libcm is a C development framework with an emphasis on audio signal processing applications.
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

cmRtNet.c 23KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926
  1. #include "cmGlobal.h"
  2. #include "cmRpt.h"
  3. #include "cmErr.h"
  4. #include "cmCtx.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmUdpPort.h"
  8. #include "cmRtNet.h"
  9. #include "cmTime.h"
  10. #include "cmRtSysMsg.h"
  11. enum
  12. {
  13. kLocalNetFl = 0x01,
  14. kSockAddrNetFl = 0x02,
  15. kRegNodeNetFl = 0x04,
  16. kRecvNodeNetFl = 0x08
  17. };
  18. typedef enum
  19. {
  20. kSendHelloStNetId = 0,
  21. kWaitHelloAckStNetId,
  22. kSendEndpointStNetId,
  23. kWaitEndpointAckStNetId,
  24. kDoneStNetId,
  25. kErrorStNetId,
  26. kInvalidStNetId,
  27. } cmRtNetNodeState_t;
  28. typedef enum
  29. {
  30. kHelloSelNetId,
  31. kHelloAckSelNetId,
  32. kEndpointSelNetId,
  33. kEndpointAckSelNetId,
  34. kDoneSelNetId
  35. } cmRtNetSelId_t;
  36. typedef struct cmRtNetEnd_str
  37. {
  38. cmChar_t* endPtLabel;
  39. unsigned endPtId;
  40. struct cmRtNetEnd_str* link;
  41. } cmRtNetEnd_t;
  42. typedef struct cmRtNetNode_str
  43. {
  44. cmChar_t* label;
  45. struct sockaddr_in sockaddr;
  46. cmChar_t* addr;
  47. cmUdpPort_t port;
  48. unsigned flags;
  49. cmRtNetNodeState_t state;
  50. unsigned epIdx; // tracks the next endpoint to send during sync-mode
  51. cmTimeSpec_t lastSendTime;
  52. cmRtNetEnd_t* ends;
  53. struct cmRtNetNode_str* link;
  54. } cmRtNetNode_t;
  55. typedef struct
  56. {
  57. cmErr_t err;
  58. cmUdpH_t udpH;
  59. cmUdpCallback_t cbFunc;
  60. void* cbArg;
  61. cmRtNetNode_t* nodes;
  62. cmRtNetNode_t* localNode;
  63. bool syncModeFl;
  64. bool masterFl;
  65. unsigned udpRecvBufByteCnt;
  66. unsigned udpTimeOutMs;
  67. unsigned interSyncSendTimeMs;
  68. } cmRtNet_t;
  69. typedef struct
  70. {
  71. cmRtSysMsgHdr_t hdr;
  72. cmRtNetSelId_t selId;
  73. const cmChar_t* endPtLabel;
  74. unsigned endPtId;
  75. } cmRtNetSyncMsg_t;
  76. cmRtNetH_t cmRtNetNullHandle = cmSTATIC_NULL_HANDLE;
  77. cmRtNet_t* _cmRtNetHandleToPtr( cmRtNetH_t h )
  78. {
  79. cmRtNet_t* p = (cmRtNet_t*)h.h;
  80. assert( p != NULL );
  81. return p;
  82. }
  83. void _cmRtNetVRpt( cmRtNet_t* p, const cmChar_t* fmt, va_list vl )
  84. {
  85. cmRptVPrintf(p->err.rpt,fmt,vl);
  86. }
  87. void _cmRtNetRpt( cmRtNet_t* p, const cmChar_t* fmt, ... )
  88. {
  89. va_list vl;
  90. va_start(vl,fmt);
  91. _cmRtNetVRpt(p,fmt,vl);
  92. va_end(vl);
  93. }
  94. cmRtNetNode_t* _cmRtNetFindNode( cmRtNet_t* p, const cmChar_t* label )
  95. {
  96. if( label == NULL )
  97. return NULL;
  98. cmRtNetNode_t* np = p->nodes;
  99. for(; np!=NULL; np=np->link)
  100. if( strcmp(label,np->label)==0)
  101. return np;
  102. return NULL;
  103. }
  104. cmRtNetNode_t* _cmRtNetFindNodeFromSockAddr( cmRtNet_t* p, const struct sockaddr_in* saddr )
  105. {
  106. if( saddr == NULL )
  107. return NULL;
  108. cmRtNetNode_t* np = p->nodes;
  109. for(; np!=NULL; np=np->link)
  110. if( cmIsFlag(np->flags,kSockAddrNetFl) && np->sockaddr.sin_addr.s_addr == saddr->sin_addr.s_addr && np->sockaddr.sin_port == saddr->sin_port )
  111. return np;
  112. return NULL;
  113. }
  114. void _cmRtNetFreeNode( cmRtNetNode_t* np )
  115. {
  116. cmRtNetEnd_t* ep = np->ends;
  117. while( ep != NULL )
  118. {
  119. cmRtNetEnd_t* nep = ep->link;
  120. cmMemFree(ep->endPtLabel);
  121. cmMemFree(ep);
  122. ep = nep;
  123. }
  124. cmMemFree(np->label);
  125. cmMemFree(np->addr);
  126. cmMemFree(np);
  127. }
  128. void _cmRtNetReleaseNodes( cmRtNet_t* p )
  129. {
  130. cmRtNetNode_t* np = p->nodes;
  131. while( np != NULL )
  132. {
  133. cmRtNetNode_t* nnp = np->link;
  134. _cmRtNetFreeNode(np);
  135. np = nnp;
  136. }
  137. p->nodes = NULL;
  138. p->localNode = NULL;
  139. }
  140. cmRtNetRC_t _cmRtNetReleaseNode( cmRtNet_t* p, cmRtNetNode_t* np )
  141. {
  142. // we should never release the local node via this function
  143. assert( np != p->localNode );
  144. cmRtNetNode_t* cnp = p->nodes;
  145. cmRtNetNode_t* pnp = NULL;
  146. while( cnp != NULL )
  147. {
  148. cmRtNetNode_t* nnp = cnp->link;
  149. if( np == cnp )
  150. {
  151. if( pnp == NULL )
  152. p->nodes = np->link;
  153. else
  154. pnp->link = np->link;
  155. _cmRtNetFreeNode(np);
  156. return kOkNetRC;
  157. }
  158. pnp = np;
  159. cnp = nnp;
  160. }
  161. assert(0);
  162. return cmErrMsg(&p->err,kNodeNotFoundNetRC,"Node to release not found.");
  163. }
  164. cmRtNetRC_t _cmRtNetCreateNode( cmRtNet_t* p, const cmChar_t* label, const cmChar_t* addr, cmUdpPort_t port, const struct sockaddr_in* saddr, unsigned flags )
  165. {
  166. cmRtNetRC_t rc = kOkNetRC;
  167. cmRtNetNode_t* np;
  168. if( label == NULL )
  169. return cmErrMsg(&p->err,kInvalidLabelNetRC,"A null or blank node label was encountered.");
  170. if((np = _cmRtNetFindNode(p,label)) != NULL )
  171. return cmErrMsg(&p->err,kDuplLabelNetRC,"The node label '%s' is already in use.",cmStringNullGuard(label));
  172. bool localNodeFl = addr==NULL && saddr==NULL;
  173. if( localNodeFl && p->localNode != NULL )
  174. return cmErrMsg(&p->err,kDuplLocalNetRC,"The local node '%s' has already been set.",cmStringNullGuard(p->localNode->label));
  175. np = cmMemAllocZ(cmRtNetNode_t,1);
  176. np->label = cmMemAllocStr(label);
  177. np->addr = addr==NULL ? NULL : cmMemAllocStr(addr);
  178. np->port = port;
  179. np->flags = cmEnaFlag(flags,kLocalNetFl,localNodeFl);
  180. np->link = p->nodes;
  181. p->nodes = np;
  182. if( localNodeFl )
  183. p->localNode = np;
  184. if( saddr != NULL )
  185. np->sockaddr = *saddr;
  186. else
  187. {
  188. if( cmUdpInitAddr(p->udpH, np->addr, np->port, &np->sockaddr ) != kOkUdpRC )
  189. {
  190. rc = cmErrMsg(&p->err,kUdpPortFailNetRC,"IP::port to socket address conversion failed.");
  191. goto errLabel;
  192. }
  193. }
  194. np->flags = cmSetFlag(np->flags,kSockAddrNetFl);
  195. errLabel:
  196. return rc;
  197. }
  198. cmRtNetEnd_t* _cmRtNetFindNodeEnd(cmRtNetNode_t* np, const cmChar_t* endPtLabel )
  199. {
  200. cmRtNetEnd_t* ep = np->ends;
  201. for(; ep!=NULL; ep=ep->link)
  202. if( strcmp(ep->endPtLabel,endPtLabel)==0 )
  203. return ep;
  204. return NULL;
  205. }
  206. cmRtNetEnd_t* _cmRtNetIndexToEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, unsigned endIndex )
  207. {
  208. cmRtNetEnd_t* ep = np->ends;
  209. unsigned i;
  210. for(i=0; ep!=NULL; ep=ep->link)
  211. {
  212. if( i == endIndex )
  213. return ep;
  214. ++i;
  215. }
  216. return NULL;
  217. }
  218. cmRtNetRC_t _cmRtNetCreateEndpoint( cmRtNet_t* p, cmRtNetNode_t* np, const cmChar_t* endPtLabel, unsigned endPtId )
  219. {
  220. if( endPtLabel == NULL )
  221. return cmErrMsg(&p->err,kInvalidLabelNetRC,"A null or blank node label was encountered.");
  222. if( _cmRtNetFindNodeEnd( np, endPtLabel) != NULL)
  223. return cmErrMsg(&p->err,kDuplEndNetRC,"A duplicate endpoint ('%s') was encountered on node '%s'.",endPtLabel,np->label);
  224. cmRtNetRC_t rc = kOkNetRC;
  225. cmRtNetEnd_t* ep = cmMemAllocZ(cmRtNetEnd_t,1);
  226. ep->endPtLabel = cmMemAllocStr(endPtLabel);
  227. ep->endPtId = endPtId;
  228. ep->link = np->ends;
  229. np->ends = ep;
  230. return rc;
  231. }
  232. unsigned _cmRtNetSyncMsgSerialByteCount( const cmRtNetSyncMsg_t* m )
  233. { return sizeof(cmRtNetSyncMsg_t) + (m->endPtLabel==NULL ? 1 : strlen(m->endPtLabel) + 1); }
  234. cmRtNetRC_t _cmRtNetSerializeSyncMsg( cmRtNet_t* p, const cmRtNetSyncMsg_t* m, void* buf, unsigned n )
  235. {
  236. unsigned bn = _cmRtNetSyncMsgSerialByteCount(m);
  237. char* b = (char*)buf;
  238. if( bn > n )
  239. return cmErrMsg(&p->err,kBufToSmallNetRC,"Serialize buffer too small.");
  240. memcpy(b,m,sizeof(*m));
  241. strcpy(b + sizeof(*m),m->endPtLabel==NULL ? "" : m->endPtLabel);
  242. return kOkNetRC;
  243. }
  244. cmRtNetRC_t _cmRtNetDeserializeSyncMsg( const void* buf, unsigned n, cmRtNetSyncMsg_t* m )
  245. {
  246. assert( n > sizeof(*m));
  247. memcpy(m,buf,sizeof(*m));
  248. const cmRtNetSyncMsg_t* mp = (const cmRtNetSyncMsg_t*)buf;
  249. const cmChar_t* s = (const cmChar_t*)(mp+1);
  250. m->endPtLabel = cmMemAllocStr(s);
  251. return kOkNetRC;
  252. }
  253. cmRtNetRC_t _cmRtNetSendSyncMsg( cmRtNet_t* p, cmRtNetNode_t* np, cmRtNetSelId_t selId, const cmChar_t* endPtLabel, unsigned endPtId, cmRtNetNodeState_t nextStId )
  254. {
  255. cmRtNetSyncMsg_t m;
  256. cmRtNetRC_t rc = kOkNetRC;
  257. cmUdpRC_t udpRC = kOkUdpRC;
  258. m.hdr.rtSubIdx = cmInvalidIdx;
  259. m.hdr.selId = kNetSyncSelRtId;
  260. m.selId = selId;
  261. m.endPtLabel = endPtLabel;
  262. m.endPtId = endPtId;
  263. // determine size of msg to send
  264. unsigned n = _cmRtNetSyncMsgSerialByteCount(&m);
  265. cmChar_t buf[n];
  266. // serialize msg into buf[]
  267. if((rc = _cmRtNetSerializeSyncMsg(p,&m,buf,n)) != kOkNetRC )
  268. return rc;
  269. // store this nodes current sync state
  270. cmRtNetNodeState_t orgState = np->state;
  271. if( nextStId != kInvalidStNetId )
  272. np->state = nextStId;
  273. // send the msg
  274. if( cmIsFlag(np->flags,kSockAddrNetFl) == false )
  275. udpRC = cmUdpSend2(p->udpH, buf, n, np->addr, np->port );
  276. else
  277. udpRC = cmUdpSendTo(p->udpH, buf, n, &np->sockaddr );
  278. // check for send errors
  279. if( udpRC != kOkUdpRC )
  280. {
  281. rc = cmErrMsg(&p->err,kUdpPortFailNetRC,"Sync msg. send on UDP port failed.");
  282. np->state = orgState; // restore node state so we can try again
  283. }
  284. else
  285. {
  286. // record the last send time
  287. cmTimeGet(&np->lastSendTime);
  288. }
  289. return rc;
  290. }
  291. cmRtNetRC_t _cmRtNetFree( cmRtNet_t* p )
  292. {
  293. cmRtNetRC_t rc = kOkNetRC;
  294. if( cmUdpFree(&p->udpH) != kOkUdpRC )
  295. cmErrMsg(&p->err,kUdpPortFailNetRC,"UDP Port free failed.");
  296. _cmRtNetReleaseNodes(p);
  297. cmMemFree(p);
  298. return rc;
  299. }
  300. cmRtNetRC_t cmRtNetAlloc( cmCtx_t* ctx, cmRtNetH_t* hp, cmUdpCallback_t cbFunc, void* cbArg )
  301. {
  302. cmRtNetRC_t rc;
  303. if((rc = cmRtNetFree(hp)) != kOkNetRC )
  304. return rc;
  305. cmRtNet_t* p = cmMemAllocZ(cmRtNet_t,1);
  306. cmErrSetup(&p->err,&ctx->rpt,"cmRtNet");
  307. // allocate the UDP port
  308. if(cmUdpAlloc(ctx,&p->udpH) != kOkUdpRC )
  309. {
  310. cmErrMsg(&p->err,kUdpPortFailNetRC,"UDP Port allocate failed.");
  311. goto errLabel;
  312. }
  313. p->udpTimeOutMs = 50;
  314. p->udpRecvBufByteCnt = 8192;
  315. p->interSyncSendTimeMs = 10;
  316. p->cbFunc = cbFunc;
  317. p->cbArg = cbArg;
  318. hp->h = p;
  319. errLabel:
  320. if(rc != kOkNetRC )
  321. _cmRtNetFree(p);
  322. return rc;
  323. }
  324. cmRtNetRC_t cmRtNetFree( cmRtNetH_t* hp )
  325. {
  326. cmRtNetRC_t rc = kOkNetRC;
  327. if( hp==NULL || cmRtNetIsValid(*hp)==false )
  328. return rc;
  329. cmRtNet_t* p = _cmRtNetHandleToPtr(*hp);
  330. if((rc = _cmRtNetFree(p)) != kOkNetRC )
  331. return rc;
  332. hp->h = NULL;
  333. return rc;
  334. }
  335. const cmChar_t* cmRtNetLocalHostName( cmRtNetH_t h )
  336. {
  337. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  338. return cmUdpHostName(p->udpH);
  339. }
  340. bool cmRtNetIsValid( cmRtNetH_t h )
  341. { return h.h !=NULL; }
  342. cmUdpH_t cmRtNetUdpPortHandle( cmRtNetH_t h )
  343. {
  344. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  345. return p->udpH;
  346. }
  347. cmRtNetRC_t cmRtNetCreateNode( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* ipAddr, cmUdpPort_t port )
  348. {
  349. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  350. cmRtNetRC_t rc;
  351. // create a node
  352. if((rc = _cmRtNetCreateNode(p,nodeLabel,ipAddr, port, NULL, kRegNodeNetFl)) != kOkNetRC )
  353. return rc;
  354. // if this is not the local node
  355. if( ipAddr != NULL )
  356. return rc;
  357. // if this is the local node then initialze the local socket
  358. if( cmUdpInit(p->udpH,port,kNonBlockingUdpFl,p->cbFunc,p->cbArg,NULL,0,p->udpRecvBufByteCnt,p->udpTimeOutMs) != kOkUdpRC )
  359. {
  360. rc = cmErrMsg(&p->err,kUdpPortFailNetRC,"The UDP port initialization failed.");
  361. goto errLabel;
  362. }
  363. // begin listening on the local port
  364. if( cmUdpEnableListen(p->udpH, true ) != kOkUdpRC )
  365. {
  366. rc = cmErrMsg(&p->err,kUdpPortFailNetRC,"The UDP port failed to enter 'listen' mode.");
  367. goto errLabel;
  368. }
  369. errLabel:
  370. return rc;
  371. }
  372. cmRtNetRC_t cmRtNetRegisterEndPoint( cmRtNetH_t h, const cmChar_t* endPtLabel, unsigned endPtId )
  373. {
  374. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  375. if( p->localNode == NULL )
  376. return cmErrMsg(&p->err,kLocalNodeNetRC,"Local endpoints may not be added if a local node has not been defined.");
  377. return _cmRtNetCreateEndpoint(p, p->localNode,endPtLabel,endPtId );
  378. }
  379. cmRtNetRC_t cmRtNetClearAll( cmRtNetH_t h )
  380. {
  381. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  382. _cmRtNetReleaseNodes(p);
  383. return kOkNetRC;
  384. }
  385. cmRtNetRC_t cmRtNetBeginSyncMode( cmRtNetH_t h )
  386. {
  387. cmRtNetRC_t rc = kOkNetRC;
  388. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  389. p->syncModeFl = true;
  390. p->masterFl = true;
  391. return rc;
  392. }
  393. bool cmRtNetIsInSyncMode( cmRtNetH_t h )
  394. {
  395. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  396. return p->syncModeFl;
  397. }
  398. // Used by slaves to send the master an 'ack' msg.
  399. cmRtNetRC_t _cmRtNetSendAck( cmRtNet_t* p, cmRtNetSelId_t ackSelId, const struct sockaddr_in* saddr )
  400. {
  401. cmRtNetNode_t* np;
  402. if((np = _cmRtNetFindNodeFromSockAddr(p,saddr)) == NULL )
  403. return cmErrMsg(&p->err,kNodeNotFoundNetRC,"The net node associated with an ack cmd was not found. Ack not sent.");
  404. return _cmRtNetSendSyncMsg(p,np,ackSelId,NULL,cmInvalidId,kInvalidStNetId);
  405. }
  406. // Used by master to update state upon receipt of 'ack' msg
  407. cmRtNetRC_t _cmRtNetRecvAck( cmRtNet_t* p, const struct sockaddr_in* fromAddr, cmRtNetNodeState_t expectedState, cmRtNetNodeState_t nextState )
  408. {
  409. cmRtNetNode_t* np;
  410. cmRtNetRC_t rc = kOkNetRC;
  411. if((np = _cmRtNetFindNodeFromSockAddr(p,fromAddr)) == NULL )
  412. {
  413. rc = cmErrMsg(&p->err,kNodeNotFoundNetRC,"The net node associated with a ack receive was not found.");
  414. goto errLabel;
  415. }
  416. if( np->state != expectedState )
  417. {
  418. rc = cmErrMsg(&p->err,kNodeStateErrNetRC,"Node '%s' expected in state %i was in state %i.",cmStringNullGuard(np->label),kWaitHelloAckStNetId,np->state);
  419. np->state = kErrorStNetId;
  420. goto errLabel;
  421. }
  422. np->state = nextState;
  423. // if we are about to send another endpoint - incr the endpoint index
  424. if( nextState == kSendEndpointStNetId )
  425. np->epIdx += 1;
  426. errLabel:
  427. return rc;
  428. }
  429. cmRtNetRC_t cmRtNetSyncModeRecv( cmRtNetH_t h, const char* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr )
  430. {
  431. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  432. cmRtNetRC_t rc = kOkNetRC;
  433. cmRtNetNode_t* np = NULL;
  434. cmRtNetSyncMsg_t m;
  435. m.endPtLabel = NULL;
  436. assert( cmRtNetIsSyncModeMsg(data,dataByteCnt));
  437. if( _cmRtNetDeserializeSyncMsg(data,dataByteCnt,&m) != kOkNetRC )
  438. {
  439. rc = cmErrMsg(&p->err,rc,"Net sync. receive failed due to deserialize fail.");
  440. goto errLabel;
  441. }
  442. assert( m.hdr.selId == kNetSyncSelRtId );
  443. switch( m.selId )
  444. {
  445. case kHelloSelNetId: // slave response
  446. {
  447. _cmRtNetRpt(p,"rcv hello\n");
  448. // attempt to locate the remote node which sent the endpoint
  449. if((np = _cmRtNetFindNodeFromSockAddr(p,fromAddr)) != NULL )
  450. {
  451. // delete the existing node because we are about to get new info. about it.
  452. if((rc = _cmRtNetReleaseNode(p,np )) != kOkNetRC )
  453. goto errLabel;
  454. }
  455. // create a node proxy to represent the remote node
  456. if(( rc = _cmRtNetCreateNode(p,m.endPtLabel,NULL,0,fromAddr,kRecvNodeNetFl)) != kOkNetRC )
  457. goto errLabel;
  458. // send an ackknowledgement of the 'hello' msg
  459. rc = _cmRtNetSendAck(p,kHelloAckSelNetId,fromAddr);
  460. }
  461. break;
  462. case kEndpointSelNetId: // slave response
  463. {
  464. cmRtNetEnd_t* ep;
  465. _cmRtNetRpt(p,"rcv endpoint\n");
  466. // locate the remote node which sent the endpoint
  467. if((np = _cmRtNetFindNodeFromSockAddr(p,fromAddr)) == NULL )
  468. {
  469. rc = cmErrMsg(&p->err,kNodeNotFoundNetRC,"The net node associated with an endpoint receive was not found.");
  470. goto errLabel;
  471. }
  472. // attempt to find the end point
  473. if((ep = _cmRtNetFindNodeEnd(np,m.endPtLabel)) != NULL )
  474. ep->endPtId = m.endPtId; // the endpoint was found update the endPtId
  475. else
  476. {
  477. // create a local proxy for the endpoint
  478. if((rc = _cmRtNetCreateEndpoint(p,np,m.endPtLabel,m.endPtId)) != kOkNetRC )
  479. goto errLabel;
  480. }
  481. // ack. the endpoint msg
  482. rc = _cmRtNetSendAck(p,kEndpointAckSelNetId,fromAddr);
  483. }
  484. break;
  485. case kDoneSelNetId:
  486. {
  487. _cmRtNetRpt(p,"rcv done\n");
  488. if( p->masterFl==false )
  489. p->syncModeFl = true;
  490. }
  491. break;
  492. case kHelloAckSelNetId: // master response
  493. {
  494. assert( p->syncModeFl );
  495. _cmRtNetRpt(p,"rcv hello ack\n");
  496. rc = _cmRtNetRecvAck(p,fromAddr,kWaitHelloAckStNetId,kSendEndpointStNetId);
  497. }
  498. break;
  499. case kEndpointAckSelNetId: // master response
  500. {
  501. assert( p->syncModeFl );
  502. _cmRtNetRpt(p,"rcv endpoint ack\n");
  503. rc = _cmRtNetRecvAck(p,fromAddr,kWaitEndpointAckStNetId,kSendEndpointStNetId);
  504. }
  505. break;
  506. default:
  507. break;
  508. }
  509. errLabel:
  510. cmMemFree((cmChar_t*)m.endPtLabel);
  511. return rc;
  512. }
  513. cmRtNetRC_t _cmRtNetSendNodeSync( cmRtNet_t* p, cmRtNetNode_t* np )
  514. {
  515. cmRtNetRC_t rc = kOkNetRC;
  516. switch( np->state )
  517. {
  518. case kSendHelloStNetId:
  519. {
  520. np->epIdx = -1;
  521. // send a 'hello' to this remote node
  522. if((rc = _cmRtNetSendSyncMsg(p,np,kHelloSelNetId,p->localNode->label, cmInvalidId, kWaitHelloAckStNetId )) != kOkNetRC )
  523. rc = cmErrMsg(&p->err,rc,"Send 'hello' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port);
  524. else
  525. _cmRtNetRpt(p,"%s sent hello\n",cmStringNullGuard(np->label));
  526. }
  527. break;
  528. case kSendEndpointStNetId:
  529. {
  530. cmRtNetEnd_t* ep;
  531. // if all of the endpoints have been sent to this node ...
  532. if((ep = _cmRtNetIndexToEndpoint(p,p->localNode,np->epIdx)) == NULL )
  533. {
  534. // notify the remote node that all endpoints have been sent
  535. if((rc = _cmRtNetSendSyncMsg(p,np,kDoneSelNetId,p->localNode->label,cmInvalidId, kDoneStNetId )) != kOkNetRC )
  536. rc = cmErrMsg(&p->err,rc,"Send 'done' to %s:%s:%i failed.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port);
  537. else
  538. _cmRtNetRpt(p,"Node %s done.\n",cmStringNullGuard(np->label));
  539. }
  540. else
  541. {
  542. // send an endpoint to this node
  543. if((rc = _cmRtNetSendSyncMsg(p,np,kEndpointSelNetId,ep->endPtLabel, ep->endPtId, kWaitEndpointAckStNetId )) != kOkNetRC )
  544. rc = cmErrMsg(&p->err,rc,"Endpoint (%s index:%i) transmission to %s:%s:%i failed.",cmStringNullGuard(ep->endPtLabel),cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port);
  545. else
  546. _cmRtNetRpt(p,"%s sent endpoint %s\n",cmStringNullGuard(np->label),cmStringNullGuard(ep->endPtLabel));
  547. }
  548. }
  549. break;
  550. case kWaitHelloAckStNetId:
  551. case kWaitEndpointAckStNetId:
  552. {
  553. cmTimeSpec_t t;
  554. cmTimeGet(&t);
  555. unsigned twentySecs = 20000000;
  556. if( cmTimeElapsedMicros(&np->lastSendTime,&t) > twentySecs)
  557. {
  558. const cmChar_t* ackStr = np->state==kWaitHelloAckStNetId ? "hello" : "endpoint";
  559. rc = cmErrMsg(&p->err,kTimeOutErrNetRC,"The node %s:%s:%i did not give a '%s' acknowledge.",cmStringNullGuard(np->label),cmStringNullGuard(np->addr),np->port,ackStr);
  560. }
  561. }
  562. break;
  563. default:
  564. break;
  565. }
  566. // if an error occurred put the node into an error state
  567. if( rc != kOkNetRC )
  568. np->state = kErrorStNetId;
  569. return rc;
  570. }
  571. cmRtNetRC_t cmRtNetSyncModeSend( cmRtNetH_t h )
  572. {
  573. cmRtNetRC_t rc = kOkNetRC;
  574. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  575. if( p->syncModeFl == false )
  576. return rc;
  577. unsigned activeCnt = 0;
  578. cmRtNetNode_t* np = p->nodes;
  579. for(; np != NULL; np=np->link )
  580. {
  581. bool fl = (p->masterFl && cmIsFlag(np->flags,kRegNodeNetFl)) || (p->masterFl==false && cmIsFlag(np->flags,kRecvNodeNetFl));
  582. if( fl && np != p->localNode && np->state != kDoneStNetId && np->state != kErrorStNetId )
  583. {
  584. _cmRtNetSendNodeSync(p,np);
  585. activeCnt += 1;
  586. }
  587. }
  588. if( activeCnt == 0 )
  589. {
  590. p->syncModeFl = false;
  591. _cmRtNetRpt(p,"sync mode complete.\n");
  592. }
  593. return rc;
  594. }
  595. cmRtNetRC_t cmRtNetReceive( cmRtNetH_t h )
  596. {
  597. cmRtNetRC_t rc = kOkNetRC;
  598. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  599. if( cmUdpGetAvailData(p->udpH, NULL, NULL, NULL ) != kOkUdpRC )
  600. {
  601. cmErrMsg(&p->err,kUdpPortFailNetRC,"UDP port query failed.");
  602. goto errLabel;
  603. }
  604. errLabel:
  605. return rc;
  606. }
  607. bool cmRtNetIsSyncModeMsg( const void* data, unsigned dataByteCnt )
  608. {
  609. cmRtNetSyncMsg_t* m = (cmRtNetSyncMsg_t*)data;
  610. return dataByteCnt >= sizeof(cmRtNetSyncMsg_t) && m->hdr.selId == kNetSyncSelRtId;
  611. }
  612. unsigned cmRtNetEndPointIndex( cmRtNetH_t h, const cmChar_t* nodeLabel, const cmChar_t* endPtLabel )
  613. {
  614. //cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  615. return cmInvalidIdx;
  616. }
  617. cmRtNetRC_t cmRtNetSend( cmRtNetH_t h, unsigned endPointIndex, const void* msg, unsigned msgByteCnt )
  618. {
  619. cmRtNetRC_t rc = kOkNetRC;
  620. //cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  621. return rc;
  622. }
  623. void cmRtNetReport( cmRtNetH_t h )
  624. {
  625. cmRtNet_t* p = _cmRtNetHandleToPtr(h);
  626. cmRpt_t* rpt = p->err.rpt;
  627. cmRptPrintf(rpt,"Sync Mode:%s\n",p->syncModeFl ? "ON" : "OFF");
  628. cmRtNetNode_t* np = p->nodes;
  629. for(; np!=NULL; np=np->link)
  630. {
  631. cmRptPrintf(rpt,"Node: %s ",np->label);
  632. if( np->addr != NULL )
  633. cmRptPrintf(rpt,"%s ",np->addr );
  634. if( cmIsFlag(np->flags,kLocalNetFl) )
  635. cmRptPrintf(rpt,"LOCAL ");
  636. if( cmIsFlag(np->flags,kSockAddrNetFl) )
  637. cmRptPrintf(rpt,"%s ",cmStringNullGuard(cmUdpAddrToString(p->udpH,&np->sockaddr)));
  638. if( np->port != cmInvalidId )
  639. cmRptPrintf(rpt,"%i ",np->port );
  640. cmRptPrintf(rpt,"\n");
  641. cmRtNetEnd_t* ep = np->ends;
  642. for(; ep!=NULL; ep=ep->link)
  643. {
  644. cmRptPrintf(rpt," endpt: %i %s\n",ep->endPtId,cmStringNullGuard(ep->endPtLabel));
  645. }
  646. }
  647. }
  648. //==========================================================================
  649. #include "cmThread.h"
  650. typedef struct
  651. {
  652. cmThreadH_t thH;
  653. cmRtNetH_t netH;
  654. } _cmRtNetTest_t;
  655. void _cmRtNetTestRecv( void* cbArg, const char* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr )
  656. {
  657. _cmRtNetTest_t* p = (_cmRtNetTest_t*)cbArg;
  658. if( cmRtNetIsSyncModeMsg(data,dataByteCnt))
  659. cmRtNetSyncModeRecv(p->netH,data,dataByteCnt,fromAddr);
  660. }
  661. bool _cmRtNetTestThreadFunc(void* param)
  662. {
  663. _cmRtNetTest_t* p = (_cmRtNetTest_t*)param;
  664. if( cmRtNetIsValid(p->netH) )
  665. {
  666. if( cmRtNetIsInSyncMode(p->netH) )
  667. cmRtNetSyncModeSend(p->netH);
  668. cmRtNetReceive(p->netH);
  669. }
  670. cmSleepMs(40);
  671. return true;
  672. }
  673. void cmRtNetTest( cmCtx_t* ctx, bool mstrFl )
  674. {
  675. char c;
  676. _cmRtNetTest_t t;
  677. const cmChar_t* hostNameStr;
  678. cmUdpPort_t port = 5876;
  679. _cmRtNetTest_t* p = &t;
  680. cmRtNetRC_t rc = kOkNetRC;
  681. memset(&t,0,sizeof(t));
  682. if( cmThreadCreate(&p->thH,_cmRtNetTestThreadFunc,p,&ctx->rpt) != kOkThRC )
  683. goto errLabel;
  684. if((rc = cmRtNetAlloc(ctx,&p->netH,_cmRtNetTestRecv,p)) != kOkNetRC )
  685. goto errLabel;
  686. hostNameStr = cmRtNetLocalHostName(p->netH);
  687. if( hostNameStr == NULL )
  688. hostNameStr = "<no-host-name>";
  689. if((rc = cmRtNetCreateNode(p->netH, hostNameStr, NULL, port )) != kOkNetRC)
  690. goto errLabel;
  691. if( mstrFl )
  692. {
  693. if((rc = cmRtNetCreateNode(p->netH,"whirl", "192.168.15.109", port )) != kOkNetRC )
  694. goto errLabel;
  695. if((rc = cmRtNetRegisterEndPoint(p->netH,"thunk_ep0", 0 )) != kOkNetRC )
  696. goto errLabel;
  697. if(( rc = cmRtNetBeginSyncMode(p->netH)) != kOkNetRC )
  698. goto errLabel;
  699. }
  700. else
  701. {
  702. if((rc = cmRtNetRegisterEndPoint(p->netH,"whirl_ep0", 0 )) != kOkNetRC )
  703. goto errLabel;
  704. }
  705. if( cmThreadPause(p->thH,0) != kOkThRC )
  706. goto errLabel;
  707. cmRptPrintf(&ctx->rpt,"%s q=quit\n", mstrFl ? "Master: " : "Slave: ");
  708. while( (c=getchar()) != 'q' )
  709. {
  710. switch(c)
  711. {
  712. case 'r':
  713. cmRtNetReport(p->netH);
  714. break;
  715. }
  716. }
  717. errLabel:
  718. cmThreadDestroy(&p->thH);
  719. cmRtNetFree(&p->netH);
  720. return;
  721. }