diff --git a/cmRtNet.c b/cmRtNet.c index bbc1f2f..acff8a0 100644 --- a/cmRtNet.c +++ b/cmRtNet.c @@ -12,7 +12,9 @@ enum { kLocalNetFl = 0x01, - kSockAddrNetFl = 0x02 + kSockAddrNetFl = 0x02, + kRegNodeNetFl = 0x04, + kRecvNodeNetFl = 0x08 }; typedef enum @@ -31,7 +33,8 @@ typedef enum kHelloSelNetId, kHelloAckSelNetId, kEndpointSelNetId, - kEndpointAckSelNetId + kEndpointAckSelNetId, + kDoneSelNetId } cmRtNetSelId_t; typedef struct cmRtNetEnd_str @@ -50,7 +53,7 @@ typedef struct cmRtNetNode_str cmUdpPort_t port; unsigned flags; cmRtNetNodeState_t state; - unsigned epIdx; + unsigned epIdx; // tracks the next endpoint to send during sync-mode cmTimeSpec_t lastSendTime; cmRtNetEnd_t* ends; struct cmRtNetNode_str* link; @@ -65,6 +68,7 @@ typedef struct cmRtNetNode_t* nodes; cmRtNetNode_t* localNode; bool syncModeFl; + bool masterFl; unsigned udpRecvBufByteCnt; unsigned udpTimeOutMs; unsigned interSyncSendTimeMs; @@ -189,7 +193,7 @@ cmRtNetRC_t _cmRtNetReleaseNode( cmRtNet_t* p, cmRtNetNode_t* np ) return cmErrMsg(&p->err,kNodeNotFoundNetRC,"Node to release not found."); } -cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr ) +cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags ) { cmRtNetRC_t rc = kOkNetRC; cmRtNetNode_t* np; @@ -209,7 +213,7 @@ cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmCha np->label = cmMemAllocStr(label); np->addr = addr==NULL ? NULL : cmMemAllocStr(addr); np->port = port; - np->flags = cmEnaFlag(np->flags,kLocalNetFl,localNodeFl); + np->flags = cmEnaFlag(flags,kLocalNetFl,localNodeFl); np->link = p->nodes; p->nodes = np; @@ -323,7 +327,9 @@ cmRtNetRC_t _cmRtNetSendSyncMsg( cmRtNet_t* p, cmRtNetNode_t* np, cmRtNetSelId_t // store this nodes current sync state cmRtNetNodeState_t orgState = np->state; - np->state = nextStId; + + if( nextStId != kInvalidStNetId ) + np->state = nextStId; // send the msg @@ -425,7 +431,7 @@ cmRtNetRC_t cmRtNetCreateNode( cmRtNetH_t h, const cmChar_t* nodeLabel, const cm cmRtNetRC_t rc; // create a node - if((rc = _cmRtNetCreateNode(p,nodeLabel,ipAddr, port, NULL)) != kOkNetRC ) + if((rc = _cmRtNetCreateNode(p,nodeLabel,ipAddr, port, NULL, kRegNodeNetFl)) != kOkNetRC ) return rc; // if this is not the local node @@ -476,7 +482,7 @@ cmRtNetRC_t cmRtNetBeginSyncMode( cmRtNetH_t h ) cmRtNet_t* p = _cmRtNetHandleToPtr(h); p->syncModeFl = true; - + p->masterFl = true; return rc; } @@ -513,13 +519,17 @@ cmRtNetRC_t _cmRtNetRecvAck( cmRtNet_t* p, const struct sockaddr_in* fromAddr, c if( np->state != expectedState ) { - rc = cmErrMsg(&p->err,kNodeStateErrNetRC,"Node '%s' expected in state %i was in state %i.",kWaitHelloAckStNetId,np->state); + rc = cmErrMsg(&p->err,kNodeStateErrNetRC,"Node '%s' expected in state %i was in state %i.",cmStringNullGuard(np->label),kWaitHelloAckStNetId,np->state); np->state = kErrorStNetId; goto errLabel; } np->state = nextState; + // if we are about to send another endpoint - incr the endpoint index + if( nextState == kSendEndpointStNetId ) + np->epIdx += 1; + errLabel: return rc; } @@ -532,6 +542,8 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB cmRtNetNode_t* np = NULL; cmRtNetSyncMsg_t m; + m.endPtLabel = NULL; + assert( cmRtNetIsSyncModeMsg(data,dataByteCnt)); if( _cmRtNetDeserializeSyncMsg(data,dataByteCnt,&m) != kOkNetRC ) @@ -558,11 +570,12 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB } // create a node proxy to represent the remote node - if(( rc = _cmRtNetCreateNode(p,m.endPtLabel,NULL,0,fromAddr)) != kOkNetRC ) + if(( rc = _cmRtNetCreateNode(p,m.endPtLabel,NULL,0,fromAddr,kRecvNodeNetFl)) != kOkNetRC ) goto errLabel; // send an ackknowledgement of the 'hello' msg rc = _cmRtNetSendAck(p,kHelloAckSelNetId,fromAddr); + } break; @@ -573,7 +586,6 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB _cmRtNetRpt(p,"rcv endpoint\n"); - // locate the remote node which sent the endpoint if((np = _cmRtNetFindNodeFromSockAddr(p,fromAddr)) == NULL ) { @@ -596,16 +608,29 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB } break; + case kDoneSelNetId: + { + _cmRtNetRpt(p,"rcv done\n"); + + if( p->masterFl==false ) + p->syncModeFl = true; + } + break; + case kHelloAckSelNetId: // master response - assert( p->syncModeFl ); - _cmRtNetRpt(p,"rcv hello ack\n"); - rc = _cmRtNetRecvAck(p,fromAddr,kWaitHelloAckStNetId,kSendEndpointStNetId); + { + assert( p->syncModeFl ); + _cmRtNetRpt(p,"rcv hello ack\n"); + rc = _cmRtNetRecvAck(p,fromAddr,kWaitHelloAckStNetId,kSendEndpointStNetId); + } break; case kEndpointAckSelNetId: // master response - assert( p->syncModeFl ); - _cmRtNetRpt(p,"rcv endpoint ack\n"); - rc = _cmRtNetRecvAck(p,fromAddr,kWaitEndpointAckStNetId,kSendEndpointStNetId); + { + assert( p->syncModeFl ); + _cmRtNetRpt(p,"rcv endpoint ack\n"); + rc = _cmRtNetRecvAck(p,fromAddr,kWaitEndpointAckStNetId,kSendEndpointStNetId); + } break; default: @@ -613,6 +638,8 @@ cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataB } errLabel: + + cmMemFree((cmChar_t*)m.endPtLabel); return rc; } @@ -624,11 +651,15 @@ cmRtNetRC_t _cmRtNetSendNodeSync( cmRtNet_t* p, cmRtNetNode_t* np ) switch( np->state ) { case kSendHelloStNetId: - // send a 'hello' to this remote node - if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,p->localNode->label, cmInvalidId, kWaitHelloAckStNetId )) != kOkNetRC ) - rc = cmErrMsg(&p->err,rc,"Send 'hello' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port); - else - _cmRtNetRpt(p,"send hello\n"); + { + np->epIdx = -1; + + // send a 'hello' to this remote node + if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,p->localNode->label, cmInvalidId, kWaitHelloAckStNetId )) != kOkNetRC ) + rc = cmErrMsg(&p->err,rc,"Send 'hello' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port); + else + _cmRtNetRpt(p,"%s sent hello\n",cmStringNullGuard(np->label)); + } break; case kSendEndpointStNetId: @@ -637,14 +668,20 @@ cmRtNetRC_t _cmRtNetSendNodeSync( cmRtNet_t* p, cmRtNetNode_t* np ) // if all of the endpoints have been sent to this node ... if((ep = _cmRtNetIndexToEndpoint(p,p->localNode,np->epIdx)) == NULL ) - np->state = kDoneStNetId; // ... we are done + { + // notify the remote node that all endpoints have been sent + if((rc = _cmRtNetSendSyncMsg(p,np,kDoneSelNetId,p->localNode->label,cmInvalidId, kDoneStNetId )) != kOkNetRC ) + rc = cmErrMsg(&p->err,rc,"Send 'done' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port); + else + _cmRtNetRpt(p,"Node %s done.\n",cmStringNullGuard(np->label)); + } else { // send an endpoint to this node - if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,ep->endPtLabel, ep->endPtId, kWaitEndpointAckStNetId )) != kOkNetRC ) + if((rc = _cmRtNetSendSyncMsg(p,np,kEndpointSelNetId,ep->endPtLabel, ep->endPtId, kWaitEndpointAckStNetId )) != kOkNetRC ) rc = cmErrMsg(&p->err,rc,"Endpoint (%s index:%i) transmission to %s:%s:%i failed.",cmStringNullGuard(ep->endPtLabel),cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port); else - _cmRtNetRpt(p,"send endpoint\n"); + _cmRtNetRpt(p,"%s sent endpoint %s\n",cmStringNullGuard(np->label),cmStringNullGuard(ep->endPtLabel)); } } @@ -688,14 +725,21 @@ cmRtNetRC_t cmRtNetSyncModeSend( cmRtNetH_t h ) unsigned activeCnt = 0; cmRtNetNode_t* np = p->nodes; for(; np != NULL; np=np->link ) - if( np != p->localNode && np->state != kDoneStNetId && np->state != kErrorStNetId ) + { + bool fl = (p->masterFl && cmIsFlag(np->flags,kRegNodeNetFl)) || (p->masterFl==false && cmIsFlag(np->flags,kRecvNodeNetFl)); + + if( fl && np != p->localNode && np->state != kDoneStNetId && np->state != kErrorStNetId ) { _cmRtNetSendNodeSync(p,np); activeCnt += 1; } - + } + if( activeCnt == 0 ) + { p->syncModeFl = false; + _cmRtNetRpt(p,"sync mode complete.\n"); + } return rc; } @@ -853,6 +897,12 @@ void cmRtNetTest( cmCtx_t* ctx, bool mstrFl ) cmRptPrintf(&ctx->rpt,"%s q=quit\n", mstrFl ? "Master: " : "Slave: "); while( (c=getchar()) != 'q' ) { + switch(c) + { + case 'r': + cmRtNetReport(p->netH); + break; + } } diff --git a/cmRtNet.h b/cmRtNet.h index 3121e23..0939cdc 100644 --- a/cmRtNet.h +++ b/cmRtNet.h @@ -104,7 +104,38 @@ extern "C" { { if( cmRtNetIsSyncModeMsg(dataV,dataN) ) cmRtNetSyncModeRecv(dataV,dataN,addr) - } + } + + + The 'master' is the machine which cmRtNetBeginSyncMode() is called on. + 1) 'master' sends local endpoints to all registered remote nodes. + 2) When a 'slave' receives the kDoneSelNetId msg it transmits + it's own local endpoints back to the master. + + a. Each node in the node list has a type id: + 1. local + 2. registered - remote node that was explicitely registered on a master + 3. received - remote node that was received from a master + + b. + 1. All nodes are created in the 'send-hello' state. + 2. If a master machine is in 'sync-mode' then it systematically sends + each of it's local endpoints to all 'registered' nodes. + 3. When a slave machine recives a 'hello' it creates a + 'received' node. + 4. When a slave machine recieves a 'done' it enters sync mode + and systematically sends each of its local endpoints to + the 'done' source. + + + Protocol: + 1. A: broadcast - 'hello' + 2. Bs: respond 'hello' ack + 3. A: send local node and endpoints to each responder + 4. A: send done + 5. Bs: send local endpoints to A + + */ #ifdef __cplusplus