|
- #include "cmGlobal.h"
- #include "cmRpt.h"
- #include "cmErr.h"
- #include "cmCtx.h"
- #include "cmMem.h"
- #include "cmMallocDebug.h"
- #include "cmLinkedHeap.h"
- #include "cmJson.h"
- #include "cmThread.h"
- #include "cmUdpPort.h"
- #include "cmUdpNet.h"
- #include "cmVirtNet.h"
- #include "cmText.h" // use by cmVnTest()
-
- enum { kOwnerVnFl = 0x01, kLocalVnFl = 0x02 };
-
- struct cmVn_str;
-
- typedef struct cmVnNode_str
- {
- cmChar_t* label;
- unsigned id;
- unsigned flags;
- cmVnH_t vnH; // handle of a local virtual net that is owned by this node
- cmTsMp1cH_t qH; // MPSC non-blocking queue to hold incoming msg's from local virt. net's
- struct cmVn_str* p; // pointer back to the cmVn_t which owns this NULL
- struct cmVnNode_str* link;
- } cmVnNode_t;
-
- typedef struct cmVn_str
- {
- cmErr_t err;
- cmUdpNetH_t udpH;
- cmVnNode_t* nodes;
- cmVnCb_t cbFunc;
- void* cbArg;
- unsigned ownerNodeId;
- } cmVn_t;
-
- cmVnH_t cmVnNullHandle = cmSTATIC_NULL_HANDLE;
-
- cmVn_t* _cmVnHandleToPtr( cmVnH_t h )
- {
- cmVn_t* p = (cmVn_t*)h.h;
- assert(p != NULL);
- return p;
- }
-
- void _cmVnNodeFree( cmVnNode_t* np )
- {
- if( np == NULL )
- return;
-
- if( cmTsMp1cIsValid(np->qH) )
- cmTsMp1cDestroy(&np->qH);
-
- np->vnH = cmVnNullHandle;
-
- cmMemFree(np->label);
- cmMemFree(np);
- }
-
- cmVnRC_t _cmVnDestroy( cmVn_t* p )
- {
- cmVnRC_t rc = kOkVnRC;
-
- if( cmUdpNetFree(&p->udpH) != kOkUnRC )
- return cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network release failed.");
-
- cmVnNode_t* np = p->nodes;
- while( np != NULL )
- {
- cmVnNode_t* nnp = np->link;
- _cmVnNodeFree(np);
- np = nnp;
- }
-
- cmMemFree(p);
-
- return rc;
- }
-
- // This function is called by cmUdpNetReceive() which is called by cmVnReceive().
- void _cmVnUdpNetCb( void* cbArg, cmUdpNetH_t h, const char* data, unsigned dataByteCnt, unsigned remoteNodeId )
- {
- cmVnNode_t* np = (cmVnNode_t*)cbArg;
- assert( np->id == remoteNodeId );
- np->p->cbFunc(np->p->cbArg,np->id,dataByteCnt,data);
- }
-
-
- cmVnRC_t _cmVnSend( cmVn_t* p, cmVnNode_t* np, unsigned byteCnt, const cmChar_t* buf )
- {
- cmVnRC_t rc = kOkVnRC;
-
- // if the node is local then send the msg directly
- if( cmIsFlag(np->flags,kLocalVnFl) )
- {
- if( cmVnIsValid(np->vnH) )
- rc = cmVnRecvFromLocal(np->vnH,p->ownerNodeId,byteCnt,buf);
- }
- else
- {
- // if the node is remote then send the msg via UDP
- if( cmUdpNetSendById(p->udpH,np->id,buf,byteCnt) != kOkUnRC )
- rc = cmErrMsg( &p->err, kUdpNetFailVnRC, "UDP Net send to remote node '%s' failed.",cmStringNullGuard(np->label));
- }
-
- return rc;
- }
-
- cmVnNode_t* _cmVnNodeFindById( cmVn_t* p, unsigned id )
- {
- cmVnNode_t* np = p->nodes;
- for(; np!=NULL; np=np->link)
- if( np->id == id )
- return np;
-
- return NULL;
- }
-
- cmVnNode_t* _cmVnNodeFindByLabel( cmVn_t* p, const cmChar_t* label )
- {
- cmVnNode_t* np = p->nodes;
- for(; np!=NULL; np=np->link)
- if( strcmp(np->label,label)==0 )
- return np;
-
- return NULL;
- }
-
- cmVnNode_t* _cmVnNodeFindOwner( cmVn_t* p )
- {
- cmVnNode_t* np = p->nodes;
- for(; np!=NULL; np=np->link)
- if( cmIsFlag(np->flags,kOwnerVnFl) )
- return np;
-
- return NULL;
- }
-
- cmVnRC_t _cmVnCreateNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned flags, cmVnNode_t** npp )
- {
- cmVnNode_t* np;
-
- if((np = _cmVnNodeFindById(p,nodeId)) != NULL )
- return cmErrMsg(&p->err,kDuplicateNodeIdVnRC,"The node id=%i is already in use for '%s'.",cmStringNullGuard(np->label));
-
- if((np = _cmVnNodeFindByLabel(p,label)) != NULL )
- return cmErrMsg(&p->err,kDuplicateNodeLabelVnRC,"The node label '%s' is already used by another node.",cmStringNullGuard(label));
-
- np = cmMemAllocZ(cmVnNode_t,1);
- np->label = cmMemAllocStr(label);
- np->id = nodeId;
- np->flags = flags;
- np->p = p;
-
- *npp = np;
-
- return kOkVnRC;
- }
-
-
- void _cmVnNodeLink( cmVn_t* p, cmVnNode_t* np )
- {
- cmVnNode_t* cnp = p->nodes;
- if( cnp == NULL )
- p->nodes = np;
- else
- {
- while( cnp->link != NULL )
- cnp = cnp->link;
-
- cnp->link = np;
- }
- }
-
- cmVnRC_t _cmVnCreateOwnerNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
- {
- cmVnRC_t rc = kOkVnRC;
- cmVnNode_t* np;
-
- // create a generic node
- if((rc = _cmVnCreateNode(p, label,nodeId, kOwnerVnFl, &np )) != kOkVnRC )
- goto errLabel;
-
- // initialize the UDP net with the owner node
- if( cmUdpNetInit(p->udpH,label,nodeId,ipPort,_cmVnUdpNetCb,np,udpRecvBufByteCnt,udpRecvTimeOutMs) != kOkUnRC )
- {
- rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network initialization failed for node:'%s'.",cmStringNullGuard(label));
- goto errLabel;
- }
-
- _cmVnNodeLink(p,np);
-
- p->ownerNodeId = nodeId;
-
- errLabel:
- if( rc != kOkVnRC )
- _cmVnNodeFree(np);
- return rc;
- }
-
- //------------------------------------------------------------------------------------------------------------
-
- cmVnRC_t cmVnCreate( cmCtx_t* ctx, cmVnH_t* hp, cmVnCb_t cbFunc, void* cbArg, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
- {
- cmVnRC_t rc;
- if((rc = cmVnDestroy(hp)) != kOkVnRC )
- return rc;
-
- cmVn_t* p = cmMemAllocZ(cmVn_t,1);
-
- cmErrSetup(&p->err,&ctx->rpt,"cmVirtNet");
-
- if( cmUdpNetAlloc(ctx,&p->udpH) != kOkUnRC )
- {
- rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network allocation failed.");
- goto errLabel;
- }
-
- // create the owner node
- if((rc = _cmVnCreateOwnerNode(p,label,nodeId, udpRecvBufByteCnt, udpRecvTimeOutMs, ipAddr, ipPort )) != kOkVnRC )
- goto errLabel;
-
- p->ownerNodeId = nodeId;
- p->cbFunc = cbFunc;
- p->cbArg = cbArg;
-
- hp->h = p;
-
- errLabel:
-
- if( rc != kOkUdpRC )
- _cmVnDestroy(p);
-
- return rc;
- }
-
- cmVnRC_t cmVnDestroy( cmVnH_t* hp )
- {
- cmVnRC_t rc = kOkVnRC;
-
- if( hp == NULL || cmVnIsValid(*hp)==false )
- return rc;
-
- cmVn_t* p = _cmVnHandleToPtr(*hp);
-
- if((rc = _cmVnDestroy(p)) != kOkVnRC )
- return rc;
-
- hp->h = NULL;
-
- return rc;
- }
-
- bool cmVnIsValid( cmVnH_t h )
- { return h.h != NULL; }
-
- cmVnRC_t cmVnEnableListen( cmVnH_t h, bool enableFl )
- {
- cmVn_t* p = _cmVnHandleToPtr(h);
-
- if( cmUdpNetEnableListen( p->udpH, enableFl ) != kOkUnRC )
- return cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP listen enable failed.");
- return kOkVnRC;
- }
-
- // This function is called by cmTsMp1cDequeueMsg() which is called by cmVnReceive().
- cmRC_t _cmVnTsQueueCb(void* userCbPtr, unsigned msgByteCnt, const void* msgDataPtr )
- {
- cmVnNode_t* np = (cmVnNode_t*)userCbPtr;
- return np->p->cbFunc(np->p->cbArg,np->id,msgByteCnt,msgDataPtr);
- }
-
- cmVnRC_t cmVnCreateLocalNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, cmVnH_t vnH, unsigned queBufByteCnt )
- {
- cmVnRC_t rc = kOkVnRC;
- cmVn_t* p = _cmVnHandleToPtr(h);
- cmVnNode_t* np = NULL;
-
- // create a generic node
- if((rc = _cmVnCreateNode(p,label,nodeId,kLocalVnFl,&np )) != kOkVnRC )
- goto errLabel;
-
- if( cmTsMp1cCreate( &np->qH, queBufByteCnt, _cmVnTsQueueCb, np, p->err.rpt ) != kOkThRC )
- {
- rc = cmErrMsg(&p->err,kQueueFailVnRC,"The internal thread-safe queue creation failed for local node '%s'.",cmStringNullGuard(label));
- goto errLabel;
- }
-
- np->vnH = vnH;
-
- _cmVnNodeLink(p,np);
-
- errLabel:
- if( rc != kOkVnRC )
- _cmVnNodeFree(np);
- return rc;
- }
-
- cmVnRC_t cmVnCreateRemoteNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, const cmChar_t* ipAddr, unsigned ipPort )
- {
- cmVnRC_t rc = kOkVnRC;
- cmVn_t* p = _cmVnHandleToPtr(h);
- cmVnNode_t* np;
-
- // creaet a generic node
- if((rc = _cmVnCreateNode(p, label,nodeId, 0, &np )) != kOkVnRC )
- goto errLabel;
-
- if( ipAddr!=NULL )
- {
- if( cmUdpNetRegisterRemote(p->udpH,label,nodeId,ipAddr,ipPort) != kOkUnRC )
- {
- rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network remote registration failed for node:'%s'.",cmStringNullGuard(label));
- goto errLabel;
- }
- }
-
- _cmVnNodeLink(p,np);
-
- errLabel:
- if( rc != kOkVnRC )
- _cmVnNodeFree(np);
- return rc;
- }
-
- cmVnRC_t cmVnRecvFromLocal( cmVnH_t h, unsigned srcNodeId, unsigned byteCnt, const cmChar_t* buf )
- {
- cmVnRC_t rc = kOkVnRC;
- cmVnNode_t* np;
- cmVn_t* p = _cmVnHandleToPtr(h);
-
- if(( np = _cmVnNodeFindById(p,srcNodeId)) == NULL )
- return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",srcNodeId);
-
- if( cmTsMp1cIsValid(np->qH) == false )
- return cmErrMsg(&p->err,kQueueFailVnRC,"The internal MPSC queue for the node '%s' is not valid. Is this a local node?",cmStringNullGuard(np->label));
-
- if( cmTsMp1cEnqueueMsg(np->qH,buf,byteCnt) != kOkThRC )
- return cmErrMsg(&p->err,kQueueFailVnRC,"Enqueue failed on the internal MPSC queue for the node '%s'.",cmStringNullGuard(np->label));
-
- return rc;
- }
-
- cmVnRC_t cmVnSendById( cmVnH_t h, unsigned nodeId, unsigned byteCnt, const cmChar_t* buf )
- {
- cmVnNode_t* np;
- cmVn_t* p = _cmVnHandleToPtr(h);
-
- if(( np = _cmVnNodeFindById(p,nodeId)) == NULL )
- return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",nodeId);
-
- return _cmVnSend(p,np,byteCnt,buf);
- }
-
- cmVnRC_t cmVnSendByLabel( cmVnH_t h, const cmChar_t* nodeLabel, unsigned byteCnt, const cmChar_t* buf )
- {
- cmVnNode_t* np;
- cmVn_t* p = _cmVnHandleToPtr(h);
-
- if(( np = _cmVnNodeFindByLabel(p,nodeLabel)) == NULL )
- return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node named '%s' could not be found.",cmStringNullGuard(nodeLabel));
-
- return _cmVnSend(p,np,byteCnt,buf);
- }
-
- cmVnRC_t _cmVnRecvQueueMsgs( cmVn_t* p, cmVnNode_t* np, unsigned *msgCntPtr )
- {
- unsigned i;
- unsigned mn = *msgCntPtr;
- *msgCntPtr = 0;
-
- for(i=0; (i<mn || mn==cmInvalidCnt) && cmTsMp1cMsgWaiting(np->qH); ++i)
- {
- // calling this function results in calls to _cmVnTsQueueCb()
- if( cmTsMp1cDequeueMsg( np->qH,NULL,0) != kOkThRC )
- return cmErrMsg(&p->err,kQueueFailVnRC,"Msg deque failed for node '%s'.",cmStringNullGuard(np->label));
- }
-
- *msgCntPtr = i;
-
- return kOkVnRC;
- }
-
- cmVnRC_t cmVnReceive( cmVnH_t h, unsigned* msgCntPtr )
- {
- cmVnRC_t rc = kOkVnRC;
- cmVn_t* p = _cmVnHandleToPtr(h);
- cmVnNode_t* np = p->nodes;
- unsigned mn = msgCntPtr == NULL ? cmInvalidCnt : *msgCntPtr;
-
- if( msgCntPtr != NULL )
- *msgCntPtr = 0;
-
- for(; np!=NULL && rc==kOkVnRC; np=np->link)
- {
- unsigned msgCnt = mn;
- switch( np->flags & (kOwnerVnFl | kLocalVnFl) )
- {
- case kOwnerVnFl:
- break;
-
- case kLocalVnFl:
- rc = _cmVnRecvQueueMsgs(p,np,&msgCnt);
- break;
-
- default:
- if( cmUdpNetReceive(p->udpH,msgCntPtr==NULL?NULL:&msgCnt) != kOkUnRC )
- rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP net receive failed on node '%s'.",cmStringNullGuard(np->label));
- break;
- }
-
- if( rc == kOkVnRC && msgCntPtr != NULL )
- *msgCntPtr += msgCnt;
-
- }
-
- return rc;
- }
-
- //---------------------------------------------------------------------------------------------------
-
- unsigned udpRecvBufByteCnt = 8192;
- unsigned udpRecvTimeOutMs = 100;
- unsigned queueBufByteCnt = 8192;
- void* cbArg = NULL;
-
-
- typedef struct
- {
- const cmChar_t* label;
- unsigned id;
- const cmChar_t* ipAddr;
- unsigned ipPort;
- cmVnH_t vnH;
- } node_t;
-
- cmVnRC_t _cmVnTestCb( void* cbArg, unsigned srcNodeId, unsigned byteCnt, const char* buf )
- {
- printf("src node:%i bytes:%i %s\n",srcNodeId,byteCnt,buf);
- return kOkVnRC;
- }
-
- cmVnRC_t _cmVnTestCreateLocalNet( cmCtx_t* ctx, cmErr_t* err, unsigned id, node_t* nodeArray )
- {
- cmVnRC_t rc = kOkVnRC;
-
- if((rc = cmVnCreate(ctx, &nodeArray[id].vnH, _cmVnTestCb, cbArg, nodeArray[id].label, nodeArray[id].id, udpRecvBufByteCnt, udpRecvTimeOutMs, nodeArray[id].ipAddr, nodeArray[id].ipPort )) != kOkVnRC )
- rc = cmErrMsg(err,rc,"Virtual network create failed.");
-
- return rc;
- }
-
- cmVnRC_t _cmVnTestCreateVirtNodes( cmErr_t* err, unsigned id, node_t* nodeArray )
- {
- unsigned rc = kOkVnRC;
- unsigned i;
-
- for(i=0; nodeArray[i].label != NULL; ++i)
- {
- // if this is a local node
- if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr) == 0 )
- {
- // if this is not the owner node
- if( i != id )
- if((rc = cmVnCreateLocalNode( nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].vnH, queueBufByteCnt )) != kOkVnRC )
- {
- cmErrMsg(err,rc,"Local node create failed for node:'%s'.",nodeArray[i].label);
- goto errLabel;
- }
- }
- else // this must be a remote node
- {
- if((rc = cmVnCreateRemoteNode(nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].ipAddr, nodeArray[i].ipPort )) != kOkVnRC )
- {
- cmErrMsg(err,rc,"Remote node create failed for node '%s'.",nodeArray[i].label);
- goto errLabel;
- }
- }
- }
-
- errLabel:
- return rc;
- }
-
- cmVnRC_t _cmVnTestSend( cmErr_t* err, node_t* nodeArray, unsigned id, unsigned i, unsigned n )
- {
- cmVnRC_t rc = kOkVnRC;
-
- if( id == i )
- {
- printf("A node cannot send to itself.\n");
- return rc;
- }
-
- const cmChar_t* buf = cmTsPrintfS("msg-%i",n);
- unsigned bufByteCnt = strlen(buf)+1;
-
- printf("Sending from '%s' to '%s'.\n", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
-
-
- if((rc = cmVnSendById(nodeArray[id].vnH, nodeArray[i].id, bufByteCnt, buf ))!= kOkVnRC )
- cmErrMsg(err,rc,"Send from '%s' to '%s' failed.", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
-
- return rc;
- }
-
- cmVnRC_t cmVnTest( cmCtx_t* ctx )
- {
- cmVnRC_t rc = kOkVnRC;
- cmErr_t err;
- unsigned id = 0;
- unsigned i;
- unsigned n = 0;
-
- node_t nodeArray[] =
- {
- { "whirl-0", 10, "192.168.15.109", 5768, cmVnNullHandle },
- { "whirl-1", 20, "192.168.15.109", 5767, cmVnNullHandle },
- { "thunk-0", 30, "192.168.15.111", 5766, cmVnNullHandle },
- { NULL, -1, NULL }
- };
-
- cmErrSetup(&err,&ctx->rpt,"cmVnTest");
-
- // create the virt networks for the local nodes
- for(i=0; nodeArray[i].label !=NULL; ++i)
- if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr ) == 0 )
- if((rc = _cmVnTestCreateLocalNet(ctx,&err,i,nodeArray)) != kOkVnRC )
- goto errLabel;
-
- // create the virtual nodes for each local virtual network
- for(i=0; nodeArray[i].label != NULL; ++i)
- if( cmVnIsValid(nodeArray[i].vnH) )
- if((rc = _cmVnTestCreateVirtNodes(&err, i, nodeArray )) != kOkVnRC )
- break;
-
- char c;
- while((c=getchar()) != 'q')
- {
- bool promptFl = true;
-
- switch( c )
- {
- case '0':
- case '1':
- case '2':
- _cmVnTestSend(&err, nodeArray, id, (unsigned)c - (unsigned)'0', n );
- ++n;
- break;
- }
-
- for(i=0; nodeArray[i].label!=NULL; ++i)
- if( cmVnIsValid(nodeArray[i].vnH) )
- cmVnReceive(nodeArray[i].vnH,NULL);
-
- if( promptFl )
- cmRptPrintf(&ctx->rpt,"%i> ",n);
-
- }
-
- errLabel:
- // destroy the virtual networks
- for(i=0; nodeArray[i].label!=NULL; ++i)
- if( cmVnIsValid(nodeArray[i].vnH) )
- if((rc = cmVnDestroy(&nodeArray[i].vnH)) != kOkVnRC )
- cmErrMsg(&err,rc,"Node destroy failed for node '%s'.",cmStringNullGuard(nodeArray[i].label));
-
- return rc;
- }
|