From 693c87d5981b4f7e8f69c76753772426d8d4f0b5 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 25 Mar 2024 13:32:46 -0400 Subject: [PATCH] cwWebSock.cpp : Now uses cwNbMpScQueue to cache outgoing messages. --- cwWebSock.cpp | 238 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 141 insertions(+), 97 deletions(-) diff --git a/cwWebSock.cpp b/cwWebSock.cpp index 3d4bdfc..156dfca 100644 --- a/cwWebSock.cpp +++ b/cwWebSock.cpp @@ -1,13 +1,16 @@ + #include "cwCommon.h" #include "cwLog.h" #include "cwCommonImpl.h" #include "cwMem.h" #include "cwFileSys.h" +#include "cwObject.h" #include "cwWebSock.h" -#include "cwMpScNbQueue.h" +#include "cwNbMpScQueue.h" #include "cwTime.h" #include +#include namespace cw { @@ -31,15 +34,18 @@ namespace cw // Internal outgoing msg structure. typedef struct msg_str { - unsigned protocolId; // Protocol associated with this msg. - unsigned sessionId; // Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions. - unsigned char* msg; // Msg data array. - unsigned msgByteN; // Count of bytes in msg[]. - unsigned msgId; // The msgId assigned when this msg is addded to the protocol state msg queue. - unsigned sessionN; // Count of sessions to which this msg has been sent. - struct msg_str* link; // Pointer to next message or nullptr if this is the last msg in the queue. + unsigned protocolId; // 0 Protocol associated with this msg. + unsigned sessionId; // 4 Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions. + unsigned char* msg; // 8 Msg data array. + unsigned msgByteN; // 16 Count of bytes in msg[]. + unsigned msgId; // 20 The msgId assigned when this msg is addded to the protocol state msg queue. + unsigned sessionN; // 24 Count of sessions to which this msg has been sent. + struct msg_str* link; // 28 Pointer to next message or nullptr if this is the last msg in the queue. + unsigned pad; // 36-40 (make size of msg_t a multiple of 8) } msg_t; + static_assert( sizeof(msg_t) % 8 == 0 ); + typedef struct websock_str { cbFunc_t _cbFunc; // @@ -50,7 +56,7 @@ namespace cw unsigned _nextSessionId = 0; // Next session id. unsigned _connSessionN = 0; // Count of connected sessions. struct lws_http_mount* _mount = nullptr; // - MpScNbQueue* _q; // Thread safe, non-blocking, protocol independent msg queue. + nbmpscq::handle_t _qH; // Thread safe, non-blocking, protocol independent msg queue. lws_pollfd* _pollfdA; // socket handle array used by poll() int _pollfdMaxN; @@ -184,14 +190,13 @@ namespace cw case LWS_CALLBACK_SERVER_WRITEABLE: { - msg_t* m1 = protoState->endMsg; - cwAssert(m1 != nullptr); + msg_t* m = protoState->endMsg; - // for each possible msg - while( m1->link != nullptr ) + // for each possible outgoing msg in this protocol state record + while( m != nullptr ) { - msg_t* m = m1->link; - + bool msg_written_fl = false; + // if this msg has not already been sent to this session if( m->msgId >= sess->nextMsgId ) { @@ -199,7 +204,8 @@ namespace cw // if the msg sessiond id is not valid or matches this session id ... if( m->sessionId == kInvalidId || m->sessionId == sess->id ) { - // ... then send the msg to this session + // ... then send the msg to this session + // Note: msgByteN should not include LWS_PRE int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT); @@ -209,17 +215,25 @@ namespace cw cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result); return -1; } + + msg_written_fl = true; } // at this point the write succeeded or this session was skipped sess->nextMsgId = m->msgId + 1; - m->sessionN += 1; - - break; + + // incr the msg session count - once m->sessionN >= protoState->sessionN this msg will be deleted in _cleanProtocoStateList() + m->sessionN += 1; + + // If a record was actually sent then we are done since only one + // call to lws_write() per LWS_CALLBACK_SERVER_WRITEABLE callback + // is permitted + if(msg_written_fl) + break; } - m1 = m1->link; + m = m->link; } } @@ -255,40 +269,41 @@ namespace cw return nullptr; } - - void _cleanProtocolStateList( protocolState_t* ps ) + // Remove message from the send queue (p->_qH), and the protocol msg list, + // which have already been sent to all relevant sessions. + rc_t _cleanProtocolStateList( websock_t* p, protocolState_t* ps ) { - msg_t* m0 = nullptr; - msg_t* m1 = ps->endMsg; + rc_t rc = kOkRC; + nbmpscq::blob_t b = get(p->_qH); - while( m1->link != nullptr ) + msg_t* oldest_msg = (msg_t*)b.blob; + + // if the last protocol end msg has been sent to all sessions ... + while( oldest_msg != nullptr && ps->endMsg != nullptr && ps->endMsg->sessionN >= ps->sessionN ) { - if( m1->link->sessionN >= ps->sessionN ) + // ... and this msg is also the oldest msg in the send queue + if( oldest_msg->msgId == ps->endMsg->msgId ) { - if( m0 == nullptr ) - ps->endMsg = m1->link; - else - m0->link = m1->link; - - msg_t* t = m1->link; - - //mem::free(m1->msg); - mem::free(m1); - - m1 = t; - - continue; + ps->endMsg = ps->endMsg->link; + b = advance(p->_qH); // ... then pop the msg off the send queue + oldest_msg = (msg_t*)b.blob; + } + else // ... otherwise the no further progress can be made + { + break; } - - m0 = m1; - m1 = m1->link; } + + if( ps->endMsg == nullptr ) + ps->begMsg = nullptr; + + return rc; } - rc_t _destroy( websock_t* p ) { - msg_t* m; + rc_t rc = kOkRC; + //msg_t* m; cwLogInfo("Websock: sent: msgs:%i largest msg:%i - recv: msgs:%i largest msg:%i - LWS_PRE:%i",p->_sendMsgCnt,p->_sendMaxByteN,p->_recvMsgCnt,p->_recvMaxByteN,LWS_PRE); cwLogInfo("Exec Time: %i avg ms, %i total ms, %i cnt", p->_execN==0 ? 0 : p->_execSumMs/p->_execN, p->_execSumMs, p->_execN ); @@ -298,39 +313,20 @@ namespace cw lws_context_destroy(p->_ctx); p->_ctx = nullptr; } - - if( p->_q != nullptr ) - { - while((m = p->_q->pop()) != nullptr) - { - //mem::free(m->msg); - mem::free(m); - } - - delete p->_q; - } - for(int i=0; p->_protocolA!=nullptr and p->_protocolA[i].callback != nullptr; ++i) { mem::free(const_cast(p->_protocolA[i].name)); // TODO: delete any msgs in the protocol state here auto ps = static_cast(p->_protocolA[i].user); - - m = ps->endMsg; - while( m != nullptr ) - { - msg_t* tmp = m->link; - - //mem::free(m->msg); - mem::free(m); - m = tmp; - } - + mem::free(ps); } - + + if((rc = nbmpscq::destroy(p->_qH)) != kOkRC ) + cwLogError(rc,"Websock queue destroy failed."); + mem::release(p->_protocolA); p->_protocolN = 0; @@ -573,11 +569,11 @@ cw::rc_t cw::websock::create( { // Allocate the application protocol state array where this application can keep protocol related info auto protocolState = mem::allocZ(1); - auto dummy = mem::allocZ(1); + //auto dummy = mem::allocZ(1); protocolState->thisPtr = p; - protocolState->begMsg = dummy; - protocolState->endMsg = dummy; + protocolState->begMsg = nullptr; + protocolState->endMsg = nullptr; // Setup the interal lws_protocols record struct lws_protocols* pr = p->_protocolA + i; @@ -607,7 +603,12 @@ cw::rc_t cw::websock::create( foreign_loops[0] = p; // pass in the custom poll object as the foreign loop object we will bind to info.foreign_loops = foreign_loops; - p->_q = new MpScNbQueue(); + if((rc = nbmpscq::create(p->_qH,3,4*4096)) != kOkRC ) + { + rc = cwLogError(rc,"Websock queue create failed."); + goto errLabel; + } + p->_cbFunc = cbFunc; p->_cbArg = cbArg; @@ -651,23 +652,33 @@ cw::rc_t cw::websock::destroy( handle_t& h ) cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN ) { rc_t rc = kOkRC; - - uint8_t* mem = mem::allocZ( sizeof(msg_t) + LWS_PRE + byteN ); - - msg_t* m = (msg_t*)mem; - m->msg = mem + sizeof(msg_t); - - memcpy(m->msg+LWS_PRE,msg,byteN); - m->msgByteN = byteN; - m->protocolId = protocolId; - m->sessionId = sessionId; - websock_t* p = _handleToPtr(h); - p->_q->push(m); + + unsigned dataByteN = sizeof(msg_t) + LWS_PRE + byteN; + uint8_t mem[ dataByteN ]; + msg_t* m = (msg_t*)mem; + + memcpy(mem + sizeof(msg_t) + LWS_PRE,msg,byteN); + m->protocolId = protocolId; + m->sessionId = sessionId; + m->msg = nullptr; + m->msgByteN = byteN; // length of msg w/o LWS_PRE + m->msgId = kInvalidId; + m->sessionN = 0; + m->link = nullptr; + + + // put the outgoing msgs on the queue + if((rc = nbmpscq::push(p->_qH,mem,dataByteN)) != kOkRC ) + { + rc = cwLogError(rc,"Websock queue push failed."); + goto errLabel; + } p->_sendMsgCnt += 1; p->_sendMaxByteN = std::max(p->_sendMaxByteN,byteN); +errLabel: return rc; } @@ -704,28 +715,61 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs ) time::spec_t t0 = time::current_time(); - // service any pending websocket activity - with no-timeout + // service any pending websocket activity - with no timeout lws_service_tsi(p->_ctx, -1, 0 ); - msg_t* m; - - // Get the next pending outgoing message. - while((m = p->_q->pop()) != nullptr ) + // clean already sent messages from each protocol outgoing msg list + for(unsigned i=0; i_protocolN-1; ++i) { - auto protocol = _idToProtocol(p,m->protocolId); + protocolState_t* ps = static_cast(p->_protocolA[i].user); + _cleanProtocolStateList( p, ps ); + } - // Get the application protcol record for this message + // Get the next pending outgoing message. + while(1) + { + nbmpscq::blob_t b = nbmpscq::peek(p->_qH); + + // if the outgoing queue is empty + if( b.blob == nullptr ) + break; + + msg_t* m = (msg_t*)b.blob; + + // if msg data ptr is not null then this msg has been seen in an earlier call to this function. + if( m->msg != nullptr ) + break; + + m->msg = (unsigned char*)(m+1); + + // Get the protocol record for this msg. + struct lws_protocols* protocol = _idToProtocol(p,m->protocolId); + + // Get the application protcol state record from the protocol 'user' field protocolState_t* ps = static_cast(protocol->user); // remove messages from the protocol message queue which have already been sent - _cleanProtocolStateList( ps ); - + //_cleanProtocolStateList( p, ps ); + + // Put the msg in the front of the protocal state list (msg's are removed from the back of the list) m->msgId = ps->nextNewMsgId; // set the msg id - ps->begMsg->link = m; // put the msg on the front of the outgoing queue - ps->begMsg = m; // + m->link = nullptr; ps->nextNewMsgId += 1; - - lws_callback_on_writable_all_protocol(p->_ctx,protocol); + + if( ps->begMsg == nullptr ) + { + ps->begMsg = m; + ps->endMsg = m; + } + else + { + ps->begMsg->link = m; + ps->begMsg = m; + } + + // we want one callback for each session + for(unsigned i=0; isessionN; ++i) + lws_callback_on_writable_all_protocol(p->_ctx,protocol); lws_service_tsi(p->_ctx, -1, 0 ); }