libcm is a C development framework with an emphasis on audio signal processing applications.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

cmDspNet.c 25KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. #include "cmPrefix.h"
  2. #include "cmGlobal.h"
  3. #include "cmFloatTypes.h"
  4. #include "cmRpt.h"
  5. #include "cmErr.h"
  6. #include "cmCtx.h"
  7. #include "cmMem.h"
  8. #include "cmMallocDebug.h"
  9. #include "cmLinkedHeap.h"
  10. #include "cmFileSys.h"
  11. #include "cmSymTbl.h"
  12. #include "cmJson.h"
  13. #include "cmPrefs.h"
  14. #include "cmDspValue.h"
  15. #include "cmMsgProtocol.h"
  16. #include "cmThread.h"
  17. #include "cmUdpPort.h"
  18. #include "cmUdpNet.h"
  19. #include "cmSerialPort.h"
  20. #include "cmTime.h"
  21. #include "cmAudioSys.h"
  22. #include "cmProcObj.h"
  23. #include "cmDspCtx.h"
  24. #include "cmDspClass.h"
  25. #include "cmDspStore.h"
  26. #include "cmDspSys.h"
  27. #include "cmDspPreset.h"
  28. #include "cmDspNet.h"
  29. ///============================================================================================
  30. ///============================================================================================
  31. /*
  32. cmDspSysAlloc()
  33. - create the p->netNodeArray[]
  34. - create the p->thH network thread but leave it paused
  35. cmDspSysLoad()
  36. - _cmDspSysNetPreload() - set srcConnList to NULL
  37. - load the progam - this result in calls to _cmDspSysNetCreateSrcConn()
  38. which creates the p->srcConnList.
  39. - _cmDspSysNetSync() - just before exiting the load process go into sync mode:
  40. - initialize the p->netNodeArray[].
  41. - set the srcId of each record in the p->srcConnList.
  42. - start the sync thread
  43. while(1)
  44. {
  45. 1) send connection requests to dst machines
  46. 2) if( connFl = all src conn's have dst id's )
  47. send 'done' to all remote nodes
  48. 3) nodeFl = if have recv'd 'done' from all remote nodes
  49. 4) if connFl && nodeFl
  50. p->syncState = kSyncSuccessDspId;
  51. break;
  52. }
  53. Enable Audio:
  54. - Call reset on all instances
  55. cmDspSysUnload()
  56. - p->syncState = kSyncPreDspId
  57. cmDspSysFree()
  58. - delete p->netNodeArray[]
  59. - delete p->thH network thread
  60. //--------------------------------------------------------
  61. */
  62. ///============================================================================================
  63. // implemented in cmDspSys.c
  64. cmDspInst_t* _cmDspSysInstSymIdToPtr( cmDsp_t* p, unsigned instSymId );
  65. cmDsp_t* _cmDspHandleToPtr( cmDspSysH_t h );
  66. cmDspRC_t _cmDspSysNetSend( cmDsp_t* p, unsigned remoteNetNodeId, unsigned subSelId, unsigned srcId, unsigned dstId, const cmChar_t* errMsg )
  67. {
  68. cmDspRC_t rc = kOkDspRC;
  69. cmDspNetMsg_t m;
  70. memset(&m,0,sizeof(m));
  71. // we should never be sending to ourselves
  72. assert( remoteNetNodeId != cmUdpNetLocalNodeId(p->netH));
  73. // form the error msg
  74. m.asSubIdx = cmDspSys_AsSubIdx_Zero;
  75. m.selId = kNetSyncSelAsId;
  76. m.subSelId = subSelId;
  77. m.srcId = srcId;
  78. m.dstId = dstId;
  79. if( cmUdpNetSendById(p->netH, remoteNetNodeId, &m, sizeof(m) ) == kOkUnRC )
  80. {
  81. //cmSleepUs(p->sendWaitMs*1000);
  82. }
  83. else
  84. {
  85. rc = kNetFailDspRC;
  86. if( errMsg != NULL )
  87. cmErrMsg(&p->err,rc,errMsg);
  88. }
  89. return rc;
  90. }
  91. // set echoFl if the remote node should respond with it's own 'hello' msg
  92. cmDspRC_t _cmDspSysNetSendHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  93. {
  94. return _cmDspSysNetSend(p, remoteNetNodeId, kNetHelloSelAsId, echoFl ? 1 : 0, cmInvalidId, "A network send failed while sending 'hello'." );
  95. }
  96. cmDspRC_t _cmDspSysNetSendSyncError( cmDsp_t* p, cmDspRC_t errRc )
  97. {
  98. cmDspRC_t rc = kOkDspRC;
  99. unsigned i;
  100. // for each non-local node
  101. for(i=0; i<p->netNodeCnt; ++i)
  102. if( p->netNodeArray[i].localFl == false)
  103. {
  104. unsigned remoteNetNodeId = p->netNodeArray[i].id;
  105. cmDspRC_t rc0;
  106. // send the error code in the dstId
  107. if((rc0 = _cmDspSysNetSend(p, remoteNetNodeId, kNetErrSelAsId, cmUdpNetLocalNodeId(p->netH), errRc, "A network send failed while signaling an error." )) != kOkDspRC )
  108. rc = rc0;
  109. }
  110. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  111. rc = cmErrMsg(&p->err,kThreadFailDspRC,"An attempt to pause the sync. thread failed after signaling an error.");
  112. if( p->netVerbosity > 0 )
  113. cmRptPrintf(p->err.rpt,"Sync:Done - Fail\n");
  114. return rc;
  115. }
  116. bool _cmDspSysNetIsNodeAwake( cmDsp_t* p, unsigned netNodeId )
  117. {
  118. unsigned i;
  119. for(i=0; i<p->netNodeCnt; ++i)
  120. if( p->netNodeArray[i].id == netNodeId )
  121. return p->netNodeArray[i].helloFl;
  122. assert(0); // unknown net node id
  123. return false;
  124. }
  125. bool _cmDspSysNet_AreAllDstIdsResolved( cmDsp_t* p )
  126. {
  127. // check that we have received all dst id req's from each remote node
  128. unsigned i;
  129. for(i=0; i<p->netNodeCnt; ++i)
  130. if( p->netNodeArray[i].localFl == false && p->netNodeArray[i].reqDoneFl==false)
  131. return false;
  132. _cmDspSrcConn_t* rp = p->srcConnList;
  133. // for each src connection which does not yet have a destination id.
  134. for(; rp != NULL; rp = rp->link)
  135. if( rp->dstId == cmInvalidId )
  136. return false;
  137. return true;
  138. }
  139. // send connection requests to the specified remote node
  140. cmDspRC_t _cmDspSysNetSendConnRequests( cmDsp_t* p, unsigned dstNetNodeId )
  141. {
  142. cmDspRC_t rc = kOkDspRC;
  143. _cmDspSrcConn_t* rp = p->srcConnList;
  144. // for each src connection which does not yet have a destination id.
  145. for(; rp != NULL; rp = rp->link)
  146. {
  147. // if this src conn has not been assigned a dstId and it's source node is awake
  148. if( rp->dstId == cmInvalidId && rp->dstNetNodeId==dstNetNodeId && _cmDspSysNetIsNodeAwake(p,rp->dstNetNodeId) )
  149. {
  150. // calc the msg size
  151. unsigned sn0 = strlen(rp->dstInstLabel) + 1;
  152. unsigned sn1 = strlen(rp->dstVarLabel) + 1;
  153. unsigned byteCnt = sizeof(cmDspNetMsg_t) + sn0 + sn1;
  154. // create msg buffer
  155. char buf[ byteCnt ];
  156. memset(buf,0,byteCnt);
  157. // fill in the msg
  158. cmDspNetMsg_t* cp = (cmDspNetMsg_t*)buf;
  159. cp->asSubIdx = cmDspSys_AsSubIdx_Zero;
  160. cp->selId = kNetSyncSelAsId;
  161. cp->subSelId = kNetDstIdReqSelAsId;
  162. cp->srcId = rp->srcId;
  163. cp->dstId = cmInvalidId;
  164. char* dp = buf + sizeof(*cp);
  165. memcpy(dp,rp->dstInstLabel,sn0);
  166. dp += sn0;
  167. memcpy(dp,rp->dstVarLabel,sn1);
  168. dp += sn1;
  169. assert(dp == buf + byteCnt );
  170. // send the msg
  171. if( cmUdpNetSendById(p->netH, rp->dstNetNodeId, buf, byteCnt ) != kOkUnRC )
  172. {
  173. rc = cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while registering remote nodes.");
  174. goto errLabel;
  175. }
  176. if( p->netVerbosity > 1 )
  177. cmRptPrintf(p->err.rpt,"Sync: send req to %i\n",rp->dstNetNodeId);
  178. //cmSleepUs(p->sendWaitMs*1000); // wait between transmissions
  179. }
  180. }
  181. errLabel:
  182. if( rc != kOkDspRC )
  183. _cmDspSysNetSendSyncError(p,rc);
  184. return rc;
  185. }
  186. // Return true when the 'doneFl' on all nodes has been set.
  187. // The doneFl is cleared on the beginning of the sync. process.
  188. // The doneFl is set as each remote node signals that it has
  189. // all of the dstId's that it needs by sending kNetDoneSelAsId
  190. // messages.
  191. bool _cmDspSysNetCheckNetNodeStatus( cmDsp_t* p )
  192. {
  193. unsigned i;
  194. for(i=0; i<p->netNodeCnt; ++i)
  195. if( p->netNodeArray[i].doneFl == false )
  196. return false;
  197. return true;
  198. }
  199. // Send kNetDoneSelAsId msgs to all remote nodes to indicate that
  200. // this node has all of it's dstId's.
  201. cmDspRC_t _cmDspSysNetSendSyncDone( cmDsp_t* p )
  202. {
  203. cmDspRC_t rc = kOkDspRC;
  204. unsigned i;
  205. // broadcast the sync 'done' msg to each non-local node
  206. if( p->netDoneSentFl )
  207. return rc;
  208. for(i=0; i<p->netNodeCnt; ++i)
  209. if( p->netNodeArray[i].localFl == false )
  210. {
  211. if((rc = _cmDspSysNetSend(p, p->netNodeArray[i].id, kNetDoneSelAsId, cmInvalidId, cmInvalidId, "A network send failed while signaling sync. completion." )) != kOkDspRC )
  212. goto errLabel;
  213. }
  214. // create the src connection map
  215. _cmDspSrcConn_t* sp = p->srcConnList;
  216. if( sp != NULL && p->srcConnMapCnt == 0 )
  217. {
  218. if( p->netVerbosity > 0 )
  219. cmRptPrintf(p->err.rpt,"Sync:Creating src map\n");
  220. // get the count of src nodes
  221. unsigned n;
  222. for(n=0; sp != NULL; sp = sp->link )
  223. ++n;
  224. // allocate the srcConnMap
  225. p->srcConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspSrcConn_t*, p->srcConnMap, n );
  226. p->srcConnMapCnt = n;
  227. // load the srcConnMap
  228. for(sp = p->srcConnList; sp != NULL; sp = sp->link )
  229. {
  230. assert( sp->srcId < n );
  231. p->srcConnMap[ sp->srcId ] = sp;
  232. }
  233. }
  234. // create the dst connection map
  235. _cmDspDstConn_t* dp = p->dstConnList;
  236. if( dp != NULL && p->dstConnMapCnt == 0 )
  237. {
  238. if( p->netVerbosity > 0 )
  239. cmRptPrintf(p->err.rpt,"Sync:Creating dst map\n");
  240. unsigned n;
  241. // get the count of dst nodes
  242. for(n=0; dp != NULL; dp = dp->link )
  243. ++n;
  244. // allocate the dstConnMap
  245. p->dstConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspDstConn_t*, p->dstConnMap, n );
  246. p->dstConnMapCnt = n;
  247. // load the dstConnMap
  248. for(dp = p->dstConnList; dp != NULL; dp = dp->link )
  249. {
  250. assert( dp->dstId < n );
  251. p->dstConnMap[ dp->dstId ] = dp;
  252. }
  253. }
  254. p->netDoneSentFl = true;
  255. if( p->netVerbosity > 0 )
  256. cmRptPrintf(p->err.rpt,"Sync: Done Sent\n",i);
  257. errLabel:
  258. if( rc != kOkDspRC )
  259. _cmDspSysNetSendSyncError(p,rc);
  260. return rc;
  261. }
  262. // Sync thread callback function
  263. bool _cmDspSysNetSyncThreadCb( void* param )
  264. {
  265. cmDsp_t* p = (cmDsp_t*)param;
  266. bool connFl = true; //
  267. bool nodeFl = true;
  268. // receive a group of waiting messages from remote nodes
  269. // WARNING: calling cmUdpNetReceive() here means that the audio engine cannot be
  270. // enabled - otherwise this thread and the audio system thread will simultaneously
  271. // attempt to read the UDP port. This will result in unsafe thread conflicts.
  272. if( cmUdpNetReceive(p->netH, NULL ) != kOkUnRC )
  273. {
  274. cmErrMsg(&p->err,kNetFailDspRC,"UDP Net receive failed during sync. mode.");
  275. _cmDspSysNetSendSyncError(p,kNetFailDspRC);
  276. }
  277. // check if all the src connections have been assigned dst id's
  278. connFl = _cmDspSysNet_AreAllDstIdsResolved(p);
  279. // if all the src connections have dst id's then send a 'done' signal
  280. // to all other nodes so that they know this node is ready to leave
  281. // sync mode
  282. if( connFl )
  283. {
  284. if( _cmDspSysNetSendSyncDone(p) != kOkDspRC )
  285. goto errLabel;
  286. }
  287. // prevent the thread from burning too much time
  288. cmSleepUs(p->sendWaitMs*1000);
  289. // check if all nodes have completed transmission to this node
  290. nodeFl = _cmDspSysNetCheckNetNodeStatus(p);
  291. // if the connections have all been setup and all the net nodes have
  292. // received a 'done' signal
  293. if( connFl && nodeFl )
  294. {
  295. // mark the sync as complete
  296. p->syncState = kSyncSuccessDspId;
  297. // the sync. is done - pause this thread
  298. if( cmThreadPause( p->thH, kPauseThFl ) != kOkThRC )
  299. cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the sync. thread upon completion failed.");
  300. if( p->netVerbosity > 0 )
  301. cmRptPrintf(p->err.rpt,"Sync Done!\n");
  302. }
  303. errLabel:
  304. return true;
  305. }
  306. // During DSP network allocation and connection this function is called
  307. // to register remote instance/var targets.
  308. _cmDspSrcConn_t* _cmDspSysNetCreateSrcConn( cmDsp_t* p, unsigned dstNetNodeId, const cmChar_t* dstInstLabel, const cmChar_t* dstVarLabel )
  309. {
  310. if( dstNetNodeId == cmUdpNetLocalNodeId(p->netH) )
  311. {
  312. cmErrMsg(&p->err,kNetFailDspRC,"Cannot connect a network node (node:%s inst:%s label:%s)to itself.",cmStringNullGuard(cmUdpNetNodeIdToLabel(p->netH,dstNetNodeId)),cmStringNullGuard(dstInstLabel),cmStringNullGuard(dstVarLabel));
  313. return NULL;
  314. }
  315. // register the remote node
  316. _cmDspSrcConn_t* rp = cmLhAllocZ( p->ctx.lhH, _cmDspSrcConn_t, 1 );
  317. rp->dstNetNodeId = dstNetNodeId;
  318. rp->dstInstLabel = cmLhAllocStr( p->ctx.lhH, dstInstLabel );
  319. rp->dstVarLabel = cmLhAllocStr( p->ctx.lhH, dstVarLabel );
  320. rp->link = p->srcConnList;
  321. p->srcConnList = rp;
  322. if( p->netVerbosity > 1 )
  323. cmRptPrintf(p->err.rpt,"Sync: create src for %i\n", dstNetNodeId );
  324. return rp;
  325. }
  326. // This function is called in response to receiving a connection request on the
  327. // dst machine. It sends a dstId to match the srcId provided by the src machine.
  328. cmDspRC_t _cmDspSysNetSendDstConnId( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId, unsigned dstId )
  329. {
  330. cmDspRC_t rc = kOkDspRC;
  331. if((rc = _cmDspSysNetSend(p, srcNetNodeId, kNetDstIdSelAsId, srcId, dstId, "A network send failed while sending a dst. id." )) == kOkDspRC )
  332. {
  333. if( p->netVerbosity > 1 )
  334. cmRptPrintf(p->err.rpt,"Sync:Sent dst id to %i\n",srcNetNodeId);
  335. }
  336. return rc;
  337. }
  338. // Handle 'hello' messages
  339. cmDspRC_t _cmDspSysNetReceiveHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  340. {
  341. cmDspRC_t rc = kOkDspRC;
  342. unsigned i;
  343. for(i=0; i<p->netNodeCnt; ++i)
  344. if( p->netNodeArray[i].id == remoteNetNodeId )
  345. {
  346. p->netNodeArray[i].helloFl = true;
  347. // if echo was requested then respond ...
  348. if( echoFl )
  349. {
  350. // ... but the remote node should not respond - because we already know about it
  351. _cmDspSysNetSendHello(p,remoteNetNodeId,false);
  352. }
  353. if( p->netVerbosity > 0 )
  354. cmRptPrintf(p->err.rpt,"Sync:hello from %i\n",remoteNetNodeId);
  355. // send connection requests to the remote net node
  356. if((rc = _cmDspSysNetSendConnRequests(p, remoteNetNodeId )) == kOkDspRC )
  357. {
  358. // send 'req done' msg
  359. rc = _cmDspSysNetSend(p, remoteNetNodeId, kNetDstIdReqDoneAsId, 0, cmInvalidId, "A network send failed while sending 'dst req done'." );
  360. }
  361. return rc;
  362. }
  363. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'hello' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  364. }
  365. cmDspRC_t _cmDspSysNetReceiveReqDone( cmDsp_t* p, unsigned remoteNetNodeId )
  366. {
  367. unsigned i;
  368. for(i=0; i<p->netNodeCnt; ++i)
  369. if( p->netNodeArray[i].id == remoteNetNodeId )
  370. {
  371. p->netNodeArray[i].reqDoneFl = true;
  372. if( p->netVerbosity > 0 )
  373. cmRptPrintf(p->err.rpt,"Sync: Req Done from %i\n",remoteNetNodeId);
  374. return kOkDspRC;
  375. }
  376. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'req done' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  377. }
  378. // given a srcNetNodeId/srcId return the associated dst connection
  379. _cmDspDstConn_t* _cmDspSysNetSrcIdToDstConn( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId )
  380. {
  381. _cmDspDstConn_t* rp = p->dstConnList;
  382. for(; rp != NULL; rp = rp->link )
  383. if( rp->srcNetNodeId == srcNetNodeId && rp->srcId == srcId )
  384. return rp;
  385. return NULL;
  386. }
  387. // Handle kNetSyncSelAsId messages.
  388. // Called on the dst machine to receive a src connection request
  389. cmDspRC_t _cmDspSysNetReceiveSrcConnRequest( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned srcNetNodeId )
  390. {
  391. cmDspRC_t rc = kOkDspRC;
  392. _cmDspDstConn_t* rp;
  393. // if a dstId has not already been assigned to this src
  394. if((rp = _cmDspSysNetSrcIdToDstConn(p, srcNetNodeId, msg->srcId )) == NULL )
  395. {
  396. if( p->netVerbosity > 1 )
  397. cmRptPrintf(p->err.rpt,"Sync:Rcvd src conn request from %i\n",srcNetNodeId);
  398. cmDspInst_t* instPtr = NULL;
  399. const cmDspVar_t* varPtr;
  400. const cmChar_t* instLabel = (const cmChar_t*)(msg+1);
  401. const cmChar_t* varLabel = instLabel + strlen(instLabel) + 1;
  402. // get the symbols assoc'd with the dst inst/var
  403. unsigned instSymId = cmSymTblId(p->ctx.stH,instLabel);
  404. unsigned varSymId = cmSymTblId(p->ctx.stH,varLabel);
  405. assert( instSymId != cmInvalidId );
  406. // find the dst inst
  407. if((instPtr = _cmDspSysInstSymIdToPtr(p,instSymId)) == NULL )
  408. {
  409. rc = cmErrMsg(&p->err,kInstNotFoundDspRC,"The instance associated with '%s' was not found.",cmStringNullGuard(instLabel));
  410. goto errLabel;
  411. }
  412. // find dst var
  413. if((varPtr = cmDspVarSymbolToPtr(&p->ctx, instPtr, varSymId, kInDsvFl )) == NULL )
  414. {
  415. rc = cmErrMsg(&p->err,kVarNotFoundDspRC,"The variable '%s' on the instance '%s' was not found.",cmStringNullGuard(varLabel),cmStringNullGuard(instLabel));
  416. goto errLabel;
  417. }
  418. // register the new dst connection
  419. rp = cmLhAllocZ( p->ctx.lhH, _cmDspDstConn_t, 1 );
  420. rp->srcNetNodeId = srcNetNodeId;
  421. rp->srcId = msg->srcId;
  422. rp->dstId = p->nextDstId++;
  423. rp->dstInst = instPtr;
  424. rp->dstVarId = varPtr->constId;
  425. rp->link = p->dstConnList;
  426. p->dstConnList = rp;
  427. }
  428. assert( rp != NULL );
  429. // send the dstId associated with this connection back to the source
  430. if((rc = _cmDspSysNetSendDstConnId(p,srcNetNodeId,msg->srcId,rp->dstId)) != kOkDspRC )
  431. goto errLabel;
  432. errLabel:
  433. if( rc != kOkDspRC )
  434. _cmDspSysNetSendSyncError(p,rc);
  435. return rc;
  436. }
  437. cmDspRC_t _cmDspSysNetReceiveDstId(cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId)
  438. {
  439. cmDspRC_t rc;
  440. _cmDspSrcConn_t* sp = p->srcConnList;
  441. for(; sp != NULL; sp = sp->link )
  442. if( msg->srcId == sp->srcId )
  443. {
  444. sp->dstId = msg->dstId;
  445. return kOkDspRC;
  446. }
  447. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"The src id %i associated with the dst id %i could not be found.",msg->srcId,msg->dstId);
  448. _cmDspSysNetSendSyncError(p,rc);
  449. return rc;
  450. }
  451. // Handle kNetDoneSelAsId messages.
  452. cmDspRC_t _cmDspSysNetReceiveRemoteSyncDone( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  453. {
  454. cmDspRC_t rc = kOkDspRC;
  455. unsigned i;
  456. for(i=0; i<p->netNodeCnt; ++i)
  457. if( p->netNodeArray[i].id == remoteNetNodeId )
  458. {
  459. p->netNodeArray[i].doneFl = true;
  460. if( p->netVerbosity > 0 )
  461. cmRptPrintf(p->err.rpt,"Sync: Rcvd done from %i\n",remoteNetNodeId);
  462. break;
  463. }
  464. return rc;
  465. }
  466. // Handle kNetErrSelAsId messages
  467. cmDspRC_t _cmDspSysNetReceiveRemoteSyncError( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  468. {
  469. cmDspRC_t rc = kOkDspRC;
  470. if( p->netVerbosity > 0 )
  471. cmRptPrintf(p->err.rpt,"Sync: Rcvd error from %i\n",remoteNetNodeId);
  472. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  473. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the thread due to a remote error failed.");
  474. return rc;
  475. }
  476. // Verifify that a net node id is valid.
  477. cmDspRC_t _cmDspSysNetValidateNetNodeId( cmDsp_t* p, unsigned netNodeId )
  478. {
  479. cmDspRC_t rc = kOkDspRC;
  480. unsigned i;
  481. for(i=0; i<p->netNodeCnt; ++i)
  482. if( p->netNodeArray[i].id == netNodeId )
  483. break;
  484. if( i == p->netNodeCnt )
  485. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"A message arrived from a unknown net node (id=%i).",netNodeId);
  486. return rc;
  487. }
  488. ///============================================================================================
  489. // main hooks to the DSP system
  490. // called by cmDspSysInitialize()
  491. cmDspRC_t _cmDspSysNetAlloc( cmDsp_t* p )
  492. {
  493. if((p->netNodeCnt = cmUdpNetNodeCount(p->netH)) != 0)
  494. p->netNodeArray = cmMemAllocZ(_cmDspNetNode_t, p->netNodeCnt );
  495. if( cmThreadCreate(&p->thH,_cmDspSysNetSyncThreadCb,p, p->err.rpt) != kOkThRC )
  496. return cmErrMsg(&p->err,kThreadFailDspRC,"The network syncronization thread create failed.");
  497. p->sendWaitMs = 10;
  498. p->syncState = kSyncPreDspId;
  499. p->netVerbosity = 1;
  500. return kOkDspRC;
  501. }
  502. // called by cmDspSysFinalize()
  503. cmDspRC_t _cmDspSysNetFree( cmDsp_t* p )
  504. {
  505. cmMemFree(p->netNodeArray);
  506. if( cmThreadDestroy(&p->thH) != kOkThRC )
  507. cmErrMsg(&p->err,kThreadFailDspRC,"The network syncrhonization thread destroy failed.");
  508. return kOkDspRC;
  509. }
  510. // called by cmDspSysLoad()
  511. cmDspRC_t _cmDspSysNetPreLoad( cmDsp_t* p )
  512. {
  513. p->srcConnList = NULL;
  514. return kOkDspRC;
  515. }
  516. // called by cmDspSysUnload()
  517. cmDspRC_t _cmDspSysNetUnload( cmDsp_t* p )
  518. {
  519. if( cmUdpNetEnableListen(p->netH, false ) != kOkUnRC )
  520. return cmErrMsg(&p->err,kNetFailDspRC,"The network failed to exit 'listening' mode.");
  521. p->syncState = kSyncPreDspId;
  522. p->nextDstId = 0;
  523. p->dstConnList = NULL;
  524. p->srcConnMap = NULL;
  525. p->srcConnMapCnt = 0;
  526. p->dstConnMap = NULL;
  527. p->dstConnMapCnt = 0;
  528. p->netDoneSentFl = false;
  529. return kOkDspRC;
  530. }
  531. // Call this function to enter 'sync' mode.
  532. cmDspRC_t _cmDspSysNetSync( cmDsp_t* p )
  533. {
  534. cmDspRC_t rc = kOkDspRC;
  535. unsigned localNodeId = cmUdpNetLocalNodeId(p->netH);
  536. unsigned i;
  537. // if there is no network then there is nothing to do
  538. if( p->netNodeCnt == 0 )
  539. {
  540. p->syncState = kSyncSuccessDspId;
  541. return kOkDspRC;
  542. }
  543. p->syncState = kSyncPendingDspId;
  544. // be sure the sync thread is paused before continuing
  545. if( cmThreadPause(p->thH, kPauseThFl | kWaitThFl ) != kOkThRC )
  546. {
  547. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be paused prior to initialization.");
  548. goto errLabel;
  549. }
  550. // initialize the netNodeArry[]
  551. for(i=0; i<p->netNodeCnt; ++i)
  552. {
  553. p->netNodeArray[i].id = cmUdpNetNodeId(p->netH,i);
  554. p->netNodeArray[i].localFl = p->netNodeArray[i].id == localNodeId;
  555. p->netNodeArray[i].doneFl = p->netNodeArray[i].localFl;
  556. p->netNodeArray[i].helloFl = false;
  557. p->netNodeArray[i].reqDoneFl = false;
  558. p->netNodeArray[i].doneFl = false;
  559. assert( p->netNodeArray[i].id != cmInvalidId );
  560. }
  561. // initialize the src connection array - this array was created when the
  562. // DSP instance network was formed
  563. _cmDspSrcConn_t* rp = p->srcConnList;
  564. for(i=0; rp != NULL; rp = rp->link,++i)
  565. {
  566. rp->srcId = i;
  567. rp->dstId = cmInvalidId;
  568. }
  569. // clear the dst conn records
  570. _cmDspDstConn_t* dp = p->dstConnList;
  571. while( dp!=NULL)
  572. {
  573. _cmDspDstConn_t* np = dp->link;
  574. cmLhFree(p->ctx.lhH,dp);
  575. dp = np;
  576. }
  577. // clear the connection maps
  578. p->srcConnMapCnt = 0;
  579. p->dstConnMapCnt = 0;
  580. p->netDoneSentFl = false;
  581. // enter listening mode
  582. if( cmUdpNetEnableListen(p->netH, true ) != kOkUnRC )
  583. {
  584. rc = cmErrMsg(&p->err,kNetFailDspRC,"The network failed to go into 'listening' mode.");
  585. goto errLabel;
  586. }
  587. // start the sync process
  588. if( cmThreadPause(p->thH,0) != kOkThRC )
  589. {
  590. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be un-paused prior to begin synchronization.");
  591. goto errLabel;
  592. }
  593. // broadcast 'hello' to all remote listeners and request that they respond with their own 'hello'.
  594. for(i=0; i<p->netNodeCnt; ++i)
  595. if( p->netNodeArray[i].localFl == false )
  596. if((rc = _cmDspSysNetSendHello(p,p->netNodeArray[i].id,true)) != kOkDspRC )
  597. goto errLabel;
  598. if( p->netVerbosity > 0 )
  599. cmRptPrintf(p->err.rpt,"Sync:Entering sync loop\n");
  600. errLabel:
  601. if( rc != kOkDspRC )
  602. p->syncState = kSyncFailDspId;
  603. return rc;
  604. }
  605. cmDspRC_t _cmDspSysNetRecvEvent( cmDsp_t* p, const void* msgPtr, unsigned msgByteCnt )
  606. {
  607. cmDspNetMsg_t* m = (cmDspNetMsg_t*)msgPtr;
  608. cmRC_t rc = cmOkRC;
  609. //bool jsFl = false;
  610. // if the value associated with this msg is a mtx then set
  611. // its mtx data area pointer to just after the msg header.
  612. if( cmDsvIsJson(&m->value) )
  613. {
  614. //rc = cmDsvDeserializeJson(&m->value,p->jsH);
  615. assert(0);
  616. //jsFl = true;
  617. }
  618. else
  619. rc = cmDsvDeserializeInPlace(&m->value,msgByteCnt-sizeof(cmDspNetMsg_t));
  620. if( rc != kOkDsvRC )
  621. rc = cmErrMsg(&p->err,kInvalidStateDspRC,"Deserialize failed on network event message.");
  622. else
  623. {
  624. assert( m->dstId < p->dstConnMapCnt );
  625. // form the event
  626. cmDspEvt_t e;
  627. e.flags = 0;
  628. e.srcInstPtr = NULL;
  629. e.srcVarId = cmInvalidId;
  630. e.valuePtr = &m->value;
  631. e.dstVarId = p->dstConnMap[ m->dstId ]->dstVarId;
  632. e.dstDataPtr = NULL;
  633. cmDspInst_t* instPtr = p->dstConnMap[ m->dstId ]->dstInst;
  634. assert(instPtr != NULL );
  635. // send the event
  636. if( instPtr->recvFunc != NULL )
  637. rc = instPtr->recvFunc(&p->ctx,instPtr,&e);
  638. }
  639. //if( jsFl )
  640. // cmJsonClearTree(p->jsH);
  641. return rc;
  642. }
  643. // Called from cmAudDsp.c:_cmAdUdpNetCallback() to to send an incoming msg to the DSP system.
  644. cmDspRC_t _cmDspSysNetRecv( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned msgByteCnt, unsigned remoteNetNodeId )
  645. {
  646. assert( msg->selId == kNetSyncSelAsId );
  647. cmDspRC_t rc = kOkDspRC;
  648. switch( msg->subSelId )
  649. {
  650. case kNetHelloSelAsId:
  651. rc = _cmDspSysNetReceiveHello(p,remoteNetNodeId,msg->srcId!=0);
  652. break;
  653. case kNetDstIdReqSelAsId:
  654. rc = _cmDspSysNetReceiveSrcConnRequest(p,msg,remoteNetNodeId);
  655. break;
  656. case kNetDstIdReqDoneAsId:
  657. rc = _cmDspSysNetReceiveReqDone(p,remoteNetNodeId);
  658. break;
  659. case kNetDstIdSelAsId:
  660. rc = _cmDspSysNetReceiveDstId(p,msg,remoteNetNodeId);
  661. break;
  662. case kNetDoneSelAsId:
  663. rc = _cmDspSysNetReceiveRemoteSyncDone(p,msg,remoteNetNodeId);
  664. break;
  665. case kNetErrSelAsId:
  666. rc = _cmDspSysNetReceiveRemoteSyncError(p,msg,remoteNetNodeId);
  667. break;
  668. case kNetEvtSelAsId:
  669. rc = _cmDspSysNetRecvEvent(p, msg, msgByteCnt );
  670. break;
  671. default:
  672. cmErrMsg(&p->err,kNetFailDspRC,"Unexpected message type 'selId=%i' received by the network sync. message dispatcher.",msg->selId);
  673. break;
  674. }
  675. return rc;
  676. }
  677. cmDspRC_t _cmDspSysNetSendEvent( cmDspSysH_t h, unsigned dstNetNodeId, unsigned dstId, const cmDspEvt_t* evt )
  678. {
  679. cmDsp_t* p = _cmDspHandleToPtr(h);
  680. unsigned bufByteCnt = sizeof(cmDspNetMsg_t);
  681. unsigned dataByteCnt = 0;
  682. if( evt->valuePtr != NULL )
  683. dataByteCnt = cmDsvSerialDataByteCount(evt->valuePtr);
  684. bufByteCnt += dataByteCnt;
  685. char buf[ bufByteCnt ];
  686. cmDspNetMsg_t* hdr = (cmDspNetMsg_t*)buf;
  687. hdr->asSubIdx = cmDspSys_AsSubIdx_Zero;
  688. hdr->selId = kNetSyncSelAsId;
  689. hdr->subSelId = kNetEvtSelAsId;
  690. hdr->srcId = cmInvalidId;
  691. hdr->dstId = dstId;
  692. if( evt->valuePtr == NULL )
  693. cmDsvSetNull(&hdr->value);
  694. else
  695. {
  696. // this function relies on the 'ne->value' field being the last field in the 'hdr'.
  697. if( cmDsvSerialize( evt->valuePtr, &hdr->value, sizeof(cmDspValue_t) + dataByteCnt) != kOkDsvRC )
  698. return cmErrMsg(&p->err,kSerializeFailMsgRC,"An attempt to serialize a network event msg failed.");
  699. }
  700. if( cmUdpNetSendById(p->netH, dstNetNodeId, &buf, bufByteCnt ) != kOkUnRC )
  701. return cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while sending a DSP event message.");
  702. return kOkDspRC;
  703. }