cwWebSock.h/cpp : Added 'sessionId' to send() (and msg_t) to allow a message to be sent to a specific endpoint.

This commit is contained in:
kevin.larke 2020-03-31 12:52:58 -04:00
parent 10b2a36970
commit e0f1e6b948
3 changed files with 44 additions and 35 deletions

View File

@ -16,6 +16,7 @@ namespace cw
typedef struct msg_str typedef struct msg_str
{ {
unsigned protocolId; // Protocol associated with this msg. unsigned protocolId; // Protocol associated with this msg.
unsigned sessionId; // Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions.
unsigned char* msg; // Msg data array. unsigned char* msg; // Msg data array.
unsigned msgByteN; // Count of bytes in msg[]. unsigned msgByteN; // Count of bytes in msg[].
unsigned msgId; // The msgId assigned when this msg is addded to the protocol state msg queue. unsigned msgId; // The msgId assigned when this msg is addded to the protocol state msg queue.
@ -91,7 +92,7 @@ namespace cw
break; break;
case LWS_CALLBACK_ESTABLISHED: case LWS_CALLBACK_ESTABLISHED:
cwLogInfo("Websocket connection opened\n"); cwLogInfo("Websocket session:%i opened: \n",thisPtr->_nextSessionId);
sess->id = thisPtr->_nextSessionId++; sess->id = thisPtr->_nextSessionId++;
sess->protocolId = proto->id; sess->protocolId = proto->id;
@ -119,8 +120,6 @@ namespace cw
case LWS_CALLBACK_SERVER_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE:
{ {
//printf("writable: sess:%i proto:%s\n",sess->id,proto->name);
msg_t* m1 = protoState->endMsg; msg_t* m1 = protoState->endMsg;
cwAssert(m1 != nullptr); cwAssert(m1 != nullptr);
@ -129,28 +128,29 @@ namespace cw
{ {
msg_t* m = m1->link; msg_t* m = m1->link;
//printf("writing: %i %i : %i %i\n",m->msgId,sess->nextMsgId,m->sessionN,protoState->sessionN);
// if this msg has not already been sent to this session // if this msg has not already been sent to this session
if( m->msgId >= sess->nextMsgId ) if( m->msgId >= sess->nextMsgId )
{ {
// Send the msg to this session // if the msg sessiond id is not valid or matches this session id ...
// Note: msgByteN should not include LWS_PRE if( m->sessionId == kInvalidId || m->sessionId == sess->id )
int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT);
// if the write failed
if(lws_result < (int)m->msgByteN)
{ {
cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result); // ... then send the msg to this session
return -1; // Note: msgByteN should not include LWS_PRE
} int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT);
else // otherwise the write succeeded
{
sess->nextMsgId = m->msgId + 1;
m->sessionN += 1;
}
// if the write failed
if(lws_result < (int)m->msgByteN)
{
cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result);
return -1;
}
}
// at this point the write succeeded or this session was skipped
sess->nextMsgId = m->msgId + 1;
m->sessionN += 1;
break; break;
} }
@ -378,7 +378,7 @@ cw::rc_t cw::websock::destroy( handle_t& h )
} }
cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, const void* msg, unsigned byteN ) cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
@ -387,6 +387,7 @@ cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, const void* msg, un
memcpy(m->msg,msg,byteN); memcpy(m->msg,msg,byteN);
m->msgByteN = byteN; m->msgByteN = byteN;
m->protocolId = protocolId; m->protocolId = protocolId;
m->sessionId = sessionId;
websock_t* p = _handleToPtr(h); websock_t* p = _handleToPtr(h);
p->_q->push(m); p->_q->push(m);
@ -394,7 +395,7 @@ cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, const void* msg, un
return rc; return rc;
} }
cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, const char* fmt, va_list vl0 ) cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, va_list vl0 )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
va_list vl1; va_list vl1;
@ -405,17 +406,17 @@ cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, const char* fmt, v
unsigned n = vsnprintf(buf,bufN+1,fmt,vl1); unsigned n = vsnprintf(buf,bufN+1,fmt,vl1);
rc = send(h,protocolId,buf,n); rc = send(h,protocolId,sessionId,buf,n);
va_end(vl1); va_end(vl1);
return rc; return rc;
} }
cw::rc_t cw::websock::sendF( handle_t h, unsigned protocolId, const char* fmt, ... ) cw::rc_t cw::websock::sendF( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, ... )
{ {
va_list vl; va_list vl;
va_start(vl,fmt); va_start(vl,fmt);
rc_t rc = sendV(h,protocolId,fmt,vl); rc_t rc = sendV(h,protocolId,sessionId,fmt,vl);
va_end(vl); va_end(vl);
return rc; return rc;
} }

