libcm/cmVirtNet.c
2012-10-29 20:52:39 -07:00

573 righe
15 KiB
C

#include "cmGlobal.h"
#include "cmRpt.h"
#include "cmErr.h"
#include "cmCtx.h"
#include "cmMem.h"
#include "cmMallocDebug.h"
#include "cmLinkedHeap.h"
#include "cmJson.h"
#include "cmThread.h"
#include "cmUdpPort.h"
#include "cmUdpNet.h"
#include "cmVirtNet.h"
#include "cmText.h" // use by cmVnTest()
enum { kOwnerVnFl = 0x01, kLocalVnFl = 0x02 };
struct cmVn_str;
typedef struct cmVnNode_str
{
cmChar_t* label;
unsigned id;
unsigned flags;
cmVnH_t vnH; // handle of a local virtual net that is owned by this node
cmTsMp1cH_t qH; // MPSC non-blocking queue to hold incoming msg's from local virt. net's
struct cmVn_str* p; // pointer back to the cmVn_t which owns this NULL
struct cmVnNode_str* link;
} cmVnNode_t;
typedef struct cmVn_str
{
cmErr_t err;
cmUdpNetH_t udpH;
cmVnNode_t* nodes;
cmVnCb_t cbFunc;
void* cbArg;
unsigned ownerNodeId;
} cmVn_t;
cmVnH_t cmVnNullHandle = cmSTATIC_NULL_HANDLE;
cmVn_t* _cmVnHandleToPtr( cmVnH_t h )
{
cmVn_t* p = (cmVn_t*)h.h;
assert(p != NULL);
return p;
}
void _cmVnNodeFree( cmVnNode_t* np )
{
if( np == NULL )
return;
if( cmTsMp1cIsValid(np->qH) )
cmTsMp1cDestroy(&np->qH);
np->vnH = cmVnNullHandle;
cmMemFree(np->label);
cmMemFree(np);
}
cmVnRC_t _cmVnDestroy( cmVn_t* p )
{
cmVnRC_t rc = kOkVnRC;
if( cmUdpNetFree(&p->udpH) != kOkUnRC )
return cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network release failed.");
cmVnNode_t* np = p->nodes;
while( np != NULL )
{
cmVnNode_t* nnp = np->link;
_cmVnNodeFree(np);
np = nnp;
}
cmMemFree(p);
return rc;
}
// This function is called by cmUdpNetReceive() which is called by cmVnReceive().
void _cmVnUdpNetCb( void* cbArg, cmUdpNetH_t h, const char* data, unsigned dataByteCnt, unsigned remoteNodeId )
{
cmVnNode_t* np = (cmVnNode_t*)cbArg;
assert( np->id == remoteNodeId );
np->p->cbFunc(np->p->cbArg,np->id,dataByteCnt,data);
}
cmVnRC_t _cmVnSend( cmVn_t* p, cmVnNode_t* np, unsigned byteCnt, const cmChar_t* buf )
{
cmVnRC_t rc = kOkVnRC;
// if the node is local then send the msg directly
if( cmIsFlag(np->flags,kLocalVnFl) )
{
if( cmVnIsValid(np->vnH) )
rc = cmVnRecvFromLocal(np->vnH,p->ownerNodeId,byteCnt,buf);
}
else
{
// if the node is remote then send the msg via UDP
if( cmUdpNetSendById(p->udpH,np->id,buf,byteCnt) != kOkUnRC )
rc = cmErrMsg( &p->err, kUdpNetFailVnRC, "UDP Net send to remote node '%s' failed.",cmStringNullGuard(np->label));
}
return rc;
}
cmVnNode_t* _cmVnNodeFindById( cmVn_t* p, unsigned id )
{
cmVnNode_t* np = p->nodes;
for(; np!=NULL; np=np->link)
if( np->id == id )
return np;
return NULL;
}
cmVnNode_t* _cmVnNodeFindByLabel( cmVn_t* p, const cmChar_t* label )
{
cmVnNode_t* np = p->nodes;
for(; np!=NULL; np=np->link)
if( strcmp(np->label,label)==0 )
return np;
return NULL;
}
cmVnNode_t* _cmVnNodeFindOwner( cmVn_t* p )
{
cmVnNode_t* np = p->nodes;
for(; np!=NULL; np=np->link)
if( cmIsFlag(np->flags,kOwnerVnFl) )
return np;
return NULL;
}
cmVnRC_t _cmVnCreateNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned flags, cmVnNode_t** npp )
{
cmVnNode_t* np;
if((np = _cmVnNodeFindById(p,nodeId)) != NULL )
return cmErrMsg(&p->err,kDuplicateNodeIdVnRC,"The node id=%i is already in use for '%s'.",cmStringNullGuard(np->label));
if((np = _cmVnNodeFindByLabel(p,label)) != NULL )
return cmErrMsg(&p->err,kDuplicateNodeLabelVnRC,"The node label '%s' is already used by another node.",cmStringNullGuard(label));
np = cmMemAllocZ(cmVnNode_t,1);
np->label = cmMemAllocStr(label);
np->id = nodeId;
np->flags = flags;
np->p = p;
*npp = np;
return kOkVnRC;
}
void _cmVnNodeLink( cmVn_t* p, cmVnNode_t* np )
{
cmVnNode_t* cnp = p->nodes;
if( cnp == NULL )
p->nodes = np;
else
{
while( cnp->link != NULL )
cnp = cnp->link;
cnp->link = np;
}
}
cmVnRC_t _cmVnCreateOwnerNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
{
cmVnRC_t rc = kOkVnRC;
cmVnNode_t* np;
// create a generic node
if((rc = _cmVnCreateNode(p, label,nodeId, kOwnerVnFl, &np )) != kOkVnRC )
goto errLabel;
// initialize the UDP net with the owner node
if( cmUdpNetInit(p->udpH,label,nodeId,ipPort,_cmVnUdpNetCb,np,udpRecvBufByteCnt,udpRecvTimeOutMs) != kOkUnRC )
{
rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network initialization failed for node:'%s'.",cmStringNullGuard(label));
goto errLabel;
}
_cmVnNodeLink(p,np);
p->ownerNodeId = nodeId;
errLabel:
if( rc != kOkVnRC )
_cmVnNodeFree(np);
return rc;
}
//------------------------------------------------------------------------------------------------------------
cmVnRC_t cmVnCreate( cmCtx_t* ctx, cmVnH_t* hp, cmVnCb_t cbFunc, void* cbArg, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
{
cmVnRC_t rc;
if((rc = cmVnDestroy(hp)) != kOkVnRC )
return rc;
cmVn_t* p = cmMemAllocZ(cmVn_t,1);
cmErrSetup(&p->err,&ctx->rpt,"cmVirtNet");
if( cmUdpNetAlloc(ctx,&p->udpH) != kOkUnRC )
{
rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network allocation failed.");
goto errLabel;
}
// create the owner node
if((rc = _cmVnCreateOwnerNode(p,label,nodeId, udpRecvBufByteCnt, udpRecvTimeOutMs, ipAddr, ipPort )) != kOkVnRC )
goto errLabel;
p->ownerNodeId = nodeId;
p->cbFunc = cbFunc;
p->cbArg = cbArg;
hp->h = p;
errLabel:
if( rc != kOkUdpRC )
_cmVnDestroy(p);
return rc;
}
cmVnRC_t cmVnDestroy( cmVnH_t* hp )
{
cmVnRC_t rc = kOkVnRC;
if( hp == NULL || cmVnIsValid(*hp)==false )
return rc;
cmVn_t* p = _cmVnHandleToPtr(*hp);
if((rc = _cmVnDestroy(p)) != kOkVnRC )
return rc;
hp->h = NULL;
return rc;
}
bool cmVnIsValid( cmVnH_t h )
{ return h.h != NULL; }
cmVnRC_t cmVnEnableListen( cmVnH_t h, bool enableFl )
{
cmVn_t* p = _cmVnHandleToPtr(h);
if( cmUdpNetEnableListen( p->udpH, enableFl ) != kOkUnRC )
return cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP listen enable failed.");
return kOkVnRC;
}
// This function is called by cmTsMp1cDequeueMsg() which is called by cmVnReceive().
cmRC_t _cmVnTsQueueCb(void* userCbPtr, unsigned msgByteCnt, const void* msgDataPtr )
{
cmVnNode_t* np = (cmVnNode_t*)userCbPtr;
return np->p->cbFunc(np->p->cbArg,np->id,msgByteCnt,msgDataPtr);
}
cmVnRC_t cmVnCreateLocalNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, cmVnH_t vnH, unsigned queBufByteCnt )
{
cmVnRC_t rc = kOkVnRC;
cmVn_t* p = _cmVnHandleToPtr(h);
cmVnNode_t* np = NULL;
// create a generic node
if((rc = _cmVnCreateNode(p,label,nodeId,kLocalVnFl,&np )) != kOkVnRC )
goto errLabel;
if( cmTsMp1cCreate( &np->qH, queBufByteCnt, _cmVnTsQueueCb, np, p->err.rpt ) != kOkThRC )
{
rc = cmErrMsg(&p->err,kQueueFailVnRC,"The internal thread-safe queue creation failed for local node '%s'.",cmStringNullGuard(label));
goto errLabel;
}
np->vnH = vnH;
_cmVnNodeLink(p,np);
errLabel:
if( rc != kOkVnRC )
_cmVnNodeFree(np);
return rc;
}
cmVnRC_t cmVnCreateRemoteNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, const cmChar_t* ipAddr, unsigned ipPort )
{
cmVnRC_t rc = kOkVnRC;
cmVn_t* p = _cmVnHandleToPtr(h);
cmVnNode_t* np;
// creaet a generic node
if((rc = _cmVnCreateNode(p, label,nodeId, 0, &np )) != kOkVnRC )
goto errLabel;
if( ipAddr!=NULL )
{
if( cmUdpNetRegisterRemote(p->udpH,label,nodeId,ipAddr,ipPort) != kOkUnRC )
{
rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network remote registration failed for node:'%s'.",cmStringNullGuard(label));
goto errLabel;
}
}
_cmVnNodeLink(p,np);
errLabel:
if( rc != kOkVnRC )
_cmVnNodeFree(np);
return rc;
}
cmVnRC_t cmVnRecvFromLocal( cmVnH_t h, unsigned srcNodeId, unsigned byteCnt, const cmChar_t* buf )
{
cmVnRC_t rc = kOkVnRC;
cmVnNode_t* np;
cmVn_t* p = _cmVnHandleToPtr(h);
if(( np = _cmVnNodeFindById(p,srcNodeId)) == NULL )
return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",srcNodeId);
if( cmTsMp1cIsValid(np->qH) == false )
return cmErrMsg(&p->err,kQueueFailVnRC,"The internal MPSC queue for the node '%s' is not valid. Is this a local node?",cmStringNullGuard(np->label));
if( cmTsMp1cEnqueueMsg(np->qH,buf,byteCnt) != kOkThRC )
return cmErrMsg(&p->err,kQueueFailVnRC,"Enqueue failed on the internal MPSC queue for the node '%s'.",cmStringNullGuard(np->label));
return rc;
}
cmVnRC_t cmVnSendById( cmVnH_t h, unsigned nodeId, unsigned byteCnt, const cmChar_t* buf )
{
cmVnNode_t* np;
cmVn_t* p = _cmVnHandleToPtr(h);
if(( np = _cmVnNodeFindById(p,nodeId)) == NULL )
return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",nodeId);
return _cmVnSend(p,np,byteCnt,buf);
}
cmVnRC_t cmVnSendByLabel( cmVnH_t h, const cmChar_t* nodeLabel, unsigned byteCnt, const cmChar_t* buf )
{
cmVnNode_t* np;
cmVn_t* p = _cmVnHandleToPtr(h);
if(( np = _cmVnNodeFindByLabel(p,nodeLabel)) == NULL )
return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node named '%s' could not be found.",cmStringNullGuard(nodeLabel));
return _cmVnSend(p,np,byteCnt,buf);
}
cmVnRC_t _cmVnRecvQueueMsgs( cmVn_t* p, cmVnNode_t* np, unsigned *msgCntPtr )
{
unsigned i;
unsigned mn = *msgCntPtr;
*msgCntPtr = 0;
for(i=0; (i<mn || mn==cmInvalidCnt) && cmTsMp1cMsgWaiting(np->qH); ++i)
{
// calling this function results in calls to _cmVnTsQueueCb()
if( cmTsMp1cDequeueMsg( np->qH,NULL,0) != kOkThRC )
return cmErrMsg(&p->err,kQueueFailVnRC,"Msg deque failed for node '%s'.",cmStringNullGuard(np->label));
}
*msgCntPtr = i;
return kOkVnRC;
}
cmVnRC_t cmVnReceive( cmVnH_t h, unsigned* msgCntPtr )
{
cmVnRC_t rc = kOkVnRC;
cmVn_t* p = _cmVnHandleToPtr(h);
cmVnNode_t* np = p->nodes;
unsigned mn = msgCntPtr == NULL ? cmInvalidCnt : *msgCntPtr;
if( msgCntPtr != NULL )
*msgCntPtr = 0;
for(; np!=NULL && rc==kOkVnRC; np=np->link)
{
unsigned msgCnt = mn;
switch( np->flags & (kOwnerVnFl | kLocalVnFl) )
{
case kOwnerVnFl:
break;
case kLocalVnFl:
rc = _cmVnRecvQueueMsgs(p,np,&msgCnt);
break;
default:
if( cmUdpNetReceive(p->udpH,msgCntPtr==NULL?NULL:&msgCnt) != kOkUnRC )
rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP net receive failed on node '%s'.",cmStringNullGuard(np->label));
break;
}
if( rc == kOkVnRC && msgCntPtr != NULL )
*msgCntPtr += msgCnt;
}
return rc;
}
//---------------------------------------------------------------------------------------------------
unsigned udpRecvBufByteCnt = 8192;
unsigned udpRecvTimeOutMs = 100;
unsigned queueBufByteCnt = 8192;
void* cbArg = NULL;
typedef struct
{
const cmChar_t* label;
unsigned id;
const cmChar_t* ipAddr;
unsigned ipPort;
cmVnH_t vnH;
} node_t;
cmVnRC_t _cmVnTestCb( void* cbArg, unsigned srcNodeId, unsigned byteCnt, const char* buf )
{
printf("src node:%i bytes:%i %s\n",srcNodeId,byteCnt,buf);
return kOkVnRC;
}
cmVnRC_t _cmVnTestCreateLocalNet( cmCtx_t* ctx, cmErr_t* err, unsigned id, node_t* nodeArray )
{
cmVnRC_t rc = kOkVnRC;
if((rc = cmVnCreate(ctx, &nodeArray[id].vnH, _cmVnTestCb, cbArg, nodeArray[id].label, nodeArray[id].id, udpRecvBufByteCnt, udpRecvTimeOutMs, nodeArray[id].ipAddr, nodeArray[id].ipPort )) != kOkVnRC )
rc = cmErrMsg(err,rc,"Virtual network create failed.");
return rc;
}
cmVnRC_t _cmVnTestCreateVirtNodes( cmErr_t* err, unsigned id, node_t* nodeArray )
{
unsigned rc = kOkVnRC;
unsigned i;
for(i=0; nodeArray[i].label != NULL; ++i)
{
// if this is a local node
if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr) == 0 )
{
// if this is not the owner node
if( i != id )
if((rc = cmVnCreateLocalNode( nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].vnH, queueBufByteCnt )) != kOkVnRC )
{
cmErrMsg(err,rc,"Local node create failed for node:'%s'.",nodeArray[i].label);
goto errLabel;
}
}
else // this must be a remote node
{
if((rc = cmVnCreateRemoteNode(nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].ipAddr, nodeArray[i].ipPort )) != kOkVnRC )
{
cmErrMsg(err,rc,"Remote node create failed for node '%s'.",nodeArray[i].label);
goto errLabel;
}
}
}
errLabel:
return rc;
}
cmVnRC_t _cmVnTestSend( cmErr_t* err, node_t* nodeArray, unsigned id, unsigned i, unsigned n )
{
cmVnRC_t rc = kOkVnRC;
if( id == i )
{
printf("A node cannot send to itself.\n");
return rc;
}
const cmChar_t* buf = cmTsPrintfS("msg-%i",n);
unsigned bufByteCnt = strlen(buf)+1;
printf("Sending from '%s' to '%s'.\n", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
if((rc = cmVnSendById(nodeArray[id].vnH, nodeArray[i].id, bufByteCnt, buf ))!= kOkVnRC )
cmErrMsg(err,rc,"Send from '%s' to '%s' failed.", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
return rc;
}
cmVnRC_t cmVnTest( cmCtx_t* ctx )
{
cmVnRC_t rc = kOkVnRC;
cmErr_t err;
unsigned id = 0;
unsigned i;
unsigned n = 0;
node_t nodeArray[] =
{
{ "whirl-0", 10, "192.168.15.109", 5768, cmVnNullHandle },
{ "whirl-1", 20, "192.168.15.109", 5767, cmVnNullHandle },
{ "thunk-0", 30, "192.168.15.111", 5766, cmVnNullHandle },
{ NULL, -1, NULL }
};
cmErrSetup(&err,&ctx->rpt,"cmVnTest");
// create the virt networks for the local nodes
for(i=0; nodeArray[i].label !=NULL; ++i)
if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr ) == 0 )
if((rc = _cmVnTestCreateLocalNet(ctx,&err,i,nodeArray)) != kOkVnRC )
goto errLabel;
// create the virtual nodes for each local virtual network
for(i=0; nodeArray[i].label != NULL; ++i)
if( cmVnIsValid(nodeArray[i].vnH) )
if((rc = _cmVnTestCreateVirtNodes(&err, i, nodeArray )) != kOkVnRC )
break;
char c;
while((c=getchar()) != 'q')
{
bool promptFl = true;
switch( c )
{
case '0':
case '1':
case '2':
_cmVnTestSend(&err, nodeArray, id, (unsigned)c - (unsigned)'0', n );
++n;
break;
}
for(i=0; nodeArray[i].label!=NULL; ++i)
if( cmVnIsValid(nodeArray[i].vnH) )
cmVnReceive(nodeArray[i].vnH,NULL);
if( promptFl )
cmRptPrintf(&ctx->rpt,"%i> ",n);
}
errLabel:
// destroy the virtual networks
for(i=0; nodeArray[i].label!=NULL; ++i)
if( cmVnIsValid(nodeArray[i].vnH) )
if((rc = cmVnDestroy(&nodeArray[i].vnH)) != kOkVnRC )
cmErrMsg(&err,rc,"Node destroy failed for node '%s'.",cmStringNullGuard(nodeArray[i].label));
return rc;
}