libcm is a C development framework with an emphasis on audio signal processing applications.
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

cmDspNet.c 25KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. #include "cmPrefix.h"
  2. #include "cmGlobal.h"
  3. #include "cmFloatTypes.h"
  4. #include "cmRpt.h"
  5. #include "cmErr.h"
  6. #include "cmCtx.h"
  7. #include "cmMem.h"
  8. #include "cmMallocDebug.h"
  9. #include "cmLinkedHeap.h"
  10. #include "cmFileSys.h"
  11. #include "cmSymTbl.h"
  12. #include "cmJson.h"
  13. #include "cmPrefs.h"
  14. #include "cmDspValue.h"
  15. #include "cmMsgProtocol.h"
  16. #include "cmThread.h"
  17. #include "cmUdpPort.h"
  18. #include "cmUdpNet.h"
  19. #include "cmAudioSys.h"
  20. #include "cmProcObj.h"
  21. #include "cmDspCtx.h"
  22. #include "cmDspClass.h"
  23. #include "cmDspSys.h"
  24. #include "cmDspPreset.h"
  25. #include "cmDspNet.h"
  26. ///============================================================================================
  27. ///============================================================================================
  28. /*
  29. cmDspSysAlloc()
  30. - create the p->netNodeArray[]
  31. - create the p->thH network thread but leave it paused
  32. cmDspSysLoad()
  33. - _cmDspSysNetPreload() - set srcConnList to NULL
  34. - load the progam - this result in calls to _cmDspSysNetCreateSrcConn()
  35. which creates the p->srcConnList.
  36. - _cmDspSysNetSync() - just before exiting the load process go into sync mode:
  37. - initialize the p->netNodeArray[].
  38. - set the srcId of each record in the p->srcConnList.
  39. - start the sync thread
  40. while(1)
  41. {
  42. 1) send connection requests to dst machines
  43. 2) if( connFl = all src conn's have dst id's )
  44. send 'done' to all remote nodes
  45. 3) nodeFl = if have recv'd 'done' from all remote nodes
  46. 4) if connFl && nodeFl
  47. p->syncState = kSyncSuccessDspId;
  48. break;
  49. }
  50. Enable Audio:
  51. - Call reset on all instances
  52. cmDspSysUnload()
  53. - p->syncState = kSyncPreDspId
  54. cmDspSysFree()
  55. - delete p->netNodeArray[]
  56. - delete p->thH network thread
  57. //--------------------------------------------------------
  58. */
  59. ///============================================================================================
  60. // implemented in cmDspSys.c
  61. cmDspInst_t* _cmDspSysInstSymIdToPtr( cmDsp_t* p, unsigned instSymId );
  62. cmDsp_t* _cmDspHandleToPtr( cmDspSysH_t h );
  63. cmDspRC_t _cmDspSysNetSend( cmDsp_t* p, unsigned remoteNetNodeId, unsigned subSelId, unsigned srcId, unsigned dstId, const cmChar_t* errMsg )
  64. {
  65. cmDspRC_t rc = kOkDspRC;
  66. cmDspNetMsg_t m;
  67. memset(&m,0,sizeof(m));
  68. // we should never be sending to ourselves
  69. assert( remoteNetNodeId != cmUdpNetLocalNodeId(p->netH));
  70. // form the error msg
  71. m.asSubIdx = cmDspSys_AsSubIdx_Zero;
  72. m.selId = kNetSyncSelAsId;
  73. m.subSelId = subSelId;
  74. m.srcId = srcId;
  75. m.dstId = dstId;
  76. if( cmUdpNetSendById(p->netH, remoteNetNodeId, &m, sizeof(m) ) == kOkUnRC )
  77. {
  78. //usleep(p->sendWaitMs*1000);
  79. }
  80. else
  81. {
  82. rc = kNetFailDspRC;
  83. if( errMsg != NULL )
  84. cmErrMsg(&p->err,rc,errMsg);
  85. }
  86. return rc;
  87. }
  88. // set echoFl if the remote node should respond with it's own 'hello' msg
  89. cmDspRC_t _cmDspSysNetSendHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  90. {
  91. return _cmDspSysNetSend(p, remoteNetNodeId, kNetHelloSelAsId, echoFl ? 1 : 0, cmInvalidId, "A network send failed while sending 'hello'." );
  92. }
  93. cmDspRC_t _cmDspSysNetSendSyncError( cmDsp_t* p, cmDspRC_t errRc )
  94. {
  95. cmDspRC_t rc = kOkDspRC;
  96. unsigned i;
  97. // for each non-local node
  98. for(i=0; i<p->netNodeCnt; ++i)
  99. if( p->netNodeArray[i].localFl == false)
  100. {
  101. unsigned remoteNetNodeId = p->netNodeArray[i].id;
  102. cmDspRC_t rc0;
  103. // send the error code in the dstId
  104. if((rc0 = _cmDspSysNetSend(p, remoteNetNodeId, kNetErrSelAsId, cmUdpNetLocalNodeId(p->netH), errRc, "A network send failed while signaling an error." )) != kOkDspRC )
  105. rc = rc0;
  106. }
  107. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  108. rc = cmErrMsg(&p->err,kThreadFailDspRC,"An attempt to pause the sync. thread failed after signaling an error.");
  109. if( p->netVerbosity > 0 )
  110. cmRptPrintf(p->err.rpt,"Sync:Done - Fail\n");
  111. return rc;
  112. }
  113. bool _cmDspSysNetIsNodeAwake( cmDsp_t* p, unsigned netNodeId )
  114. {
  115. unsigned i;
  116. for(i=0; i<p->netNodeCnt; ++i)
  117. if( p->netNodeArray[i].id == netNodeId )
  118. return p->netNodeArray[i].helloFl;
  119. assert(0); // unknown net node id
  120. return false;
  121. }
  122. bool _cmDspSysNet_AreAllDstIdsResolved( cmDsp_t* p )
  123. {
  124. // check that we have received all dst id req's from each remote node
  125. unsigned i;
  126. for(i=0; i<p->netNodeCnt; ++i)
  127. if( p->netNodeArray[i].localFl == false && p->netNodeArray[i].reqDoneFl==false)
  128. return false;
  129. _cmDspSrcConn_t* rp = p->srcConnList;
  130. // for each src connection which does not yet have a destination id.
  131. for(; rp != NULL; rp = rp->link)
  132. if( rp->dstId == cmInvalidId )
  133. return false;
  134. return true;
  135. }
  136. // send connection requests to the specified remote node
  137. cmDspRC_t _cmDspSysNetSendConnRequests( cmDsp_t* p, unsigned dstNetNodeId )
  138. {
  139. cmDspRC_t rc = kOkDspRC;
  140. _cmDspSrcConn_t* rp = p->srcConnList;
  141. // for each src connection which does not yet have a destination id.
  142. for(; rp != NULL; rp = rp->link)
  143. {
  144. // if this src conn has not been assigned a dstId and it's source node is awake
  145. if( rp->dstId == cmInvalidId && rp->dstNetNodeId==dstNetNodeId && _cmDspSysNetIsNodeAwake(p,rp->dstNetNodeId) )
  146. {
  147. // calc the msg size
  148. unsigned sn0 = strlen(rp->dstInstLabel) + 1;
  149. unsigned sn1 = strlen(rp->dstVarLabel) + 1;
  150. unsigned byteCnt = sizeof(cmDspNetMsg_t) + sn0 + sn1;
  151. // create msg buffer
  152. char buf[ byteCnt ];
  153. memset(buf,0,byteCnt);
  154. // fill in the msg
  155. cmDspNetMsg_t* cp = (cmDspNetMsg_t*)buf;
  156. cp->asSubIdx = cmDspSys_AsSubIdx_Zero;
  157. cp->selId = kNetSyncSelAsId;
  158. cp->subSelId = kNetDstIdReqSelAsId;
  159. cp->srcId = rp->srcId;
  160. cp->dstId = cmInvalidId;
  161. char* dp = buf + sizeof(*cp);
  162. memcpy(dp,rp->dstInstLabel,sn0);
  163. dp += sn0;
  164. memcpy(dp,rp->dstVarLabel,sn1);
  165. dp += sn1;
  166. assert(dp == buf + byteCnt );
  167. // send the msg
  168. if( cmUdpNetSendById(p->netH, rp->dstNetNodeId, buf, byteCnt ) != kOkUnRC )
  169. {
  170. rc = cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while registering remote nodes.");
  171. goto errLabel;
  172. }
  173. if( p->netVerbosity > 1 )
  174. cmRptPrintf(p->err.rpt,"Sync: send req to %i\n",rp->dstNetNodeId);
  175. //usleep(p->sendWaitMs*1000); // wait between transmissions
  176. }
  177. }
  178. errLabel:
  179. if( rc != kOkDspRC )
  180. _cmDspSysNetSendSyncError(p,rc);
  181. return rc;
  182. }
  183. // Return true when the 'doneFl' on all nodes has been set.
  184. // The doneFl is cleared on the beginning of the sync. process.
  185. // The doneFl is set as each remote node signals that it has
  186. // all of the dstId's that it needs by sending kNetDoneSelAsId
  187. // messages.
  188. bool _cmDspSysNetCheckNetNodeStatus( cmDsp_t* p )
  189. {
  190. unsigned i;
  191. for(i=0; i<p->netNodeCnt; ++i)
  192. if( p->netNodeArray[i].doneFl == false )
  193. return false;
  194. return true;
  195. }
  196. // Send kNetDoneSelAsId msgs to all remote nodes to indicate that
  197. // this node has all of it's dstId's.
  198. cmDspRC_t _cmDspSysNetSendSyncDone( cmDsp_t* p )
  199. {
  200. cmDspRC_t rc = kOkDspRC;
  201. unsigned i;
  202. // broadcast the sync 'done' msg to each non-local node
  203. if( p->netDoneSentFl )
  204. return rc;
  205. for(i=0; i<p->netNodeCnt; ++i)
  206. if( p->netNodeArray[i].localFl == false )
  207. {
  208. if((rc = _cmDspSysNetSend(p, p->netNodeArray[i].id, kNetDoneSelAsId, cmInvalidId, cmInvalidId, "A network send failed while signaling sync. completion." )) != kOkDspRC )
  209. goto errLabel;
  210. }
  211. // create the src connection map
  212. _cmDspSrcConn_t* sp = p->srcConnList;
  213. if( sp != NULL && p->srcConnMapCnt == 0 )
  214. {
  215. if( p->netVerbosity > 0 )
  216. cmRptPrintf(p->err.rpt,"Sync:Creating src map\n");
  217. // get the count of src nodes
  218. unsigned n;
  219. for(n=0; sp != NULL; sp = sp->link )
  220. ++n;
  221. // allocate the srcConnMap
  222. p->srcConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspSrcConn_t*, p->srcConnMap, n );
  223. p->srcConnMapCnt = n;
  224. // load the srcConnMap
  225. for(sp = p->srcConnList; sp != NULL; sp = sp->link )
  226. {
  227. assert( sp->srcId < n );
  228. p->srcConnMap[ sp->srcId ] = sp;
  229. }
  230. }
  231. // create the dst connection map
  232. _cmDspDstConn_t* dp = p->dstConnList;
  233. if( dp != NULL && p->dstConnMapCnt == 0 )
  234. {
  235. if( p->netVerbosity > 0 )
  236. cmRptPrintf(p->err.rpt,"Sync:Creating dst map\n");
  237. unsigned n;
  238. // get the count of dst nodes
  239. for(n=0; dp != NULL; dp = dp->link )
  240. ++n;
  241. // allocate the dstConnMap
  242. p->dstConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspDstConn_t*, p->dstConnMap, n );
  243. p->dstConnMapCnt = n;
  244. // load the dstConnMap
  245. for(dp = p->dstConnList; dp != NULL; dp = dp->link )
  246. {
  247. assert( dp->dstId < n );
  248. p->dstConnMap[ dp->dstId ] = dp;
  249. }
  250. }
  251. p->netDoneSentFl = true;
  252. if( p->netVerbosity > 0 )
  253. cmRptPrintf(p->err.rpt,"Sync: Done Sent\n",i);
  254. errLabel:
  255. if( rc != kOkDspRC )
  256. _cmDspSysNetSendSyncError(p,rc);
  257. return rc;
  258. }
  259. // Sync thread callback function
  260. bool _cmDspSysNetSyncThreadCb( void* param )
  261. {
  262. cmDsp_t* p = (cmDsp_t*)param;
  263. bool connFl = true; //
  264. bool nodeFl = true;
  265. // receive a group of waiting messages from remote nodes
  266. // WARNING: calling cmUdpNetReceive() here means that the audio engine cannot be
  267. // enabled - otherwise this thread and the audio system thread will simultaneously
  268. // attempt to read the UDP port. This will result in unsafe thread conflicts.
  269. if( cmUdpNetReceive(p->netH, NULL ) != kOkUnRC )
  270. {
  271. cmErrMsg(&p->err,kNetFailDspRC,"UDP Net receive failed during sync. mode.");
  272. _cmDspSysNetSendSyncError(p,kNetFailDspRC);
  273. }
  274. // check if all the src connections have been assigned dst id's
  275. connFl = _cmDspSysNet_AreAllDstIdsResolved(p);
  276. // if all the src connections have dst id's then send a 'done' signal
  277. // to all other nodes so that they know this node is ready to leave
  278. // sync mode
  279. if( connFl )
  280. {
  281. if( _cmDspSysNetSendSyncDone(p) != kOkDspRC )
  282. goto errLabel;
  283. }
  284. // prevent the thread from burning too much time
  285. usleep(p->sendWaitMs*1000);
  286. // check if all nodes have completed transmission to this node
  287. nodeFl = _cmDspSysNetCheckNetNodeStatus(p);
  288. // if the connections have all been setup and all the net nodes have
  289. // received a 'done' signal
  290. if( connFl && nodeFl )
  291. {
  292. // mark the sync as complete
  293. p->syncState = kSyncSuccessDspId;
  294. // the sync. is done - pause this thread
  295. if( cmThreadPause( p->thH, kPauseThFl ) != kOkThRC )
  296. cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the sync. thread upon completion failed.");
  297. if( p->netVerbosity > 0 )
  298. cmRptPrintf(p->err.rpt,"Sync Done!\n");
  299. }
  300. errLabel:
  301. return true;
  302. }
  303. // During DSP network allocation and connection this function is called
  304. // to register remote instance/var targets.
  305. _cmDspSrcConn_t* _cmDspSysNetCreateSrcConn( cmDsp_t* p, unsigned dstNetNodeId, const cmChar_t* dstInstLabel, const cmChar_t* dstVarLabel )
  306. {
  307. if( dstNetNodeId == cmUdpNetLocalNodeId(p->netH) )
  308. {
  309. cmErrMsg(&p->err,kNetFailDspRC,"Cannot connect a network node (node:%s inst:%s label:%s)to itself.",cmStringNullGuard(cmUdpNetNodeIdToLabel(p->netH,dstNetNodeId)),cmStringNullGuard(dstInstLabel),cmStringNullGuard(dstVarLabel));
  310. return NULL;
  311. }
  312. // register the remote node
  313. _cmDspSrcConn_t* rp = cmLhAllocZ( p->ctx.lhH, _cmDspSrcConn_t, 1 );
  314. rp->dstNetNodeId = dstNetNodeId;
  315. rp->dstInstLabel = cmLhAllocStr( p->ctx.lhH, dstInstLabel );
  316. rp->dstVarLabel = cmLhAllocStr( p->ctx.lhH, dstVarLabel );
  317. rp->link = p->srcConnList;
  318. p->srcConnList = rp;
  319. if( p->netVerbosity > 1 )
  320. cmRptPrintf(p->err.rpt,"Sync: create src for %i\n", dstNetNodeId );
  321. return rp;
  322. }
  323. // This function is called in response to receiving a connection request on the
  324. // dst machine. It sends a dstId to match the srcId provided by the src machine.
  325. cmDspRC_t _cmDspSysNetSendDstConnId( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId, unsigned dstId )
  326. {
  327. cmDspRC_t rc = kOkDspRC;
  328. if((rc = _cmDspSysNetSend(p, srcNetNodeId, kNetDstIdSelAsId, srcId, dstId, "A network send failed while sending a dst. id." )) == kOkDspRC )
  329. {
  330. if( p->netVerbosity > 1 )
  331. cmRptPrintf(p->err.rpt,"Sync:Sent dst id to %i\n",srcNetNodeId);
  332. }
  333. return rc;
  334. }
  335. // Handle 'hello' messages
  336. cmDspRC_t _cmDspSysNetReceiveHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  337. {
  338. cmDspRC_t rc = kOkDspRC;
  339. unsigned i;
  340. for(i=0; i<p->netNodeCnt; ++i)
  341. if( p->netNodeArray[i].id == remoteNetNodeId )
  342. {
  343. p->netNodeArray[i].helloFl = true;
  344. // if echo was requested then respond ...
  345. if( echoFl )
  346. {
  347. // ... but the remote node should not respond - because we already know about it
  348. _cmDspSysNetSendHello(p,remoteNetNodeId,false);
  349. }
  350. if( p->netVerbosity > 0 )
  351. cmRptPrintf(p->err.rpt,"Sync:hello from %i\n",remoteNetNodeId);
  352. // send connection requests to the remote net node
  353. if((rc = _cmDspSysNetSendConnRequests(p, remoteNetNodeId )) == kOkDspRC )
  354. {
  355. // send 'req done' msg
  356. rc = _cmDspSysNetSend(p, remoteNetNodeId, kNetDstIdReqDoneAsId, 0, cmInvalidId, "A network send failed while sending 'dst req done'." );
  357. }
  358. return rc;
  359. }
  360. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'hello' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  361. }
  362. cmDspRC_t _cmDspSysNetReceiveReqDone( cmDsp_t* p, unsigned remoteNetNodeId )
  363. {
  364. unsigned i;
  365. for(i=0; i<p->netNodeCnt; ++i)
  366. if( p->netNodeArray[i].id == remoteNetNodeId )
  367. {
  368. p->netNodeArray[i].reqDoneFl = true;
  369. if( p->netVerbosity > 0 )
  370. cmRptPrintf(p->err.rpt,"Sync: Req Done from %i\n",remoteNetNodeId);
  371. return kOkDspRC;
  372. }
  373. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'req done' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  374. }
  375. // given a srcNetNodeId/srcId return the associated dst connection
  376. _cmDspDstConn_t* _cmDspSysNetSrcIdToDstConn( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId )
  377. {
  378. _cmDspDstConn_t* rp = p->dstConnList;
  379. for(; rp != NULL; rp = rp->link )
  380. if( rp->srcNetNodeId == srcNetNodeId && rp->srcId == srcId )
  381. return rp;
  382. return NULL;
  383. }
  384. // Handle kNetSyncSelAsId messages.
  385. // Called on the dst machine to receive a src connection request
  386. cmDspRC_t _cmDspSysNetReceiveSrcConnRequest( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned srcNetNodeId )
  387. {
  388. cmDspRC_t rc = kOkDspRC;
  389. _cmDspDstConn_t* rp;
  390. // if a dstId has not already been assigned to this src
  391. if((rp = _cmDspSysNetSrcIdToDstConn(p, srcNetNodeId, msg->srcId )) == NULL )
  392. {
  393. if( p->netVerbosity > 1 )
  394. cmRptPrintf(p->err.rpt,"Sync:Rcvd src conn request from %i\n",srcNetNodeId);
  395. cmDspInst_t* instPtr = NULL;
  396. const cmDspVar_t* varPtr;
  397. const cmChar_t* instLabel = (const cmChar_t*)(msg+1);
  398. const cmChar_t* varLabel = instLabel + strlen(instLabel) + 1;
  399. // get the symbols assoc'd with the dst inst/var
  400. unsigned instSymId = cmSymTblId(p->ctx.stH,instLabel);
  401. unsigned varSymId = cmSymTblId(p->ctx.stH,varLabel);
  402. assert( instSymId != cmInvalidId );
  403. // find the dst inst
  404. if((instPtr = _cmDspSysInstSymIdToPtr(p,instSymId)) == NULL )
  405. {
  406. rc = cmErrMsg(&p->err,kInstNotFoundDspRC,"The instance associated with '%s' was not found.",cmStringNullGuard(instLabel));
  407. goto errLabel;
  408. }
  409. // find dst var
  410. if((varPtr = cmDspVarSymbolToPtr(&p->ctx, instPtr, varSymId, kInDsvFl )) == NULL )
  411. {
  412. rc = cmErrMsg(&p->err,kVarNotFoundDspRC,"The variable '%s' on the instance '%s' was not found.",cmStringNullGuard(varLabel),cmStringNullGuard(instLabel));
  413. goto errLabel;
  414. }
  415. // register the new dst connection
  416. rp = cmLhAllocZ( p->ctx.lhH, _cmDspDstConn_t, 1 );
  417. rp->srcNetNodeId = srcNetNodeId;
  418. rp->srcId = msg->srcId;
  419. rp->dstId = p->nextDstId++;
  420. rp->dstInst = instPtr;
  421. rp->dstVarId = varPtr->constId;
  422. rp->link = p->dstConnList;
  423. p->dstConnList = rp;
  424. }
  425. assert( rp != NULL );
  426. // send the dstId associated with this connection back to the source
  427. if((rc = _cmDspSysNetSendDstConnId(p,srcNetNodeId,msg->srcId,rp->dstId)) != kOkDspRC )
  428. goto errLabel;
  429. errLabel:
  430. if( rc != kOkDspRC )
  431. _cmDspSysNetSendSyncError(p,rc);
  432. return rc;
  433. }
  434. cmDspRC_t _cmDspSysNetReceiveDstId(cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId)
  435. {
  436. cmDspRC_t rc;
  437. _cmDspSrcConn_t* sp = p->srcConnList;
  438. for(; sp != NULL; sp = sp->link )
  439. if( msg->srcId == sp->srcId )
  440. {
  441. sp->dstId = msg->dstId;
  442. return kOkDspRC;
  443. }
  444. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"The src id %i associated with the dst id %i could not be found.",msg->srcId,msg->dstId);
  445. _cmDspSysNetSendSyncError(p,rc);
  446. return rc;
  447. }
  448. // Handle kNetDoneSelAsId messages.
  449. cmDspRC_t _cmDspSysNetReceiveRemoteSyncDone( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  450. {
  451. cmDspRC_t rc = kOkDspRC;
  452. unsigned i;
  453. for(i=0; i<p->netNodeCnt; ++i)
  454. if( p->netNodeArray[i].id == remoteNetNodeId )
  455. {
  456. p->netNodeArray[i].doneFl = true;
  457. if( p->netVerbosity > 0 )
  458. cmRptPrintf(p->err.rpt,"Sync: Rcvd done from %i\n",remoteNetNodeId);
  459. break;
  460. }
  461. return rc;
  462. }
  463. // Handle kNetErrSelAsId messages
  464. cmDspRC_t _cmDspSysNetReceiveRemoteSyncError( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  465. {
  466. cmDspRC_t rc = kOkDspRC;
  467. if( p->netVerbosity > 0 )
  468. cmRptPrintf(p->err.rpt,"Sync: Rcvd error from %i\n",remoteNetNodeId);
  469. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  470. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the thread due to a remote error failed.");
  471. return rc;
  472. }
  473. // Verifify that a net node id is valid.
  474. cmDspRC_t _cmDspSysNetValidateNetNodeId( cmDsp_t* p, unsigned netNodeId )
  475. {
  476. cmDspRC_t rc = kOkDspRC;
  477. unsigned i;
  478. for(i=0; i<p->netNodeCnt; ++i)
  479. if( p->netNodeArray[i].id == netNodeId )
  480. break;
  481. if( i == p->netNodeCnt )
  482. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"A message arrived from a unknown net node (id=%i).",netNodeId);
  483. return rc;
  484. }
  485. ///============================================================================================
  486. // main hooks to the DSP system
  487. // called by cmDspSysInitialize()
  488. cmDspRC_t _cmDspSysNetAlloc( cmDsp_t* p )
  489. {
  490. if((p->netNodeCnt = cmUdpNetNodeCount(p->netH)) != 0)
  491. p->netNodeArray = cmMemAllocZ(_cmDspNetNode_t, p->netNodeCnt );
  492. if( cmThreadCreate(&p->thH,_cmDspSysNetSyncThreadCb,p, p->err.rpt) != kOkThRC )
  493. return cmErrMsg(&p->err,kThreadFailDspRC,"The network syncronization thread create failed.");
  494. p->sendWaitMs = 10;
  495. p->syncState = kSyncPreDspId;
  496. p->netVerbosity = 1;
  497. return kOkDspRC;
  498. }
  499. // called by cmDspSysFinalize()
  500. cmDspRC_t _cmDspSysNetFree( cmDsp_t* p )
  501. {
  502. cmMemFree(p->netNodeArray);
  503. if( cmThreadDestroy(&p->thH) != kOkThRC )
  504. cmErrMsg(&p->err,kThreadFailDspRC,"The network syncrhonization thread destroy failed.");
  505. return kOkDspRC;
  506. }
  507. // called by cmDspSysLoad()
  508. cmDspRC_t _cmDspSysNetPreLoad( cmDsp_t* p )
  509. {
  510. p->srcConnList = NULL;
  511. return kOkDspRC;
  512. }
  513. // called by cmDspSysUnload()
  514. cmDspRC_t _cmDspSysNetUnload( cmDsp_t* p )
  515. {
  516. if( cmUdpNetEnableListen(p->netH, false ) != kOkUnRC )
  517. return cmErrMsg(&p->err,kNetFailDspRC,"The network failed to exit 'listening' mode.");
  518. p->syncState = kSyncPreDspId;
  519. p->nextDstId = 0;
  520. p->dstConnList = NULL;
  521. p->srcConnMap = NULL;
  522. p->srcConnMapCnt = 0;
  523. p->dstConnMap = NULL;
  524. p->dstConnMapCnt = 0;
  525. p->netDoneSentFl = false;
  526. return kOkDspRC;
  527. }
  528. // Call this function to enter 'sync' mode.
  529. cmDspRC_t _cmDspSysNetSync( cmDsp_t* p )
  530. {
  531. cmDspRC_t rc = kOkDspRC;
  532. unsigned localNodeId = cmUdpNetLocalNodeId(p->netH);
  533. unsigned i;
  534. // if there is no network then there is nothing to do
  535. if( p->netNodeCnt == 0 )
  536. {
  537. p->syncState = kSyncSuccessDspId;
  538. return kOkDspRC;
  539. }
  540. p->syncState = kSyncPendingDspId;
  541. // be sure the sync thread is paused before continuing
  542. if( cmThreadPause(p->thH, kPauseThFl | kWaitThFl ) != kOkThRC )
  543. {
  544. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be paused prior to initialization.");
  545. goto errLabel;
  546. }
  547. // initialize the netNodeArry[]
  548. for(i=0; i<p->netNodeCnt; ++i)
  549. {
  550. p->netNodeArray[i].id = cmUdpNetNodeId(p->netH,i);
  551. p->netNodeArray[i].localFl = p->netNodeArray[i].id == localNodeId;
  552. p->netNodeArray[i].doneFl = p->netNodeArray[i].localFl;
  553. p->netNodeArray[i].helloFl = false;
  554. p->netNodeArray[i].reqDoneFl = false;
  555. p->netNodeArray[i].doneFl = false;
  556. assert( p->netNodeArray[i].id != cmInvalidId );
  557. }
  558. // initialize the src connection array - this array was created when the
  559. // DSP instance network was formed
  560. _cmDspSrcConn_t* rp = p->srcConnList;
  561. for(i=0; rp != NULL; rp = rp->link,++i)
  562. {
  563. rp->srcId = i;
  564. rp->dstId = cmInvalidId;
  565. }
  566. // clear the dst conn records
  567. _cmDspDstConn_t* dp = p->dstConnList;
  568. while( dp!=NULL)
  569. {
  570. _cmDspDstConn_t* np = dp->link;
  571. cmLhFree(p->ctx.lhH,dp);
  572. dp = np;
  573. }
  574. // clear the connection maps
  575. p->srcConnMapCnt = 0;
  576. p->dstConnMapCnt = 0;
  577. p->netDoneSentFl = false;
  578. // enter listening mode
  579. if( cmUdpNetEnableListen(p->netH, true ) != kOkUnRC )
  580. {
  581. rc = cmErrMsg(&p->err,kNetFailDspRC,"The network failed to go into 'listening' mode.");
  582. goto errLabel;
  583. }
  584. // start the sync process
  585. if( cmThreadPause(p->thH,0) != kOkThRC )
  586. {
  587. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be un-paused prior to begin synchronization.");
  588. goto errLabel;
  589. }
  590. // broadcast 'hello' to all remote listeners and request that they respond with their own 'hello'.
  591. for(i=0; i<p->netNodeCnt; ++i)
  592. if( p->netNodeArray[i].localFl == false )
  593. if((rc = _cmDspSysNetSendHello(p,p->netNodeArray[i].id,true)) != kOkDspRC )
  594. goto errLabel;
  595. if( p->netVerbosity > 0 )
  596. cmRptPrintf(p->err.rpt,"Sync:Entering sync loop\n");
  597. errLabel:
  598. if( rc != kOkDspRC )
  599. p->syncState = kSyncFailDspId;
  600. return rc;
  601. }
  602. cmDspRC_t _cmDspSysNetRecvEvent( cmDsp_t* p, const void* msgPtr, unsigned msgByteCnt )
  603. {
  604. cmDspNetMsg_t* m = (cmDspNetMsg_t*)msgPtr;
  605. cmRC_t rc = cmOkRC;
  606. //bool jsFl = false;
  607. // if the value associated with this msg is a mtx then set
  608. // its mtx data area pointer to just after the msg header.
  609. if( cmDsvIsJson(&m->value) )
  610. {
  611. //rc = cmDsvDeserializeJson(&m->value,p->jsH);
  612. assert(0);
  613. //jsFl = true;
  614. }
  615. else
  616. rc = cmDsvDeserializeInPlace(&m->value,msgByteCnt-sizeof(cmDspNetMsg_t));
  617. if( rc != kOkDsvRC )
  618. rc = cmErrMsg(&p->err,kInvalidStateDspRC,"Deserialize failed on network event message.");
  619. else
  620. {
  621. assert( m->dstId < p->dstConnMapCnt );
  622. // form the event
  623. cmDspEvt_t e;
  624. e.flags = 0;
  625. e.srcInstPtr = NULL;
  626. e.srcVarId = cmInvalidId;
  627. e.valuePtr = &m->value;
  628. e.dstVarId = p->dstConnMap[ m->dstId ]->dstVarId;
  629. e.dstDataPtr = NULL;
  630. cmDspInst_t* instPtr = p->dstConnMap[ m->dstId ]->dstInst;
  631. assert(instPtr != NULL );
  632. // send the event
  633. if( instPtr->recvFunc != NULL )
  634. rc = instPtr->recvFunc(&p->ctx,instPtr,&e);
  635. }
  636. //if( jsFl )
  637. // cmJsonClearTree(p->jsH);
  638. return rc;
  639. }
  640. // Called from cmAudDsp.c:_cmAdUdpNetCallback() to to send an incoming msg to the DSP system.
  641. cmDspRC_t _cmDspSysNetRecv( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned msgByteCnt, unsigned remoteNetNodeId )
  642. {
  643. assert( msg->selId == kNetSyncSelAsId );
  644. cmDspRC_t rc = kOkDspRC;
  645. switch( msg->subSelId )
  646. {
  647. case kNetHelloSelAsId:
  648. rc = _cmDspSysNetReceiveHello(p,remoteNetNodeId,msg->srcId!=0);
  649. break;
  650. case kNetDstIdReqSelAsId:
  651. rc = _cmDspSysNetReceiveSrcConnRequest(p,msg,remoteNetNodeId);
  652. break;
  653. case kNetDstIdReqDoneAsId:
  654. rc = _cmDspSysNetReceiveReqDone(p,remoteNetNodeId);
  655. break;
  656. case kNetDstIdSelAsId:
  657. rc = _cmDspSysNetReceiveDstId(p,msg,remoteNetNodeId);
  658. break;
  659. case kNetDoneSelAsId:
  660. rc = _cmDspSysNetReceiveRemoteSyncDone(p,msg,remoteNetNodeId);
  661. break;
  662. case kNetErrSelAsId:
  663. rc = _cmDspSysNetReceiveRemoteSyncError(p,msg,remoteNetNodeId);
  664. break;
  665. case kNetEvtSelAsId:
  666. rc = _cmDspSysNetRecvEvent(p, msg, msgByteCnt );
  667. break;
  668. default:
  669. cmErrMsg(&p->err,kNetFailDspRC,"Unexpected message type 'selId=%i' received by the network sync. message dispatcher.",msg->selId);
  670. break;
  671. }
  672. return rc;
  673. }
  674. cmDspRC_t _cmDspSysNetSendEvent( cmDspSysH_t h, unsigned dstNetNodeId, unsigned dstId, const cmDspEvt_t* evt )
  675. {
  676. cmDsp_t* p = _cmDspHandleToPtr(h);
  677. unsigned bufByteCnt = sizeof(cmDspNetMsg_t);
  678. unsigned dataByteCnt = 0;
  679. if( evt->valuePtr != NULL )
  680. dataByteCnt = cmDsvSerialDataByteCount(evt->valuePtr);
  681. bufByteCnt += dataByteCnt;
  682. char buf[ bufByteCnt ];
  683. cmDspNetMsg_t* hdr = (cmDspNetMsg_t*)buf;
  684. hdr->asSubIdx = cmDspSys_AsSubIdx_Zero;
  685. hdr->selId = kNetSyncSelAsId;
  686. hdr->subSelId = kNetEvtSelAsId;
  687. hdr->srcId = cmInvalidId;
  688. hdr->dstId = dstId;
  689. if( evt->valuePtr == NULL )
  690. cmDsvSetNull(&hdr->value);
  691. else
  692. {
  693. // this function relies on the 'ne->value' field being the last field in the 'hdr'.
  694. if( cmDsvSerialize( evt->valuePtr, &hdr->value, sizeof(cmDspValue_t) + dataByteCnt) != kOkDsvRC )
  695. return cmErrMsg(&p->err,kSerializeFailMsgRC,"An attempt to serialize a network event msg failed.");
  696. }
  697. if( cmUdpNetSendById(p->netH, dstNetNodeId, &buf, bufByteCnt ) != kOkUnRC )
  698. return cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while sending a DSP event message.");
  699. return kOkDspRC;
  700. }