From 10943e18e3929939d3143a8181b97a5e0efd2b7e Mon Sep 17 00:00:00 2001 From: kpl Date: Sun, 15 Jun 2014 14:36:12 -0700 Subject: [PATCH] cmRtNet.h/c : The rtSubIdx of the owning cmRtSys sub-system is now set in cmNetAlloc(). rtSubIdx is no longer required to locate a remote endpoint because IP/"node label" is enough information to determine the rtSubIdx. Several changes were made to simplify the interface to cmRtNet based on this observation. --- cmRtNet.c | 74 ++++++++++++++++++++++++++++++------------------------- cmRtNet.h | 26 +++++++++++++++---- 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/cmRtNet.c b/cmRtNet.c index 21dee99..95ec277 100644 --- a/cmRtNet.c +++ b/cmRtNet.c @@ -30,14 +30,15 @@ typedef struct cmRtNetEnd_str { cmChar_t* label; unsigned id; - unsigned rtSubIdx; struct cmRtNetNode_str* np; // Owner node. struct cmRtNetEnd_str* link; } cmRtNetEnd_t; +struct cmRtNet_str; typedef struct cmRtNetNode_str { + unsigned rtSubIdx; // rtSubIdx of the sub-system which owns this node cmChar_t* label; // Node label. struct sockaddr_in sockaddr; // Socket address cmChar_t* addr; // IP Address (human readable) @@ -50,10 +51,11 @@ typedef struct cmRtNetNode_str struct cmRtNetNode_str* link; } cmRtNetNode_t; -typedef struct +typedef struct cmRtNet_str { cmErr_t err; // Error state object unsigned flags; // See kXXXNetFl above. + unsigned rtSubIdx; // rtSubIdx of the owning sub-system cmUdpH_t udpH; // UDP port handle cmUdpCallback_t cbFunc; // Client callback to receive incoming messages from network. void* cbArg; // @@ -173,7 +175,7 @@ cmRtNetRC_t _cmRtNetReleaseNode( cmRtNet_t* p, cmRtNetNode_t* np ) return cmErrMsg(&p->err,kNodeNotFoundNetRC,"Node to release not found."); } -cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags, unsigned endPtCnt ) +cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, unsigned rtSubIdx, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags, unsigned endPtCnt ) { cmRtNetRC_t rc = kOkNetRC; cmRtNetNode_t* np; @@ -190,6 +192,7 @@ cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmCha if( saddr != NULL ) np->sockaddr = *saddr; + np->rtSubIdx = rtSubIdx; np->addr = addr==NULL ? NULL : cmMemAllocStr(addr); np->port = port; np->flags = flags; @@ -200,11 +203,11 @@ cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmCha return rc; } -cmRtNetEnd_t* _cmRtNetFindNodeEnd(cmRtNetNode_t* np, unsigned rtSubIdx, const cmChar_t* endPtLabel ) +cmRtNetEnd_t* _cmRtNetFindNodeEnd(cmRtNetNode_t* np, const cmChar_t* endPtLabel ) { cmRtNetEnd_t* ep = np->ends; for(; ep!=NULL; ep=ep->link) - if( ep->rtSubIdx==rtSubIdx && strcmp(ep->label,endPtLabel)==0 ) + if( strcmp(ep->label,endPtLabel)==0 ) return ep; return NULL; } @@ -223,12 +226,12 @@ cmRtNetEnd_t* _cmRtNetIndexToEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned return NULL; } -cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned rtSubIdx, const cmChar_t* endPtLabel, unsigned endPtId ) +cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, const cmChar_t* endPtLabel, unsigned endPtId ) { if( endPtLabel == NULL ) return cmErrMsg(&p->err,kInvalidLabelNetRC,"A null or blank node label was encountered."); - if( _cmRtNetFindNodeEnd( np, rtSubIdx, endPtLabel) != NULL) + if( _cmRtNetFindNodeEnd( np, endPtLabel) != NULL) return cmErrMsg(&p->err,kDuplEndNetRC,"A duplicate endpoint ('%s') was encountered on node '%s'.",endPtLabel,np->label); cmRtNetRC_t rc = kOkNetRC; @@ -236,7 +239,6 @@ cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned rt ep->label = cmMemAllocStr(endPtLabel); ep->id = endPtId; - ep->rtSubIdx = rtSubIdx; ep->np = np; ep->link = np->ends; np->ends = ep; @@ -386,7 +388,7 @@ const cmChar_t* cmRtNetSyncMsgLabel( const cmRtNetSyncMsg_t* m ) return ""; } -cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc, void* cbArg ) +cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, unsigned rtSubIdx, cmUdpCallback_t cbFunc, void* cbArg ) { cmRtNetRC_t rc; if((rc = cmRtNetFree(hp)) != kOkNetRC ) @@ -402,10 +404,11 @@ cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc, goto errLabel; } - p->udpTimeOutMs = 50; - p->udpRecvBufByteCnt = 8192; - p->cbFunc = cbFunc; - p->cbArg = cbArg; + p->rtSubIdx = rtSubIdx; + p->udpTimeOutMs = 50; + p->udpRecvBufByteCnt = 8192; + p->cbFunc = cbFunc; + p->cbArg = cbArg; hp->h = p; @@ -478,7 +481,7 @@ cmRtNetRC_t _cmRtNetSendEndpointReplyMsg( cmRtNet_t* p, cmRtNetNode_t* np, cmRt { msgLabel = ep->label; // ... send next local endpoint msgId = ep->id; - msgRtSubIdx = ep->rtSubIdx; + msgRtSubIdx = ep->np->rtSubIdx; } else // .... all local endpoints have been sent { @@ -573,7 +576,7 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data // create a node proxy to represent the remote node // (Note:m.id == remote node endpoint count (i.e. the count of endpoints expected for the remote node.)) - if(( rc = _cmRtNetCreateNode(p,m.label,NULL,0,fromAddr,0,m.id)) != kOkNetRC ) + if(( rc = _cmRtNetCreateNode(p,m.label,m.rtSubIdx,NULL,0,fromAddr,0,m.id)) != kOkNetRC ) goto errLabel; np = p->nodes; // newest node is always the first node @@ -583,7 +586,7 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data { case kHelloSelNetId: _cmRtNetRpt(p,"rcv hello\n"); // reply with local node - rc = _cmRtNetSendSyncMsg( p, np, kNodeSelNetId, p->localNode->label, p->localNode->endPtCnt, cmInvalidIdx ); + rc = _cmRtNetSendSyncMsg( p, np, kNodeSelNetId, p->localNode->label, p->localNode->endPtCnt, p->localNode->rtSubIdx ); break; case kNodeSelNetId: @@ -615,12 +618,12 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data } // attempt to find the end point - if((ep = _cmRtNetFindNodeEnd(np, m.rtSubIdx, m.label)) != NULL ) + if((ep = _cmRtNetFindNodeEnd(np, m.label)) != NULL ) ep->id = m.id; // the endpoint was found update the endPtId else { // create a local proxy for the endpoint - if((rc = _cmRtNetCreateEndpoint(p,np,m.rtSubIdx,m.label,m.id)) != kOkNetRC ) + if((rc = _cmRtNetCreateEndpoint(p,np,m.label,m.id)) != kOkNetRC ) goto errLabel; } @@ -675,7 +678,7 @@ cmRtNetRC_t cmRtNetInitialize( cmRtNetH_t h, const cmChar_t* bcastAddr, const cm } // create the local node - if((rc = _cmRtNetCreateNode(p,nodeLabel, ipAddr, port, NULL, kLocalNodeNetFl, 0)) != kOkNetRC ) + if((rc = _cmRtNetCreateNode(p,nodeLabel, p->rtSubIdx, ipAddr, port, NULL, kLocalNodeNetFl, 0)) != kOkNetRC ) goto errLabel; // the last created node is always the first node on the list @@ -705,7 +708,7 @@ bool cmRtNetIsInitialized( cmRtNetH_t h ) } -cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, unsigned rtSubIdx, const cmChar_t* endPtLabel, unsigned endPtId ) +cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, const cmChar_t* endPtLabel, unsigned endPtId ) { cmRtNetRC_t rc = kOkNetRC; cmRtNet_t* p = _cmRtNetHandleToPtr(h); @@ -713,7 +716,7 @@ cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, unsigned rtSubIdx, const cmCh if( p->localNode == NULL ) return cmErrMsg(&p->err,kLocalNodeNetRC,"Local endpoints may not be added if a local node has not been defined."); - if((rc = _cmRtNetCreateEndpoint(p, p->localNode,rtSubIdx,endPtLabel,endPtId )) == kOkNetRC ) + if((rc = _cmRtNetCreateEndpoint(p, p->localNode,endPtLabel,endPtId )) == kOkNetRC ) p->localNode->endPtCnt += 1; return rc; @@ -750,7 +753,7 @@ cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h ) return rc; } -cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ) +cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ) { cmRtNetRC_t rc = kOkNetRC; cmRtNet_t* p = _cmRtNetHandleToPtr(h); @@ -761,7 +764,7 @@ cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, unsi return cmErrMsg(&p->err,kNodeNotFoundNetRC,"The node '%s' was not found.",cmStringNullGuard(nodeLabel)); - if(( ep = _cmRtNetFindNodeEnd(np, rtSubIdx, endptLabel )) == NULL ) + if(( ep = _cmRtNetFindNodeEnd(np, endptLabel )) == NULL ) return cmErrMsg(&p->err,kEndNotFoundNetRC,"The endpoint '%s' on '%s' on node was not found.",cmStringNullGuard(endptLabel),cmStringNullGuard(nodeLabel)); hp->h = ep; @@ -773,14 +776,19 @@ cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg, { cmRtNetRC_t rc = kOkNetRC; - unsigned dN = sizeof(cmRtNetMsg_t) + msgByteCnt; + unsigned hN = sizeof(cmRtNetMsg_t); + unsigned dN = hN + msgByteCnt; char data[ dN ]; cmRtNetMsg_t* r = (cmRtNetMsg_t*)data; - r->hdr.rtSubIdx = ep->rtSubIdx; + r->hdr.rtSubIdx = ep->np->rtSubIdx; r->hdr.selId = kMsgSelRtId; - r->endptId = ep->id; - memcpy(data+sizeof(cmRtNetMsg_t),msg,msgByteCnt); + r->endptId = ep->id; + memcpy(data+hN,msg,msgByteCnt); + + // ep->np->sockaddr identifies the node on the receiving cmRtNet. + // cmRtNetMsg_t* r.endptId is then used by the receiving cmRtNet to indicate which endpoint on + // the node the incoming message should be associated with. if( cmUdpSendTo(p->udpH, data, dN, &ep->np->sockaddr ) != kOkUdpRC ) return cmErrMsg(&p->err,kUdpPortFailNetRC,"Send to node:%s endpt:%s failed.\n",cmStringNullGuard(ep->np->label),cmStringNullGuard(ep->label)); @@ -798,12 +806,12 @@ cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, uns } -cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ) +cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ) { cmRtNetRC_t rc = kOkNetRC; cmRtNetEndptH_t epH = cmRtNetEndptNullHandle; - if((rc = cmRtNetEndpointHandle(h,nodeLabel,rtSubIdx,endptLabel,&epH)) != kOkNetRC ) + if((rc = cmRtNetEndpointHandle(h,nodeLabel,endptLabel,&epH)) != kOkNetRC ) return rc; return cmRtNetSend(h,epH,msg,msgByteCnt); @@ -935,7 +943,7 @@ cmRtNetRC_t cmRtNetRemoteNodeEndPoint( *labelRef = ep->label; *idRef = ep->id; - *rsiRef = ep->rtSubIdx; + *rsiRef = ep->np->rtSubIdx; return kOkNetRC; } @@ -998,7 +1006,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) if( cmThreadCreate(&p->thH,_cmRtNetTestThreadFunc,p,&ctx->rpt) != kOkThRC ) goto errLabel; - if((rc = cmRtNetAlloc(ctx,&p->netH,_cmRtNetTestRecv,p)) != kOkNetRC ) + if((rc = cmRtNetAlloc(ctx,&p->netH,rtSubIdx,_cmRtNetTestRecv,p)) != kOkNetRC ) goto errLabel; cmRtNetReportSyncEnable(p->netH,true); // enable sync. protocol reporting @@ -1006,7 +1014,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) if((rc = cmRtNetInitialize(p->netH, bcastAddr, localHostStr, NULL, port )) != kOkNetRC) goto errLabel; - if((rc = cmRtNetRegisterEndPoint(p->netH,rtSubIdx,localEndpStr, 0 )) != kOkNetRC ) + if((rc = cmRtNetRegisterEndPoint(p->netH,localEndpStr, 0 )) != kOkNetRC ) goto errLabel; if( cmThreadPause(p->thH,0) != kOkThRC ) @@ -1028,7 +1036,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) case 't': { - if( cmRtNetSendByLabels(p->netH, remoteHostStr, rtSubIdx, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC ) + if( cmRtNetSendByLabels(p->netH, remoteHostStr, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC ) p->msgVal += 1; } diff --git a/cmRtNet.h b/cmRtNet.h index 397725e..5985081 100644 --- a/cmRtNet.h +++ b/cmRtNet.h @@ -5,6 +5,21 @@ extern "C" { #endif + /* + Nodes and Endpoints: + --------------------- + A node corresponds to a process and owns a socket. It also has a label which is + unique among all other nodes on the network. A node also has a set of application + defined 'endpoints'. Each endpoint has a label and id that is unique among all + other endpoints on the same node. Endpoints on different nodes however may share + use the same label and id. Endpoints are used by remote senders to identify + a particular receiver which is sharing the node with other receivers. Endpoints + are therefore analogous to port numbers on sockets. + + See gt/doc/notes.txt for more discussion of cmRtNet. + + */ + enum { kOkNetRC = cmOkRC, @@ -62,7 +77,8 @@ extern "C" { // 'cbFunc' will be called within the context of cmRtNetReceive() to receive // incoming network messages. - cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc, void* cbArg ); + // rtSubIdx is the rtSubIdx of the cmRtSys which owns this cmRtNet. + cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, unsigned rtSubIdx, cmUdpCallback_t cbFunc, void* cbArg ); cmRtNetRC_t cmRtNetFree( cmRtNetH_t* hp ); bool cmRtNetIsValid( cmRtNetH_t h ); @@ -85,7 +101,7 @@ extern "C" { // cmRtNetInitialize(). // Remote nodes will be able to send messages to these endpoints by // referring to (nodeLabel/endPtLabel) - cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, unsigned rtSubIdx, const cmChar_t* endPtLabel, unsigned endPtId ); + cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, const cmChar_t* endPtLabel, unsigned endPtId ); // Delete all nodes and endpoints. cmRtNetRC_t cmRtNetFinalize( cmRtNetH_t h ); @@ -100,15 +116,15 @@ extern "C" { // an cmRtSysMsgHdr_t header (See cmRtSysMsg.h). cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h ); - // Get an end point handle for use with cmRtNetSend. - cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ); + // Get a remote end point handle for use with cmRtNetSend. + cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ); // Send a message to a remote endpoint. cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ); // Send a message to a remote endpoint. This function is a composite // of cmRtNetEndpointHandle() and cmRtNetSend(). - cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, unsigned rtSubIdx, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ); + cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ); cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt );