From 963457454826ce836238f5b13cc7c6a5b91a170c Mon Sep 17 00:00:00 2001 From: kevin Date: Sat, 24 Feb 2024 13:57:47 -0500 Subject: [PATCH] cwWebSock.cpp : Updated to use custom event loop plugin and thereby correctly block in exec(). --- cwWebSock.cpp | 339 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 229 insertions(+), 110 deletions(-) diff --git a/cwWebSock.cpp b/cwWebSock.cpp index 23a32d0..3d4bdfc 100644 --- a/cwWebSock.cpp +++ b/cwWebSock.cpp @@ -5,6 +5,7 @@ #include "cwFileSys.h" #include "cwWebSock.h" #include "cwMpScNbQueue.h" +#include "cwTime.h" #include @@ -12,6 +13,20 @@ namespace cw { namespace websock { + + #define ALLOC_FD_CNT 16 + + struct websock_str; + + static int _custom_init_private(struct lws_context *cx, void *_loop, int tsi); + static int _custom_sock_accept(struct lws *wsi); + static void _custom_io(struct lws *wsi, unsigned int flags); + static int _custom_wsi_logical_close(struct lws *wsi); + + struct pt_eventlibs_custom + { + struct websock_str* ws; + }; // Internal outgoing msg structure. typedef struct msg_str @@ -46,9 +61,16 @@ namespace cw unsigned _recvMsgCnt; // Count of msg's recv'd unsigned _recvMaxByteN; // Max size across all recv'd msg's + + unsigned _execN; + unsigned _execSumMs; + + struct lws_event_loop_ops _event_loop_ops_custom; + lws_plugin_evlib_t _evlib_custom; } websock_t; + inline websock_t* _handleToPtr(handle_t h) { return handleToPtr(h); } @@ -90,70 +112,7 @@ namespace cw cwLogError(kInvalidArgRC,"Invalid protocol state record on http websock callback."); return 0; // TODO: issue a warning } - - websock_t* ws = ps->thisPtr; - - switch( reason ) - { - // called when libwebsocket opens a new socket - case LWS_CALLBACK_ADD_POLL_FD: - { - lws_pollargs* a = static_cast(in); - int i = 0; - // find an open slot in the polling array - for(; i < ws->_pollfdN; ++i ) - if( ws->_pollfdA[i].fd == LWS_SOCK_INVALID ) - break; - - // if an open socket was found - if( i == ws->_pollfdMaxN ) - { - cwLogError(kResourceNotAvailableRC,"All websocket poll slots are alreadry in use. Proper socket polling will not occur."); - } - else - { - // setup the poll array to be notified of incoming (browser->server) messages. - ws->_pollfdA[ i ].fd = a->fd; - ws->_pollfdA[ i ].events = LWS_POLLIN; - - if( i == ws->_pollfdN ) - ws->_pollfdN += 1; - - } - } - break; - - // called when libwebsocket closes a socket - case LWS_CALLBACK_DEL_POLL_FD: - { - lws_pollargs* a = static_cast(in); - int i = 0; - - // locate the socket that is being closed - for(; i_pollfdN; ++i) - { - if( ws->_pollfdA[i].fd == a->fd ) - { - ws->_pollfdA[i].fd = LWS_SOCK_INVALID; - ws->_pollfdA[ i ].events = 0; - break; - } - } - // Note that the libwebsock semms to send this mesg twice for every closed socket. - // This means that the socket has already been removed from pollfdA[] on the second call. - // We therefore don't warn when the socket is not found since it - // will happen on every socket. - } - break; - - case LWS_CALLBACK_CHANGE_MODE_POLL_FD: - break; - - default: - break; - } - return lws_callback_http_dummy(wsi,reason,user,in,len); } @@ -332,7 +291,8 @@ namespace cw msg_t* m; cwLogInfo("Websock: sent: msgs:%i largest msg:%i - recv: msgs:%i largest msg:%i - LWS_PRE:%i",p->_sendMsgCnt,p->_sendMaxByteN,p->_recvMsgCnt,p->_recvMaxByteN,LWS_PRE); - + cwLogInfo("Exec Time: %i avg ms, %i total ms, %i cnt", p->_execN==0 ? 0 : p->_execSumMs/p->_execN, p->_execSumMs, p->_execN ); + if( p->_ctx != nullptr ) { lws_context_destroy(p->_ctx); @@ -393,6 +353,170 @@ namespace cw return kOkRC; } + + static struct lws_pollfd *_custom_poll_find_fd(websock_t *p, lws_sockfd_type fd) + { + for (int i = 0; i < p->_pollfdN; i++) + if (p->_pollfdA[i].fd == fd) + return &p->_pollfdA[i]; + + return nullptr; + } + + // During lws context creation, we get called with the foreign loop pointer + // that was passed in the creation info struct. Stash it in our private part + // of the pt, so we can reference it in the other callbacks subsequently. + static int _custom_init_private(struct lws_context *cx, void *_loop, int tsi) + { + struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_tsi_to_evlib_pt(cx, tsi); + + // store the loop we are bound to in our private part of the pt + + priv->ws = (websock_t *)_loop; + + return 0; + } + + static int _custom_sock_accept(struct lws *wsi) + { + struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_wsi_to_evlib_pt(wsi); + websock_t *p = priv->ws; + lws_sockfd_type fd = lws_get_socket_fd(wsi); + int events = POLLIN; + struct lws_pollfd *pfd; + + lwsl_info("%s: ADD fd %d, ev %d\n", __func__, fd, events); + + if((pfd = _custom_poll_find_fd(p, fd)) != nullptr ) + { + lwsl_err("%s: ADD fd %d already in ext table\n", __func__, fd); + return 1; + } + + if (p->_pollfdN == p->_pollfdMaxN /*LWS_ARRAY_SIZE(cpcx->pollfds)*/ ) + { + lwsl_err("%s: no room left\n", __func__); + return 1; + } + + pfd = &p->_pollfdA[p->_pollfdN++]; + pfd->fd = fd; + pfd->events = (short)events; + pfd->revents = 0; + + return 0; + //return custom_poll_add_fd(priv->ws, lws_get_socket_fd(wsi), POLLIN); + } + + static void _custom_io(struct lws *wsi, unsigned int flags) + { + struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_wsi_to_evlib_pt(wsi); + websock_t *p = priv->ws; + int events_add = 0; + int events_remove = 0; + lws_sockfd_type fd = lws_get_socket_fd(wsi); + struct lws_pollfd *pfd; + + if (flags & LWS_EV_START) + { + if (flags & LWS_EV_WRITE) + events_add |= POLLOUT; + + if (flags & LWS_EV_READ) + events_add |= POLLIN; + } + else + { + if (flags & LWS_EV_WRITE) + events_remove |= POLLOUT; + + if (flags & LWS_EV_READ) + events_remove |= POLLIN; + } + + lwsl_info("%s: CHG fd %d, ev_add %d, ev_rem %d\n", __func__, fd, events_add, events_remove); + + if((pfd = _custom_poll_find_fd(p, fd)) != nullptr ) + pfd->events = (short)((pfd->events & (~events_remove)) | events_add); + } + + static int _custom_wsi_logical_close(struct lws *wsi) + { + struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_wsi_to_evlib_pt(wsi); + websock_t *p = priv->ws; + lws_sockfd_type fd = lws_get_socket_fd(wsi); + struct lws_pollfd *pfd; + + lwsl_info("%s: DEL fd %d\n", __func__, fd); + + if((pfd = _custom_poll_find_fd(p, fd)) == nullptr) + { + lwsl_err("%s: DEL fd %d missing in ext table\n", __func__, fd); + return 1; + } + + if (p->_pollfdN > 1) + *pfd = p->_pollfdA[p->_pollfdN - 1]; + + p->_pollfdN--; + + return 0; + } + + + rc_t _exec( websock_t* p, unsigned timeOutMs ) + { + rc_t rc = kOkRC; + int adjTimeOut = lws_service_adjust_timeout(p->_ctx, timeOutMs, 0); + int sysRC = 0; + + if( p->_pollfdN > 0 ) + sysRC = poll(p->_pollfdA, p->_pollfdN, adjTimeOut); + + // if poll timed-out + if( sysRC == 0 ) + return rc; + + // if error from poll() + if (sysRC < 0) + { + cwLogSysError(kOpFailRC,sysRC,"Websocket poll failed."); + goto errLabel; + } + + for(int i = 0; i < p->_pollfdN; i++) + { + lws_sockfd_type fd = p->_pollfdA[i].fd; + int ws_rc; + + + if (!p->_pollfdA[i].revents) + continue; + + ws_rc = lws_service_fd(p->_ctx, &p->_pollfdA[i]); + + // if something closed, retry this slot since may have been swapped with end + if(ws_rc && p->_pollfdA[i].fd != fd) + i--; + + // if an error occurred + if(ws_rc < 0) + { + // lws feels something bad happened, but the outer application may not care + cwLogError(kOpFailRC,"libwebsocket lws_service_fd() failed with error %i.", ws_rc); + goto errLabel; + } + + if(!ws_rc) + { + // check if it is an fd owned by the application + } + } + + errLabel: + return rc; + } + } } @@ -408,14 +532,37 @@ cw::rc_t cw::websock::create( { rc_t rc; struct lws_context_creation_info info; + void *foreign_loops[1]; if((rc = destroy(h)) != kOkRC ) return rc; - + websock_t* p = mem::allocZ(); int logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE; - lws_set_log_level(logs, NULL); + lws_set_log_level(logs, nullptr); + + p->_event_loop_ops_custom = { + .name = "custom", + .init_vhost_listen_wsi = _custom_sock_accept, + .init_pt = _custom_init_private, + .wsi_logical_close = _custom_wsi_logical_close, + .sock_accept = _custom_sock_accept, + .io = _custom_io, + .evlib_size_pt = sizeof(struct pt_eventlibs_custom) + }; + + p->_evlib_custom = { + .hdr = { + "custom event loop", + "lws_evlib_plugin", + LWS_BUILD_HASH, + LWS_PLUGIN_API_MAGIC + }, + + .ops = &p->_event_loop_ops_custom + }; + // Allocate one extra record to act as the end-of-list sentinel. p->_protocolN = protocolN + 1; @@ -456,11 +603,15 @@ cw::rc_t cw::websock::create( info.mounts = p->_mount; info.protocols = p->_protocolA; + info.event_lib_custom = &p->_evlib_custom; // bind lws to our custom event lib implementation above + foreign_loops[0] = p; // pass in the custom poll object as the foreign loop object we will bind to + info.foreign_loops = foreign_loops; + p->_q = new MpScNbQueue(); p->_cbFunc = cbFunc; p->_cbArg = cbArg; - p->_pollfdMaxN = sysconf(_SC_OPEN_MAX); + p->_pollfdMaxN = ALLOC_FD_CNT; p->_pollfdA = mem::allocZ(p->_pollfdMaxN); p->_pollfdN = 0; for(int i=0; i_pollfdMaxN; ++i) @@ -481,7 +632,6 @@ cw::rc_t cw::websock::create( return rc; } - cw::rc_t cw::websock::destroy( handle_t& h ) { rc_t rc = kOkRC; @@ -498,14 +648,11 @@ cw::rc_t cw::websock::destroy( handle_t& h ) return rc; } - cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN ) { rc_t rc = kOkRC; uint8_t* mem = mem::allocZ( sizeof(msg_t) + LWS_PRE + byteN ); - //msg_t* m = mem::allocZ(1); - //m->msg = mem::allocZ(byteN); msg_t* m = (msg_t*)mem; m->msg = mem + sizeof(msg_t); @@ -530,7 +677,7 @@ cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, unsigned sessionId va_list vl1; va_copy(vl1,vl0); - unsigned bufN = vsnprintf(NULL,0,fmt,vl0); + unsigned bufN = vsnprintf(nullptr,0,fmt,vl0); char buf[bufN+1]; unsigned n = vsnprintf(buf,bufN+1,fmt,vl1); @@ -555,37 +702,14 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs ) rc_t rc = kOkRC; websock_t* p = _handleToPtr(h); - // TODO: implement the external polling version of lws_service_fd(). - // See this: LWS_CALLBACK_ADD_POLL_FD to get the fd of the websocket. - // As of 11/20 this callback is never made - even when the websock library - // is explicitely built with -DLWS_WITH_EXTERNAL_POLL=ON - // Note the libwebsocket/test_apps/test-server.c also is incomplete and does - // not apparently represent a working version of an externally polled server. - - // Also see: https://stackoverflow.com/questions/27192071/libwebsockets-for-c-can-i-use-websocket-file-descriptor-with-select - int sysRC = 0; - - if( p->_pollfdN > 0 ) - { - sysRC = poll(p->_pollfdA, p->_pollfdN, timeOutMs ); - } - - if( sysRC < 0 ) - return cwLogSysError(kReadFailRC,errno,"Poll failed on socket."); - - if(sysRC) - { - for(int i = 0; i < p->_pollfdN; i++) - if(p->_pollfdA[i].revents) - lws_service_fd(p->_ctx, p->_pollfdA + i); - } + time::spec_t t0 = time::current_time(); + // service any pending websocket activity - with no-timeout lws_service_tsi(p->_ctx, -1, 0 ); - msg_t* m; - // Get the next pending message. + // Get the next pending outgoing message. while((m = p->_q->pop()) != nullptr ) { auto protocol = _idToProtocol(p,m->protocolId); @@ -595,28 +719,23 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs ) // remove messages from the protocol message queue which have already been sent _cleanProtocolStateList( ps ); - - - // add the pre-padding bytes to the msg - //unsigned char* msg = mem::allocZ(LWS_PRE + m->msgByteN); - //memcpy( msg+LWS_PRE, m->msg, m->msgByteN ); - - //mem::free(m->msg); // free the original msg buffer - - //m->msg = msg; m->msgId = ps->nextNewMsgId; // set the msg id ps->begMsg->link = m; // put the msg on the front of the outgoing queue ps->begMsg = m; // ps->nextNewMsgId += 1; - - + lws_callback_on_writable_all_protocol(p->_ctx,protocol); lws_service_tsi(p->_ctx, -1, 0 ); - } + + // block waiting for incoming messages + _exec(p,timeOutMs); + p->_execSumMs += time::elapsedMs(t0); + p->_execN += 1; + return rc; }