diff --git a/cwWebSock.cpp b/cwWebSock.cpp index 0eaf5e9..51ea07d 100644 --- a/cwWebSock.cpp +++ b/cwWebSock.cpp @@ -16,6 +16,7 @@ namespace cw typedef struct msg_str { unsigned protocolId; // Protocol associated with this msg. + unsigned sessionId; // Session Id that this msg should be sent to or kInvalidId if it should be sent to all sessions. unsigned char* msg; // Msg data array. unsigned msgByteN; // Count of bytes in msg[]. unsigned msgId; // The msgId assigned when this msg is addded to the protocol state msg queue. @@ -91,7 +92,7 @@ namespace cw break; case LWS_CALLBACK_ESTABLISHED: - cwLogInfo("Websocket connection opened\n"); + cwLogInfo("Websocket session:%i opened: \n",thisPtr->_nextSessionId); sess->id = thisPtr->_nextSessionId++; sess->protocolId = proto->id; @@ -119,8 +120,6 @@ namespace cw case LWS_CALLBACK_SERVER_WRITEABLE: { - //printf("writable: sess:%i proto:%s\n",sess->id,proto->name); - msg_t* m1 = protoState->endMsg; cwAssert(m1 != nullptr); @@ -129,28 +128,29 @@ namespace cw { msg_t* m = m1->link; - //printf("writing: %i %i : %i %i\n",m->msgId,sess->nextMsgId,m->sessionN,protoState->sessionN); - // if this msg has not already been sent to this session if( m->msgId >= sess->nextMsgId ) { - // Send the msg to this session - // Note: msgByteN should not include LWS_PRE - int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT); - - // if the write failed - if(lws_result < (int)m->msgByteN) + // if the msg sessiond id is not valid or matches this session id ... + if( m->sessionId == kInvalidId || m->sessionId == sess->id ) { - cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result); - return -1; - } - else // otherwise the write succeeded - { - sess->nextMsgId = m->msgId + 1; - m->sessionN += 1; - } + // ... then send the msg to this session + // Note: msgByteN should not include LWS_PRE + int lws_result = lws_write(wsi, m->msg + LWS_PRE , m->msgByteN, LWS_WRITE_TEXT); + // if the write failed + if(lws_result < (int)m->msgByteN) + { + cwLogError(kWriteFailRC,"Websocket error: %d on write", lws_result); + return -1; + } + } + + // at this point the write succeeded or this session was skipped + sess->nextMsgId = m->msgId + 1; + m->sessionN += 1; + break; } @@ -378,7 +378,7 @@ cw::rc_t cw::websock::destroy( handle_t& h ) } -cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, const void* msg, unsigned byteN ) +cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN ) { rc_t rc = kOkRC; @@ -387,6 +387,7 @@ cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, const void* msg, un memcpy(m->msg,msg,byteN); m->msgByteN = byteN; m->protocolId = protocolId; + m->sessionId = sessionId; websock_t* p = _handleToPtr(h); p->_q->push(m); @@ -394,7 +395,7 @@ cw::rc_t cw::websock::send(handle_t h, unsigned protocolId, const void* msg, un return rc; } -cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, const char* fmt, va_list vl0 ) +cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, va_list vl0 ) { rc_t rc = kOkRC; va_list vl1; @@ -405,17 +406,17 @@ cw::rc_t cw::websock::sendV( handle_t h, unsigned protocolId, const char* fmt, v unsigned n = vsnprintf(buf,bufN+1,fmt,vl1); - rc = send(h,protocolId,buf,n); + rc = send(h,protocolId,sessionId,buf,n); va_end(vl1); return rc; } -cw::rc_t cw::websock::sendF( handle_t h, unsigned protocolId, const char* fmt, ... ) +cw::rc_t cw::websock::sendF( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, ... ) { va_list vl; va_start(vl,fmt); - rc_t rc = sendV(h,protocolId,fmt,vl); + rc_t rc = sendV(h,protocolId,sessionId,fmt,vl); va_end(vl); return rc; } diff --git a/cwWebSock.h b/cwWebSock.h index d179686..e87fb04 100644 --- a/cwWebSock.h +++ b/cwWebSock.h @@ -35,7 +35,7 @@ namespace cw { typedef handle handle_t; - typedef void (*cbFunc_t)( void* cbArg, unsigned protocolId, unsigned connectionId, msgTypeId_t msg_type, const void* msg, unsigned byteN ); + typedef void (*cbFunc_t)( void* cbArg, unsigned protocolId, unsigned sessionId, msgTypeId_t msg_type, const void* msg, unsigned byteN ); rc_t create( handle_t& h, @@ -49,10 +49,10 @@ namespace cw rc_t destroy( handle_t& h ); - // - rc_t send( handle_t h, unsigned protocolId, const void* msg, unsigned byteN ); - rc_t sendV( handle_t h, unsigned protocolId, const char* fmt, va_list vl ); - rc_t sendF( handle_t h, unsigned protocolId, const char* fmt, ... ); + // Set 'sessionId' to kInvalid + rc_t send( handle_t h, unsigned protocolId, unsigned sessionId, const void* msg, unsigned byteN ); + rc_t sendV( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, va_list vl ); + rc_t sendF( handle_t h, unsigned protocolId, unsigned sessionId, const char* fmt, ... ); // Call periodically from the same thread to send/recv messages. rc_t exec( handle_t h, unsigned timeOutMs ); diff --git a/cwWebSockSvr.cpp b/cwWebSockSvr.cpp index 4ea0e53..46084dd 100644 --- a/cwWebSockSvr.cpp +++ b/cwWebSockSvr.cpp @@ -134,20 +134,27 @@ namespace cw // Note that this function is called from context of the websockSrv internal thread // and from within the websockExec() call. - void websockCb( void* cbArg, unsigned protocolId, unsigned connectionId, websock::msgTypeId_t msg_type, const void* vmsg, unsigned byteN ) + void websockCb( void* cbArg, unsigned protocolId, unsigned sessionId, websock::msgTypeId_t msg_type, const void* vmsg, unsigned byteN ) { appCtx_t* app = static_cast(cbArg); const char* msg = static_cast(vmsg); - printf("protcol:%i connection:%i type:%i bytes:%i %.*s\n",protocolId,connectionId, msg_type, byteN, byteN, msg); + cwLogInfo("protcol:%i connection:%i type:%i bytes:%i %.*s ",protocolId,sessionId, msg_type, byteN, byteN, msg); if( msg_type == websock::kMessageTId ) { if( textCompare(msg,"quit",4) == 0) app->quitFl = true; - - websock::send(app->wsH, app->protocolId, vmsg, byteN ); + else + if( textCompare(msg,"bcast",5) == 0 ) + { + sessionId = kInvalidId; // send msg to all sessions + vmsg = ((const char*)(vmsg)) + 6; // remove the 'bcast' prefix + byteN -=6; + } + + websock::send(app->wsH, app->protocolId, sessionId, vmsg, byteN ); } @@ -159,10 +166,10 @@ cw::rc_t cw::websockSrvTest() { rc_t rc; websockSrv::handle_t h; - const char* physRootDir = "/home/kevin/src/cw_rt/html/websockSrvTest"; + const char* physRootDir = "/home/kevin/src/cwtest/src/libcw/html/websockSrvTest"; const char* dfltHtmlPageFn = "test_websocket.html"; unsigned timeOutMs = 50; - int port = 7681; + int port = 5687; unsigned rcvBufByteN = 128; unsigned xmtBufByteN = 128; appCtx_t appCtx; @@ -180,6 +187,7 @@ cw::rc_t cw::websockSrvTest() }; unsigned protocolN = sizeof(protocolA)/sizeof(protocolA[0]); + if((rc = websockSrv::create( h, websockCb, &appCtx, physRootDir, dfltHtmlPageFn, port, protocolA, protocolN, timeOutMs )) != kOkRC ) return rc;