cwWebSock.cpp : Now uses cwNbMpScQueue to cache outgoing messages.

This commit is contained in:
kevin 2024-03-25 13:32:46 -04:00
parent 99871c5bef
commit 693c87d598

View File

@ -1,13 +1,16 @@
#include "cwCommon.h" #include "cwCommon.h"
#include "cwLog.h" #include "cwLog.h"
#include "cwCommonImpl.h" #include "cwCommonImpl.h"
#include "cwMem.h" #include "cwMem.h"
#include "cwFileSys.h" #include "cwFileSys.h"
#include "cwObject.h"
#include "cwWebSock.h" #include "cwWebSock.h"
#include "cwMpScNbQueue.h" #include "cwNbMpScQueue.h"
#include "cwTime.h" #include "cwTime.h"
#include <libwebsockets.h> #include <libwebsockets.h>
#include <memory>
namespace cw namespace cw
{ {
@ -31,15 +34,18 @@ namespace cw
// Internal outgoing msg structure. // Internal outgoing msg structure.
typedef struct msg_str typedef struct msg_str
{ {
unsigned protocolId; // Protocol associated with this msg. unsigned protocolId; // 0 Protocol associated with this msg.
unsigned sessionId; // Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions. unsigned sessionId; // 4 Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions.
unsigned char* msg; // Msg data array. unsigned char* msg; // 8 Msg data array.
unsigned msgByteN; // Count of bytes in msg[]. unsigned msgByteN; // 16 Count of bytes in msg[].
unsigned msgId; // The msgId assigned when this msg is addded to the protocol state msg queue. unsigned msgId; // 20 The msgId assigned when this msg is addded to the protocol state msg queue.
unsigned sessionN; // Count of sessions to which this msg has been sent. unsigned sessionN; // 24 Count of sessions to which this msg has been sent.
struct msg_str* link; // Pointer to next message or nullptr if this is the last msg in the queue. struct msg_str* link; // 28 Pointer to next message or nullptr if this is the last msg in the queue.
unsigned pad; // 36-40 (make size of msg_t a multiple of 8)
} msg_t; } msg_t;
static_assert( sizeof(msg_t) % 8 == 0 );
typedef struct websock_str typedef struct websock_str
{ {
cbFunc_t _cbFunc; // cbFunc_t _cbFunc; //
@ -50,7 +56,7 @@ namespace cw
unsigned _nextSessionId = 0; // Next session id. unsigned _nextSessionId = 0; // Next session id.
unsigned _connSessionN = 0; // Count of connected sessions. unsigned _connSessionN = 0; // Count of connected sessions.
struct lws_http_mount* _mount = nullptr; // struct lws_http_mount* _mount = nullptr; //
MpScNbQueue<msg_t>* _q; // Thread safe, non-blocking, protocol independent msg queue. nbmpscq::handle_t _qH; // Thread safe, non-blocking, protocol independent msg queue.
lws_pollfd* _pollfdA; // socket handle array used by poll() lws_pollfd* _pollfdA; // socket handle array used by poll()
int _pollfdMaxN; int _pollfdMaxN;
@ -184,13 +190,12 @@ namespace cw
case LWS_CALLBACK_SERVER_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE:
{ {
msg_t* m1 = protoState->endMsg; msg_t* m = protoState->endMsg;
cwAssert(m1 != nullptr);
// for each possible msg // for each possible outgoing msg in this protocol state record
while( m1->link != nullptr ) while( m != nullptr )
{ {
msg_t* m = m1->link; bool msg_written_fl = false;
// if this msg has not already been sent to this session // if this msg has not already been sent to this session
if( m->msgId >= sess->nextMsgId ) if( m->msgId >= sess->nextMsgId )
@ -200,6 +205,7 @@ namespace cw
if( m->sessionId == kInvalidId || m->sessionId == sess->id ) if( m->sessionId == kInvalidId || m->sessionId == sess->id )
{ {
// ... then send the msg to this session // ... then send the msg to this session
// Note: msgByteN should not include LWS_PRE // Note: msgByteN should not include LWS_PRE
int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT); int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT);
@ -209,17 +215,25 @@ namespace cw
cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result); cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result);
return -1; return -1;
} }
msg_written_fl = true;
} }
// at this point the write succeeded or this session was skipped // at this point the write succeeded or this session was skipped
sess->nextMsgId = m->msgId + 1; sess->nextMsgId = m->msgId + 1;
// incr the msg session count - once m->sessionN >= protoState->sessionN this msg will be deleted in _cleanProtocoStateList()
m->sessionN += 1; m->sessionN += 1;
break; // If a record was actually sent then we are done since only one
// call to lws_write() per LWS_CALLBACK_SERVER_WRITEABLE callback
// is permitted
if(msg_written_fl)
break;
} }
m1 = m1->link; m = m->link;
} }
} }
@ -255,40 +269,41 @@ namespace cw
return nullptr; return nullptr;
} }
// Remove message from the send queue (p->_qH), and the protocol msg list,
void _cleanProtocolStateList( protocolState_t* ps ) // which have already been sent to all relevant sessions.
rc_t _cleanProtocolStateList( websock_t* p, protocolState_t* ps )
{ {
msg_t* m0 = nullptr; rc_t rc = kOkRC;
msg_t* m1 = ps->endMsg; nbmpscq::blob_t b = get(p->_qH);
while( m1->link != nullptr ) msg_t* oldest_msg = (msg_t*)b.blob;
// if the last protocol end msg has been sent to all sessions ...
while( oldest_msg != nullptr && ps->endMsg != nullptr && ps->endMsg->sessionN >= ps->sessionN )
{ {
if( m1->link->sessionN >= ps->sessionN ) // ... and this msg is also the oldest msg in the send queue
if( oldest_msg->msgId == ps->endMsg->msgId )
{ {
if( m0 == nullptr ) ps->endMsg = ps->endMsg->link;
ps->endMsg = m1->link; b = advance(p->_qH); // ... then pop the msg off the send queue
else oldest_msg = (msg_t*)b.blob;
m0->link = m1->link; }
else // ... otherwise the no further progress can be made
msg_t* t = m1->link; {
break;
//mem::free(m1->msg);
mem::free(m1);
m1 = t;
continue;
} }
m0 = m1;
m1 = m1->link;
} }
}
if( ps->endMsg == nullptr )
ps->begMsg = nullptr;
return rc;
}
rc_t _destroy( websock_t* p ) rc_t _destroy( websock_t* p )
{ {
msg_t* m; rc_t rc = kOkRC;
//msg_t* m;
cwLogInfo("Websock: sent: msgs:%i largest msg:%i - recv: msgs:%i largest msg:%i - LWS_PRE:%i",p->_sendMsgCnt,p->_sendMaxByteN,p->_recvMsgCnt,p->_recvMaxByteN,LWS_PRE); cwLogInfo("Websock: sent: msgs:%i largest msg:%i - recv: msgs:%i largest msg:%i - LWS_PRE:%i",p->_sendMsgCnt,p->_sendMaxByteN,p->_recvMsgCnt,p->_recvMaxByteN,LWS_PRE);
cwLogInfo("Exec Time: %i avg ms, %i total ms, %i cnt", p->_execN==0 ? 0 : p->_execSumMs/p->_execN, p->_execSumMs, p->_execN ); cwLogInfo("Exec Time: %i avg ms, %i total ms, %i cnt", p->_execN==0 ? 0 : p->_execSumMs/p->_execN, p->_execSumMs, p->_execN );
@ -299,18 +314,6 @@ namespace cw
p->_ctx = nullptr; p->_ctx = nullptr;
} }
if( p->_q != nullptr )
{
while((m = p->_q->pop()) != nullptr)
{
//mem::free(m->msg);
mem::free(m);
}
delete p->_q;
}
for(int i=0; p->_protocolA!=nullptr and p->_protocolA[i].callback != nullptr; ++i) for(int i=0; p->_protocolA!=nullptr and p->_protocolA[i].callback != nullptr; ++i)
{ {
mem::free(const_cast<char*>(p->_protocolA[i].name)); mem::free(const_cast<char*>(p->_protocolA[i].name));
@ -318,19 +321,12 @@ namespace cw
// TODO: delete any msgs in the protocol state here // TODO: delete any msgs in the protocol state here
auto ps = static_cast<protocolState_t*>(p->_protocolA[i].user); auto ps = static_cast<protocolState_t*>(p->_protocolA[i].user);
m = ps->endMsg;
while( m != nullptr )
{
msg_t* tmp = m->link;
//mem::free(m->msg);
mem::free(m);
m = tmp;
}
mem::free(ps); mem::free(ps);
} }
if((rc = nbmpscq::destroy(p->_qH)) != kOkRC )
cwLogError(rc,"Websock queue destroy failed.");
mem::release(p->_protocolA); mem::release(p->_protocolA);
p->_protocolN = 0; p->_protocolN = 0;
@ -573,11 +569,11 @@ cw::rc_t cw::websock::create(
{ {
// Allocate the application protocol state array where this application can keep protocol related info // Allocate the application protocol state array where this application can keep protocol related info
auto protocolState = mem::allocZ<protocolState_t>(1); auto protocolState = mem::allocZ<protocolState_t>(1);
auto dummy = mem::allocZ<msg_t>(1); //auto dummy = mem::allocZ<msg_t>(1);
protocolState->thisPtr = p; protocolState->thisPtr = p;
protocolState->begMsg = dummy; protocolState->begMsg = nullptr;
protocolState->endMsg = dummy; protocolState->endMsg = nullptr;
// Setup the interal lws_protocols record // Setup the interal lws_protocols record
struct lws_protocols* pr = p->_protocolA + i; struct lws_protocols* pr = p->_protocolA + i;
@ -607,7 +603,12 @@ cw::rc_t cw::websock::create(
foreign_loops[0] = p; // pass in the custom poll object as the foreign loop object we will bind to foreign_loops[0] = p; // pass in the custom poll object as the foreign loop object we will bind to
info.foreign_loops = foreign_loops; info.foreign_loops = foreign_loops;
p->_q = new MpScNbQueue<msg_t>(); if((rc = nbmpscq::create(p->_qH,3,4*4096)) != kOkRC )
{
rc = cwLogError(rc,"Websock queue create failed.");
goto errLabel;
}
p->_cbFunc = cbFunc; p->_cbFunc = cbFunc;
p->_cbArg = cbArg; p->_cbArg = cbArg;
@ -651,23 +652,33 @@ cw::rc_t cw::websock::destroy( handle_t& h )
cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN ) cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
uint8_t* mem = mem::allocZ<uint8_t>( sizeof(msg_t) + LWS_PRE + byteN );
msg_t* m = (msg_t*)mem;
m->msg = mem + sizeof(msg_t);
memcpy(m->msg+LWS_PRE,msg,byteN);
m->msgByteN = byteN;
m->protocolId = protocolId;
m->sessionId = sessionId;
websock_t* p = _handleToPtr(h); websock_t* p = _handleToPtr(h);
p->_q->push(m);
unsigned dataByteN = sizeof(msg_t) + LWS_PRE + byteN;
uint8_t mem[ dataByteN ];
msg_t* m = (msg_t*)mem;
memcpy(mem + sizeof(msg_t) + LWS_PRE,msg,byteN);
m->protocolId = protocolId;
m->sessionId = sessionId;
m->msg = nullptr;
m->msgByteN = byteN; // length of msg w/o LWS_PRE
m->msgId = kInvalidId;
m->sessionN = 0;
m->link = nullptr;
// put the outgoing msgs on the queue
if((rc = nbmpscq::push(p->_qH,mem,dataByteN)) != kOkRC )
{
rc = cwLogError(rc,"Websock queue push failed.");
goto errLabel;
}
p->_sendMsgCnt += 1; p->_sendMsgCnt += 1;
p->_sendMaxByteN = std::max(p->_sendMaxByteN,byteN); p->_sendMaxByteN = std::max(p->_sendMaxByteN,byteN);
errLabel:
return rc; return rc;
} }
@ -704,28 +715,61 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs )
time::spec_t t0 = time::current_time(); time::spec_t t0 = time::current_time();
// service any pending websocket activity - with no-timeout // service any pending websocket activity - with no timeout
lws_service_tsi(p->_ctx, -1, 0 ); lws_service_tsi(p->_ctx, -1, 0 );
msg_t* m; // clean already sent messages from each protocol outgoing msg list
for(unsigned i=0; i<p->_protocolN-1; ++i)
{
protocolState_t* ps = static_cast<protocolState_t*>(p->_protocolA[i].user);
_cleanProtocolStateList( p, ps );
}
// Get the next pending outgoing message. // Get the next pending outgoing message.
while((m = p->_q->pop()) != nullptr ) while(1)
{ {
auto protocol = _idToProtocol(p,m->protocolId); nbmpscq::blob_t b = nbmpscq::peek(p->_qH);
// Get the application protcol record for this message // if the outgoing queue is empty
if( b.blob == nullptr )
break;
msg_t* m = (msg_t*)b.blob;
// if msg data ptr is not null then this msg has been seen in an earlier call to this function.
if( m->msg != nullptr )
break;
m->msg = (unsigned char*)(m+1);
// Get the protocol record for this msg.
struct lws_protocols* protocol = _idToProtocol(p,m->protocolId);
// Get the application protcol state record from the protocol 'user' field
protocolState_t* ps = static_cast<protocolState_t*>(protocol->user); protocolState_t* ps = static_cast<protocolState_t*>(protocol->user);
// remove messages from the protocol message queue which have already been sent // remove messages from the protocol message queue which have already been sent
_cleanProtocolStateList( ps ); //_cleanProtocolStateList( p, ps );
// Put the msg in the front of the protocal state list (msg's are removed from the back of the list)
m->msgId = ps->nextNewMsgId; // set the msg id m->msgId = ps->nextNewMsgId; // set the msg id
ps->begMsg->link = m; // put the msg on the front of the outgoing queue m->link = nullptr;
ps->begMsg = m; //
ps->nextNewMsgId += 1; ps->nextNewMsgId += 1;
lws_callback_on_writable_all_protocol(p->_ctx,protocol); if( ps->begMsg == nullptr )
{
ps->begMsg = m;
ps->endMsg = m;
}
else
{
ps->begMsg->link = m;
ps->begMsg = m;
}
// we want one callback for each session
for(unsigned i=0; i<ps->sessionN; ++i)
lws_callback_on_writable_all_protocol(p->_ctx,protocol);
lws_service_tsi(p->_ctx, -1, 0 ); lws_service_tsi(p->_ctx, -1, 0 );
} }