diff --git a/cwWebSock.cpp b/cwWebSock.cpp index b62e416..16ddfd7 100644 --- a/cwWebSock.cpp +++ b/cwWebSock.cpp @@ -36,6 +36,10 @@ namespace cw unsigned _connSessionN = 0; // Count of connected sessions. struct lws_http_mount* _mount = nullptr; // MpScNbQueue* _q; // Thread safe, non-blocking, protocol independent msg queue. + + lws_pollfd* _pollfdA; // socket handle array used by poll() + int _pollfdMaxN; + int _pollfdN; } websock_t; @@ -61,27 +65,121 @@ namespace cw unsigned sessionN; // Count of sessions using this protocol. } protocolState_t; - - int _internalCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) + // This callback is always from protocol 0 which receives messages when a system socket is created,deleted, or changed. + int _httpCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) { const struct lws_protocols* p = lws_get_protocol(wsi); if( p == nullptr || p->user == nullptr ) + { + cwLogError(kInvalidArgRC,"Invalid protocol record on http websock callback."); return 0; // TODO: issue a warning - + } + protocolState_t* ps = static_cast(p->user); if( ps == nullptr || ps->thisPtr == nullptr ) + { + cwLogError(kInvalidArgRC,"Invalid protocol state record on http websock callback."); return 0; // TODO: issue a warning + } + websock_t* ws = ps->thisPtr; + + switch( reason ) + { + // called when libwebsocket opens a new socket + case LWS_CALLBACK_ADD_POLL_FD: + { + lws_pollargs* a = static_cast(in); + int i = 0; + + // find an open slot in the polling array + for(; i < ws->_pollfdN; ++i ) + if( ws->_pollfdA[i].fd == LWS_SOCK_INVALID ) + break; + + // if an open socket was found + if( i == ws->_pollfdMaxN ) + { + cwLogError(kResourceNotAvailableRC,"All websocket poll slots are alreadry in use. Proper socket polling will not occur."); + } + else + { + // setup the poll array to be notified of incoming (browser->server) messages. + ws->_pollfdA[ i ].fd = a->fd; + ws->_pollfdA[ i ].events = LWS_POLLIN; + + if( i == ws->_pollfdN ) + ws->_pollfdN += 1; + + } + } + break; + + // called when libwebsocket closes a socket + case LWS_CALLBACK_DEL_POLL_FD: + { + lws_pollargs* a = static_cast(in); + int i = 0; + + // locate the socket that is being closed + for(; i_pollfdN; ++i) + { + if( ws->_pollfdA[i].fd == a->fd ) + { + ws->_pollfdA[i].fd = LWS_SOCK_INVALID; + ws->_pollfdA[ i ].events = 0; + break; + } + } + // Note that the libwebsock semms to send this mesg twice for every closed socket. + // This means that the socket has already been removed from pollfdA[] on the second call. + // We therefore don't warn when the socket is not found since it + // will happen on every socket. + } + break; + + case LWS_CALLBACK_CHANGE_MODE_POLL_FD: + break; + + default: + break; + } + + return lws_callback_http_dummy(wsi,reason,user,in,len); + } + + int _internalCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) + { + + const struct lws_protocols* p = lws_get_protocol(wsi); + + if( p == nullptr || p->user == nullptr ) + { + cwLogError(kInvalidArgRC,"Invalid protocol record on websock callback."); + return 0; + } + + protocolState_t* ps = static_cast(p->user); + + if( ps == nullptr || ps->thisPtr == nullptr ) + { + cwLogError(kInvalidArgRC,"Invalid protocol state record on websock callback."); + return 0; + } + session_t* sess = static_cast(user); const struct lws_protocols* proto = lws_get_protocol(wsi); protocolState_t* protoState = static_cast(proto->user); - websock_t* thisPtr = ps->thisPtr; + websock_t* ws = ps->thisPtr; //char buf[32]; - + + //printf("i: %i %i\n",reason,reason==LWS_CALLBACK_ADD_POLL_FD); + + switch( reason ) { case LWS_CALLBACK_PROTOCOL_INIT: @@ -93,15 +191,15 @@ namespace cw break; case LWS_CALLBACK_ESTABLISHED: - cwLogInfo("Websocket session:%i opened: \n",thisPtr->_nextSessionId); + cwLogInfo("Websocket session:%i opened: \n",ws->_nextSessionId); - sess->id = thisPtr->_nextSessionId++; + sess->id = ws->_nextSessionId++; sess->protocolId = proto->id; protoState->sessionN += 1; - thisPtr->_connSessionN += 1; + ws->_connSessionN += 1; - if( thisPtr->_cbFunc != nullptr) - thisPtr->_cbFunc(thisPtr->_cbArg, proto->id, sess->id, kConnectTId, nullptr, 0); + if( ws->_cbFunc != nullptr) + ws->_cbFunc(ws->_cbArg, proto->id, sess->id, kConnectTId, nullptr, 0); //if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0) // printf("conn:%p %s\n",user,buf); @@ -110,12 +208,12 @@ namespace cw case LWS_CALLBACK_CLOSED: cwLogInfo("Websocket connection closed.\n"); - thisPtr->_connSessionN -= 1; + ws->_connSessionN -= 1; protoState->sessionN -= 1; cwAssert( protoState->sessionN >= 0 ); - if( thisPtr->_cbFunc != nullptr) - thisPtr->_cbFunc(thisPtr->_cbArg,proto->id,sess->id,kDisconnectTId,nullptr,0); + if( ws->_cbFunc != nullptr) + ws->_cbFunc(ws->_cbArg,proto->id,sess->id,kDisconnectTId,nullptr,0); break; @@ -163,10 +261,10 @@ namespace cw break; case LWS_CALLBACK_RECEIVE: - //printf("recv: sess:%i proto:%s : %p : len:%li\n",sess->id,proto->name,thisPtr->_cbFunc,len); + //printf("recv: sess:%i proto:%s : %p : len:%li\n",sess->id,proto->name,ws->_cbFunc,len); - if( thisPtr->_cbFunc != nullptr && len>0) - thisPtr->_cbFunc(thisPtr->_cbArg,proto->id,sess->id,kMessageTId,in,len); + if( ws->_cbFunc != nullptr && len>0) + ws->_cbFunc(ws->_cbArg,proto->id,sess->id,kMessageTId,in,len); break; @@ -271,6 +369,10 @@ namespace cw mem::release(p->_mount); } + mem::release(p->_pollfdA); + p->_pollfdMaxN = 0; + p->_pollfdN = 0; + p->_nextSessionId = 0; p->_connSessionN = 0; @@ -325,7 +427,7 @@ cw::rc_t cw::websock::create( pr->rx_buffer_size = protocolArgA[i].rcvBufByteN; pr->tx_packet_size = 0; //protocolArgA[i].xmtBufByteN; pr->per_session_data_size = sizeof(session_t); - pr->callback = strcmp(pr->name,"http")==0 ? lws_callback_http_dummy : _internalCallback; + pr->callback = strcmp(pr->name,"http")==0 ? _httpCallback : _internalCallback; pr->user = protocolState; // maintain a ptr to the application protocol state } @@ -345,6 +447,12 @@ cw::rc_t cw::websock::create( p->_q = new MpScNbQueue(); p->_cbFunc = cbFunc; p->_cbArg = cbArg; + + p->_pollfdMaxN = sysconf(_SC_OPEN_MAX); + p->_pollfdA = mem::allocZ(p->_pollfdMaxN); + p->_pollfdN = 0; + for(int i=0; i_pollfdMaxN; ++i) + p->_pollfdA[i].fd = LWS_SOCK_INVALID; if((p->_ctx = lws_create_context(&info)) == 0) { @@ -427,13 +535,38 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs ) rc_t rc = kOkRC; websock_t* p = _handleToPtr(h); - // TODO: The return value of lws_service() is undocumented - look at the source code. - lws_service(p->_ctx, timeOutMs ); + // TODO: implement the external polling version of lws_service_fd(). + // See this: LWS_CALLBACK_ADD_POLL_FD to get the fd of the websocket. + // As of 11/20 this callback is never made - even when the websock library + // is explicitely built with -DLWS_WITH_EXTERNAL_POLL=ON + // Note the libwebsocket/test_apps/test-server.c also is incomplete and does + // not apparently represent a working version of an externally polled server. + + // Also see: https://stackoverflow.com/questions/27192071/libwebsockets-for-c-can-i-use-websocket-file-descriptor-with-select + int sysRC = 0; + + if( p->_pollfdN > 0 ) + { + sysRC = poll(p->_pollfdA, p->_pollfdN, timeOutMs ); + } + + if( sysRC < 0 ) + return cwLogSysError(kReadFailRC,errno,"Poll failed on socket."); + + if(sysRC) + { + for(int i = 0; i < p->_pollfdN; i++) + if(p->_pollfdA[i].revents) + lws_service_fd(p->_ctx, p->_pollfdA + i); + } + + lws_service_tsi(p->_ctx, -1, 0 ); + msg_t* m; // Get the next pending message. - if((m = p->_q->pop()) != nullptr ) + while((m = p->_q->pop()) != nullptr ) { auto protocol = _idToProtocol(p,m->protocolId); @@ -458,7 +591,9 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs ) lws_callback_on_writable_all_protocol(p->_ctx,protocol); - + + lws_service_tsi(p->_ctx, -1, 0 ); + }