libcm is a C development framework with an emphasis on audio signal processing applications.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

cmDspNet.c 25KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. #include "cmPrefix.h"
  2. #include "cmGlobal.h"
  3. #include "cmFloatTypes.h"
  4. #include "cmRpt.h"
  5. #include "cmErr.h"
  6. #include "cmCtx.h"
  7. #include "cmMem.h"
  8. #include "cmMallocDebug.h"
  9. #include "cmLinkedHeap.h"
  10. #include "cmFileSys.h"
  11. #include "cmSymTbl.h"
  12. #include "cmJson.h"
  13. #include "cmPrefs.h"
  14. #include "cmDspValue.h"
  15. #include "cmMsgProtocol.h"
  16. #include "cmThread.h"
  17. #include "cmUdpPort.h"
  18. #include "cmUdpNet.h"
  19. #include "cmAudioSys.h"
  20. #include "cmProcObj.h"
  21. #include "cmDspCtx.h"
  22. #include "cmDspClass.h"
  23. #include "cmDspStore.h"
  24. #include "cmDspSys.h"
  25. #include "cmDspPreset.h"
  26. #include "cmDspNet.h"
  27. ///============================================================================================
  28. ///============================================================================================
  29. /*
  30. cmDspSysAlloc()
  31. - create the p->netNodeArray[]
  32. - create the p->thH network thread but leave it paused
  33. cmDspSysLoad()
  34. - _cmDspSysNetPreload() - set srcConnList to NULL
  35. - load the progam - this result in calls to _cmDspSysNetCreateSrcConn()
  36. which creates the p->srcConnList.
  37. - _cmDspSysNetSync() - just before exiting the load process go into sync mode:
  38. - initialize the p->netNodeArray[].
  39. - set the srcId of each record in the p->srcConnList.
  40. - start the sync thread
  41. while(1)
  42. {
  43. 1) send connection requests to dst machines
  44. 2) if( connFl = all src conn's have dst id's )
  45. send 'done' to all remote nodes
  46. 3) nodeFl = if have recv'd 'done' from all remote nodes
  47. 4) if connFl && nodeFl
  48. p->syncState = kSyncSuccessDspId;
  49. break;
  50. }
  51. Enable Audio:
  52. - Call reset on all instances
  53. cmDspSysUnload()
  54. - p->syncState = kSyncPreDspId
  55. cmDspSysFree()
  56. - delete p->netNodeArray[]
  57. - delete p->thH network thread
  58. //--------------------------------------------------------
  59. */
  60. ///============================================================================================
  61. // implemented in cmDspSys.c
  62. cmDspInst_t* _cmDspSysInstSymIdToPtr( cmDsp_t* p, unsigned instSymId );
  63. cmDsp_t* _cmDspHandleToPtr( cmDspSysH_t h );
  64. cmDspRC_t _cmDspSysNetSend( cmDsp_t* p, unsigned remoteNetNodeId, unsigned subSelId, unsigned srcId, unsigned dstId, const cmChar_t* errMsg )
  65. {
  66. cmDspRC_t rc = kOkDspRC;
  67. cmDspNetMsg_t m;
  68. memset(&m,0,sizeof(m));
  69. // we should never be sending to ourselves
  70. assert( remoteNetNodeId != cmUdpNetLocalNodeId(p->netH));
  71. // form the error msg
  72. m.asSubIdx = cmDspSys_AsSubIdx_Zero;
  73. m.selId = kNetSyncSelAsId;
  74. m.subSelId = subSelId;
  75. m.srcId = srcId;
  76. m.dstId = dstId;
  77. if( cmUdpNetSendById(p->netH, remoteNetNodeId, &m, sizeof(m) ) == kOkUnRC )
  78. {
  79. //cmSleepUs(p->sendWaitMs*1000);
  80. }
  81. else
  82. {
  83. rc = kNetFailDspRC;
  84. if( errMsg != NULL )
  85. cmErrMsg(&p->err,rc,errMsg);
  86. }
  87. return rc;
  88. }
  89. // set echoFl if the remote node should respond with it's own 'hello' msg
  90. cmDspRC_t _cmDspSysNetSendHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  91. {
  92. return _cmDspSysNetSend(p, remoteNetNodeId, kNetHelloSelAsId, echoFl ? 1 : 0, cmInvalidId, "A network send failed while sending 'hello'." );
  93. }
  94. cmDspRC_t _cmDspSysNetSendSyncError( cmDsp_t* p, cmDspRC_t errRc )
  95. {
  96. cmDspRC_t rc = kOkDspRC;
  97. unsigned i;
  98. // for each non-local node
  99. for(i=0; i<p->netNodeCnt; ++i)
  100. if( p->netNodeArray[i].localFl == false)
  101. {
  102. unsigned remoteNetNodeId = p->netNodeArray[i].id;
  103. cmDspRC_t rc0;
  104. // send the error code in the dstId
  105. if((rc0 = _cmDspSysNetSend(p, remoteNetNodeId, kNetErrSelAsId, cmUdpNetLocalNodeId(p->netH), errRc, "A network send failed while signaling an error." )) != kOkDspRC )
  106. rc = rc0;
  107. }
  108. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  109. rc = cmErrMsg(&p->err,kThreadFailDspRC,"An attempt to pause the sync. thread failed after signaling an error.");
  110. if( p->netVerbosity > 0 )
  111. cmRptPrintf(p->err.rpt,"Sync:Done - Fail\n");
  112. return rc;
  113. }
  114. bool _cmDspSysNetIsNodeAwake( cmDsp_t* p, unsigned netNodeId )
  115. {
  116. unsigned i;
  117. for(i=0; i<p->netNodeCnt; ++i)
  118. if( p->netNodeArray[i].id == netNodeId )
  119. return p->netNodeArray[i].helloFl;
  120. assert(0); // unknown net node id
  121. return false;
  122. }
  123. bool _cmDspSysNet_AreAllDstIdsResolved( cmDsp_t* p )
  124. {
  125. // check that we have received all dst id req's from each remote node
  126. unsigned i;
  127. for(i=0; i<p->netNodeCnt; ++i)
  128. if( p->netNodeArray[i].localFl == false && p->netNodeArray[i].reqDoneFl==false)
  129. return false;
  130. _cmDspSrcConn_t* rp = p->srcConnList;
  131. // for each src connection which does not yet have a destination id.
  132. for(; rp != NULL; rp = rp->link)
  133. if( rp->dstId == cmInvalidId )
  134. return false;
  135. return true;
  136. }
  137. // send connection requests to the specified remote node
  138. cmDspRC_t _cmDspSysNetSendConnRequests( cmDsp_t* p, unsigned dstNetNodeId )
  139. {
  140. cmDspRC_t rc = kOkDspRC;
  141. _cmDspSrcConn_t* rp = p->srcConnList;
  142. // for each src connection which does not yet have a destination id.
  143. for(; rp != NULL; rp = rp->link)
  144. {
  145. // if this src conn has not been assigned a dstId and it's source node is awake
  146. if( rp->dstId == cmInvalidId && rp->dstNetNodeId==dstNetNodeId && _cmDspSysNetIsNodeAwake(p,rp->dstNetNodeId) )
  147. {
  148. // calc the msg size
  149. unsigned sn0 = strlen(rp->dstInstLabel) + 1;
  150. unsigned sn1 = strlen(rp->dstVarLabel) + 1;
  151. unsigned byteCnt = sizeof(cmDspNetMsg_t) + sn0 + sn1;
  152. // create msg buffer
  153. char buf[ byteCnt ];
  154. memset(buf,0,byteCnt);
  155. // fill in the msg
  156. cmDspNetMsg_t* cp = (cmDspNetMsg_t*)buf;
  157. cp->asSubIdx = cmDspSys_AsSubIdx_Zero;
  158. cp->selId = kNetSyncSelAsId;
  159. cp->subSelId = kNetDstIdReqSelAsId;
  160. cp->srcId = rp->srcId;
  161. cp->dstId = cmInvalidId;
  162. char* dp = buf + sizeof(*cp);
  163. memcpy(dp,rp->dstInstLabel,sn0);
  164. dp += sn0;
  165. memcpy(dp,rp->dstVarLabel,sn1);
  166. dp += sn1;
  167. assert(dp == buf + byteCnt );
  168. // send the msg
  169. if( cmUdpNetSendById(p->netH, rp->dstNetNodeId, buf, byteCnt ) != kOkUnRC )
  170. {
  171. rc = cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while registering remote nodes.");
  172. goto errLabel;
  173. }
  174. if( p->netVerbosity > 1 )
  175. cmRptPrintf(p->err.rpt,"Sync: send req to %i\n",rp->dstNetNodeId);
  176. //cmSleepUs(p->sendWaitMs*1000); // wait between transmissions
  177. }
  178. }
  179. errLabel:
  180. if( rc != kOkDspRC )
  181. _cmDspSysNetSendSyncError(p,rc);
  182. return rc;
  183. }
  184. // Return true when the 'doneFl' on all nodes has been set.
  185. // The doneFl is cleared on the beginning of the sync. process.
  186. // The doneFl is set as each remote node signals that it has
  187. // all of the dstId's that it needs by sending kNetDoneSelAsId
  188. // messages.
  189. bool _cmDspSysNetCheckNetNodeStatus( cmDsp_t* p )
  190. {
  191. unsigned i;
  192. for(i=0; i<p->netNodeCnt; ++i)
  193. if( p->netNodeArray[i].doneFl == false )
  194. return false;
  195. return true;
  196. }
  197. // Send kNetDoneSelAsId msgs to all remote nodes to indicate that
  198. // this node has all of it's dstId's.
  199. cmDspRC_t _cmDspSysNetSendSyncDone( cmDsp_t* p )
  200. {
  201. cmDspRC_t rc = kOkDspRC;
  202. unsigned i;
  203. // broadcast the sync 'done' msg to each non-local node
  204. if( p->netDoneSentFl )
  205. return rc;
  206. for(i=0; i<p->netNodeCnt; ++i)
  207. if( p->netNodeArray[i].localFl == false )
  208. {
  209. if((rc = _cmDspSysNetSend(p, p->netNodeArray[i].id, kNetDoneSelAsId, cmInvalidId, cmInvalidId, "A network send failed while signaling sync. completion." )) != kOkDspRC )
  210. goto errLabel;
  211. }
  212. // create the src connection map
  213. _cmDspSrcConn_t* sp = p->srcConnList;
  214. if( sp != NULL && p->srcConnMapCnt == 0 )
  215. {
  216. if( p->netVerbosity > 0 )
  217. cmRptPrintf(p->err.rpt,"Sync:Creating src map\n");
  218. // get the count of src nodes
  219. unsigned n;
  220. for(n=0; sp != NULL; sp = sp->link )
  221. ++n;
  222. // allocate the srcConnMap
  223. p->srcConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspSrcConn_t*, p->srcConnMap, n );
  224. p->srcConnMapCnt = n;
  225. // load the srcConnMap
  226. for(sp = p->srcConnList; sp != NULL; sp = sp->link )
  227. {
  228. assert( sp->srcId < n );
  229. p->srcConnMap[ sp->srcId ] = sp;
  230. }
  231. }
  232. // create the dst connection map
  233. _cmDspDstConn_t* dp = p->dstConnList;
  234. if( dp != NULL && p->dstConnMapCnt == 0 )
  235. {
  236. if( p->netVerbosity > 0 )
  237. cmRptPrintf(p->err.rpt,"Sync:Creating dst map\n");
  238. unsigned n;
  239. // get the count of dst nodes
  240. for(n=0; dp != NULL; dp = dp->link )
  241. ++n;
  242. // allocate the dstConnMap
  243. p->dstConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspDstConn_t*, p->dstConnMap, n );
  244. p->dstConnMapCnt = n;
  245. // load the dstConnMap
  246. for(dp = p->dstConnList; dp != NULL; dp = dp->link )
  247. {
  248. assert( dp->dstId < n );
  249. p->dstConnMap[ dp->dstId ] = dp;
  250. }
  251. }
  252. p->netDoneSentFl = true;
  253. if( p->netVerbosity > 0 )
  254. cmRptPrintf(p->err.rpt,"Sync: Done Sent\n",i);
  255. errLabel:
  256. if( rc != kOkDspRC )
  257. _cmDspSysNetSendSyncError(p,rc);
  258. return rc;
  259. }
  260. // Sync thread callback function
  261. bool _cmDspSysNetSyncThreadCb( void* param )
  262. {
  263. cmDsp_t* p = (cmDsp_t*)param;
  264. bool connFl = true; //
  265. bool nodeFl = true;
  266. // receive a group of waiting messages from remote nodes
  267. // WARNING: calling cmUdpNetReceive() here means that the audio engine cannot be
  268. // enabled - otherwise this thread and the audio system thread will simultaneously
  269. // attempt to read the UDP port. This will result in unsafe thread conflicts.
  270. if( cmUdpNetReceive(p->netH, NULL ) != kOkUnRC )
  271. {
  272. cmErrMsg(&p->err,kNetFailDspRC,"UDP Net receive failed during sync. mode.");
  273. _cmDspSysNetSendSyncError(p,kNetFailDspRC);
  274. }
  275. // check if all the src connections have been assigned dst id's
  276. connFl = _cmDspSysNet_AreAllDstIdsResolved(p);
  277. // if all the src connections have dst id's then send a 'done' signal
  278. // to all other nodes so that they know this node is ready to leave
  279. // sync mode
  280. if( connFl )
  281. {
  282. if( _cmDspSysNetSendSyncDone(p) != kOkDspRC )
  283. goto errLabel;
  284. }
  285. // prevent the thread from burning too much time
  286. cmSleepUs(p->sendWaitMs*1000);
  287. // check if all nodes have completed transmission to this node
  288. nodeFl = _cmDspSysNetCheckNetNodeStatus(p);
  289. // if the connections have all been setup and all the net nodes have
  290. // received a 'done' signal
  291. if( connFl && nodeFl )
  292. {
  293. // mark the sync as complete
  294. p->syncState = kSyncSuccessDspId;
  295. // the sync. is done - pause this thread
  296. if( cmThreadPause( p->thH, kPauseThFl ) != kOkThRC )
  297. cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the sync. thread upon completion failed.");
  298. if( p->netVerbosity > 0 )
  299. cmRptPrintf(p->err.rpt,"Sync Done!\n");
  300. }
  301. errLabel:
  302. return true;
  303. }
  304. // During DSP network allocation and connection this function is called
  305. // to register remote instance/var targets.
  306. _cmDspSrcConn_t* _cmDspSysNetCreateSrcConn( cmDsp_t* p, unsigned dstNetNodeId, const cmChar_t* dstInstLabel, const cmChar_t* dstVarLabel )
  307. {
  308. if( dstNetNodeId == cmUdpNetLocalNodeId(p->netH) )
  309. {
  310. cmErrMsg(&p->err,kNetFailDspRC,"Cannot connect a network node (node:%s inst:%s label:%s)to itself.",cmStringNullGuard(cmUdpNetNodeIdToLabel(p->netH,dstNetNodeId)),cmStringNullGuard(dstInstLabel),cmStringNullGuard(dstVarLabel));
  311. return NULL;
  312. }
  313. // register the remote node
  314. _cmDspSrcConn_t* rp = cmLhAllocZ( p->ctx.lhH, _cmDspSrcConn_t, 1 );
  315. rp->dstNetNodeId = dstNetNodeId;
  316. rp->dstInstLabel = cmLhAllocStr( p->ctx.lhH, dstInstLabel );
  317. rp->dstVarLabel = cmLhAllocStr( p->ctx.lhH, dstVarLabel );
  318. rp->link = p->srcConnList;
  319. p->srcConnList = rp;
  320. if( p->netVerbosity > 1 )
  321. cmRptPrintf(p->err.rpt,"Sync: create src for %i\n", dstNetNodeId );
  322. return rp;
  323. }
  324. // This function is called in response to receiving a connection request on the
  325. // dst machine. It sends a dstId to match the srcId provided by the src machine.
  326. cmDspRC_t _cmDspSysNetSendDstConnId( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId, unsigned dstId )
  327. {
  328. cmDspRC_t rc = kOkDspRC;
  329. if((rc = _cmDspSysNetSend(p, srcNetNodeId, kNetDstIdSelAsId, srcId, dstId, "A network send failed while sending a dst. id." )) == kOkDspRC )
  330. {
  331. if( p->netVerbosity > 1 )
  332. cmRptPrintf(p->err.rpt,"Sync:Sent dst id to %i\n",srcNetNodeId);
  333. }
  334. return rc;
  335. }
  336. // Handle 'hello' messages
  337. cmDspRC_t _cmDspSysNetReceiveHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  338. {
  339. cmDspRC_t rc = kOkDspRC;
  340. unsigned i;
  341. for(i=0; i<p->netNodeCnt; ++i)
  342. if( p->netNodeArray[i].id == remoteNetNodeId )
  343. {
  344. p->netNodeArray[i].helloFl = true;
  345. // if echo was requested then respond ...
  346. if( echoFl )
  347. {
  348. // ... but the remote node should not respond - because we already know about it
  349. _cmDspSysNetSendHello(p,remoteNetNodeId,false);
  350. }
  351. if( p->netVerbosity > 0 )
  352. cmRptPrintf(p->err.rpt,"Sync:hello from %i\n",remoteNetNodeId);
  353. // send connection requests to the remote net node
  354. if((rc = _cmDspSysNetSendConnRequests(p, remoteNetNodeId )) == kOkDspRC )
  355. {
  356. // send 'req done' msg
  357. rc = _cmDspSysNetSend(p, remoteNetNodeId, kNetDstIdReqDoneAsId, 0, cmInvalidId, "A network send failed while sending 'dst req done'." );
  358. }
  359. return rc;
  360. }
  361. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'hello' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  362. }
  363. cmDspRC_t _cmDspSysNetReceiveReqDone( cmDsp_t* p, unsigned remoteNetNodeId )
  364. {
  365. unsigned i;
  366. for(i=0; i<p->netNodeCnt; ++i)
  367. if( p->netNodeArray[i].id == remoteNetNodeId )
  368. {
  369. p->netNodeArray[i].reqDoneFl = true;
  370. if( p->netVerbosity > 0 )
  371. cmRptPrintf(p->err.rpt,"Sync: Req Done from %i\n",remoteNetNodeId);
  372. return kOkDspRC;
  373. }
  374. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'req done' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  375. }
  376. // given a srcNetNodeId/srcId return the associated dst connection
  377. _cmDspDstConn_t* _cmDspSysNetSrcIdToDstConn( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId )
  378. {
  379. _cmDspDstConn_t* rp = p->dstConnList;
  380. for(; rp != NULL; rp = rp->link )
  381. if( rp->srcNetNodeId == srcNetNodeId && rp->srcId == srcId )
  382. return rp;
  383. return NULL;
  384. }
  385. // Handle kNetSyncSelAsId messages.
  386. // Called on the dst machine to receive a src connection request
  387. cmDspRC_t _cmDspSysNetReceiveSrcConnRequest( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned srcNetNodeId )
  388. {
  389. cmDspRC_t rc = kOkDspRC;
  390. _cmDspDstConn_t* rp;
  391. // if a dstId has not already been assigned to this src
  392. if((rp = _cmDspSysNetSrcIdToDstConn(p, srcNetNodeId, msg->srcId )) == NULL )
  393. {
  394. if( p->netVerbosity > 1 )
  395. cmRptPrintf(p->err.rpt,"Sync:Rcvd src conn request from %i\n",srcNetNodeId);
  396. cmDspInst_t* instPtr = NULL;
  397. const cmDspVar_t* varPtr;
  398. const cmChar_t* instLabel = (const cmChar_t*)(msg+1);
  399. const cmChar_t* varLabel = instLabel + strlen(instLabel) + 1;
  400. // get the symbols assoc'd with the dst inst/var
  401. unsigned instSymId = cmSymTblId(p->ctx.stH,instLabel);
  402. unsigned varSymId = cmSymTblId(p->ctx.stH,varLabel);
  403. assert( instSymId != cmInvalidId );
  404. // find the dst inst
  405. if((instPtr = _cmDspSysInstSymIdToPtr(p,instSymId)) == NULL )
  406. {
  407. rc = cmErrMsg(&p->err,kInstNotFoundDspRC,"The instance associated with '%s' was not found.",cmStringNullGuard(instLabel));
  408. goto errLabel;
  409. }
  410. // find dst var
  411. if((varPtr = cmDspVarSymbolToPtr(&p->ctx, instPtr, varSymId, kInDsvFl )) == NULL )
  412. {
  413. rc = cmErrMsg(&p->err,kVarNotFoundDspRC,"The variable '%s' on the instance '%s' was not found.",cmStringNullGuard(varLabel),cmStringNullGuard(instLabel));
  414. goto errLabel;
  415. }
  416. // register the new dst connection
  417. rp = cmLhAllocZ( p->ctx.lhH, _cmDspDstConn_t, 1 );
  418. rp->srcNetNodeId = srcNetNodeId;
  419. rp->srcId = msg->srcId;
  420. rp->dstId = p->nextDstId++;
  421. rp->dstInst = instPtr;
  422. rp->dstVarId = varPtr->constId;
  423. rp->link = p->dstConnList;
  424. p->dstConnList = rp;
  425. }
  426. assert( rp != NULL );
  427. // send the dstId associated with this connection back to the source
  428. if((rc = _cmDspSysNetSendDstConnId(p,srcNetNodeId,msg->srcId,rp->dstId)) != kOkDspRC )
  429. goto errLabel;
  430. errLabel:
  431. if( rc != kOkDspRC )
  432. _cmDspSysNetSendSyncError(p,rc);
  433. return rc;
  434. }
  435. cmDspRC_t _cmDspSysNetReceiveDstId(cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId)
  436. {
  437. cmDspRC_t rc;
  438. _cmDspSrcConn_t* sp = p->srcConnList;
  439. for(; sp != NULL; sp = sp->link )
  440. if( msg->srcId == sp->srcId )
  441. {
  442. sp->dstId = msg->dstId;
  443. return kOkDspRC;
  444. }
  445. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"The src id %i associated with the dst id %i could not be found.",msg->srcId,msg->dstId);
  446. _cmDspSysNetSendSyncError(p,rc);
  447. return rc;
  448. }
  449. // Handle kNetDoneSelAsId messages.
  450. cmDspRC_t _cmDspSysNetReceiveRemoteSyncDone( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  451. {
  452. cmDspRC_t rc = kOkDspRC;
  453. unsigned i;
  454. for(i=0; i<p->netNodeCnt; ++i)
  455. if( p->netNodeArray[i].id == remoteNetNodeId )
  456. {
  457. p->netNodeArray[i].doneFl = true;
  458. if( p->netVerbosity > 0 )
  459. cmRptPrintf(p->err.rpt,"Sync: Rcvd done from %i\n",remoteNetNodeId);
  460. break;
  461. }
  462. return rc;
  463. }
  464. // Handle kNetErrSelAsId messages
  465. cmDspRC_t _cmDspSysNetReceiveRemoteSyncError( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  466. {
  467. cmDspRC_t rc = kOkDspRC;
  468. if( p->netVerbosity > 0 )
  469. cmRptPrintf(p->err.rpt,"Sync: Rcvd error from %i\n",remoteNetNodeId);
  470. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  471. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the thread due to a remote error failed.");
  472. return rc;
  473. }
  474. // Verifify that a net node id is valid.
  475. cmDspRC_t _cmDspSysNetValidateNetNodeId( cmDsp_t* p, unsigned netNodeId )
  476. {
  477. cmDspRC_t rc = kOkDspRC;
  478. unsigned i;
  479. for(i=0; i<p->netNodeCnt; ++i)
  480. if( p->netNodeArray[i].id == netNodeId )
  481. break;
  482. if( i == p->netNodeCnt )
  483. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"A message arrived from a unknown net node (id=%i).",netNodeId);
  484. return rc;
  485. }
  486. ///============================================================================================
  487. // main hooks to the DSP system
  488. // called by cmDspSysInitialize()
  489. cmDspRC_t _cmDspSysNetAlloc( cmDsp_t* p )
  490. {
  491. if((p->netNodeCnt = cmUdpNetNodeCount(p->netH)) != 0)
  492. p->netNodeArray = cmMemAllocZ(_cmDspNetNode_t, p->netNodeCnt );
  493. if( cmThreadCreate(&p->thH,_cmDspSysNetSyncThreadCb,p, p->err.rpt) != kOkThRC )
  494. return cmErrMsg(&p->err,kThreadFailDspRC,"The network syncronization thread create failed.");
  495. p->sendWaitMs = 10;
  496. p->syncState = kSyncPreDspId;
  497. p->netVerbosity = 1;
  498. return kOkDspRC;
  499. }
  500. // called by cmDspSysFinalize()
  501. cmDspRC_t _cmDspSysNetFree( cmDsp_t* p )
  502. {
  503. cmMemFree(p->netNodeArray);
  504. if( cmThreadDestroy(&p->thH) != kOkThRC )
  505. cmErrMsg(&p->err,kThreadFailDspRC,"The network syncrhonization thread destroy failed.");
  506. return kOkDspRC;
  507. }
  508. // called by cmDspSysLoad()
  509. cmDspRC_t _cmDspSysNetPreLoad( cmDsp_t* p )
  510. {
  511. p->srcConnList = NULL;
  512. return kOkDspRC;
  513. }
  514. // called by cmDspSysUnload()
  515. cmDspRC_t _cmDspSysNetUnload( cmDsp_t* p )
  516. {
  517. if( cmUdpNetEnableListen(p->netH, false ) != kOkUnRC )
  518. return cmErrMsg(&p->err,kNetFailDspRC,"The network failed to exit 'listening' mode.");
  519. p->syncState = kSyncPreDspId;
  520. p->nextDstId = 0;
  521. p->dstConnList = NULL;
  522. p->srcConnMap = NULL;
  523. p->srcConnMapCnt = 0;
  524. p->dstConnMap = NULL;
  525. p->dstConnMapCnt = 0;
  526. p->netDoneSentFl = false;
  527. return kOkDspRC;
  528. }
  529. // Call this function to enter 'sync' mode.
  530. cmDspRC_t _cmDspSysNetSync( cmDsp_t* p )
  531. {
  532. cmDspRC_t rc = kOkDspRC;
  533. unsigned localNodeId = cmUdpNetLocalNodeId(p->netH);
  534. unsigned i;
  535. // if there is no network then there is nothing to do
  536. if( p->netNodeCnt == 0 )
  537. {
  538. p->syncState = kSyncSuccessDspId;
  539. return kOkDspRC;
  540. }
  541. p->syncState = kSyncPendingDspId;
  542. // be sure the sync thread is paused before continuing
  543. if( cmThreadPause(p->thH, kPauseThFl | kWaitThFl ) != kOkThRC )
  544. {
  545. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be paused prior to initialization.");
  546. goto errLabel;
  547. }
  548. // initialize the netNodeArry[]
  549. for(i=0; i<p->netNodeCnt; ++i)
  550. {
  551. p->netNodeArray[i].id = cmUdpNetNodeId(p->netH,i);
  552. p->netNodeArray[i].localFl = p->netNodeArray[i].id == localNodeId;
  553. p->netNodeArray[i].doneFl = p->netNodeArray[i].localFl;
  554. p->netNodeArray[i].helloFl = false;
  555. p->netNodeArray[i].reqDoneFl = false;
  556. p->netNodeArray[i].doneFl = false;
  557. assert( p->netNodeArray[i].id != cmInvalidId );
  558. }
  559. // initialize the src connection array - this array was created when the
  560. // DSP instance network was formed
  561. _cmDspSrcConn_t* rp = p->srcConnList;
  562. for(i=0; rp != NULL; rp = rp->link,++i)
  563. {
  564. rp->srcId = i;
  565. rp->dstId = cmInvalidId;
  566. }
  567. // clear the dst conn records
  568. _cmDspDstConn_t* dp = p->dstConnList;
  569. while( dp!=NULL)
  570. {
  571. _cmDspDstConn_t* np = dp->link;
  572. cmLhFree(p->ctx.lhH,dp);
  573. dp = np;
  574. }
  575. // clear the connection maps
  576. p->srcConnMapCnt = 0;
  577. p->dstConnMapCnt = 0;
  578. p->netDoneSentFl = false;
  579. // enter listening mode
  580. if( cmUdpNetEnableListen(p->netH, true ) != kOkUnRC )
  581. {
  582. rc = cmErrMsg(&p->err,kNetFailDspRC,"The network failed to go into 'listening' mode.");
  583. goto errLabel;
  584. }
  585. // start the sync process
  586. if( cmThreadPause(p->thH,0) != kOkThRC )
  587. {
  588. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be un-paused prior to begin synchronization.");
  589. goto errLabel;
  590. }
  591. // broadcast 'hello' to all remote listeners and request that they respond with their own 'hello'.
  592. for(i=0; i<p->netNodeCnt; ++i)
  593. if( p->netNodeArray[i].localFl == false )
  594. if((rc = _cmDspSysNetSendHello(p,p->netNodeArray[i].id,true)) != kOkDspRC )
  595. goto errLabel;
  596. if( p->netVerbosity > 0 )
  597. cmRptPrintf(p->err.rpt,"Sync:Entering sync loop\n");
  598. errLabel:
  599. if( rc != kOkDspRC )
  600. p->syncState = kSyncFailDspId;
  601. return rc;
  602. }
  603. cmDspRC_t _cmDspSysNetRecvEvent( cmDsp_t* p, const void* msgPtr, unsigned msgByteCnt )
  604. {
  605. cmDspNetMsg_t* m = (cmDspNetMsg_t*)msgPtr;
  606. cmRC_t rc = cmOkRC;
  607. //bool jsFl = false;
  608. // if the value associated with this msg is a mtx then set
  609. // its mtx data area pointer to just after the msg header.
  610. if( cmDsvIsJson(&m->value) )
  611. {
  612. //rc = cmDsvDeserializeJson(&m->value,p->jsH);
  613. assert(0);
  614. //jsFl = true;
  615. }
  616. else
  617. rc = cmDsvDeserializeInPlace(&m->value,msgByteCnt-sizeof(cmDspNetMsg_t));
  618. if( rc != kOkDsvRC )
  619. rc = cmErrMsg(&p->err,kInvalidStateDspRC,"Deserialize failed on network event message.");
  620. else
  621. {
  622. assert( m->dstId < p->dstConnMapCnt );
  623. // form the event
  624. cmDspEvt_t e;
  625. e.flags = 0;
  626. e.srcInstPtr = NULL;
  627. e.srcVarId = cmInvalidId;
  628. e.valuePtr = &m->value;
  629. e.dstVarId = p->dstConnMap[ m->dstId ]->dstVarId;
  630. e.dstDataPtr = NULL;
  631. cmDspInst_t* instPtr = p->dstConnMap[ m->dstId ]->dstInst;
  632. assert(instPtr != NULL );
  633. // send the event
  634. if( instPtr->recvFunc != NULL )
  635. rc = instPtr->recvFunc(&p->ctx,instPtr,&e);
  636. }
  637. //if( jsFl )
  638. // cmJsonClearTree(p->jsH);
  639. return rc;
  640. }
  641. // Called from cmAudDsp.c:_cmAdUdpNetCallback() to to send an incoming msg to the DSP system.
  642. cmDspRC_t _cmDspSysNetRecv( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned msgByteCnt, unsigned remoteNetNodeId )
  643. {
  644. assert( msg->selId == kNetSyncSelAsId );
  645. cmDspRC_t rc = kOkDspRC;
  646. switch( msg->subSelId )
  647. {
  648. case kNetHelloSelAsId:
  649. rc = _cmDspSysNetReceiveHello(p,remoteNetNodeId,msg->srcId!=0);
  650. break;
  651. case kNetDstIdReqSelAsId:
  652. rc = _cmDspSysNetReceiveSrcConnRequest(p,msg,remoteNetNodeId);
  653. break;
  654. case kNetDstIdReqDoneAsId:
  655. rc = _cmDspSysNetReceiveReqDone(p,remoteNetNodeId);
  656. break;
  657. case kNetDstIdSelAsId:
  658. rc = _cmDspSysNetReceiveDstId(p,msg,remoteNetNodeId);
  659. break;
  660. case kNetDoneSelAsId:
  661. rc = _cmDspSysNetReceiveRemoteSyncDone(p,msg,remoteNetNodeId);
  662. break;
  663. case kNetErrSelAsId:
  664. rc = _cmDspSysNetReceiveRemoteSyncError(p,msg,remoteNetNodeId);
  665. break;
  666. case kNetEvtSelAsId:
  667. rc = _cmDspSysNetRecvEvent(p, msg, msgByteCnt );
  668. break;
  669. default:
  670. cmErrMsg(&p->err,kNetFailDspRC,"Unexpected message type 'selId=%i' received by the network sync. message dispatcher.",msg->selId);
  671. break;
  672. }
  673. return rc;
  674. }
  675. cmDspRC_t _cmDspSysNetSendEvent( cmDspSysH_t h, unsigned dstNetNodeId, unsigned dstId, const cmDspEvt_t* evt )
  676. {
  677. cmDsp_t* p = _cmDspHandleToPtr(h);
  678. unsigned bufByteCnt = sizeof(cmDspNetMsg_t);
  679. unsigned dataByteCnt = 0;
  680. if( evt->valuePtr != NULL )
  681. dataByteCnt = cmDsvSerialDataByteCount(evt->valuePtr);
  682. bufByteCnt += dataByteCnt;
  683. char buf[ bufByteCnt ];
  684. cmDspNetMsg_t* hdr = (cmDspNetMsg_t*)buf;
  685. hdr->asSubIdx = cmDspSys_AsSubIdx_Zero;
  686. hdr->selId = kNetSyncSelAsId;
  687. hdr->subSelId = kNetEvtSelAsId;
  688. hdr->srcId = cmInvalidId;
  689. hdr->dstId = dstId;
  690. if( evt->valuePtr == NULL )
  691. cmDsvSetNull(&hdr->value);
  692. else
  693. {
  694. // this function relies on the 'ne->value' field being the last field in the 'hdr'.
  695. if( cmDsvSerialize( evt->valuePtr, &hdr->value, sizeof(cmDspValue_t) + dataByteCnt) != kOkDsvRC )
  696. return cmErrMsg(&p->err,kSerializeFailMsgRC,"An attempt to serialize a network event msg failed.");
  697. }
  698. if( cmUdpNetSendById(p->netH, dstNetNodeId, &buf, bufByteCnt ) != kOkUnRC )
  699. return cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while sending a DSP event message.");
  700. return kOkDspRC;
  701. }