cmRtNet.h/c : Added cmRtNetAddrToNodeIndex(),

cmRtNetEndpointIsValid(),cmRtNetEndPointId(),cmRtNetEndPointLabel().

Added srcEndPtId to all 'sending' functions.

_cmRtNetRecv() now sets cmRtNetMsg_t.srcNodeIdx.
This commit is contained in:
kpl 2014-06-15 22:16:22 -07:00
parent c0d05985e6
commit c69698716c
2 changed files with 97 additions and 16 deletions

View File

@ -76,6 +76,13 @@ cmRtNet_t* _cmRtNetHandleToPtr( cmRtNetH_t h )
return p; return p;
} }
cmRtNetEnd_t* _cmRtNetEndptHandleToPtr( cmRtNetEndptH_t h )
{
cmRtNetEnd_t* p = (cmRtNetEnd_t*)h.h;
assert( p != NULL );
return p;
}
void _cmRtNetVRpt( cmRtNet_t* p, const cmChar_t* fmt, va_list vl ) void _cmRtNetVRpt( cmRtNet_t* p, const cmChar_t* fmt, va_list vl )
{ {
if( cmIsFlag(p->flags,kReportSyncNetFl) ) if( cmIsFlag(p->flags,kReportSyncNetFl) )
@ -643,14 +650,38 @@ cmRtNetRC_t _cmRtNetSyncModeRecv( cmRtNet_t* p, const char* data, unsigned data
return rc; return rc;
} }
unsigned _cmRtNetAddrToNodeIndex( cmRtNet_t* p, const struct sockaddr_in* addr )
{
unsigned i;
cmRtNetNode_t* np = p->nodes;
for(i=0; np!=NULL; np=np->link,++i)
if( cmUdpAddrIsEqual( addr, &np->sockaddr ) )
return i;
return cmInvalidIdx;
}
// This is called in the context of cmRtNetReceive(). // This is called in the context of cmRtNetReceive().
void _cmRtNetRecv( void* cbArg, const char* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr ) void _cmRtNetRecv( void* cbArg, const char* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr )
{ {
cmRtNet_t* p = (cmRtNet_t*)cbArg; cmRtNet_t* p = (cmRtNet_t*)cbArg;
// if this is a sync msg - then handle it here
if( _cmRtNetIsSyncModeMsg(data,dataByteCnt)) if( _cmRtNetIsSyncModeMsg(data,dataByteCnt))
_cmRtNetSyncModeRecv(p,data,dataByteCnt,fromAddr); _cmRtNetSyncModeRecv(p,data,dataByteCnt,fromAddr);
else
{
// All non-sync messages arriving here are prefixed by a cmRtNetMsg_t header - fill in the source addr here.
// NOTE: the source addr could be filled in by the sender but this would increase the size
// of the msg. Instead we choose the more time consuming method of looking up the
// soure node here - hmmmm????.
cmRtNetMsg_t* hdr = (cmRtNetMsg_t*)(data);
hdr->srcNodeIdx = _cmRtNetAddrToNodeIndex(p,fromAddr);
}
p->cbFunc(p->cbArg,data,dataByteCnt,fromAddr); p->cbFunc(p->cbArg,data,dataByteCnt,fromAddr);
} }
@ -753,6 +784,13 @@ cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h )
return rc; return rc;
} }
unsigned cmRtNetAddrToNodeIndex( cmRtNetH_t h, const struct sockaddr_in* a )
{
cmRtNet_t* p = _cmRtNetHandleToPtr(h);
return _cmRtNetAddrToNodeIndex(p,a);
}
cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ) cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp )
{ {
cmRtNetRC_t rc = kOkNetRC; cmRtNetRC_t rc = kOkNetRC;
@ -772,7 +810,29 @@ cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, cons
return rc; return rc;
} }
cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg, unsigned msgByteCnt ) bool cmRtNetEndpointIsValid( cmRtNetEndptH_t endPtH )
{ return endPtH.h != NULL; }
unsigned cmRtNetEndpointId( cmRtNetEndptH_t endPtH )
{
if( !cmRtNetEndpointIsValid(endPtH) )
return cmInvalidId;
cmRtNetEnd_t* ep = _cmRtNetEndptHandleToPtr( endPtH );
return ep->id;
}
const cmChar_t* cmRtNetEndpointLabel( cmRtNetEndptH_t endPtH )
{
if( !cmRtNetEndpointIsValid(endPtH) )
return NULL;
cmRtNetEnd_t* ep = _cmRtNetEndptHandleToPtr( endPtH );
return ep->label;
}
cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, unsigned srcEndPtId, const cmRtNetEnd_t* ep, const void* msg, unsigned msgByteCnt )
{ {
cmRtNetRC_t rc = kOkNetRC; cmRtNetRC_t rc = kOkNetRC;
@ -783,7 +843,8 @@ cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg,
cmRtNetMsg_t* r = (cmRtNetMsg_t*)data; cmRtNetMsg_t* r = (cmRtNetMsg_t*)data;
r->hdr.rtSubIdx = ep->np->rtSubIdx; r->hdr.rtSubIdx = ep->np->rtSubIdx;
r->hdr.selId = kMsgSelRtId; r->hdr.selId = kMsgSelRtId;
r->endptId = ep->id; r->dstEndPtId = ep->id;
r->srcEndPtId = srcEndPtId;
memcpy(data+hN,msg,msgByteCnt); memcpy(data+hN,msg,msgByteCnt);
// ep->np->sockaddr identifies the node on the receiving cmRtNet. // ep->np->sockaddr identifies the node on the receiving cmRtNet.
@ -796,17 +857,17 @@ cmRtNetRC_t _cmRtNetSend( cmRtNet_t* p, const cmRtNetEnd_t* ep, const void* msg,
return rc; return rc;
} }
cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ) cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, unsigned srcEndPtId, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt )
{ {
cmRtNet_t* p = _cmRtNetHandleToPtr(h); cmRtNet_t* p = _cmRtNetHandleToPtr(h);
cmRtNetEnd_t* ep = (cmRtNetEnd_t*)epH.h; cmRtNetEnd_t* ep = _cmRtNetEndptHandleToPtr(epH);
assert( ep != NULL ); assert( ep != NULL );
return _cmRtNetSend(p,ep,msg,msgByteCnt); return _cmRtNetSend(p,srcEndPtId,ep,msg,msgByteCnt);
} }
cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ) cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, unsigned srcEndPtId, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt )
{ {
cmRtNetRC_t rc = kOkNetRC; cmRtNetRC_t rc = kOkNetRC;
cmRtNetEndptH_t epH = cmRtNetEndptNullHandle; cmRtNetEndptH_t epH = cmRtNetEndptNullHandle;
@ -814,10 +875,10 @@ cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const
if((rc = cmRtNetEndpointHandle(h,nodeLabel,endptLabel,&epH)) != kOkNetRC ) if((rc = cmRtNetEndpointHandle(h,nodeLabel,endptLabel,&epH)) != kOkNetRC )
return rc; return rc;
return cmRtNetSend(h,epH,msg,msgByteCnt); return cmRtNetSend(h,srcEndPtId,epH,msg,msgByteCnt);
} }
cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt ) cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned srcEndPtId, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt )
{ {
cmRtNet_t* p = _cmRtNetHandleToPtr(h); cmRtNet_t* p = _cmRtNetHandleToPtr(h);
@ -826,7 +887,7 @@ cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptId
if((ep = _cmRtNetFindEndpt(p, nodeIdx, endptIdx )) == NULL ) if((ep = _cmRtNetFindEndpt(p, nodeIdx, endptIdx )) == NULL )
return cmErrMsg(&p->err,kEndNotFoundNetRC,"The endpoint at node index %i endpoint index %i was not found.",nodeIdx,endptIdx); return cmErrMsg(&p->err,kEndNotFoundNetRC,"The endpoint at node index %i endpoint index %i was not found.",nodeIdx,endptIdx);
return _cmRtNetSend( p, ep, msg, msgByteCnt ); return _cmRtNetSend( p, srcEndPtId, ep, msg, msgByteCnt );
} }
@ -968,7 +1029,7 @@ void _cmRtNetTestRecv( void* cbArg, const char* data, unsigned dataByteCnt, cons
cmRtNetMsg_t* r = (cmRtNetMsg_t*)data; cmRtNetMsg_t* r = (cmRtNetMsg_t*)data;
unsigned i = *(unsigned*)(data + sizeof(cmRtNetMsg_t)); unsigned i = *(unsigned*)(data + sizeof(cmRtNetMsg_t));
printf("rtSubIdx:%i endptId:%i %i\n",r->hdr.rtSubIdx,r->endptId,i); printf("rtSubIdx:%i endptId:%i %i\n",r->hdr.rtSubIdx,r->dstEndPtId,i);
} }
@ -1000,7 +1061,9 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
const cmChar_t* remoteHostStr = !mstrFl ? "master" : "slave"; const cmChar_t* remoteHostStr = !mstrFl ? "master" : "slave";
const cmChar_t* remoteEndpStr = !mstrFl ? "master_ep" : "slave_ep"; const cmChar_t* remoteEndpStr = !mstrFl ? "master_ep" : "slave_ep";
const cmChar_t* bcastAddr = "192.168.15.255"; const cmChar_t* bcastAddr = "192.168.15.255";
cmRtNetEndptH_t eH = cmRtNetEndptNullHandle;
unsigned srcEndPtId = cmInvalidId;
memset(&t,0,sizeof(t)); memset(&t,0,sizeof(t));
if( cmThreadCreate(&p->thH,_cmRtNetTestThreadFunc,p,&ctx->rpt) != kOkThRC ) if( cmThreadCreate(&p->thH,_cmRtNetTestThreadFunc,p,&ctx->rpt) != kOkThRC )
@ -1016,6 +1079,12 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
if((rc = cmRtNetRegisterEndPoint(p->netH,localEndpStr, 0 )) != kOkNetRC ) if((rc = cmRtNetRegisterEndPoint(p->netH,localEndpStr, 0 )) != kOkNetRC )
goto errLabel; goto errLabel;
if((rc = cmRtNetEndpointHandle(p->netH, localHostStr, localEndpStr, &eH )) != kOkNetRC )
goto errLabel;
if((srcEndPtId = cmRtNetEndpointId(eH)) == cmInvalidIdx )
goto errLabel;
if( cmThreadPause(p->thH,0) != kOkThRC ) if( cmThreadPause(p->thH,0) != kOkThRC )
goto errLabel; goto errLabel;
@ -1036,7 +1105,7 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
case 't': case 't':
{ {
if( cmRtNetSendByLabels(p->netH, remoteHostStr, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC ) if( cmRtNetSendByLabels(p->netH, srcEndPtId, remoteHostStr, remoteEndpStr, &p->msgVal, sizeof(p->msgVal)) == kOkNetRC )
p->msgVal += 1; p->msgVal += 1;
} }

View File

@ -116,17 +116,29 @@ extern "C" {
// an cmRtSysMsgHdr_t header (See cmRtSysMsg.h). // an cmRtSysMsgHdr_t header (See cmRtSysMsg.h).
cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h ); cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h );
// Return the index of the node associated with sockaddr_in.
unsigned cmRtNetAddrToNodeIndex( cmRtNetH_t h, const struct sockaddr_in* a );
// Get a remote end point handle for use with cmRtNetSend. // Get a remote end point handle for use with cmRtNetSend.
cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp ); cmRtNetRC_t cmRtNetEndpointHandle( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, cmRtNetEndptH_t* hp );
bool cmRtNetEndpointIsValid( cmRtNetEndptH_t endPtH );
// Given an endpoint handle return the id/label of the associated endpoint.
unsigned cmRtNetEndpointId( cmRtNetEndptH_t endPtH );
const cmChar_t* cmRtNetEndpointLabel( cmRtNetEndptH_t endPtH );
// Send a message to a remote endpoint. // Send a message to a remote endpoint.
cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt ); // Note that srcEndPtId is used only to inform the receiver of the endpoint
// of the transmitter. It is not used in any part of the transmit or receive
// process.
cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, unsigned srcEndPtId, cmRtNetEndptH_t epH, const void* msg, unsigned msgByteCnt );
// Send a message to a remote endpoint. This function is a composite // Send a message to a remote endpoint. This function is a composite
// of cmRtNetEndpointHandle() and cmRtNetSend(). // of cmRtNetEndpointHandle() and cmRtNetSend().
cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt ); cmRtNetRC_t cmRtNetSendByLabels( cmRtNetH_t h, unsigned srcEndPtId, const cmChar_t* nodeLabel, const cmChar_t* endptLabel, const void* msg, unsigned msgByteCnt );
cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned nodeIdx, unsigned endptIdx, const void* msg, unsigned msgByteCnt ); cmRtNetRC_t cmRtNetSendByIndex( cmRtNetH_t h, unsigned srcEndPtId, unsigned dstNodeIdx, unsigned dstEndptIdx, const void* msg, unsigned msgByteCnt );
// Enable/disable synchronization protocol reporting. // Enable/disable synchronization protocol reporting.
// Return the previous state of the report sync. flag. // Return the previous state of the report sync. flag.