diff --git a/cmRtNet.c b/cmRtNet.c index 95ec277..a23a7a5 100644 --- a/cmRtNet.c +++ b/cmRtNet.c @@ -76,6 +76,13 @@ cmRtNet_t* _cmRtNetHandleToPtr( cmRtNetH_t h ) return p; } +cmRtNetEnd_t* _cmRtNetEndptHandleToPtr( cmRtNetEndptH_t h ) +{ + cmRtNetEnd_t* p = (cmRtNetEnd_t*)h.h; + assert( p != NULL ); + return p; +} + void _cmRtNetVRpt( cmRtNet_t* p, const cmChar_t* fmt, va_list vl ) { if( cmIsFlag(p->flags,kReportSyncNetFl) ) @@ -643,14 +650,38 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data return rc; } +unsigned _cmRtNetAddrToNodeIndex( cmRtNet_t* p, const struct sockaddr_in* addr ) +{ + unsigned i; + cmRtNetNode_t* np = p->nodes; + for(i=0; np!=NULL; np=np->link,++i) + if( cmUdpAddrIsEqual( addr, &np->sockaddr ) ) + return i; + + return cmInvalidIdx; +} + + // This is called in the context of cmRtNetReceive(). void _cmRtNetRecv( void* cbArg, const char* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr ) { cmRtNet_t* p = (cmRtNet_t*)cbArg; + // if this is a sync msg - then handle it here if( _cmRtNetIsSyncModeMsg(data,dataByteCnt)) _cmRtNetSyncModeRecv(p,data,dataByteCnt,fromAddr); - + else + { + // All non-sync messages arriving here are prefixed by a cmRtNetMsg_t header - fill in the source addr here. + + // NOTE: the source addr could be filled in by the sender but this would increase the size + // of the msg. Instead we choose the more time consuming method of looking up the + // soure node here - hmmmm????. + + cmRtNetMsg_t* hdr = (cmRtNetMsg_t*)(data); + hdr->srcNodeIdx = _cmRtNetAddrToNodeIndex(p,fromAddr); + } + p->cbFunc(p->cbArg,data,dataByteCnt,fromAddr); } @@ -753,6 +784,13 @@ cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h ) return rc; } +unsigned cmRtNetAddrToNodeIndex( cmRtNetH_t h, const struct sockaddr_in* a ) +{ + cmRtNet_t* p = _cmRtNetHandleToPtr(h); + return _cmRtNetAddrToNodeIndex(p,a); +} + + cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ) { cmRtNetRC_t rc = kOkNetRC; @@ -772,7 +810,29 @@ cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, cons return rc; } -cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg, unsigned msgByteCnt ) +bool cmRtNetEndpointIsValid( cmRtNetEndptH_t endPtH ) +{ return endPtH.h != NULL; } + +unsigned cmRtNetEndpointId( cmRtNetEndptH_t endPtH ) +{ + if( !cmRtNetEndpointIsValid(endPtH) ) + return cmInvalidId; + + cmRtNetEnd_t* ep = _cmRtNetEndptHandleToPtr( endPtH ); + return ep->id; +} + +const cmChar_t* cmRtNetEndpointLabel( cmRtNetEndptH_t endPtH ) +{ + if( !cmRtNetEndpointIsValid(endPtH) ) + return NULL; + + cmRtNetEnd_t* ep = _cmRtNetEndptHandleToPtr( endPtH ); + return ep->label; +} + + +cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, unsigned srcEndPtId, const cmRtNetEnd_t* ep, const void* msg, unsigned msgByteCnt ) { cmRtNetRC_t rc = kOkNetRC; @@ -783,7 +843,8 @@ cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg, cmRtNetMsg_t* r = (cmRtNetMsg_t*)data; r->hdr.rtSubIdx = ep->np->rtSubIdx; r->hdr.selId = kMsgSelRtId; - r->endptId = ep->id; + r->dstEndPtId = ep->id; + r->srcEndPtId = srcEndPtId; memcpy(data+hN,msg,msgByteCnt); // ep->np->sockaddr identifies the node on the receiving cmRtNet. @@ -796,17 +857,17 @@ cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg, return rc; } -cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ) +cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, unsigned srcEndPtId, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ) { cmRtNet_t* p = _cmRtNetHandleToPtr(h); - cmRtNetEnd_t* ep = (cmRtNetEnd_t*)epH.h; + cmRtNetEnd_t* ep = _cmRtNetEndptHandleToPtr(epH); assert( ep != NULL ); - return _cmRtNetSend(p,ep,msg,msgByteCnt); + return _cmRtNetSend(p,srcEndPtId,ep,msg,msgByteCnt); } -cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ) +cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, unsigned srcEndPtId, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ) { cmRtNetRC_t rc = kOkNetRC; cmRtNetEndptH_t epH = cmRtNetEndptNullHandle; @@ -814,10 +875,10 @@ cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const if((rc = cmRtNetEndpointHandle(h,nodeLabel,endptLabel,&epH)) != kOkNetRC ) return rc; - return cmRtNetSend(h,epH,msg,msgByteCnt); + return cmRtNetSend(h,srcEndPtId,epH,msg,msgByteCnt); } -cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt ) +cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned srcEndPtId, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt ) { cmRtNet_t* p = _cmRtNetHandleToPtr(h); @@ -826,7 +887,7 @@ cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptId if((ep = _cmRtNetFindEndpt(p, nodeIdx, endptIdx )) == NULL ) return cmErrMsg(&p->err,kEndNotFoundNetRC,"The endpoint at node index %i endpoint index %i was not found.",nodeIdx,endptIdx); - return _cmRtNetSend( p, ep, msg, msgByteCnt ); + return _cmRtNetSend( p, srcEndPtId, ep, msg, msgByteCnt ); } @@ -968,7 +1029,7 @@ void _cmRtNetTestRecv( void* cbArg, const char* data, unsigned dataByteCnt, cons cmRtNetMsg_t* r = (cmRtNetMsg_t*)data; unsigned i = *(unsigned*)(data + sizeof(cmRtNetMsg_t)); - printf("rtSubIdx:%i endptId:%i %i\n",r->hdr.rtSubIdx,r->endptId,i); + printf("rtSubIdx:%i endptId:%i %i\n",r->hdr.rtSubIdx,r->dstEndPtId,i); } @@ -1000,7 +1061,9 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) const cmChar_t* remoteHostStr = !mstrFl ? "master" : "slave"; const cmChar_t* remoteEndpStr = !mstrFl ? "master_ep" : "slave_ep"; const cmChar_t* bcastAddr = "192.168.15.255"; - + cmRtNetEndptH_t eH = cmRtNetEndptNullHandle; + unsigned srcEndPtId = cmInvalidId; + memset(&t,0,sizeof(t)); if( cmThreadCreate(&p->thH,_cmRtNetTestThreadFunc,p,&ctx->rpt) != kOkThRC ) @@ -1016,6 +1079,12 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) if((rc = cmRtNetRegisterEndPoint(p->netH,localEndpStr, 0 )) != kOkNetRC ) goto errLabel; + + if((rc = cmRtNetEndpointHandle(p->netH, localHostStr, localEndpStr, &eH )) != kOkNetRC ) + goto errLabel; + + if((srcEndPtId = cmRtNetEndpointId(eH)) == cmInvalidIdx ) + goto errLabel; if( cmThreadPause(p->thH,0) != kOkThRC ) goto errLabel; @@ -1036,7 +1105,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) case 't': { - if( cmRtNetSendByLabels(p->netH, remoteHostStr, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC ) + if( cmRtNetSendByLabels(p->netH, srcEndPtId, remoteHostStr, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC ) p->msgVal += 1; } diff --git a/cmRtNet.h b/cmRtNet.h index 5985081..2b091e7 100644 --- a/cmRtNet.h +++ b/cmRtNet.h @@ -116,17 +116,29 @@ extern "C" { // an cmRtSysMsgHdr_t header (See cmRtSysMsg.h). cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h ); + // Return the index of the node associated with sockaddr_in. + unsigned cmRtNetAddrToNodeIndex( cmRtNetH_t h, const struct sockaddr_in* a ); + // Get a remote end point handle for use with cmRtNetSend. cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ); + bool cmRtNetEndpointIsValid( cmRtNetEndptH_t endPtH ); + + // Given an endpoint handle return the id/label of the associated endpoint. + unsigned cmRtNetEndpointId( cmRtNetEndptH_t endPtH ); + const cmChar_t* cmRtNetEndpointLabel( cmRtNetEndptH_t endPtH ); + // Send a message to a remote endpoint. - cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ); + // Note that srcEndPtId is used only to inform the receiver of the endpoint + // of the transmitter. It is not used in any part of the transmit or receive + // process. + cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, unsigned srcEndPtId, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ); // Send a message to a remote endpoint. This function is a composite // of cmRtNetEndpointHandle() and cmRtNetSend(). - cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ); + cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, unsigned srcEndPtId, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ); - cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt ); + cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned srcEndPtId, unsigned dstNodeIdx, unsigned dstEndptIdx, const void* msg, unsigned msgByteCnt ); // Enable/disable synchronization protocol reporting. // Return the previous state of the report sync. flag.