cmRtNet.h/c : The rtSubIdx of the owning cmRtSys sub-system is now set in

cmNetAlloc().  rtSubIdx is no longer required to locate a remote endpoint
because IP/"node label" is enough information to determine the rtSubIdx.
Several changes were made to simplify the interface to cmRtNet based on this
observation.
This commit is contained in:
kpl 2014-06-15 14:36:12 -07:00
parent b6e71997f3
commit 10943e18e3
2 changed files with 62 additions and 38 deletions

View File

@ -30,14 +30,15 @@ typedef struct cmRtNetEnd_str
{
cmChar_t* label;
unsigned id;
unsigned rtSubIdx;
struct cmRtNetNode_str* np; // Owner node.
struct cmRtNetEnd_str* link;
} cmRtNetEnd_t;
struct cmRtNet_str;
typedef struct cmRtNetNode_str
{
unsigned rtSubIdx; // rtSubIdx of the sub-system which owns this node
cmChar_t* label; // Node label.
struct sockaddr_in sockaddr; // Socket address
cmChar_t* addr; // IP Address (human readable)
@ -50,10 +51,11 @@ typedef struct cmRtNetNode_str
struct cmRtNetNode_str* link;
} cmRtNetNode_t;
typedef struct
typedef struct cmRtNet_str
{
cmErr_t err; // Error state object
unsigned flags; // See kXXXNetFl above.
unsigned rtSubIdx; // rtSubIdx of the owning sub-system
cmUdpH_t udpH; // UDP port handle
cmUdpCallback_t cbFunc; // Client callback to receive incoming messages from network.
void* cbArg; //
@ -173,7 +175,7 @@ cmRtNetRC_t _cmRtNetReleaseNode( cmRtNet_t* p, cmRtNetNode_t* np )
return cmErrMsg(&p->err,kNodeNotFoundNetRC,"Node to release not found.");
}
cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags, unsigned endPtCnt )
cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, unsigned rtSubIdx, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags, unsigned endPtCnt )
{
cmRtNetRC_t rc = kOkNetRC;
cmRtNetNode_t* np;
@ -190,6 +192,7 @@ cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmCha
if( saddr != NULL )
np->sockaddr = *saddr;
np->rtSubIdx = rtSubIdx;
np->addr = addr==NULL ? NULL : cmMemAllocStr(addr);
np->port = port;
np->flags = flags;
@ -200,11 +203,11 @@ cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmCha
return rc;
}
cmRtNetEnd_t* _cmRtNetFindNodeEnd(cmRtNetNode_t* np, unsigned rtSubIdx, const cmChar_t* endPtLabel )
cmRtNetEnd_t* _cmRtNetFindNodeEnd(cmRtNetNode_t* np, const cmChar_t* endPtLabel )
{
cmRtNetEnd_t* ep = np->ends;
for(; ep!=NULL; ep=ep->link)
if( ep->rtSubIdx==rtSubIdx && strcmp(ep->label,endPtLabel)==0 )
if( strcmp(ep->label,endPtLabel)==0 )
return ep;
return NULL;
}
@ -223,12 +226,12 @@ cmRtNetEnd_t* _cmRtNetIndexToEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned
return NULL;
}
cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned rtSubIdx, const cmChar_t* endPtLabel, unsigned endPtId )
cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, const cmChar_t* endPtLabel, unsigned endPtId )
{
if( endPtLabel == NULL )
return cmErrMsg(&p->err,kInvalidLabelNetRC,"A null or blank node label was encountered.");
if( _cmRtNetFindNodeEnd( np, rtSubIdx, endPtLabel) != NULL)
if( _cmRtNetFindNodeEnd( np, endPtLabel) != NULL)
return cmErrMsg(&p->err,kDuplEndNetRC,"A duplicate endpoint ('%s') was encountered on node '%s'.",endPtLabel,np->label);
cmRtNetRC_t rc = kOkNetRC;
@ -236,7 +239,6 @@ cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned rt
ep->label = cmMemAllocStr(endPtLabel);
ep->id = endPtId;
ep->rtSubIdx = rtSubIdx;
ep->np = np;
ep->link = np->ends;
np->ends = ep;
@ -386,7 +388,7 @@ const cmChar_t* cmRtNetSyncMsgLabel( const cmRtNetSyncMsg_t* m )
return "";
}
cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc, void* cbArg )
cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, unsigned rtSubIdx, cmUdpCallback_t cbFunc, void* cbArg )
{
cmRtNetRC_t rc;
if((rc = cmRtNetFree(hp)) != kOkNetRC )
@ -402,6 +404,7 @@ cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc,
goto errLabel;
}
p->rtSubIdx = rtSubIdx;
p->udpTimeOutMs = 50;
p->udpRecvBufByteCnt = 8192;
p->cbFunc = cbFunc;
@ -478,7 +481,7 @@ cmRtNetRC_t _cmRtNetSendEndpointReplyMsg( cmRtNet_t* p, cmRtNetNode_t* np, cmRt
{
msgLabel = ep->label; // ... send next local endpoint
msgId = ep->id;
msgRtSubIdx = ep->rtSubIdx;
msgRtSubIdx = ep->np->rtSubIdx;
}
else // .... all local endpoints have been sent
{
@ -573,7 +576,7 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data
// create a node proxy to represent the remote node
// (Note:m.id == remote node endpoint count (i.e. the count of endpoints expected for the remote node.))
if(( rc = _cmRtNetCreateNode(p,m.label,NULL,0,fromAddr,0,m.id)) != kOkNetRC )
if(( rc = _cmRtNetCreateNode(p,m.label,m.rtSubIdx,NULL,0,fromAddr,0,m.id)) != kOkNetRC )
goto errLabel;
np = p->nodes; // newest node is always the first node
@ -583,7 +586,7 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data
{
case kHelloSelNetId:
_cmRtNetRpt(p,"rcv hello\n"); // reply with local node
rc = _cmRtNetSendSyncMsg( p, np, kNodeSelNetId, p->localNode->label, p->localNode->endPtCnt, cmInvalidIdx );
rc = _cmRtNetSendSyncMsg( p, np, kNodeSelNetId, p->localNode->label, p->localNode->endPtCnt, p->localNode->rtSubIdx );
break;
case kNodeSelNetId:
@ -615,12 +618,12 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data
}
// attempt to find the end point
if((ep = _cmRtNetFindNodeEnd(np, m.rtSubIdx, m.label)) != NULL )
if((ep = _cmRtNetFindNodeEnd(np, m.label)) != NULL )
ep->id = m.id; // the endpoint was found update the endPtId
else
{
// create a local proxy for the endpoint
if((rc = _cmRtNetCreateEndpoint(p,np,m.rtSubIdx,m.label,m.id)) != kOkNetRC )
if((rc = _cmRtNetCreateEndpoint(p,np,m.label,m.id)) != kOkNetRC )
goto errLabel;
}
@ -675,7 +678,7 @@ cmRtNetRC_t cmRtNetInitialize( cmRtNetH_t h, const cmChar_t* bcastAddr, const cm
}
// create the local node
if((rc = _cmRtNetCreateNode(p,nodeLabel, ipAddr, port, NULL, kLocalNodeNetFl, 0)) != kOkNetRC )
if((rc = _cmRtNetCreateNode(p,nodeLabel, p->rtSubIdx, ipAddr, port, NULL, kLocalNodeNetFl, 0)) != kOkNetRC )
goto errLabel;
// the last created node is always the first node on the list
@ -705,7 +708,7 @@ bool cmRtNetIsInitialized( cmRtNetH_t h )
}
cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, unsigned rtSubIdx, const cmChar_t* endPtLabel, unsigned endPtId )
cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, const cmChar_t* endPtLabel, unsigned endPtId )
{
cmRtNetRC_t rc = kOkNetRC;
cmRtNet_t* p = _cmRtNetHandleToPtr(h);
@ -713,7 +716,7 @@ cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, unsigned rtSubIdx, const cmCh
if( p->localNode == NULL )
return cmErrMsg(&p->err,kLocalNodeNetRC,"Local endpoints may not be added if a local node has not been defined.");
if((rc = _cmRtNetCreateEndpoint(p, p->localNode,rtSubIdx,endPtLabel,endPtId )) == kOkNetRC )
if((rc = _cmRtNetCreateEndpoint(p, p->localNode,endPtLabel,endPtId )) == kOkNetRC )
p->localNode->endPtCnt += 1;
return rc;
@ -750,7 +753,7 @@ cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h )
return rc;
}
cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp )
cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp )
{
cmRtNetRC_t rc = kOkNetRC;
cmRtNet_t* p = _cmRtNetHandleToPtr(h);
@ -761,7 +764,7 @@ cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, unsi
return cmErrMsg(&p->err,kNodeNotFoundNetRC,"The node '%s' was not found.",cmStringNullGuard(nodeLabel));
if(( ep = _cmRtNetFindNodeEnd(np, rtSubIdx, endptLabel )) == NULL )
if(( ep = _cmRtNetFindNodeEnd(np, endptLabel )) == NULL )
return cmErrMsg(&p->err,kEndNotFoundNetRC,"The endpoint '%s' on '%s' on node was not found.",cmStringNullGuard(endptLabel),cmStringNullGuard(nodeLabel));
hp->h = ep;
@ -773,14 +776,19 @@ cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg,
{
cmRtNetRC_t rc = kOkNetRC;
unsigned dN = sizeof(cmRtNetMsg_t) + msgByteCnt;
unsigned hN = sizeof(cmRtNetMsg_t);
unsigned dN = hN + msgByteCnt;
char data[ dN ];
cmRtNetMsg_t* r = (cmRtNetMsg_t*)data;
r->hdr.rtSubIdx = ep->rtSubIdx;
r->hdr.rtSubIdx = ep->np->rtSubIdx;
r->hdr.selId = kMsgSelRtId;
r->endptId = ep->id;
memcpy(data+sizeof(cmRtNetMsg_t),msg,msgByteCnt);
memcpy(data+hN,msg,msgByteCnt);
// ep->np->sockaddr identifies the node on the receiving cmRtNet.
// cmRtNetMsg_t* r.endptId is then used by the receiving cmRtNet to indicate which endpoint on
// the node the incoming message should be associated with.
if( cmUdpSendTo(p->udpH, data, dN, &ep->np->sockaddr ) != kOkUdpRC )
return cmErrMsg(&p->err,kUdpPortFailNetRC,"Send to node:%s endpt:%s failed.\n",cmStringNullGuard(ep->np->label),cmStringNullGuard(ep->label));
@ -798,12 +806,12 @@ cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, uns
}
cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt )
cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt )
{
cmRtNetRC_t rc = kOkNetRC;
cmRtNetEndptH_t epH = cmRtNetEndptNullHandle;
if((rc = cmRtNetEndpointHandle(h,nodeLabel,rtSubIdx,endptLabel,&epH)) != kOkNetRC )
if((rc = cmRtNetEndpointHandle(h,nodeLabel,endptLabel,&epH)) != kOkNetRC )
return rc;
return cmRtNetSend(h,epH,msg,msgByteCnt);
@ -935,7 +943,7 @@ cmRtNetRC_t cmRtNetRemoteNodeEndPoint(
*labelRef = ep->label;
*idRef = ep->id;
*rsiRef = ep->rtSubIdx;
*rsiRef = ep->np->rtSubIdx;
return kOkNetRC;
}
@ -998,7 +1006,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
if( cmThreadCreate(&p->thH,_cmRtNetTestThreadFunc,p,&ctx->rpt) != kOkThRC )
goto errLabel;
if((rc = cmRtNetAlloc(ctx,&p->netH,_cmRtNetTestRecv,p)) != kOkNetRC )
if((rc = cmRtNetAlloc(ctx,&p->netH,rtSubIdx,_cmRtNetTestRecv,p)) != kOkNetRC )
goto errLabel;
cmRtNetReportSyncEnable(p->netH,true); // enable sync. protocol reporting
@ -1006,7 +1014,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
if((rc = cmRtNetInitialize(p->netH, bcastAddr, localHostStr, NULL, port )) != kOkNetRC)
goto errLabel;
if((rc = cmRtNetRegisterEndPoint(p->netH,rtSubIdx,localEndpStr, 0 )) != kOkNetRC )
if((rc = cmRtNetRegisterEndPoint(p->netH,localEndpStr, 0 )) != kOkNetRC )
goto errLabel;
if( cmThreadPause(p->thH,0) != kOkThRC )
@ -1028,7 +1036,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
case 't':
{
if( cmRtNetSendByLabels(p->netH, remoteHostStr, rtSubIdx, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC )
if( cmRtNetSendByLabels(p->netH, remoteHostStr, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC )
p->msgVal += 1;
}

View File

@ -5,6 +5,21 @@
extern "C" {
#endif
/*
Nodes and Endpoints:
---------------------
A node corresponds to a process and owns a socket. It also has a label which is
unique among all other nodes on the network. A node also has a set of application
defined 'endpoints'. Each endpoint has a label and id that is unique among all
other endpoints on the same node. Endpoints on different nodes however may share
use the same label and id. Endpoints are used by remote senders to identify
a particular receiver which is sharing the node with other receivers. Endpoints
are therefore analogous to port numbers on sockets.
See gt/doc/notes.txt for more discussion of cmRtNet.
*/
enum
{
kOkNetRC = cmOkRC,
@ -62,7 +77,8 @@ extern "C" {
// 'cbFunc' will be called within the context of cmRtNetReceive() to receive
// incoming network messages.
cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc, void* cbArg );
// rtSubIdx is the rtSubIdx of the cmRtSys which owns this cmRtNet.
cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, unsigned rtSubIdx, cmUdpCallback_t cbFunc, void* cbArg );
cmRtNetRC_t cmRtNetFree( cmRtNetH_t* hp );
bool cmRtNetIsValid( cmRtNetH_t h );
@ -85,7 +101,7 @@ extern "C" {
// cmRtNetInitialize().
// Remote nodes will be able to send messages to these endpoints by
// referring to (nodeLabel/endPtLabel)
cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, unsigned rtSubIdx, const cmChar_t* endPtLabel, unsigned endPtId );
cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, const cmChar_t* endPtLabel, unsigned endPtId );
// Delete all nodes and endpoints.
cmRtNetRC_t cmRtNetFinalize( cmRtNetH_t h );
@ -100,15 +116,15 @@ extern "C" {
// an cmRtSysMsgHdr_t header (See cmRtSysMsg.h).
cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h );
// Get an end point handle for use with cmRtNetSend.
cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp );
// Get a remote end point handle for use with cmRtNetSend.
cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp );
// Send a message to a remote endpoint.
cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt );
// Send a message to a remote endpoint. This function is a composite
// of cmRtNetEndpointHandle() and cmRtNetSend().
cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt );
cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt );
cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt );