cmRtNet.h/c: Completed initial working cmRtNetTest().

This commit is contained in:
kpl 2013-04-27 11:02:01 -07:00
parent c6906efbea
commit 8f9689faa8
2 changed files with 109 additions and 28 deletions

View File

@ -12,7 +12,9 @@
enum enum
{ {
kLocalNetFl = 0x01, kLocalNetFl = 0x01,
kSockAddrNetFl = 0x02 kSockAddrNetFl = 0x02,
kRegNodeNetFl = 0x04,
kRecvNodeNetFl = 0x08
}; };
typedef enum typedef enum
@ -31,7 +33,8 @@ typedef enum
kHelloSelNetId, kHelloSelNetId,
kHelloAckSelNetId, kHelloAckSelNetId,
kEndpointSelNetId, kEndpointSelNetId,
kEndpointAckSelNetId kEndpointAckSelNetId,
kDoneSelNetId
} cmRtNetSelId_t; } cmRtNetSelId_t;
typedef struct cmRtNetEnd_str typedef struct cmRtNetEnd_str
@ -50,7 +53,7 @@ typedef struct cmRtNetNode_str
cmUdpPort_t port; cmUdpPort_t port;
unsigned flags; unsigned flags;
cmRtNetNodeState_t state; cmRtNetNodeState_t state;
unsigned epIdx; unsigned epIdx; // tracks the next endpoint to send during sync-mode
cmTimeSpec_t lastSendTime; cmTimeSpec_t lastSendTime;
cmRtNetEnd_t* ends; cmRtNetEnd_t* ends;
struct cmRtNetNode_str* link; struct cmRtNetNode_str* link;
@ -65,6 +68,7 @@ typedef struct
cmRtNetNode_t* nodes; cmRtNetNode_t* nodes;
cmRtNetNode_t* localNode; cmRtNetNode_t* localNode;
bool syncModeFl; bool syncModeFl;
bool masterFl;
unsigned udpRecvBufByteCnt; unsigned udpRecvBufByteCnt;
unsigned udpTimeOutMs; unsigned udpTimeOutMs;
unsigned interSyncSendTimeMs; unsigned interSyncSendTimeMs;
@ -189,7 +193,7 @@ cmRtNetRC_t _cmRtNetReleaseNode( cmRtNet_t* p, cmRtNetNode_t* np )
return cmErrMsg(&p->err,kNodeNotFoundNetRC,"Node to release not found."); return cmErrMsg(&p->err,kNodeNotFoundNetRC,"Node to release not found.");
} }
cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr ) cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags )
{ {
cmRtNetRC_t rc = kOkNetRC; cmRtNetRC_t rc = kOkNetRC;
cmRtNetNode_t* np; cmRtNetNode_t* np;
@ -209,7 +213,7 @@ cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmCha
np->label = cmMemAllocStr(label); np->label = cmMemAllocStr(label);
np->addr = addr==NULL ? NULL : cmMemAllocStr(addr); np->addr = addr==NULL ? NULL : cmMemAllocStr(addr);
np->port = port; np->port = port;
np->flags = cmEnaFlag(np->flags,kLocalNetFl,localNodeFl); np->flags = cmEnaFlag(flags,kLocalNetFl,localNodeFl);
np->link = p->nodes; np->link = p->nodes;
p->nodes = np; p->nodes = np;
@ -323,6 +327,8 @@ cmRtNetRC_t _cmRtNetSendSyncMsg( cmRtNet_t* p, cmRtNetNode_t* np, cmRtNetSelId_t
// store this nodes current sync state // store this nodes current sync state
cmRtNetNodeState_t orgState = np->state; cmRtNetNodeState_t orgState = np->state;
if( nextStId != kInvalidStNetId )
np->state = nextStId; np->state = nextStId;
@ -425,7 +431,7 @@ cmRtNetRC_t cmRtNetCreateNode( cmRtNetH_t h, const cmChar_t* nodeLabel, const cm
cmRtNetRC_t rc; cmRtNetRC_t rc;
// create a node // create a node
if((rc = _cmRtNetCreateNode(p,nodeLabel,ipAddr, port, NULL)) != kOkNetRC ) if((rc = _cmRtNetCreateNode(p,nodeLabel,ipAddr, port, NULL, kRegNodeNetFl)) != kOkNetRC )
return rc; return rc;
// if this is not the local node // if this is not the local node
@ -476,7 +482,7 @@ cmRtNetRC_t cmRtNetBeginSyncMode( cmRtNetH_t h )
cmRtNet_t* p = _cmRtNetHandleToPtr(h); cmRtNet_t* p = _cmRtNetHandleToPtr(h);
p->syncModeFl = true; p->syncModeFl = true;
p->masterFl = true;
return rc; return rc;
} }
@ -513,13 +519,17 @@ cmRtNetRC_t _cmRtNetRecvAck( cmRtNet_t* p, const struct sockaddr_in* fromAddr, c
if( np->state != expectedState ) if( np->state != expectedState )
{ {
rc = cmErrMsg(&p->err,kNodeStateErrNetRC,"Node '%s' expected in state %i was in state %i.",kWaitHelloAckStNetId,np->state); rc = cmErrMsg(&p->err,kNodeStateErrNetRC,"Node '%s' expected in state %i was in state %i.",cmStringNullGuard(np->label),kWaitHelloAckStNetId,np->state);
np->state = kErrorStNetId; np->state = kErrorStNetId;
goto errLabel; goto errLabel;
} }
np->state = nextState; np->state = nextState;
// if we are about to send another endpoint - incr the endpoint index
if( nextState == kSendEndpointStNetId )
np->epIdx += 1;
errLabel: errLabel:
return rc; return rc;
} }
@ -532,6 +542,8 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB
cmRtNetNode_t* np = NULL; cmRtNetNode_t* np = NULL;
cmRtNetSyncMsg_t m; cmRtNetSyncMsg_t m;
m.endPtLabel = NULL;
assert( cmRtNetIsSyncModeMsg(data,dataByteCnt)); assert( cmRtNetIsSyncModeMsg(data,dataByteCnt));
if( _cmRtNetDeserializeSyncMsg(data,dataByteCnt,&m) != kOkNetRC ) if( _cmRtNetDeserializeSyncMsg(data,dataByteCnt,&m) != kOkNetRC )
@ -558,11 +570,12 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB
} }
// create a node proxy to represent the remote node // create a node proxy to represent the remote node
if(( rc = _cmRtNetCreateNode(p,m.endPtLabel,NULL,0,fromAddr)) != kOkNetRC ) if(( rc = _cmRtNetCreateNode(p,m.endPtLabel,NULL,0,fromAddr,kRecvNodeNetFl)) != kOkNetRC )
goto errLabel; goto errLabel;
// send an ackknowledgement of the 'hello' msg // send an ackknowledgement of the 'hello' msg
rc = _cmRtNetSendAck(p,kHelloAckSelNetId,fromAddr); rc = _cmRtNetSendAck(p,kHelloAckSelNetId,fromAddr);
} }
break; break;
@ -573,7 +586,6 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB
_cmRtNetRpt(p,"rcv endpoint\n"); _cmRtNetRpt(p,"rcv endpoint\n");
// locate the remote node which sent the endpoint // locate the remote node which sent the endpoint
if((np = _cmRtNetFindNodeFromSockAddr(p,fromAddr)) == NULL ) if((np = _cmRtNetFindNodeFromSockAddr(p,fromAddr)) == NULL )
{ {
@ -596,16 +608,29 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB
} }
break; break;
case kDoneSelNetId:
{
_cmRtNetRpt(p,"rcv done\n");
if( p->masterFl==false )
p->syncModeFl = true;
}
break;
case kHelloAckSelNetId: // master response case kHelloAckSelNetId: // master response
{
assert( p->syncModeFl ); assert( p->syncModeFl );
_cmRtNetRpt(p,"rcv hello ack\n"); _cmRtNetRpt(p,"rcv hello ack\n");
rc = _cmRtNetRecvAck(p,fromAddr,kWaitHelloAckStNetId,kSendEndpointStNetId); rc = _cmRtNetRecvAck(p,fromAddr,kWaitHelloAckStNetId,kSendEndpointStNetId);
}
break; break;
case kEndpointAckSelNetId: // master response case kEndpointAckSelNetId: // master response
{
assert( p->syncModeFl ); assert( p->syncModeFl );
_cmRtNetRpt(p,"rcv endpoint ack\n"); _cmRtNetRpt(p,"rcv endpoint ack\n");
rc = _cmRtNetRecvAck(p,fromAddr,kWaitEndpointAckStNetId,kSendEndpointStNetId); rc = _cmRtNetRecvAck(p,fromAddr,kWaitEndpointAckStNetId,kSendEndpointStNetId);
}
break; break;
default: default:
@ -613,6 +638,8 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB
} }
errLabel: errLabel:
cmMemFree((cmChar_t*)m.endPtLabel);
return rc; return rc;
} }
@ -624,11 +651,15 @@ cmRtNetRC_t _cmRtNetSendNodeSync( cmRtNet_t* p, cmRtNetNode_t* np )
switch( np->state ) switch( np->state )
{ {
case kSendHelloStNetId: case kSendHelloStNetId:
{
np->epIdx = -1;
// send a 'hello' to this remote node // send a 'hello' to this remote node
if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,p->localNode->label, cmInvalidId, kWaitHelloAckStNetId )) != kOkNetRC ) if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,p->localNode->label, cmInvalidId, kWaitHelloAckStNetId )) != kOkNetRC )
rc = cmErrMsg(&p->err,rc,"Send 'hello' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port); rc = cmErrMsg(&p->err,rc,"Send 'hello' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port);
else else
_cmRtNetRpt(p,"send hello\n"); _cmRtNetRpt(p,"%s sent hello\n",cmStringNullGuard(np->label));
}
break; break;
case kSendEndpointStNetId: case kSendEndpointStNetId:
@ -637,14 +668,20 @@ cmRtNetRC_t _cmRtNetSendNodeSync( cmRtNet_t* p, cmRtNetNode_t* np )
// if all of the endpoints have been sent to this node ... // if all of the endpoints have been sent to this node ...
if((ep = _cmRtNetIndexToEndpoint(p,p->localNode,np->epIdx)) == NULL ) if((ep = _cmRtNetIndexToEndpoint(p,p->localNode,np->epIdx)) == NULL )
np->state = kDoneStNetId; // ... we are done {
// notify the remote node that all endpoints have been sent
if((rc = _cmRtNetSendSyncMsg(p,np,kDoneSelNetId,p->localNode->label,cmInvalidId, kDoneStNetId )) != kOkNetRC )
rc = cmErrMsg(&p->err,rc,"Send 'done' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port);
else
_cmRtNetRpt(p,"Node %s done.\n",cmStringNullGuard(np->label));
}
else else
{ {
// send an endpoint to this node // send an endpoint to this node
if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,ep->endPtLabel, ep->endPtId, kWaitEndpointAckStNetId )) != kOkNetRC ) if((rc = _cmRtNetSendSyncMsg(p,np,kEndpointSelNetId,ep->endPtLabel, ep->endPtId, kWaitEndpointAckStNetId )) != kOkNetRC )
rc = cmErrMsg(&p->err,rc,"Endpoint (%s index:%i) transmission to %s:%s:%i failed.",cmStringNullGuard(ep->endPtLabel),cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port); rc = cmErrMsg(&p->err,rc,"Endpoint (%s index:%i) transmission to %s:%s:%i failed.",cmStringNullGuard(ep->endPtLabel),cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port);
else else
_cmRtNetRpt(p,"send endpoint\n"); _cmRtNetRpt(p,"%s sent endpoint %s\n",cmStringNullGuard(np->label),cmStringNullGuard(ep->endPtLabel));
} }
} }
@ -688,14 +725,21 @@ cmRtNetRC_t cmRtNetSyncModeSend( cmRtNetH_t h )
unsigned activeCnt = 0; unsigned activeCnt = 0;
cmRtNetNode_t* np = p->nodes; cmRtNetNode_t* np = p->nodes;
for(; np != NULL; np=np->link ) for(; np != NULL; np=np->link )
if( np != p->localNode && np->state != kDoneStNetId && np->state != kErrorStNetId ) {
bool fl = (p->masterFl && cmIsFlag(np->flags,kRegNodeNetFl)) || (p->masterFl==false && cmIsFlag(np->flags,kRecvNodeNetFl));
if( fl && np != p->localNode && np->state != kDoneStNetId && np->state != kErrorStNetId )
{ {
_cmRtNetSendNodeSync(p,np); _cmRtNetSendNodeSync(p,np);
activeCnt += 1; activeCnt += 1;
} }
}
if( activeCnt == 0 ) if( activeCnt == 0 )
{
p->syncModeFl = false; p->syncModeFl = false;
_cmRtNetRpt(p,"sync mode complete.\n");
}
return rc; return rc;
} }
@ -853,6 +897,12 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
cmRptPrintf(&ctx->rpt,"%s q=quit\n", mstrFl ? "Master: " : "Slave: "); cmRptPrintf(&ctx->rpt,"%s q=quit\n", mstrFl ? "Master: " : "Slave: ");
while( (c=getchar()) != 'q' ) while( (c=getchar()) != 'q' )
{ {
switch(c)
{
case 'r':
cmRtNetReport(p->netH);
break;
}
} }

