#include "cwCommon.h" #include "cwLog.h" #include "cwCommonImpl.h" #include "cwMem.h" #include "cwFileSys.h" #include "cwWebSock.h" #include "cwMpScNbQueue.h" #include "cwTime.h" #include namespace cw { namespace websock { #define ALLOC_FD_CNT 16 struct websock_str; static int _custom_init_private(struct lws_context *cx, void *_loop, int tsi); static int _custom_sock_accept(struct lws *wsi); static void _custom_io(struct lws *wsi, unsigned int flags); static int _custom_wsi_logical_close(struct lws *wsi); struct pt_eventlibs_custom { struct websock_str* ws; }; // Internal outgoing msg structure. typedef struct msg_str { unsigned protocolId; // Protocol associated with this msg. unsigned sessionId; // Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions. unsigned char* msg; // Msg data array. unsigned msgByteN; // Count of bytes in msg[]. unsigned msgId; // The msgId assigned when this msg is addded to the protocol state msg queue. unsigned sessionN; // Count of sessions to which this msg has been sent. struct msg_str* link; // Pointer to next message or nullptr if this is the last msg in the queue. } msg_t; typedef struct websock_str { cbFunc_t _cbFunc; // void* _cbArg; // struct lws_context* _ctx = nullptr; // struct lws_protocols* _protocolA = nullptr; // Websocket internal protocol state array unsigned _protocolN = 0; // Count of protocol records in _protocolA[]. unsigned _nextSessionId = 0; // Next session id. unsigned _connSessionN = 0; // Count of connected sessions. struct lws_http_mount* _mount = nullptr; // MpScNbQueue* _q; // Thread safe, non-blocking, protocol independent msg queue. lws_pollfd* _pollfdA; // socket handle array used by poll() int _pollfdMaxN; int _pollfdN; unsigned _sendMsgCnt; // Count of msg's sent unsigned _sendMaxByteN; // Max size across all sent msg's unsigned _recvMsgCnt; // Count of msg's recv'd unsigned _recvMaxByteN; // Max size across all recv'd msg's unsigned _execN; unsigned _execSumMs; struct lws_event_loop_ops _event_loop_ops_custom; lws_plugin_evlib_t _evlib_custom; } websock_t; inline websock_t* _handleToPtr(handle_t h) { return handleToPtr(h); } // Internal session record. typedef struct session_str { unsigned id; // This sessions id. unsigned protocolId; // This sessions protocol. unsigned nextMsgId; // Id of the next msg this session will receieve. } session_t; // Application protocol state record - each lws_protocols record in _protocolA[] points to one of these records. typedef struct protocolState_str { websock_t* thisPtr; // Pointer to this websocket. unsigned nextNewMsgId; // Id of the next message to add to this outgoing msg queue. msg_t* endMsg; // End of the protocol outgoing msg queue: next message to be written to the remote endpoint. msg_t* begMsg; // Begin of the protocol outgoing msg queue: last msg added to the outgoing queue by the application. unsigned sessionN; // Count of sessions using this protocol. } protocolState_t; // This callback is always from protocol 0 which receives messages when a system socket is created,deleted, or changed. int _httpCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) { const struct lws_protocols* p = lws_get_protocol(wsi); if( p == nullptr || p->user == nullptr ) { cwLogError(kInvalidArgRC,"Invalid protocol record on http websock callback."); return 0; // TODO: issue a warning } protocolState_t* ps = static_cast(p->user); if( ps == nullptr || ps->thisPtr == nullptr ) { cwLogError(kInvalidArgRC,"Invalid protocol state record on http websock callback."); return 0; // TODO: issue a warning } return lws_callback_http_dummy(wsi,reason,user,in,len); } int _internalCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) { const struct lws_protocols* p = lws_get_protocol(wsi); if( p == nullptr || p->user == nullptr ) { cwLogError(kInvalidArgRC,"Invalid protocol record on websock callback."); return 0; } protocolState_t* ps = static_cast(p->user); if( ps == nullptr || ps->thisPtr == nullptr ) { cwLogError(kInvalidArgRC,"Invalid protocol state record on websock callback."); return 0; } session_t* sess = static_cast(user); const struct lws_protocols* proto = lws_get_protocol(wsi); protocolState_t* protoState = static_cast(proto->user); websock_t* ws = ps->thisPtr; //char buf[32]; //printf("i: %i %i\n",reason,reason==LWS_CALLBACK_ADD_POLL_FD); switch( reason ) { case LWS_CALLBACK_PROTOCOL_INIT: cwLogInfo("Websocket init"); break; case LWS_CALLBACK_PROTOCOL_DESTROY: cwLogInfo("Websocket destroy"); break; case LWS_CALLBACK_ESTABLISHED: cwLogInfo("Websocket session:%i opened: \n",ws->_nextSessionId); sess->id = ws->_nextSessionId++; sess->protocolId = proto->id; protoState->sessionN += 1; ws->_connSessionN += 1; if( ws->_cbFunc != nullptr) ws->_cbFunc(ws->_cbArg, proto->id, sess->id, kConnectTId, nullptr, 0); //if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0) // printf("conn:%p %s\n",user,buf); break; case LWS_CALLBACK_CLOSED: cwLogInfo("Websocket connection closed.\n"); ws->_connSessionN -= 1; protoState->sessionN -= 1; cwAssert( protoState->sessionN >= 0 ); if( ws->_cbFunc != nullptr) ws->_cbFunc(ws->_cbArg,proto->id,sess->id,kDisconnectTId,nullptr,0); break; case LWS_CALLBACK_SERVER_WRITEABLE: { msg_t* m1 = protoState->endMsg; cwAssert(m1 != nullptr); // for each possible msg while( m1->link != nullptr ) { msg_t* m = m1->link; // if this msg has not already been sent to this session if( m->msgId >= sess->nextMsgId ) { // if the msg sessiond id is not valid or matches this session id ... if( m->sessionId == kInvalidId || m->sessionId == sess->id ) { // ... then send the msg to this session // Note: msgByteN should not include LWS_PRE int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT); // if the write failed if(lws_result < (int)m->msgByteN) { cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result); return -1; } } // at this point the write succeeded or this session was skipped sess->nextMsgId = m->msgId + 1; m->sessionN += 1; break; } m1 = m1->link; } } break; case LWS_CALLBACK_RECEIVE: //printf("recv: sess:%i proto:%s : %p : len:%li\n",sess->id,proto->name,ws->_cbFunc,len); if( ws->_cbFunc != nullptr && len>0) { ws->_cbFunc(ws->_cbArg,proto->id,sess->id,kMessageTId,in,len); ws->_recvMsgCnt += 1; ws->_recvMaxByteN = std::max(ws->_recvMaxByteN,(unsigned)len); } break; default: break; } return 0; } struct lws_protocols* _idToProtocol( websock_t* p, unsigned protocolId ) { for(unsigned i=0; i_protocolN; ++i) if( p->_protocolA[i].id == protocolId ) return p->_protocolA + i; cwAssert(0); return nullptr; } void _cleanProtocolStateList( protocolState_t* ps ) { msg_t* m0 = nullptr; msg_t* m1 = ps->endMsg; while( m1->link != nullptr ) { if( m1->link->sessionN >= ps->sessionN ) { if( m0 == nullptr ) ps->endMsg = m1->link; else m0->link = m1->link; msg_t* t = m1->link; //mem::free(m1->msg); mem::free(m1); m1 = t; continue; } m0 = m1; m1 = m1->link; } } rc_t _destroy( websock_t* p ) { msg_t* m; cwLogInfo("Websock: sent: msgs:%i largest msg:%i - recv: msgs:%i largest msg:%i - LWS_PRE:%i",p->_sendMsgCnt,p->_sendMaxByteN,p->_recvMsgCnt,p->_recvMaxByteN,LWS_PRE); cwLogInfo("Exec Time: %i avg ms, %i total ms, %i cnt", p->_execN==0 ? 0 : p->_execSumMs/p->_execN, p->_execSumMs, p->_execN ); if( p->_ctx != nullptr ) { lws_context_destroy(p->_ctx); p->_ctx = nullptr; } if( p->_q != nullptr ) { while((m = p->_q->pop()) != nullptr) { //mem::free(m->msg); mem::free(m); } delete p->_q; } for(int i=0; p->_protocolA!=nullptr and p->_protocolA[i].callback != nullptr; ++i) { mem::free(const_cast(p->_protocolA[i].name)); // TODO: delete any msgs in the protocol state here auto ps = static_cast(p->_protocolA[i].user); m = ps->endMsg; while( m != nullptr ) { msg_t* tmp = m->link; //mem::free(m->msg); mem::free(m); m = tmp; } mem::free(ps); } mem::release(p->_protocolA); p->_protocolN = 0; if( p->_mount != nullptr ) { mem::free(const_cast(p->_mount->origin)); mem::free(const_cast(p->_mount->def)); mem::release(p->_mount); } mem::release(p->_pollfdA); p->_pollfdMaxN = 0; p->_pollfdN = 0; p->_nextSessionId = 0; p->_connSessionN = 0; mem::release(p); return kOkRC; } static struct lws_pollfd *_custom_poll_find_fd(websock_t *p, lws_sockfd_type fd) { for (int i = 0; i < p->_pollfdN; i++) if (p->_pollfdA[i].fd == fd) return &p->_pollfdA[i]; return nullptr; } // During lws context creation, we get called with the foreign loop pointer // that was passed in the creation info struct. Stash it in our private part // of the pt, so we can reference it in the other callbacks subsequently. static int _custom_init_private(struct lws_context *cx, void *_loop, int tsi) { struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_tsi_to_evlib_pt(cx, tsi); // store the loop we are bound to in our private part of the pt priv->ws = (websock_t *)_loop; return 0; } static int _custom_sock_accept(struct lws *wsi) { struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_wsi_to_evlib_pt(wsi); websock_t *p = priv->ws; lws_sockfd_type fd = lws_get_socket_fd(wsi); int events = POLLIN; struct lws_pollfd *pfd; lwsl_info("%s: ADD fd %d, ev %d\n", __func__, fd, events); if((pfd = _custom_poll_find_fd(p, fd)) != nullptr ) { lwsl_err("%s: ADD fd %d already in ext table\n", __func__, fd); return 1; } if (p->_pollfdN == p->_pollfdMaxN /*LWS_ARRAY_SIZE(cpcx->pollfds)*/ ) { lwsl_err("%s: no room left\n", __func__); return 1; } pfd = &p->_pollfdA[p->_pollfdN++]; pfd->fd = fd; pfd->events = (short)events; pfd->revents = 0; return 0; //return custom_poll_add_fd(priv->ws, lws_get_socket_fd(wsi), POLLIN); } static void _custom_io(struct lws *wsi, unsigned int flags) { struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_wsi_to_evlib_pt(wsi); websock_t *p = priv->ws; int events_add = 0; int events_remove = 0; lws_sockfd_type fd = lws_get_socket_fd(wsi); struct lws_pollfd *pfd; if (flags & LWS_EV_START) { if (flags & LWS_EV_WRITE) events_add |= POLLOUT; if (flags & LWS_EV_READ) events_add |= POLLIN; } else { if (flags & LWS_EV_WRITE) events_remove |= POLLOUT; if (flags & LWS_EV_READ) events_remove |= POLLIN; } lwsl_info("%s: CHG fd %d, ev_add %d, ev_rem %d\n", __func__, fd, events_add, events_remove); if((pfd = _custom_poll_find_fd(p, fd)) != nullptr ) pfd->events = (short)((pfd->events & (~events_remove)) | events_add); } static int _custom_wsi_logical_close(struct lws *wsi) { struct pt_eventlibs_custom *priv = (struct pt_eventlibs_custom *)lws_evlib_wsi_to_evlib_pt(wsi); websock_t *p = priv->ws; lws_sockfd_type fd = lws_get_socket_fd(wsi); struct lws_pollfd *pfd; lwsl_info("%s: DEL fd %d\n", __func__, fd); if((pfd = _custom_poll_find_fd(p, fd)) == nullptr) { lwsl_err("%s: DEL fd %d missing in ext table\n", __func__, fd); return 1; } if (p->_pollfdN > 1) *pfd = p->_pollfdA[p->_pollfdN - 1]; p->_pollfdN--; return 0; } rc_t _exec( websock_t* p, unsigned timeOutMs ) { rc_t rc = kOkRC; int adjTimeOut = lws_service_adjust_timeout(p->_ctx, timeOutMs, 0); int sysRC = 0; if( p->_pollfdN > 0 ) sysRC = poll(p->_pollfdA, p->_pollfdN, adjTimeOut); // if poll timed-out if( sysRC == 0 ) return rc; // if error from poll() if (sysRC < 0) { cwLogSysError(kOpFailRC,sysRC,"Websocket poll failed."); goto errLabel; } for(int i = 0; i < p->_pollfdN; i++) { lws_sockfd_type fd = p->_pollfdA[i].fd; int ws_rc; if (!p->_pollfdA[i].revents) continue; ws_rc = lws_service_fd(p->_ctx, &p->_pollfdA[i]); // if something closed, retry this slot since may have been swapped with end if(ws_rc && p->_pollfdA[i].fd != fd) i--; // if an error occurred if(ws_rc < 0) { // lws feels something bad happened, but the outer application may not care cwLogError(kOpFailRC,"libwebsocket lws_service_fd() failed with error %i.", ws_rc); goto errLabel; } if(!ws_rc) { // check if it is an fd owned by the application } } errLabel: return rc; } } } cw::rc_t cw::websock::create( handle_t& h, cbFunc_t cbFunc, void* cbArg, const char* physRootDir, const char* dfltHtmlPageFn, int port, const protocol_t* protocolArgA, unsigned protocolN ) { rc_t rc; struct lws_context_creation_info info; void *foreign_loops[1]; if((rc = destroy(h)) != kOkRC ) return rc; websock_t* p = mem::allocZ(); int logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE; lws_set_log_level(logs, nullptr); p->_event_loop_ops_custom = { .name = "custom", .init_vhost_listen_wsi = _custom_sock_accept, .init_pt = _custom_init_private, .wsi_logical_close = _custom_wsi_logical_close, .sock_accept = _custom_sock_accept, .io = _custom_io, .evlib_size_pt = sizeof(struct pt_eventlibs_custom) }; p->_evlib_custom = { .hdr = { "custom event loop", "lws_evlib_plugin", LWS_BUILD_HASH, LWS_PLUGIN_API_MAGIC }, .ops = &p->_event_loop_ops_custom }; // Allocate one extra record to act as the end-of-list sentinel. p->_protocolN = protocolN + 1; p->_protocolA = mem::allocZ(p->_protocolN); // Setup the websocket internal protocol state array for(unsigned i=0; i(1); auto dummy = mem::allocZ(1); protocolState->thisPtr = p; protocolState->begMsg = dummy; protocolState->endMsg = dummy; // Setup the interal lws_protocols record struct lws_protocols* pr = p->_protocolA + i; pr->name = mem::allocStr(protocolArgA[i].label); pr->id = protocolArgA[i].id; pr->rx_buffer_size = protocolArgA[i].rcvBufByteN; pr->tx_packet_size = 0; //protocolArgA[i].xmtBufByteN; pr->per_session_data_size = sizeof(session_t); pr->callback = strcmp(pr->name,"http")==0 ? _httpCallback : _internalCallback; pr->user = protocolState; // maintain a ptr to the application protocol state } static const char* slash = {"/"}; p->_mount = mem::allocZ(1); p->_mount->mountpoint = slash; p->_mount->mountpoint_len = strlen(slash); p->_mount->origin = filesys::expandPath(physRootDir); // physical directory assoc'd with http "/" p->_mount->def = mem::allocStr(dfltHtmlPageFn); p->_mount->origin_protocol= LWSMPRO_FILE; memset(&info,0,sizeof(info)); info.port = port; info.mounts = p->_mount; info.protocols = p->_protocolA; info.event_lib_custom = &p->_evlib_custom; // bind lws to our custom event lib implementation above foreign_loops[0] = p; // pass in the custom poll object as the foreign loop object we will bind to info.foreign_loops = foreign_loops; p->_q = new MpScNbQueue(); p->_cbFunc = cbFunc; p->_cbArg = cbArg; p->_pollfdMaxN = ALLOC_FD_CNT; p->_pollfdA = mem::allocZ(p->_pollfdMaxN); p->_pollfdN = 0; for(int i=0; i_pollfdMaxN; ++i) p->_pollfdA[i].fd = LWS_SOCK_INVALID; if((p->_ctx = lws_create_context(&info)) == 0) { rc = cwLogError(kObjAllocFailRC,"Unable to create the websocket context."); goto errLabel; } errLabel: if( rc != kOkRC ) _destroy(p); else h.set(p); return rc; } cw::rc_t cw::websock::destroy( handle_t& h ) { rc_t rc = kOkRC; if(!h.isValid()) return rc; websock_t* p = _handleToPtr(h); if((rc = _destroy(p)) != kOkRC ) return rc; h.clear(); return rc; } cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN ) { rc_t rc = kOkRC; uint8_t* mem = mem::allocZ( sizeof(msg_t) + LWS_PRE + byteN ); msg_t* m = (msg_t*)mem; m->msg = mem + sizeof(msg_t); memcpy(m->msg+LWS_PRE,msg,byteN); m->msgByteN = byteN; m->protocolId = protocolId; m->sessionId = sessionId; websock_t* p = _handleToPtr(h); p->_q->push(m); p->_sendMsgCnt += 1; p->_sendMaxByteN = std::max(p->_sendMaxByteN,byteN); return rc; } cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, va_list vl0 ) { rc_t rc = kOkRC; va_list vl1; va_copy(vl1,vl0); unsigned bufN = vsnprintf(nullptr,0,fmt,vl0); char buf[bufN+1]; unsigned n = vsnprintf(buf,bufN+1,fmt,vl1); rc = send(h,protocolId,sessionId,buf,n); va_end(vl1); return rc; } cw::rc_t cw::websock::sendF( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, ... ) { va_list vl; va_start(vl,fmt); rc_t rc = sendV(h,protocolId,sessionId,fmt,vl); va_end(vl); return rc; } cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs ) { rc_t rc = kOkRC; websock_t* p = _handleToPtr(h); time::spec_t t0 = time::current_time(); // service any pending websocket activity - with no-timeout lws_service_tsi(p->_ctx, -1, 0 ); msg_t* m; // Get the next pending outgoing message. while((m = p->_q->pop()) != nullptr ) { auto protocol = _idToProtocol(p,m->protocolId); // Get the application protcol record for this message protocolState_t* ps = static_cast(protocol->user); // remove messages from the protocol message queue which have already been sent _cleanProtocolStateList( ps ); m->msgId = ps->nextNewMsgId; // set the msg id ps->begMsg->link = m; // put the msg on the front of the outgoing queue ps->begMsg = m; // ps->nextNewMsgId += 1; lws_callback_on_writable_all_protocol(p->_ctx,protocol); lws_service_tsi(p->_ctx, -1, 0 ); } // block waiting for incoming messages _exec(p,timeOutMs); p->_execSumMs += time::elapsedMs(t0); p->_execN += 1; return rc; }