View File

@ -35,7 +35,7 @@ namespace cw
{ {
typedef handle<struct websock_str> handle_t; typedef handle<struct websock_str> handle_t;
typedef void (*cbFunc_t)( void* cbArg, unsigned protocolId, unsigned connectionId, msgTypeId_t msg_type, const void* msg, unsigned byteN ); typedef void (*cbFunc_t)( void* cbArg, unsigned protocolId, unsigned sessionId, msgTypeId_t msg_type, const void* msg, unsigned byteN );
rc_t create( rc_t create(
handle_t& h, handle_t& h,
@ -49,10 +49,10 @@ namespace cw
rc_t destroy( handle_t& h ); rc_t destroy( handle_t& h );
// // Set 'sessionId' to kInvalid
rc_t send( handle_t h, unsigned protocolId, const void* msg, unsigned byteN ); rc_t send( handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN );
rc_t sendV( handle_t h, unsigned protocolId, const char* fmt, va_list vl ); rc_t sendV( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, va_list vl );
rc_t sendF( handle_t h, unsigned protocolId, const char* fmt, ... ); rc_t sendF( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, ... );
// Call periodically from the same thread to send/recv messages. // Call periodically from the same thread to send/recv messages.
rc_t exec( handle_t h, unsigned timeOutMs ); rc_t exec( handle_t h, unsigned timeOutMs );

View File

@ -134,20 +134,27 @@ namespace cw
// Note that this function is called from context of the websockSrv internal thread // Note that this function is called from context of the websockSrv internal thread
// and from within the websockExec() call. // and from within the websockExec() call.
void websockCb( void* cbArg, unsigned protocolId, unsigned connectionId, websock::msgTypeId_t msg_type, const void* vmsg, unsigned byteN ) void websockCb( void* cbArg, unsigned protocolId, unsigned sessionId, websock::msgTypeId_t msg_type, const void* vmsg, unsigned byteN )
{ {
appCtx_t* app = static_cast<appCtx_t*>(cbArg); appCtx_t* app = static_cast<appCtx_t*>(cbArg);
const char* msg = static_cast<const char*>(vmsg); const char* msg = static_cast<const char*>(vmsg);
printf("protcol:%i connection:%i type:%i bytes:%i %.*s\n",protocolId,connectionId, msg_type, byteN, byteN, msg); cwLogInfo("protcol:%i connection:%i type:%i bytes:%i %.*s ",protocolId,sessionId, msg_type, byteN, byteN, msg);
if( msg_type == websock::kMessageTId ) if( msg_type == websock::kMessageTId )
{ {
if( textCompare(msg,"quit",4) == 0) if( textCompare(msg,"quit",4) == 0)
app->quitFl = true; app->quitFl = true;
else
websock::send(app->wsH, app->protocolId, vmsg, byteN ); if( textCompare(msg,"bcast",5) == 0 )
{
sessionId = kInvalidId; // send msg to all sessions
vmsg = ((const char*)(vmsg)) + 6; // remove the 'bcast' prefix
byteN -=6;
}
websock::send(app->wsH, app->protocolId, sessionId, vmsg, byteN );
} }
@ -159,10 +166,10 @@ cw::rc_t cw::websockSrvTest()
{ {
rc_t rc; rc_t rc;
websockSrv::handle_t h; websockSrv::handle_t h;
const char* physRootDir = "/home/kevin/src/cw_rt/html/websockSrvTest"; const char* physRootDir = "/home/kevin/src/cwtest/src/libcw/html/websockSrvTest";
const char* dfltHtmlPageFn = "test_websocket.html"; const char* dfltHtmlPageFn = "test_websocket.html";
unsigned timeOutMs = 50; unsigned timeOutMs = 50;
int port = 7681; int port = 5687;
unsigned rcvBufByteN = 128; unsigned rcvBufByteN = 128;
unsigned xmtBufByteN = 128; unsigned xmtBufByteN = 128;
appCtx_t appCtx; appCtx_t appCtx;
@ -180,6 +187,7 @@ cw::rc_t cw::websockSrvTest()
}; };
unsigned protocolN = sizeof(protocolA)/sizeof(protocolA[0]); unsigned protocolN = sizeof(protocolA)/sizeof(protocolA[0]);
if((rc = websockSrv::create( h, websockCb, &appCtx, physRootDir, dfltHtmlPageFn, port, protocolA, protocolN, timeOutMs )) != kOkRC ) if((rc = websockSrv::create( h, websockCb, &appCtx, physRootDir, dfltHtmlPageFn, port, protocolA, protocolN, timeOutMs )) != kOkRC )
return rc; return rc;