View File

@ -105,6 +105,37 @@ extern "C" {
if( cmRtNetIsSyncModeMsg(dataV,dataN) ) if( cmRtNetIsSyncModeMsg(dataV,dataN) )
cmRtNetSyncModeRecv(dataV,dataN,addr) cmRtNetSyncModeRecv(dataV,dataN,addr)
} }
The 'master' is the machine which cmRtNetBeginSyncMode() is called on.
1) 'master' sends local endpoints to all registered remote nodes.
2) When a 'slave' receives the kDoneSelNetId msg it transmits
it's own local endpoints back to the master.
a. Each node in the node list has a type id:
1. local
2. registered - remote node that was explicitely registered on a master
3. received - remote node that was received from a master
b.
1. All nodes are created in the 'send-hello' state.
2. If a master machine is in 'sync-mode' then it systematically sends
each of it's local endpoints to all 'registered' nodes.
3. When a slave machine recives a 'hello' it creates a
'received' node.
4. When a slave machine recieves a 'done' it enters sync mode
and systematically sends each of its local endpoints to
the 'done' source.
Protocol:
1. A: broadcast - 'hello'
2. Bs: respond 'hello' ack
3. A: send local node and endpoints to each responder
4. A: send done
5. Bs: send local endpoints to A
*/ */
#ifdef __cplusplus #ifdef __cplusplus