libcm is a C development framework with an emphasis on audio signal processing applications.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cmDspNet.c 25KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. #include "cmPrefix.h"
  2. #include "cmGlobal.h"
  3. #include "cmFloatTypes.h"
  4. #include "cmRpt.h"
  5. #include "cmErr.h"
  6. #include "cmCtx.h"
  7. #include "cmMem.h"
  8. #include "cmMallocDebug.h"
  9. #include "cmLinkedHeap.h"
  10. #include "cmFileSys.h"
  11. #include "cmSymTbl.h"
  12. #include "cmJson.h"
  13. #include "cmPrefs.h"
  14. #include "cmDspValue.h"
  15. #include "cmMsgProtocol.h"
  16. #include "cmThread.h"
  17. #include "cmUdpPort.h"
  18. #include "cmUdpNet.h"
  19. #include "cmAudioSys.h"
  20. #include "cmProcObj.h"
  21. #include "cmDspCtx.h"
  22. #include "cmDspClass.h"
  23. #include "cmDspStore.h"
  24. #include "cmDspSys.h"
  25. #include "cmDspPreset.h"
  26. #include "cmDspNet.h"
  27. ///============================================================================================
  28. ///============================================================================================
  29. /*
  30. cmDspSysAlloc()
  31. - create the p->netNodeArray[]
  32. - create the p->thH network thread but leave it paused
  33. cmDspSysLoad()
  34. - _cmDspSysNetPreload() - set srcConnList to NULL
  35. - load the progam - this result in calls to _cmDspSysNetCreateSrcConn()
  36. which creates the p->srcConnList.
  37. - _cmDspSysNetSync() - just before exiting the load process go into sync mode:
  38. - initialize the p->netNodeArray[].
  39. - set the srcId of each record in the p->srcConnList.
  40. - start the sync thread
  41. while(1)
  42. {
  43. 1) send connection requests to dst machines
  44. 2) if( connFl = all src conn's have dst id's )
  45. send 'done' to all remote nodes
  46. 3) nodeFl = if have recv'd 'done' from all remote nodes
  47. 4) if connFl && nodeFl
  48. p->syncState = kSyncSuccessDspId;
  49. break;
  50. }
  51. Enable Audio:
  52. - Call reset on all instances
  53. cmDspSysUnload()
  54. - p->syncState = kSyncPreDspId
  55. cmDspSysFree()
  56. - delete p->netNodeArray[]
  57. - delete p->thH network thread
  58. //--------------------------------------------------------
  59. */
  60. ///============================================================================================
  61. // implemented in cmDspSys.c
  62. cmDspInst_t* _cmDspSysInstSymIdToPtr( cmDsp_t* p, unsigned instSymId );
  63. cmDsp_t* _cmDspHandleToPtr( cmDspSysH_t h );
  64. cmDspRC_t _cmDspSysNetSend( cmDsp_t* p, unsigned remoteNetNodeId, unsigned subSelId, unsigned srcId, unsigned dstId, const cmChar_t* errMsg )
  65. {
  66. cmDspRC_t rc = kOkDspRC;
  67. cmDspNetMsg_t m;
  68. memset(&m,0,sizeof(m));
  69. // we should never be sending to ourselves
  70. assert( remoteNetNodeId != cmUdpNetLocalNodeId(p->netH));
  71. // form the error msg
  72. m.asSubIdx = cmDspSys_AsSubIdx_Zero;
  73. m.selId = kNetSyncSelAsId;
  74. m.subSelId = subSelId;
  75. m.srcId = srcId;
  76. m.dstId = dstId;
  77. if( cmUdpNetSendById(p->netH, remoteNetNodeId, &m, sizeof(m) ) == kOkUnRC )
  78. {
  79. //usleep(p->sendWaitMs*1000);
  80. }
  81. else
  82. {
  83. rc = kNetFailDspRC;
  84. if( errMsg != NULL )
  85. cmErrMsg(&p->err,rc,errMsg);
  86. }
  87. return rc;
  88. }
  89. // set echoFl if the remote node should respond with it's own 'hello' msg
  90. cmDspRC_t _cmDspSysNetSendHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  91. {
  92. return _cmDspSysNetSend(p, remoteNetNodeId, kNetHelloSelAsId, echoFl ? 1 : 0, cmInvalidId, "A network send failed while sending 'hello'." );
  93. }
  94. cmDspRC_t _cmDspSysNetSendSyncError( cmDsp_t* p, cmDspRC_t errRc )
  95. {
  96. cmDspRC_t rc = kOkDspRC;
  97. unsigned i;
  98. // for each non-local node
  99. for(i=0; i<p->netNodeCnt; ++i)
  100. if( p->netNodeArray[i].localFl == false)
  101. {
  102. unsigned remoteNetNodeId = p->netNodeArray[i].id;
  103. cmDspRC_t rc0;
  104. // send the error code in the dstId
  105. if((rc0 = _cmDspSysNetSend(p, remoteNetNodeId, kNetErrSelAsId, cmUdpNetLocalNodeId(p->netH), errRc, "A network send failed while signaling an error." )) != kOkDspRC )
  106. rc = rc0;
  107. }
  108. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  109. rc = cmErrMsg(&p->err,kThreadFailDspRC,"An attempt to pause the sync. thread failed after signaling an error.");
  110. if( p->netVerbosity > 0 )
  111. cmRptPrintf(p->err.rpt,"Sync:Done - Fail\n");
  112. return rc;
  113. }
  114. bool _cmDspSysNetIsNodeAwake( cmDsp_t* p, unsigned netNodeId )
  115. {
  116. unsigned i;
  117. for(i=0; i<p->netNodeCnt; ++i)
  118. if( p->netNodeArray[i].id == netNodeId )
  119. return p->netNodeArray[i].helloFl;
  120. assert(0); // unknown net node id
  121. return false;
  122. }
  123. bool _cmDspSysNet_AreAllDstIdsResolved( cmDsp_t* p )
  124. {
  125. // check that we have received all dst id req's from each remote node
  126. unsigned i;
  127. for(i=0; i<p->netNodeCnt; ++i)
  128. if( p->netNodeArray[i].localFl == false && p->netNodeArray[i].reqDoneFl==false)
  129. return false;
  130. _cmDspSrcConn_t* rp = p->srcConnList;
  131. // for each src connection which does not yet have a destination id.
  132. for(; rp != NULL; rp = rp->link)
  133. if( rp->dstId == cmInvalidId )
  134. return false;
  135. return true;
  136. }
  137. // send connection requests to the specified remote node
  138. cmDspRC_t _cmDspSysNetSendConnRequests( cmDsp_t* p, unsigned dstNetNodeId )
  139. {
  140. cmDspRC_t rc = kOkDspRC;
  141. _cmDspSrcConn_t* rp = p->srcConnList;
  142. // for each src connection which does not yet have a destination id.
  143. for(; rp != NULL; rp = rp->link)
  144. {
  145. // if this src conn has not been assigned a dstId and it's source node is awake
  146. if( rp->dstId == cmInvalidId && rp->dstNetNodeId==dstNetNodeId && _cmDspSysNetIsNodeAwake(p,rp->dstNetNodeId) )
  147. {
  148. // calc the msg size
  149. unsigned sn0 = strlen(rp->dstInstLabel) + 1;
  150. unsigned sn1 = strlen(rp->dstVarLabel) + 1;
  151. unsigned byteCnt = sizeof(cmDspNetMsg_t) + sn0 + sn1;
  152. // create msg buffer
  153. char buf[ byteCnt ];
  154. memset(buf,0,byteCnt);
  155. // fill in the msg
  156. cmDspNetMsg_t* cp = (cmDspNetMsg_t*)buf;
  157. cp->asSubIdx = cmDspSys_AsSubIdx_Zero;
  158. cp->selId = kNetSyncSelAsId;
  159. cp->subSelId = kNetDstIdReqSelAsId;
  160. cp->srcId = rp->srcId;
  161. cp->dstId = cmInvalidId;
  162. char* dp = buf + sizeof(*cp);
  163. memcpy(dp,rp->dstInstLabel,sn0);
  164. dp += sn0;
  165. memcpy(dp,rp->dstVarLabel,sn1);
  166. dp += sn1;
  167. assert(dp == buf + byteCnt );
  168. // send the msg
  169. if( cmUdpNetSendById(p->netH, rp->dstNetNodeId, buf, byteCnt ) != kOkUnRC )
  170. {
  171. rc = cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while registering remote nodes.");
  172. goto errLabel;
  173. }
  174. if( p->netVerbosity > 1 )
  175. cmRptPrintf(p->err.rpt,"Sync: send req to %i\n",rp->dstNetNodeId);
  176. //usleep(p->sendWaitMs*1000); // wait between transmissions
  177. }
  178. }
  179. errLabel:
  180. if( rc != kOkDspRC )
  181. _cmDspSysNetSendSyncError(p,rc);
  182. return rc;
  183. }
  184. // Return true when the 'doneFl' on all nodes has been set.
  185. // The doneFl is cleared on the beginning of the sync. process.
  186. // The doneFl is set as each remote node signals that it has
  187. // all of the dstId's that it needs by sending kNetDoneSelAsId
  188. // messages.
  189. bool _cmDspSysNetCheckNetNodeStatus( cmDsp_t* p )
  190. {
  191. unsigned i;
  192. for(i=0; i<p->netNodeCnt; ++i)
  193. if( p->netNodeArray[i].doneFl == false )
  194. return false;
  195. return true;
  196. }
  197. // Send kNetDoneSelAsId msgs to all remote nodes to indicate that
  198. // this node has all of it's dstId's.
  199. cmDspRC_t _cmDspSysNetSendSyncDone( cmDsp_t* p )
  200. {
  201. cmDspRC_t rc = kOkDspRC;
  202. unsigned i;
  203. // broadcast the sync 'done' msg to each non-local node
  204. if( p->netDoneSentFl )
  205. return rc;
  206. for(i=0; i<p->netNodeCnt; ++i)
  207. if( p->netNodeArray[i].localFl == false )
  208. {
  209. if((rc = _cmDspSysNetSend(p, p->netNodeArray[i].id, kNetDoneSelAsId, cmInvalidId, cmInvalidId, "A network send failed while signaling sync. completion." )) != kOkDspRC )
  210. goto errLabel;
  211. }
  212. // create the src connection map
  213. _cmDspSrcConn_t* sp = p->srcConnList;
  214. if( sp != NULL && p->srcConnMapCnt == 0 )
  215. {
  216. if( p->netVerbosity > 0 )
  217. cmRptPrintf(p->err.rpt,"Sync:Creating src map\n");
  218. // get the count of src nodes
  219. unsigned n;
  220. for(n=0; sp != NULL; sp = sp->link )
  221. ++n;
  222. // allocate the srcConnMap
  223. p->srcConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspSrcConn_t*, p->srcConnMap, n );
  224. p->srcConnMapCnt = n;
  225. // load the srcConnMap
  226. for(sp = p->srcConnList; sp != NULL; sp = sp->link )
  227. {
  228. assert( sp->srcId < n );
  229. p->srcConnMap[ sp->srcId ] = sp;
  230. }
  231. }
  232. // create the dst connection map
  233. _cmDspDstConn_t* dp = p->dstConnList;
  234. if( dp != NULL && p->dstConnMapCnt == 0 )
  235. {
  236. if( p->netVerbosity > 0 )
  237. cmRptPrintf(p->err.rpt,"Sync:Creating dst map\n");
  238. unsigned n;
  239. // get the count of dst nodes
  240. for(n=0; dp != NULL; dp = dp->link )
  241. ++n;
  242. // allocate the dstConnMap
  243. p->dstConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspDstConn_t*, p->dstConnMap, n );
  244. p->dstConnMapCnt = n;
  245. // load the dstConnMap
  246. for(dp = p->dstConnList; dp != NULL; dp = dp->link )
  247. {
  248. assert( dp->dstId < n );
  249. p->dstConnMap[ dp->dstId ] = dp;
  250. }
  251. }
  252. p->netDoneSentFl = true;
  253. if( p->netVerbosity > 0 )
  254. cmRptPrintf(p->err.rpt,"Sync: Done Sent\n",i);
  255. errLabel:
  256. if( rc != kOkDspRC )
  257. _cmDspSysNetSendSyncError(p,rc);
  258. return rc;
  259. }
  260. // Sync thread callback function
  261. bool _cmDspSysNetSyncThreadCb( void* param )
  262. {
  263. cmDsp_t* p = (cmDsp_t*)param;
  264. bool connFl = true; //
  265. bool nodeFl = true;
  266. // receive a group of waiting messages from remote nodes
  267. // WARNING: calling cmUdpNetReceive() here means that the audio engine cannot be
  268. // enabled - otherwise this thread and the audio system thread will simultaneously
  269. // attempt to read the UDP port. This will result in unsafe thread conflicts.
  270. if( cmUdpNetReceive(p->netH, NULL ) != kOkUnRC )
  271. {
  272. cmErrMsg(&p->err,kNetFailDspRC,"UDP Net receive failed during sync. mode.");
  273. _cmDspSysNetSendSyncError(p,kNetFailDspRC);
  274. }
  275. // check if all the src connections have been assigned dst id's
  276. connFl = _cmDspSysNet_AreAllDstIdsResolved(p);
  277. // if all the src connections have dst id's then send a 'done' signal
  278. // to all other nodes so that they know this node is ready to leave
  279. // sync mode
  280. if( connFl )
  281. {
  282. if( _cmDspSysNetSendSyncDone(p) != kOkDspRC )
  283. goto errLabel;
  284. }
  285. // prevent the thread from burning too much time
  286. usleep(p->sendWaitMs*1000);
  287. // check if all nodes have completed transmission to this node
  288. nodeFl = _cmDspSysNetCheckNetNodeStatus(p);
  289. // if the connections have all been setup and all the net nodes have
  290. // received a 'done' signal
  291. if( connFl && nodeFl )
  292. {
  293. // mark the sync as complete
  294. p->syncState = kSyncSuccessDspId;
  295. // the sync. is done - pause this thread
  296. if( cmThreadPause( p->thH, kPauseThFl ) != kOkThRC )
  297. cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the sync. thread upon completion failed.");
  298. if( p->netVerbosity > 0 )
  299. cmRptPrintf(p->err.rpt,"Sync Done!\n");
  300. }
  301. errLabel:
  302. return true;
  303. }
  304. // During DSP network allocation and connection this function is called
  305. // to register remote instance/var targets.
  306. _cmDspSrcConn_t* _cmDspSysNetCreateSrcConn( cmDsp_t* p, unsigned dstNetNodeId, const cmChar_t* dstInstLabel, const cmChar_t* dstVarLabel )
  307. {
  308. if( dstNetNodeId == cmUdpNetLocalNodeId(p->netH) )
  309. {
  310. cmErrMsg(&p->err,kNetFailDspRC,"Cannot connect a network node (node:%s inst:%s label:%s)to itself.",cmStringNullGuard(cmUdpNetNodeIdToLabel(p->netH,dstNetNodeId)),cmStringNullGuard(dstInstLabel),cmStringNullGuard(dstVarLabel));
  311. return NULL;
  312. }
  313. // register the remote node
  314. _cmDspSrcConn_t* rp = cmLhAllocZ( p->ctx.lhH, _cmDspSrcConn_t, 1 );
  315. rp->dstNetNodeId = dstNetNodeId;
  316. rp->dstInstLabel = cmLhAllocStr( p->ctx.lhH, dstInstLabel );
  317. rp->dstVarLabel = cmLhAllocStr( p->ctx.lhH, dstVarLabel );
  318. rp->link = p->srcConnList;
  319. p->srcConnList = rp;
  320. if( p->netVerbosity > 1 )
  321. cmRptPrintf(p->err.rpt,"Sync: create src for %i\n", dstNetNodeId );
  322. return rp;
  323. }
  324. // This function is called in response to receiving a connection request on the
  325. // dst machine. It sends a dstId to match the srcId provided by the src machine.
  326. cmDspRC_t _cmDspSysNetSendDstConnId( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId, unsigned dstId )
  327. {
  328. cmDspRC_t rc = kOkDspRC;
  329. if((rc = _cmDspSysNetSend(p, srcNetNodeId, kNetDstIdSelAsId, srcId, dstId, "A network send failed while sending a dst. id." )) == kOkDspRC )
  330. {
  331. if( p->netVerbosity > 1 )
  332. cmRptPrintf(p->err.rpt,"Sync:Sent dst id to %i\n",srcNetNodeId);
  333. }
  334. return rc;
  335. }
  336. // Handle 'hello' messages
  337. cmDspRC_t _cmDspSysNetReceiveHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  338. {
  339. cmDspRC_t rc = kOkDspRC;
  340. unsigned i;
  341. for(i=0; i<p->netNodeCnt; ++i)
  342. if( p->netNodeArray[i].id == remoteNetNodeId )
  343. {
  344. p->netNodeArray[i].helloFl = true;
  345. // if echo was requested then respond ...
  346. if( echoFl )
  347. {
  348. // ... but the remote node should not respond - because we already know about it
  349. _cmDspSysNetSendHello(p,remoteNetNodeId,false);
  350. }
  351. if( p->netVerbosity > 0 )
  352. cmRptPrintf(p->err.rpt,"Sync:hello from %i\n",remoteNetNodeId);
  353. // send connection requests to the remote net node
  354. if((rc = _cmDspSysNetSendConnRequests(p, remoteNetNodeId )) == kOkDspRC )
  355. {
  356. // send 'req done' msg
  357. rc = _cmDspSysNetSend(p, remoteNetNodeId, kNetDstIdReqDoneAsId, 0, cmInvalidId, "A network send failed while sending 'dst req done'." );
  358. }
  359. return rc;
  360. }
  361. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'hello' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  362. }
  363. cmDspRC_t _cmDspSysNetReceiveReqDone( cmDsp_t* p, unsigned remoteNetNodeId )
  364. {
  365. unsigned i;
  366. for(i=0; i<p->netNodeCnt; ++i)
  367. if( p->netNodeArray[i].id == remoteNetNodeId )
  368. {
  369. p->netNodeArray[i].reqDoneFl = true;
  370. if( p->netVerbosity > 0 )
  371. cmRptPrintf(p->err.rpt,"Sync: Req Done from %i\n",remoteNetNodeId);
  372. return kOkDspRC;
  373. }
  374. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'req done' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  375. }
  376. // given a srcNetNodeId/srcId return the associated dst connection
  377. _cmDspDstConn_t* _cmDspSysNetSrcIdToDstConn( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId )
  378. {
  379. _cmDspDstConn_t* rp = p->dstConnList;
  380. for(; rp != NULL; rp = rp->link )
  381. if( rp->srcNetNodeId == srcNetNodeId && rp->srcId == srcId )
  382. return rp;
  383. return NULL;
  384. }
  385. // Handle kNetSyncSelAsId messages.
  386. // Called on the dst machine to receive a src connection request
  387. cmDspRC_t _cmDspSysNetReceiveSrcConnRequest( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned srcNetNodeId )
  388. {
  389. cmDspRC_t rc = kOkDspRC;
  390. _cmDspDstConn_t* rp;
  391. // if a dstId has not already been assigned to this src
  392. if((rp = _cmDspSysNetSrcIdToDstConn(p, srcNetNodeId, msg->srcId )) == NULL )
  393. {
  394. if( p->netVerbosity > 1 )
  395. cmRptPrintf(p->err.rpt,"Sync:Rcvd src conn request from %i\n",srcNetNodeId);
  396. cmDspInst_t* instPtr = NULL;
  397. const cmDspVar_t* varPtr;
  398. const cmChar_t* instLabel = (const cmChar_t*)(msg+1);
  399. const cmChar_t* varLabel = instLabel + strlen(instLabel) + 1;
  400. // get the symbols assoc'd with the dst inst/var
  401. unsigned instSymId = cmSymTblId(p->ctx.stH,instLabel);
  402. unsigned varSymId = cmSymTblId(p->ctx.stH,varLabel);
  403. assert( instSymId != cmInvalidId );
  404. // find the dst inst
  405. if((instPtr = _cmDspSysInstSymIdToPtr(p,instSymId)) == NULL )
  406. {
  407. rc = cmErrMsg(&p->err,kInstNotFoundDspRC,"The instance associated with '%s' was not found.",cmStringNullGuard(instLabel));
  408. goto errLabel;
  409. }
  410. // find dst var
  411. if((varPtr = cmDspVarSymbolToPtr(&p->ctx, instPtr, varSymId, kInDsvFl )) == NULL )
  412. {
  413. rc = cmErrMsg(&p->err,kVarNotFoundDspRC,"The variable '%s' on the instance '%s' was not found.",cmStringNullGuard(varLabel),cmStringNullGuard(instLabel));
  414. goto errLabel;
  415. }
  416. // register the new dst connection
  417. rp = cmLhAllocZ( p->ctx.lhH, _cmDspDstConn_t, 1 );
  418. rp->srcNetNodeId = srcNetNodeId;
  419. rp->srcId = msg->srcId;
  420. rp->dstId = p->nextDstId++;
  421. rp->dstInst = instPtr;
  422. rp->dstVarId = varPtr->constId;
  423. rp->link = p->dstConnList;
  424. p->dstConnList = rp;
  425. }
  426. assert( rp != NULL );
  427. // send the dstId associated with this connection back to the source
  428. if((rc = _cmDspSysNetSendDstConnId(p,srcNetNodeId,msg->srcId,rp->dstId)) != kOkDspRC )
  429. goto errLabel;
  430. errLabel:
  431. if( rc != kOkDspRC )
  432. _cmDspSysNetSendSyncError(p,rc);
  433. return rc;
  434. }
  435. cmDspRC_t _cmDspSysNetReceiveDstId(cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId)
  436. {
  437. cmDspRC_t rc;
  438. _cmDspSrcConn_t* sp = p->srcConnList;
  439. for(; sp != NULL; sp = sp->link )
  440. if( msg->srcId == sp->srcId )
  441. {
  442. sp->dstId = msg->dstId;
  443. return kOkDspRC;
  444. }
  445. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"The src id %i associated with the dst id %i could not be found.",msg->srcId,msg->dstId);
  446. _cmDspSysNetSendSyncError(p,rc);
  447. return rc;
  448. }
  449. // Handle kNetDoneSelAsId messages.
  450. cmDspRC_t _cmDspSysNetReceiveRemoteSyncDone( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  451. {
  452. cmDspRC_t rc = kOkDspRC;
  453. unsigned i;
  454. for(i=0; i<p->netNodeCnt; ++i)
  455. if( p->netNodeArray[i].id == remoteNetNodeId )
  456. {
  457. p->netNodeArray[i].doneFl = true;
  458. if( p->netVerbosity > 0 )
  459. cmRptPrintf(p->err.rpt,"Sync: Rcvd done from %i\n",remoteNetNodeId);
  460. break;
  461. }
  462. return rc;
  463. }
  464. // Handle kNetErrSelAsId messages
  465. cmDspRC_t _cmDspSysNetReceiveRemoteSyncError( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  466. {
  467. cmDspRC_t rc = kOkDspRC;
  468. if( p->netVerbosity > 0 )
  469. cmRptPrintf(p->err.rpt,"Sync: Rcvd error from %i\n",remoteNetNodeId);
  470. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  471. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the thread due to a remote error failed.");
  472. return rc;
  473. }
  474. // Verifify that a net node id is valid.
  475. cmDspRC_t _cmDspSysNetValidateNetNodeId( cmDsp_t* p, unsigned netNodeId )
  476. {
  477. cmDspRC_t rc = kOkDspRC;
  478. unsigned i;
  479. for(i=0; i<p->netNodeCnt; ++i)
  480. if( p->netNodeArray[i].id == netNodeId )
  481. break;
  482. if( i == p->netNodeCnt )
  483. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"A message arrived from a unknown net node (id=%i).",netNodeId);
  484. return rc;
  485. }
  486. ///============================================================================================
  487. // main hooks to the DSP system
  488. // called by cmDspSysInitialize()
  489. cmDspRC_t _cmDspSysNetAlloc( cmDsp_t* p )
  490. {
  491. if((p->netNodeCnt = cmUdpNetNodeCount(p->netH)) != 0)
  492. p->netNodeArray = cmMemAllocZ(_cmDspNetNode_t, p->netNodeCnt );
  493. if( cmThreadCreate(&p->thH,_cmDspSysNetSyncThreadCb,p, p->err.rpt) != kOkThRC )
  494. return cmErrMsg(&p->err,kThreadFailDspRC,"The network syncronization thread create failed.");
  495. p->sendWaitMs = 10;
  496. p->syncState = kSyncPreDspId;
  497. p->netVerbosity = 1;
  498. return kOkDspRC;
  499. }
  500. // called by cmDspSysFinalize()
  501. cmDspRC_t _cmDspSysNetFree( cmDsp_t* p )
  502. {
  503. cmMemFree(p->netNodeArray);
  504. if( cmThreadDestroy(&p->thH) != kOkThRC )
  505. cmErrMsg(&p->err,kThreadFailDspRC,"The network syncrhonization thread destroy failed.");
  506. return kOkDspRC;
  507. }
  508. // called by cmDspSysLoad()
  509. cmDspRC_t _cmDspSysNetPreLoad( cmDsp_t* p )
  510. {
  511. p->srcConnList = NULL;
  512. return kOkDspRC;
  513. }
  514. // called by cmDspSysUnload()
  515. cmDspRC_t _cmDspSysNetUnload( cmDsp_t* p )
  516. {
  517. if( cmUdpNetEnableListen(p->netH, false ) != kOkUnRC )
  518. return cmErrMsg(&p->err,kNetFailDspRC,"The network failed to exit 'listening' mode.");
  519. p->syncState = kSyncPreDspId;
  520. p->nextDstId = 0;
  521. p->dstConnList = NULL;
  522. p->srcConnMap = NULL;
  523. p->srcConnMapCnt = 0;
  524. p->dstConnMap = NULL;
  525. p->dstConnMapCnt = 0;
  526. p->netDoneSentFl = false;
  527. return kOkDspRC;
  528. }
  529. // Call this function to enter 'sync' mode.
  530. cmDspRC_t _cmDspSysNetSync( cmDsp_t* p )
  531. {
  532. cmDspRC_t rc = kOkDspRC;
  533. unsigned localNodeId = cmUdpNetLocalNodeId(p->netH);
  534. unsigned i;
  535. // if there is no network then there is nothing to do
  536. if( p->netNodeCnt == 0 )
  537. {
  538. p->syncState = kSyncSuccessDspId;
  539. return kOkDspRC;
  540. }
  541. p->syncState = kSyncPendingDspId;
  542. // be sure the sync thread is paused before continuing
  543. if( cmThreadPause(p->thH, kPauseThFl | kWaitThFl ) != kOkThRC )
  544. {
  545. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be paused prior to initialization.");
  546. goto errLabel;
  547. }
  548. // initialize the netNodeArry[]
  549. for(i=0; i<p->netNodeCnt; ++i)
  550. {
  551. p->netNodeArray[i].id = cmUdpNetNodeId(p->netH,i);
  552. p->netNodeArray[i].localFl = p->netNodeArray[i].id == localNodeId;
  553. p->netNodeArray[i].doneFl = p->netNodeArray[i].localFl;
  554. p->netNodeArray[i].helloFl = false;
  555. p->netNodeArray[i].reqDoneFl = false;
  556. p->netNodeArray[i].doneFl = false;
  557. assert( p->netNodeArray[i].id != cmInvalidId );
  558. }
  559. // initialize the src connection array - this array was created when the
  560. // DSP instance network was formed
  561. _cmDspSrcConn_t* rp = p->srcConnList;
  562. for(i=0; rp != NULL; rp = rp->link,++i)
  563. {
  564. rp->srcId = i;
  565. rp->dstId = cmInvalidId;
  566. }
  567. // clear the dst conn records
  568. _cmDspDstConn_t* dp = p->dstConnList;
  569. while( dp!=NULL)
  570. {
  571. _cmDspDstConn_t* np = dp->link;
  572. cmLhFree(p->ctx.lhH,dp);
  573. dp = np;
  574. }
  575. // clear the connection maps
  576. p->srcConnMapCnt = 0;
  577. p->dstConnMapCnt = 0;
  578. p->netDoneSentFl = false;
  579. // enter listening mode
  580. if( cmUdpNetEnableListen(p->netH, true ) != kOkUnRC )
  581. {
  582. rc = cmErrMsg(&p->err,kNetFailDspRC,"The network failed to go into 'listening' mode.");
  583. goto errLabel;
  584. }
  585. // start the sync process
  586. if( cmThreadPause(p->thH,0) != kOkThRC )
  587. {
  588. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be un-paused prior to begin synchronization.");
  589. goto errLabel;
  590. }
  591. // broadcast 'hello' to all remote listeners and request that they respond with their own 'hello'.
  592. for(i=0; i<p->netNodeCnt; ++i)
  593. if( p->netNodeArray[i].localFl == false )
  594. if((rc = _cmDspSysNetSendHello(p,p->netNodeArray[i].id,true)) != kOkDspRC )
  595. goto errLabel;
  596. if( p->netVerbosity > 0 )
  597. cmRptPrintf(p->err.rpt,"Sync:Entering sync loop\n");
  598. errLabel:
  599. if( rc != kOkDspRC )
  600. p->syncState = kSyncFailDspId;
  601. return rc;
  602. }
  603. cmDspRC_t _cmDspSysNetRecvEvent( cmDsp_t* p, const void* msgPtr, unsigned msgByteCnt )
  604. {
  605. cmDspNetMsg_t* m = (cmDspNetMsg_t*)msgPtr;
  606. cmRC_t rc = cmOkRC;
  607. //bool jsFl = false;
  608. // if the value associated with this msg is a mtx then set
  609. // its mtx data area pointer to just after the msg header.
  610. if( cmDsvIsJson(&m->value) )
  611. {
  612. //rc = cmDsvDeserializeJson(&m->value,p->jsH);
  613. assert(0);
  614. //jsFl = true;
  615. }
  616. else
  617. rc = cmDsvDeserializeInPlace(&m->value,msgByteCnt-sizeof(cmDspNetMsg_t));
  618. if( rc != kOkDsvRC )
  619. rc = cmErrMsg(&p->err,kInvalidStateDspRC,"Deserialize failed on network event message.");
  620. else
  621. {
  622. assert( m->dstId < p->dstConnMapCnt );
  623. // form the event
  624. cmDspEvt_t e;
  625. e.flags = 0;
  626. e.srcInstPtr = NULL;
  627. e.srcVarId = cmInvalidId;
  628. e.valuePtr = &m->value;
  629. e.dstVarId = p->dstConnMap[ m->dstId ]->dstVarId;
  630. e.dstDataPtr = NULL;
  631. cmDspInst_t* instPtr = p->dstConnMap[ m->dstId ]->dstInst;
  632. assert(instPtr != NULL );
  633. // send the event
  634. if( instPtr->recvFunc != NULL )
  635. rc = instPtr->recvFunc(&p->ctx,instPtr,&e);
  636. }
  637. //if( jsFl )
  638. // cmJsonClearTree(p->jsH);
  639. return rc;
  640. }
  641. // Called from cmAudDsp.c:_cmAdUdpNetCallback() to to send an incoming msg to the DSP system.
  642. cmDspRC_t _cmDspSysNetRecv( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned msgByteCnt, unsigned remoteNetNodeId )
  643. {
  644. assert( msg->selId == kNetSyncSelAsId );
  645. cmDspRC_t rc = kOkDspRC;
  646. switch( msg->subSelId )
  647. {
  648. case kNetHelloSelAsId:
  649. rc = _cmDspSysNetReceiveHello(p,remoteNetNodeId,msg->srcId!=0);
  650. break;
  651. case kNetDstIdReqSelAsId:
  652. rc = _cmDspSysNetReceiveSrcConnRequest(p,msg,remoteNetNodeId);
  653. break;
  654. case kNetDstIdReqDoneAsId:
  655. rc = _cmDspSysNetReceiveReqDone(p,remoteNetNodeId);
  656. break;
  657. case kNetDstIdSelAsId:
  658. rc = _cmDspSysNetReceiveDstId(p,msg,remoteNetNodeId);
  659. break;
  660. case kNetDoneSelAsId:
  661. rc = _cmDspSysNetReceiveRemoteSyncDone(p,msg,remoteNetNodeId);
  662. break;
  663. case kNetErrSelAsId:
  664. rc = _cmDspSysNetReceiveRemoteSyncError(p,msg,remoteNetNodeId);
  665. break;
  666. case kNetEvtSelAsId:
  667. rc = _cmDspSysNetRecvEvent(p, msg, msgByteCnt );
  668. break;
  669. default:
  670. cmErrMsg(&p->err,kNetFailDspRC,"Unexpected message type 'selId=%i' received by the network sync. message dispatcher.",msg->selId);
  671. break;
  672. }
  673. return rc;
  674. }
  675. cmDspRC_t _cmDspSysNetSendEvent( cmDspSysH_t h, unsigned dstNetNodeId, unsigned dstId, const cmDspEvt_t* evt )
  676. {
  677. cmDsp_t* p = _cmDspHandleToPtr(h);
  678. unsigned bufByteCnt = sizeof(cmDspNetMsg_t);
  679. unsigned dataByteCnt = 0;
  680. if( evt->valuePtr != NULL )
  681. dataByteCnt = cmDsvSerialDataByteCount(evt->valuePtr);
  682. bufByteCnt += dataByteCnt;
  683. char buf[ bufByteCnt ];
  684. cmDspNetMsg_t* hdr = (cmDspNetMsg_t*)buf;
  685. hdr->asSubIdx = cmDspSys_AsSubIdx_Zero;
  686. hdr->selId = kNetSyncSelAsId;
  687. hdr->subSelId = kNetEvtSelAsId;
  688. hdr->srcId = cmInvalidId;
  689. hdr->dstId = dstId;
  690. if( evt->valuePtr == NULL )
  691. cmDsvSetNull(&hdr->value);
  692. else
  693. {
  694. // this function relies on the 'ne->value' field being the last field in the 'hdr'.
  695. if( cmDsvSerialize( evt->valuePtr, &hdr->value, sizeof(cmDspValue_t) + dataByteCnt) != kOkDsvRC )
  696. return cmErrMsg(&p->err,kSerializeFailMsgRC,"An attempt to serialize a network event msg failed.");
  697. }
  698. if( cmUdpNetSendById(p->netH, dstNetNodeId, &buf, bufByteCnt ) != kOkUnRC )
  699. return cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while sending a DSP event message.");
  700. return kOkDspRC;
  701. }