Merge branch 'master' of gitea.larke.org:kevin/libcw

This commit is contained in:
kevin 2020-11-12 21:18:44 -05:00
commit 9e925e7996

View File

@ -37,6 +37,10 @@ namespace cw
struct lws_http_mount* _mount = nullptr; //
MpScNbQueue<msg_t>* _q; // Thread safe, non-blocking, protocol independent msg queue.
lws_pollfd* _pollfdA; // socket handle array used by poll()
int _pollfdMaxN;
int _pollfdN;
} websock_t;
inline websock_t* _handleToPtr(handle_t h)
@ -61,6 +65,91 @@ namespace cw
unsigned sessionN; // Count of sessions using this protocol.
} protocolState_t;
// This callback is always from protocol 0 which receives messages when a system socket is created,deleted, or changed.
int _httpCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
{
const struct lws_protocols* p = lws_get_protocol(wsi);
if( p == nullptr || p->user == nullptr )
{
cwLogError(kInvalidArgRC,"Invalid protocol record on http websock callback.");
return 0; // TODO: issue a warning
}
protocolState_t* ps = static_cast<protocolState_t*>(p->user);
if( ps == nullptr || ps->thisPtr == nullptr )
{
cwLogError(kInvalidArgRC,"Invalid protocol state record on http websock callback.");
return 0; // TODO: issue a warning
}
websock_t* ws = ps->thisPtr;
switch( reason )
{
// called when libwebsocket opens a new socket
case LWS_CALLBACK_ADD_POLL_FD:
{
lws_pollargs* a = static_cast<lws_pollargs*>(in);
int i = 0;
// find an open slot in the polling array
for(; i < ws->_pollfdN; ++i )
if( ws->_pollfdA[i].fd == LWS_SOCK_INVALID )
break;
// if an open socket was found
if( i == ws->_pollfdMaxN )
{
cwLogError(kResourceNotAvailableRC,"All websocket poll slots are alreadry in use. Proper socket polling will not occur.");
}
else
{
// setup the poll array to be notified of incoming (browser->server) messages.
ws->_pollfdA[ i ].fd = a->fd;
ws->_pollfdA[ i ].events = LWS_POLLIN;
if( i == ws->_pollfdN )
ws->_pollfdN += 1;
}
}
break;
// called when libwebsocket closes a socket
case LWS_CALLBACK_DEL_POLL_FD:
{
lws_pollargs* a = static_cast<lws_pollargs*>(in);
int i = 0;
// locate the socket that is being closed
for(; i<ws->_pollfdN; ++i)
{
if( ws->_pollfdA[i].fd == a->fd )
{
ws->_pollfdA[i].fd = LWS_SOCK_INVALID;
ws->_pollfdA[ i ].events = 0;
break;
}
}
// Note that the libwebsock semms to send this mesg twice for every closed socket.
// This means that the socket has already been removed from pollfdA[] on the second call.
// We therefore don't warn when the socket is not found since it
// will happen on every socket.
}
break;
case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
break;
default:
break;
}
return lws_callback_http_dummy(wsi,reason,user,in,len);
}
int _internalCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
{
@ -68,20 +157,29 @@ namespace cw
const struct lws_protocols* p = lws_get_protocol(wsi);
if( p == nullptr || p->user == nullptr )
return 0; // TODO: issue a warning
{
cwLogError(kInvalidArgRC,"Invalid protocol record on websock callback.");
return 0;
}
protocolState_t* ps = static_cast<protocolState_t*>(p->user);
if( ps == nullptr || ps->thisPtr == nullptr )
return 0; // TODO: issue a warning
{
cwLogError(kInvalidArgRC,"Invalid protocol state record on websock callback.");
return 0;
}
session_t* sess = static_cast<session_t*>(user);
const struct lws_protocols* proto = lws_get_protocol(wsi);
protocolState_t* protoState = static_cast<protocolState_t*>(proto->user);
websock_t* thisPtr = ps->thisPtr;
websock_t* ws = ps->thisPtr;
//char buf[32];
//printf("i: %i %i\n",reason,reason==LWS_CALLBACK_ADD_POLL_FD);
switch( reason )
{
case LWS_CALLBACK_PROTOCOL_INIT:
@ -93,15 +191,15 @@ namespace cw
break;
case LWS_CALLBACK_ESTABLISHED:
cwLogInfo("Websocket session:%i opened: \n",thisPtr->_nextSessionId);
cwLogInfo("Websocket session:%i opened: \n",ws->_nextSessionId);
sess->id = thisPtr->_nextSessionId++;
sess->id = ws->_nextSessionId++;
sess->protocolId = proto->id;
protoState->sessionN += 1;
thisPtr->_connSessionN += 1;
ws->_connSessionN += 1;
if( thisPtr->_cbFunc != nullptr)
thisPtr->_cbFunc(thisPtr->_cbArg, proto->id, sess->id, kConnectTId, nullptr, 0);
if( ws->_cbFunc != nullptr)
ws->_cbFunc(ws->_cbArg, proto->id, sess->id, kConnectTId, nullptr, 0);
//if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0)
// printf("conn:%p %s\n",user,buf);
@ -110,12 +208,12 @@ namespace cw
case LWS_CALLBACK_CLOSED:
cwLogInfo("Websocket connection closed.\n");
thisPtr->_connSessionN -= 1;
ws->_connSessionN -= 1;
protoState->sessionN -= 1;
cwAssert( protoState->sessionN >= 0 );
if( thisPtr->_cbFunc != nullptr)
thisPtr->_cbFunc(thisPtr->_cbArg,proto->id,sess->id,kDisconnectTId,nullptr,0);
if( ws->_cbFunc != nullptr)
ws->_cbFunc(ws->_cbArg,proto->id,sess->id,kDisconnectTId,nullptr,0);
break;
@ -163,10 +261,10 @@ namespace cw
break;
case LWS_CALLBACK_RECEIVE:
//printf("recv: sess:%i proto:%s : %p : len:%li\n",sess->id,proto->name,thisPtr->_cbFunc,len);
//printf("recv: sess:%i proto:%s : %p : len:%li\n",sess->id,proto->name,ws->_cbFunc,len);
if( thisPtr->_cbFunc != nullptr && len>0)
thisPtr->_cbFunc(thisPtr->_cbArg,proto->id,sess->id,kMessageTId,in,len);
if( ws->_cbFunc != nullptr && len>0)
ws->_cbFunc(ws->_cbArg,proto->id,sess->id,kMessageTId,in,len);
break;
@ -271,6 +369,10 @@ namespace cw
mem::release(p->_mount);
}
mem::release(p->_pollfdA);
p->_pollfdMaxN = 0;
p->_pollfdN = 0;
p->_nextSessionId = 0;
p->_connSessionN = 0;
@ -325,7 +427,7 @@ cw::rc_t cw::websock::create(
pr->rx_buffer_size = protocolArgA[i].rcvBufByteN;
pr->tx_packet_size = 0; //protocolArgA[i].xmtBufByteN;
pr->per_session_data_size = sizeof(session_t);
pr->callback = strcmp(pr->name,"http")==0 ? lws_callback_http_dummy : _internalCallback;
pr->callback = strcmp(pr->name,"http")==0 ? _httpCallback : _internalCallback;
pr->user = protocolState; // maintain a ptr to the application protocol state
}
@ -346,6 +448,12 @@ cw::rc_t cw::websock::create(
p->_cbFunc = cbFunc;
p->_cbArg = cbArg;
p->_pollfdMaxN = sysconf(_SC_OPEN_MAX);
p->_pollfdA = mem::allocZ<lws_pollfd>(p->_pollfdMaxN);
p->_pollfdN = 0;
for(int i=0; i<p->_pollfdMaxN; ++i)
p->_pollfdA[i].fd = LWS_SOCK_INVALID;
if((p->_ctx = lws_create_context(&info)) == 0)
{
rc = cwLogError(kObjAllocFailRC,"Unable to create the websocket context.");
@ -427,13 +535,38 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs )
rc_t rc = kOkRC;
websock_t* p = _handleToPtr(h);
// TODO: The return value of lws_service() is undocumented - look at the source code.
lws_service(p->_ctx, timeOutMs );
// TODO: implement the external polling version of lws_service_fd().
// See this: LWS_CALLBACK_ADD_POLL_FD to get the fd of the websocket.
// As of 11/20 this callback is never made - even when the websock library
// is explicitely built with -DLWS_WITH_EXTERNAL_POLL=ON
// Note the libwebsocket/test_apps/test-server.c also is incomplete and does
// not apparently represent a working version of an externally polled server.
// Also see: https://stackoverflow.com/questions/27192071/libwebsockets-for-c-can-i-use-websocket-file-descriptor-with-select
int sysRC = 0;
if( p->_pollfdN > 0 )
{
sysRC = poll(p->_pollfdA, p->_pollfdN, timeOutMs );
}
if( sysRC < 0 )
return cwLogSysError(kReadFailRC,errno,"Poll failed on socket.");
if(sysRC)
{
for(int i = 0; i < p->_pollfdN; i++)
if(p->_pollfdA[i].revents)
lws_service_fd(p->_ctx, p->_pollfdA + i);
}
lws_service_tsi(p->_ctx, -1, 0 );
msg_t* m;
// Get the next pending message.
if((m = p->_q->pop()) != nullptr )
while((m = p->_q->pop()) != nullptr )
{
auto protocol = _idToProtocol(p,m->protocolId);
@ -459,6 +592,8 @@ cw::rc_t cw::websock::exec( handle_t h, unsigned timeOutMs )
lws_callback_on_writable_all_protocol(p->_ctx,protocol);
lws_service_tsi(p->_ctx, -1, 0 );
}