非阻塞Accept和Connect的封装器,WINDOWS版本
acceptor
#ifndef _ACCEPTOR_H#define _ACCEPTOR_Htypedef struct acceptor* acceptor_t;typedef void (*on_accept)(SOCKET);acceptor_t create_acceptor(const char *ip,unsigned long port,on_accept accept_callback);void destroy_acceptor(acceptor_t*);void acceptor_run(acceptor_t,int ms);#endif
#include#include #include #include #include "Acceptor.h"struct acceptor{ on_accept accept_callback; SOCKET sock; FD_SET Set;};acceptor_t create_acceptor(const char *ip,unsigned long port,on_accept accept_callback){ SOCKET ListenSocket; struct sockaddr_in addr; int optval=1; //Socket属性值 unsigned long ul=1; acceptor_t a; ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (INVALID_SOCKET == ListenSocket) { printf("\nError occurred while opening socket: %d.", WSAGetLastError()); return NULL; } addr.sin_family = AF_INET; addr.sin_port = htons((u_short)port); addr.sin_addr.S_un.S_addr = inet_addr(ip); if ((bind(ListenSocket, (struct sockaddr *)&addr, sizeof( struct sockaddr_in))) == SOCKET_ERROR) { closesocket(ListenSocket); return NULL; } if((listen(ListenSocket, 5)) == SOCKET_ERROR) { closesocket(ListenSocket); return NULL; } ioctlsocket(ListenSocket,FIONBIO,(unsigned long*)&ul); a = malloc(sizeof(*a)); a->sock = ListenSocket; a->accept_callback = accept_callback; FD_ZERO(&a->Set); FD_SET(a->sock,&a->Set); return a;}void destroy_acceptor(acceptor_t *a){ closesocket((*a)->sock); free(*a); *a = NULL;}void acceptor_run(acceptor_t a,int ms){ struct timeval timeout; SOCKET client; struct sockaddr_in ClientAddress; int nClientLength = sizeof(ClientAddress); timeout.tv_sec = 0; timeout.tv_usec = 1000*ms; if(select(0, &a->Set,NULL, NULL, &timeout) >0 ) { for(;;) { client = accept(a->sock, (struct sockaddr*)&ClientAddress, &nClientLength); if (INVALID_SOCKET == client) break; else { a->accept_callback(client); } } } FD_ZERO(&a->Set); FD_SET(a->sock,&a->Set);}
connector
#ifndef _CONNECTOR_H#define _CONNECTOR_Htypedef struct connector *connector_t;typedef void (*on_connect)(SOCKET,const char *ip,unsigned long port);connector_t connector_create();void connector_destroy(connector_t*);int connector_connect(connector_t,const char *ip,unsigned long port,on_connect,unsigned long ms);void connector_run(connector_t,unsigned long ms);#endif
#include#include #include #include "Connector.h"#include #include "link_list.h"typedef struct pending_connect{ list_node lnode; const char *ip; unsigned long port; SOCKET sock; on_connect call_back; unsigned long timeout;};struct connector{ FD_SET Set; struct link_list *_pending_connect; unsigned long fd_seisize;};connector_t connector_create(){ connector_t c = malloc(sizeof(*c)); c->fd_seisize = 0; FD_ZERO(&c->Set); c->_pending_connect = create_list(); return c;}void connector_destroy(connector_t *c){ struct pending_connect *pc; while(pc = LIST_POP(struct pending_connect*,(*c)->_pending_connect)) free(pc); free(*c); *c = 0;}int connector_connect(connector_t c,const char *ip,unsigned long port,on_connect call_back,unsigned long ms){ struct sockaddr_in remote; ULONG NonBlock = 1; SOCKET sock; struct pending_connect *pc; int slot = -1; if(c->fd_seisize >= FD_SETSIZE) return -1; sock =socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (sock == INVALID_SOCKET) { printf("\nError occurred while opening socket: %d.", WSAGetLastError()); return -1; } remote.sin_family = AF_INET; remote.sin_port = htons((u_short)port); remote.sin_addr.s_addr = inet_addr(ip); if(ms>0) { if (ioctlsocket(sock, FIONBIO, &NonBlock) == SOCKET_ERROR) { closesocket(sock); printf("ioctlsocket() failed with error %d\n", WSAGetLastError()); return -1; } } if(connect(sock, (struct sockaddr *)&remote, sizeof(remote)) >=0 ) { //连接成功,无需要后续处理了,直接调用回调函数 call_back(sock,ip,port); return 0; } pc = malloc(sizeof(*pc)); pc->lnode.next = NULL; pc->sock = sock; pc->ip = ip; pc->port = port; pc->call_back = call_back; pc->timeout = GetTickCount() + ms; FD_SET(sock,&c->Set); LIST_PUSH_BACK(c->_pending_connect,pc); ++c->fd_seisize; return 0;}void connector_run(connector_t c,unsigned long ms){ int i = 0; DWORD tick; int size; int total; struct pending_connect *pc; struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 1000*ms; size = list_size(c->_pending_connect); if(size == 0) return; if((total = select(0, NULL,&c->Set, NULL, &timeout)) >0 ) { for(; i < size; ++i) { pc = LIST_POP(struct pending_connect*,c->_pending_connect); if(pc) { if(FD_ISSET(pc->sock, &c->Set)) { pc->call_back(pc->sock,pc->ip,pc->port); free(pc); --c->fd_seisize; } else LIST_PUSH_BACK(c->_pending_connect,pc); } } } FD_ZERO(&c->Set); tick = GetTickCount(); size = list_size(c->_pending_connect); i = 0; for(; i < (int)size; ++i) { pc = LIST_POP(struct pending_connect*,c->_pending_connect); if(tick >= pc->timeout) { //超时了 pc->call_back(INVALID_SOCKET,pc->ip,pc->port); free(pc); --c->fd_seisize; } else { LIST_PUSH_BACK(c->_pending_connect,pc); FD_SET(pc->sock,&c->Set); } }}
使用方式
//acceptoracceptor_t a = create_acceptor("192.168.6.13",8010,&accept_callback);while(1) acceptor_run(a,10);//connectorconnector_t c = NULL;int ret;int i = 0;c = connector_create();for( ; i < 100;++i){ ret = connector_connect(c,"192.168.6.13",8010,on_connect_callback,1000*20); Sleep(1);}while(1){ connector_run(c,0);}