libcm is a C development framework with an emphasis on audio signal processing applications.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

cmDspNet.c 26KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899
  1. //| Copyright: (C) 2009-2020 Kevin Larke <contact AT larke DOT org>
  2. //| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
  3. #include "cmPrefix.h"
  4. #include "cmGlobal.h"
  5. #include "cmFloatTypes.h"
  6. #include "cmRpt.h"
  7. #include "cmErr.h"
  8. #include "cmCtx.h"
  9. #include "cmMem.h"
  10. #include "cmMallocDebug.h"
  11. #include "cmLinkedHeap.h"
  12. #include "cmFileSys.h"
  13. #include "cmSymTbl.h"
  14. #include "cmJson.h"
  15. #include "cmPrefs.h"
  16. #include "cmDspValue.h"
  17. #include "cmMsgProtocol.h"
  18. #include "cmThread.h"
  19. #include "cmUdpPort.h"
  20. #include "cmUdpNet.h"
  21. #include "cmSerialPort.h"
  22. #include "cmTime.h"
  23. #include "cmAudioSys.h"
  24. #include "cmProcObj.h"
  25. #include "cmDspCtx.h"
  26. #include "cmDspClass.h"
  27. #include "cmDspStore.h"
  28. #include "cmDspSys.h"
  29. #include "cmDspPreset.h"
  30. #include "cmDspNet.h"
  31. ///============================================================================================
  32. ///============================================================================================
  33. /*
  34. cmDspSysAlloc()
  35. - create the p->netNodeArray[]
  36. - create the p->thH network thread but leave it paused
  37. cmDspSysLoad()
  38. - _cmDspSysNetPreload() - set srcConnList to NULL
  39. - load the progam - this result in calls to _cmDspSysNetCreateSrcConn()
  40. which creates the p->srcConnList.
  41. - _cmDspSysNetSync() - just before exiting the load process go into sync mode:
  42. - initialize the p->netNodeArray[].
  43. - set the srcId of each record in the p->srcConnList.
  44. - start the sync thread
  45. while(1)
  46. {
  47. 1) send connection requests to dst machines
  48. 2) if( connFl = all src conn's have dst id's )
  49. send 'done' to all remote nodes
  50. 3) nodeFl = if have recv'd 'done' from all remote nodes
  51. 4) if connFl && nodeFl
  52. p->syncState = kSyncSuccessDspId;
  53. break;
  54. }
  55. Enable Audio:
  56. - Call reset on all instances
  57. cmDspSysUnload()
  58. - p->syncState = kSyncPreDspId
  59. cmDspSysFree()
  60. - delete p->netNodeArray[]
  61. - delete p->thH network thread
  62. //--------------------------------------------------------
  63. */
  64. ///============================================================================================
  65. // implemented in cmDspSys.c
  66. cmDspInst_t* _cmDspSysInstSymIdToPtr( cmDsp_t* p, unsigned instSymId );
  67. cmDsp_t* _cmDspHandleToPtr( cmDspSysH_t h );
  68. cmDspRC_t _cmDspSysNetSend( cmDsp_t* p, unsigned remoteNetNodeId, unsigned subSelId, unsigned srcId, unsigned dstId, const cmChar_t* errMsg )
  69. {
  70. cmDspRC_t rc = kOkDspRC;
  71. cmDspNetMsg_t m;
  72. memset(&m,0,sizeof(m));
  73. // we should never be sending to ourselves
  74. assert( remoteNetNodeId != cmUdpNetLocalNodeId(p->netH));
  75. // form the error msg
  76. m.asSubIdx = cmDspSys_AsSubIdx_Zero;
  77. m.selId = kNetSyncSelAsId;
  78. m.subSelId = subSelId;
  79. m.srcId = srcId;
  80. m.dstId = dstId;
  81. if( cmUdpNetSendById(p->netH, remoteNetNodeId, &m, sizeof(m) ) == kOkUnRC )
  82. {
  83. //cmSleepUs(p->sendWaitMs*1000);
  84. }
  85. else
  86. {
  87. rc = kNetFailDspRC;
  88. if( errMsg != NULL )
  89. cmErrMsg(&p->err,rc,errMsg);
  90. }
  91. return rc;
  92. }
  93. // set echoFl if the remote node should respond with it's own 'hello' msg
  94. cmDspRC_t _cmDspSysNetSendHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  95. {
  96. return _cmDspSysNetSend(p, remoteNetNodeId, kNetHelloSelAsId, echoFl ? 1 : 0, cmInvalidId, "A network send failed while sending 'hello'." );
  97. }
  98. cmDspRC_t _cmDspSysNetSendSyncError( cmDsp_t* p, cmDspRC_t errRc )
  99. {
  100. cmDspRC_t rc = kOkDspRC;
  101. unsigned i;
  102. // for each non-local node
  103. for(i=0; i<p->netNodeCnt; ++i)
  104. if( p->netNodeArray[i].localFl == false)
  105. {
  106. unsigned remoteNetNodeId = p->netNodeArray[i].id;
  107. cmDspRC_t rc0;
  108. // send the error code in the dstId
  109. if((rc0 = _cmDspSysNetSend(p, remoteNetNodeId, kNetErrSelAsId, cmUdpNetLocalNodeId(p->netH), errRc, "A network send failed while signaling an error." )) != kOkDspRC )
  110. rc = rc0;
  111. }
  112. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  113. rc = cmErrMsg(&p->err,kThreadFailDspRC,"An attempt to pause the sync. thread failed after signaling an error.");
  114. if( p->netVerbosity > 0 )
  115. cmRptPrintf(p->err.rpt,"Sync:Done - Fail\n");
  116. return rc;
  117. }
  118. bool _cmDspSysNetIsNodeAwake( cmDsp_t* p, unsigned netNodeId )
  119. {
  120. unsigned i;
  121. for(i=0; i<p->netNodeCnt; ++i)
  122. if( p->netNodeArray[i].id == netNodeId )
  123. return p->netNodeArray[i].helloFl;
  124. assert(0); // unknown net node id
  125. return false;
  126. }
  127. bool _cmDspSysNet_AreAllDstIdsResolved( cmDsp_t* p )
  128. {
  129. // check that we have received all dst id req's from each remote node
  130. unsigned i;
  131. for(i=0; i<p->netNodeCnt; ++i)
  132. if( p->netNodeArray[i].localFl == false && p->netNodeArray[i].reqDoneFl==false)
  133. return false;
  134. _cmDspSrcConn_t* rp = p->srcConnList;
  135. // for each src connection which does not yet have a destination id.
  136. for(; rp != NULL; rp = rp->link)
  137. if( rp->dstId == cmInvalidId )
  138. return false;
  139. return true;
  140. }
  141. // send connection requests to the specified remote node
  142. cmDspRC_t _cmDspSysNetSendConnRequests( cmDsp_t* p, unsigned dstNetNodeId )
  143. {
  144. cmDspRC_t rc = kOkDspRC;
  145. _cmDspSrcConn_t* rp = p->srcConnList;
  146. // for each src connection which does not yet have a destination id.
  147. for(; rp != NULL; rp = rp->link)
  148. {
  149. // if this src conn has not been assigned a dstId and it's source node is awake
  150. if( rp->dstId == cmInvalidId && rp->dstNetNodeId==dstNetNodeId && _cmDspSysNetIsNodeAwake(p,rp->dstNetNodeId) )
  151. {
  152. // calc the msg size
  153. unsigned sn0 = strlen(rp->dstInstLabel) + 1;
  154. unsigned sn1 = strlen(rp->dstVarLabel) + 1;
  155. unsigned byteCnt = sizeof(cmDspNetMsg_t) + sn0 + sn1;
  156. // create msg buffer
  157. char buf[ byteCnt ];
  158. memset(buf,0,byteCnt);
  159. // fill in the msg
  160. cmDspNetMsg_t* cp = (cmDspNetMsg_t*)buf;
  161. cp->asSubIdx = cmDspSys_AsSubIdx_Zero;
  162. cp->selId = kNetSyncSelAsId;
  163. cp->subSelId = kNetDstIdReqSelAsId;
  164. cp->srcId = rp->srcId;
  165. cp->dstId = cmInvalidId;
  166. char* dp = buf + sizeof(*cp);
  167. memcpy(dp,rp->dstInstLabel,sn0);
  168. dp += sn0;
  169. memcpy(dp,rp->dstVarLabel,sn1);
  170. dp += sn1;
  171. assert(dp == buf + byteCnt );
  172. // send the msg
  173. if( cmUdpNetSendById(p->netH, rp->dstNetNodeId, buf, byteCnt ) != kOkUnRC )
  174. {
  175. rc = cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while registering remote nodes.");
  176. goto errLabel;
  177. }
  178. if( p->netVerbosity > 1 )
  179. cmRptPrintf(p->err.rpt,"Sync: send req to %i\n",rp->dstNetNodeId);
  180. //cmSleepUs(p->sendWaitMs*1000); // wait between transmissions
  181. }
  182. }
  183. errLabel:
  184. if( rc != kOkDspRC )
  185. _cmDspSysNetSendSyncError(p,rc);
  186. return rc;
  187. }
  188. // Return true when the 'doneFl' on all nodes has been set.
  189. // The doneFl is cleared on the beginning of the sync. process.
  190. // The doneFl is set as each remote node signals that it has
  191. // all of the dstId's that it needs by sending kNetDoneSelAsId
  192. // messages.
  193. bool _cmDspSysNetCheckNetNodeStatus( cmDsp_t* p )
  194. {
  195. unsigned i;
  196. for(i=0; i<p->netNodeCnt; ++i)
  197. if( p->netNodeArray[i].doneFl == false )
  198. return false;
  199. return true;
  200. }
  201. // Send kNetDoneSelAsId msgs to all remote nodes to indicate that
  202. // this node has all of it's dstId's.
  203. cmDspRC_t _cmDspSysNetSendSyncDone( cmDsp_t* p )
  204. {
  205. cmDspRC_t rc = kOkDspRC;
  206. unsigned i;
  207. // broadcast the sync 'done' msg to each non-local node
  208. if( p->netDoneSentFl )
  209. return rc;
  210. for(i=0; i<p->netNodeCnt; ++i)
  211. if( p->netNodeArray[i].localFl == false )
  212. {
  213. if((rc = _cmDspSysNetSend(p, p->netNodeArray[i].id, kNetDoneSelAsId, cmInvalidId, cmInvalidId, "A network send failed while signaling sync. completion." )) != kOkDspRC )
  214. goto errLabel;
  215. }
  216. // create the src connection map
  217. _cmDspSrcConn_t* sp = p->srcConnList;
  218. if( sp != NULL && p->srcConnMapCnt == 0 )
  219. {
  220. if( p->netVerbosity > 0 )
  221. cmRptPrintf(p->err.rpt,"Sync:Creating src map\n");
  222. // get the count of src nodes
  223. unsigned n;
  224. for(n=0; sp != NULL; sp = sp->link )
  225. ++n;
  226. // allocate the srcConnMap
  227. p->srcConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspSrcConn_t*, p->srcConnMap, n );
  228. p->srcConnMapCnt = n;
  229. // load the srcConnMap
  230. for(sp = p->srcConnList; sp != NULL; sp = sp->link )
  231. {
  232. assert( sp->srcId < n );
  233. p->srcConnMap[ sp->srcId ] = sp;
  234. }
  235. }
  236. // create the dst connection map
  237. _cmDspDstConn_t* dp = p->dstConnList;
  238. if( dp != NULL && p->dstConnMapCnt == 0 )
  239. {
  240. if( p->netVerbosity > 0 )
  241. cmRptPrintf(p->err.rpt,"Sync:Creating dst map\n");
  242. unsigned n;
  243. // get the count of dst nodes
  244. for(n=0; dp != NULL; dp = dp->link )
  245. ++n;
  246. // allocate the dstConnMap
  247. p->dstConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspDstConn_t*, p->dstConnMap, n );
  248. p->dstConnMapCnt = n;
  249. // load the dstConnMap
  250. for(dp = p->dstConnList; dp != NULL; dp = dp->link )
  251. {
  252. assert( dp->dstId < n );
  253. p->dstConnMap[ dp->dstId ] = dp;
  254. }
  255. }
  256. p->netDoneSentFl = true;
  257. if( p->netVerbosity > 0 )
  258. cmRptPrintf(p->err.rpt,"Sync: Done Sent\n",i);
  259. errLabel:
  260. if( rc != kOkDspRC )
  261. _cmDspSysNetSendSyncError(p,rc);
  262. return rc;
  263. }
  264. // Sync thread callback function
  265. bool _cmDspSysNetSyncThreadCb( void* param )
  266. {
  267. cmDsp_t* p = (cmDsp_t*)param;
  268. bool connFl = true; //
  269. bool nodeFl = true;
  270. // receive a group of waiting messages from remote nodes
  271. // WARNING: calling cmUdpNetReceive() here means that the audio engine cannot be
  272. // enabled - otherwise this thread and the audio system thread will simultaneously
  273. // attempt to read the UDP port. This will result in unsafe thread conflicts.
  274. if( cmUdpNetReceive(p->netH, NULL ) != kOkUnRC )
  275. {
  276. cmErrMsg(&p->err,kNetFailDspRC,"UDP Net receive failed during sync. mode.");
  277. _cmDspSysNetSendSyncError(p,kNetFailDspRC);
  278. }
  279. // check if all the src connections have been assigned dst id's
  280. connFl = _cmDspSysNet_AreAllDstIdsResolved(p);
  281. // if all the src connections have dst id's then send a 'done' signal
  282. // to all other nodes so that they know this node is ready to leave
  283. // sync mode
  284. if( connFl )
  285. {
  286. if( _cmDspSysNetSendSyncDone(p) != kOkDspRC )
  287. goto errLabel;
  288. }
  289. // prevent the thread from burning too much time
  290. cmSleepUs(p->sendWaitMs*1000);
  291. // check if all nodes have completed transmission to this node
  292. nodeFl = _cmDspSysNetCheckNetNodeStatus(p);
  293. // if the connections have all been setup and all the net nodes have
  294. // received a 'done' signal
  295. if( connFl && nodeFl )
  296. {
  297. // mark the sync as complete
  298. p->syncState = kSyncSuccessDspId;
  299. // the sync. is done - pause this thread
  300. if( cmThreadPause( p->thH, kPauseThFl ) != kOkThRC )
  301. cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the sync. thread upon completion failed.");
  302. if( p->netVerbosity > 0 )
  303. cmRptPrintf(p->err.rpt,"Sync Done!\n");
  304. }
  305. errLabel:
  306. return true;
  307. }
  308. // During DSP network allocation and connection this function is called
  309. // to register remote instance/var targets.
  310. _cmDspSrcConn_t* _cmDspSysNetCreateSrcConn( cmDsp_t* p, unsigned dstNetNodeId, const cmChar_t* dstInstLabel, const cmChar_t* dstVarLabel )
  311. {
  312. if( dstNetNodeId == cmUdpNetLocalNodeId(p->netH) )
  313. {
  314. cmErrMsg(&p->err,kNetFailDspRC,"Cannot connect a network node (node:%s inst:%s label:%s)to itself.",cmStringNullGuard(cmUdpNetNodeIdToLabel(p->netH,dstNetNodeId)),cmStringNullGuard(dstInstLabel),cmStringNullGuard(dstVarLabel));
  315. return NULL;
  316. }
  317. // register the remote node
  318. _cmDspSrcConn_t* rp = cmLhAllocZ( p->ctx.lhH, _cmDspSrcConn_t, 1 );
  319. rp->dstNetNodeId = dstNetNodeId;
  320. rp->dstInstLabel = cmLhAllocStr( p->ctx.lhH, dstInstLabel );
  321. rp->dstVarLabel = cmLhAllocStr( p->ctx.lhH, dstVarLabel );
  322. rp->link = p->srcConnList;
  323. p->srcConnList = rp;
  324. if( p->netVerbosity > 1 )
  325. cmRptPrintf(p->err.rpt,"Sync: create src for %i\n", dstNetNodeId );
  326. return rp;
  327. }
  328. // This function is called in response to receiving a connection request on the
  329. // dst machine. It sends a dstId to match the srcId provided by the src machine.
  330. cmDspRC_t _cmDspSysNetSendDstConnId( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId, unsigned dstId )
  331. {
  332. cmDspRC_t rc = kOkDspRC;
  333. if((rc = _cmDspSysNetSend(p, srcNetNodeId, kNetDstIdSelAsId, srcId, dstId, "A network send failed while sending a dst. id." )) == kOkDspRC )
  334. {
  335. if( p->netVerbosity > 1 )
  336. cmRptPrintf(p->err.rpt,"Sync:Sent dst id to %i\n",srcNetNodeId);
  337. }
  338. return rc;
  339. }
  340. // Handle 'hello' messages
  341. cmDspRC_t _cmDspSysNetReceiveHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl )
  342. {
  343. cmDspRC_t rc = kOkDspRC;
  344. unsigned i;
  345. for(i=0; i<p->netNodeCnt; ++i)
  346. if( p->netNodeArray[i].id == remoteNetNodeId )
  347. {
  348. p->netNodeArray[i].helloFl = true;
  349. // if echo was requested then respond ...
  350. if( echoFl )
  351. {
  352. // ... but the remote node should not respond - because we already know about it
  353. _cmDspSysNetSendHello(p,remoteNetNodeId,false);
  354. }
  355. if( p->netVerbosity > 0 )
  356. cmRptPrintf(p->err.rpt,"Sync:hello from %i\n",remoteNetNodeId);
  357. // send connection requests to the remote net node
  358. if((rc = _cmDspSysNetSendConnRequests(p, remoteNetNodeId )) == kOkDspRC )
  359. {
  360. // send 'req done' msg
  361. rc = _cmDspSysNetSend(p, remoteNetNodeId, kNetDstIdReqDoneAsId, 0, cmInvalidId, "A network send failed while sending 'dst req done'." );
  362. }
  363. return rc;
  364. }
  365. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'hello' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  366. }
  367. cmDspRC_t _cmDspSysNetReceiveReqDone( cmDsp_t* p, unsigned remoteNetNodeId )
  368. {
  369. unsigned i;
  370. for(i=0; i<p->netNodeCnt; ++i)
  371. if( p->netNodeArray[i].id == remoteNetNodeId )
  372. {
  373. p->netNodeArray[i].reqDoneFl = true;
  374. if( p->netVerbosity > 0 )
  375. cmRptPrintf(p->err.rpt,"Sync: Req Done from %i\n",remoteNetNodeId);
  376. return kOkDspRC;
  377. }
  378. return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'req done' message from an unknown remote network node (id=%i).",remoteNetNodeId);
  379. }
  380. // given a srcNetNodeId/srcId return the associated dst connection
  381. _cmDspDstConn_t* _cmDspSysNetSrcIdToDstConn( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId )
  382. {
  383. _cmDspDstConn_t* rp = p->dstConnList;
  384. for(; rp != NULL; rp = rp->link )
  385. if( rp->srcNetNodeId == srcNetNodeId && rp->srcId == srcId )
  386. return rp;
  387. return NULL;
  388. }
  389. // Handle kNetSyncSelAsId messages.
  390. // Called on the dst machine to receive a src connection request
  391. cmDspRC_t _cmDspSysNetReceiveSrcConnRequest( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned srcNetNodeId )
  392. {
  393. cmDspRC_t rc = kOkDspRC;
  394. _cmDspDstConn_t* rp;
  395. // if a dstId has not already been assigned to this src
  396. if((rp = _cmDspSysNetSrcIdToDstConn(p, srcNetNodeId, msg->srcId )) == NULL )
  397. {
  398. if( p->netVerbosity > 1 )
  399. cmRptPrintf(p->err.rpt,"Sync:Rcvd src conn request from %i\n",srcNetNodeId);
  400. cmDspInst_t* instPtr = NULL;
  401. const cmDspVar_t* varPtr;
  402. const cmChar_t* instLabel = (const cmChar_t*)(msg+1);
  403. const cmChar_t* varLabel = instLabel + strlen(instLabel) + 1;
  404. // get the symbols assoc'd with the dst inst/var
  405. unsigned instSymId = cmSymTblId(p->ctx.stH,instLabel);
  406. unsigned varSymId = cmSymTblId(p->ctx.stH,varLabel);
  407. assert( instSymId != cmInvalidId );
  408. // find the dst inst
  409. if((instPtr = _cmDspSysInstSymIdToPtr(p,instSymId)) == NULL )
  410. {
  411. rc = cmErrMsg(&p->err,kInstNotFoundDspRC,"The instance associated with '%s' was not found.",cmStringNullGuard(instLabel));
  412. goto errLabel;
  413. }
  414. // find dst var
  415. if((varPtr = cmDspVarSymbolToPtr(&p->ctx, instPtr, varSymId, kInDsvFl )) == NULL )
  416. {
  417. rc = cmErrMsg(&p->err,kVarNotFoundDspRC,"The variable '%s' on the instance '%s' was not found.",cmStringNullGuard(varLabel),cmStringNullGuard(instLabel));
  418. goto errLabel;
  419. }
  420. // register the new dst connection
  421. rp = cmLhAllocZ( p->ctx.lhH, _cmDspDstConn_t, 1 );
  422. rp->srcNetNodeId = srcNetNodeId;
  423. rp->srcId = msg->srcId;
  424. rp->dstId = p->nextDstId++;
  425. rp->dstInst = instPtr;
  426. rp->dstVarId = varPtr->constId;
  427. rp->link = p->dstConnList;
  428. p->dstConnList = rp;
  429. }
  430. assert( rp != NULL );
  431. // send the dstId associated with this connection back to the source
  432. if((rc = _cmDspSysNetSendDstConnId(p,srcNetNodeId,msg->srcId,rp->dstId)) != kOkDspRC )
  433. goto errLabel;
  434. errLabel:
  435. if( rc != kOkDspRC )
  436. _cmDspSysNetSendSyncError(p,rc);
  437. return rc;
  438. }
  439. cmDspRC_t _cmDspSysNetReceiveDstId(cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId)
  440. {
  441. cmDspRC_t rc;
  442. _cmDspSrcConn_t* sp = p->srcConnList;
  443. for(; sp != NULL; sp = sp->link )
  444. if( msg->srcId == sp->srcId )
  445. {
  446. sp->dstId = msg->dstId;
  447. return kOkDspRC;
  448. }
  449. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"The src id %i associated with the dst id %i could not be found.",msg->srcId,msg->dstId);
  450. _cmDspSysNetSendSyncError(p,rc);
  451. return rc;
  452. }
  453. // Handle kNetDoneSelAsId messages.
  454. cmDspRC_t _cmDspSysNetReceiveRemoteSyncDone( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  455. {
  456. cmDspRC_t rc = kOkDspRC;
  457. unsigned i;
  458. for(i=0; i<p->netNodeCnt; ++i)
  459. if( p->netNodeArray[i].id == remoteNetNodeId )
  460. {
  461. p->netNodeArray[i].doneFl = true;
  462. if( p->netVerbosity > 0 )
  463. cmRptPrintf(p->err.rpt,"Sync: Rcvd done from %i\n",remoteNetNodeId);
  464. break;
  465. }
  466. return rc;
  467. }
  468. // Handle kNetErrSelAsId messages
  469. cmDspRC_t _cmDspSysNetReceiveRemoteSyncError( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId )
  470. {
  471. cmDspRC_t rc = kOkDspRC;
  472. if( p->netVerbosity > 0 )
  473. cmRptPrintf(p->err.rpt,"Sync: Rcvd error from %i\n",remoteNetNodeId);
  474. if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC )
  475. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the thread due to a remote error failed.");
  476. return rc;
  477. }
  478. // Verifify that a net node id is valid.
  479. cmDspRC_t _cmDspSysNetValidateNetNodeId( cmDsp_t* p, unsigned netNodeId )
  480. {
  481. cmDspRC_t rc = kOkDspRC;
  482. unsigned i;
  483. for(i=0; i<p->netNodeCnt; ++i)
  484. if( p->netNodeArray[i].id == netNodeId )
  485. break;
  486. if( i == p->netNodeCnt )
  487. rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"A message arrived from a unknown net node (id=%i).",netNodeId);
  488. return rc;
  489. }
  490. ///============================================================================================
  491. // main hooks to the DSP system
  492. // called by cmDspSysInitialize()
  493. cmDspRC_t _cmDspSysNetAlloc( cmDsp_t* p )
  494. {
  495. if((p->netNodeCnt = cmUdpNetNodeCount(p->netH)) != 0)
  496. p->netNodeArray = cmMemAllocZ(_cmDspNetNode_t, p->netNodeCnt );
  497. if( cmThreadCreate(&p->thH,_cmDspSysNetSyncThreadCb,p, p->err.rpt) != kOkThRC )
  498. return cmErrMsg(&p->err,kThreadFailDspRC,"The network syncronization thread create failed.");
  499. p->sendWaitMs = 10;
  500. p->syncState = kSyncPreDspId;
  501. p->netVerbosity = 1;
  502. return kOkDspRC;
  503. }
  504. // called by cmDspSysFinalize()
  505. cmDspRC_t _cmDspSysNetFree( cmDsp_t* p )
  506. {
  507. cmMemFree(p->netNodeArray);
  508. if( cmThreadDestroy(&p->thH) != kOkThRC )
  509. cmErrMsg(&p->err,kThreadFailDspRC,"The network syncrhonization thread destroy failed.");
  510. return kOkDspRC;
  511. }
  512. // called by cmDspSysLoad()
  513. cmDspRC_t _cmDspSysNetPreLoad( cmDsp_t* p )
  514. {
  515. p->srcConnList = NULL;
  516. return kOkDspRC;
  517. }
  518. // called by cmDspSysUnload()
  519. cmDspRC_t _cmDspSysNetUnload( cmDsp_t* p )
  520. {
  521. if( cmUdpNetEnableListen(p->netH, false ) != kOkUnRC )
  522. return cmErrMsg(&p->err,kNetFailDspRC,"The network failed to exit 'listening' mode.");
  523. p->syncState = kSyncPreDspId;
  524. p->nextDstId = 0;
  525. p->dstConnList = NULL;
  526. p->srcConnMap = NULL;
  527. p->srcConnMapCnt = 0;
  528. p->dstConnMap = NULL;
  529. p->dstConnMapCnt = 0;
  530. p->netDoneSentFl = false;
  531. return kOkDspRC;
  532. }
  533. // Call this function to enter 'sync' mode.
  534. cmDspRC_t _cmDspSysNetSync( cmDsp_t* p )
  535. {
  536. cmDspRC_t rc = kOkDspRC;
  537. unsigned localNodeId = cmUdpNetLocalNodeId(p->netH);
  538. unsigned i;
  539. // if there is no network then there is nothing to do
  540. if( p->netNodeCnt == 0 )
  541. {
  542. p->syncState = kSyncSuccessDspId;
  543. return kOkDspRC;
  544. }
  545. p->syncState = kSyncPendingDspId;
  546. // be sure the sync thread is paused before continuing
  547. if( cmThreadPause(p->thH, kPauseThFl | kWaitThFl ) != kOkThRC )
  548. {
  549. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be paused prior to initialization.");
  550. goto errLabel;
  551. }
  552. // initialize the netNodeArry[]
  553. for(i=0; i<p->netNodeCnt; ++i)
  554. {
  555. p->netNodeArray[i].id = cmUdpNetNodeId(p->netH,i);
  556. p->netNodeArray[i].localFl = p->netNodeArray[i].id == localNodeId;
  557. p->netNodeArray[i].doneFl = p->netNodeArray[i].localFl;
  558. p->netNodeArray[i].helloFl = false;
  559. p->netNodeArray[i].reqDoneFl = false;
  560. p->netNodeArray[i].doneFl = false;
  561. assert( p->netNodeArray[i].id != cmInvalidId );
  562. }
  563. // initialize the src connection array - this array was created when the
  564. // DSP instance network was formed
  565. _cmDspSrcConn_t* rp = p->srcConnList;
  566. for(i=0; rp != NULL; rp = rp->link,++i)
  567. {
  568. rp->srcId = i;
  569. rp->dstId = cmInvalidId;
  570. }
  571. // clear the dst conn records
  572. _cmDspDstConn_t* dp = p->dstConnList;
  573. while( dp!=NULL)
  574. {
  575. _cmDspDstConn_t* np = dp->link;
  576. cmLhFree(p->ctx.lhH,dp);
  577. dp = np;
  578. }
  579. // clear the connection maps
  580. p->srcConnMapCnt = 0;
  581. p->dstConnMapCnt = 0;
  582. p->netDoneSentFl = false;
  583. // enter listening mode
  584. if( cmUdpNetEnableListen(p->netH, true ) != kOkUnRC )
  585. {
  586. rc = cmErrMsg(&p->err,kNetFailDspRC,"The network failed to go into 'listening' mode.");
  587. goto errLabel;
  588. }
  589. // start the sync process
  590. if( cmThreadPause(p->thH,0) != kOkThRC )
  591. {
  592. rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be un-paused prior to begin synchronization.");
  593. goto errLabel;
  594. }
  595. // broadcast 'hello' to all remote listeners and request that they respond with their own 'hello'.
  596. for(i=0; i<p->netNodeCnt; ++i)
  597. if( p->netNodeArray[i].localFl == false )
  598. if((rc = _cmDspSysNetSendHello(p,p->netNodeArray[i].id,true)) != kOkDspRC )
  599. goto errLabel;
  600. if( p->netVerbosity > 0 )
  601. cmRptPrintf(p->err.rpt,"Sync:Entering sync loop\n");
  602. errLabel:
  603. if( rc != kOkDspRC )
  604. p->syncState = kSyncFailDspId;
  605. return rc;
  606. }
  607. cmDspRC_t _cmDspSysNetRecvEvent( cmDsp_t* p, const void* msgPtr, unsigned msgByteCnt )
  608. {
  609. cmDspNetMsg_t* m = (cmDspNetMsg_t*)msgPtr;
  610. cmRC_t rc = cmOkRC;
  611. //bool jsFl = false;
  612. // if the value associated with this msg is a mtx then set
  613. // its mtx data area pointer to just after the msg header.
  614. if( cmDsvIsJson(&m->value) )
  615. {
  616. //rc = cmDsvDeserializeJson(&m->value,p->jsH);
  617. assert(0);
  618. //jsFl = true;
  619. }
  620. else
  621. rc = cmDsvDeserializeInPlace(&m->value,msgByteCnt-sizeof(cmDspNetMsg_t));
  622. if( rc != kOkDsvRC )
  623. rc = cmErrMsg(&p->err,kInvalidStateDspRC,"Deserialize failed on network event message.");
  624. else
  625. {
  626. assert( m->dstId < p->dstConnMapCnt );
  627. // form the event
  628. cmDspEvt_t e;
  629. e.flags = 0;
  630. e.srcInstPtr = NULL;
  631. e.srcVarId = cmInvalidId;
  632. e.valuePtr = &m->value;
  633. e.dstVarId = p->dstConnMap[ m->dstId ]->dstVarId;
  634. e.dstDataPtr = NULL;
  635. cmDspInst_t* instPtr = p->dstConnMap[ m->dstId ]->dstInst;
  636. assert(instPtr != NULL );
  637. // send the event
  638. if( instPtr->recvFunc != NULL )
  639. rc = instPtr->recvFunc(&p->ctx,instPtr,&e);
  640. }
  641. //if( jsFl )
  642. // cmJsonClearTree(p->jsH);
  643. return rc;
  644. }
  645. // Called from cmAudDsp.c:_cmAdUdpNetCallback() to to send an incoming msg to the DSP system.
  646. cmDspRC_t _cmDspSysNetRecv( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned msgByteCnt, unsigned remoteNetNodeId )
  647. {
  648. assert( msg->selId == kNetSyncSelAsId );
  649. cmDspRC_t rc = kOkDspRC;
  650. switch( msg->subSelId )
  651. {
  652. case kNetHelloSelAsId:
  653. rc = _cmDspSysNetReceiveHello(p,remoteNetNodeId,msg->srcId!=0);
  654. break;
  655. case kNetDstIdReqSelAsId:
  656. rc = _cmDspSysNetReceiveSrcConnRequest(p,msg,remoteNetNodeId);
  657. break;
  658. case kNetDstIdReqDoneAsId:
  659. rc = _cmDspSysNetReceiveReqDone(p,remoteNetNodeId);
  660. break;
  661. case kNetDstIdSelAsId:
  662. rc = _cmDspSysNetReceiveDstId(p,msg,remoteNetNodeId);
  663. break;
  664. case kNetDoneSelAsId:
  665. rc = _cmDspSysNetReceiveRemoteSyncDone(p,msg,remoteNetNodeId);
  666. break;
  667. case kNetErrSelAsId:
  668. rc = _cmDspSysNetReceiveRemoteSyncError(p,msg,remoteNetNodeId);
  669. break;
  670. case kNetEvtSelAsId:
  671. rc = _cmDspSysNetRecvEvent(p, msg, msgByteCnt );
  672. break;
  673. default:
  674. cmErrMsg(&p->err,kNetFailDspRC,"Unexpected message type 'selId=%i' received by the network sync. message dispatcher.",msg->selId);
  675. break;
  676. }
  677. return rc;
  678. }
  679. cmDspRC_t _cmDspSysNetSendEvent( cmDspSysH_t h, unsigned dstNetNodeId, unsigned dstId, const cmDspEvt_t* evt )
  680. {
  681. cmDsp_t* p = _cmDspHandleToPtr(h);
  682. unsigned bufByteCnt = sizeof(cmDspNetMsg_t);
  683. unsigned dataByteCnt = 0;
  684. if( evt->valuePtr != NULL )
  685. dataByteCnt = cmDsvSerialDataByteCount(evt->valuePtr);
  686. bufByteCnt += dataByteCnt;
  687. char buf[ bufByteCnt ];
  688. cmDspNetMsg_t* hdr = (cmDspNetMsg_t*)buf;
  689. hdr->asSubIdx = cmDspSys_AsSubIdx_Zero;
  690. hdr->selId = kNetSyncSelAsId;
  691. hdr->subSelId = kNetEvtSelAsId;
  692. hdr->srcId = cmInvalidId;
  693. hdr->dstId = dstId;
  694. if( evt->valuePtr == NULL )
  695. cmDsvSetNull(&hdr->value);
  696. else
  697. {
  698. // this function relies on the 'ne->value' field being the last field in the 'hdr'.
  699. if( cmDsvSerialize( evt->valuePtr, &hdr->value, sizeof(cmDspValue_t) + dataByteCnt) != kOkDsvRC )
  700. return cmErrMsg(&p->err,kSerializeFailMsgRC,"An attempt to serialize a network event msg failed.");
  701. }
  702. if( cmUdpNetSendById(p->netH, dstNetNodeId, &buf, bufByteCnt ) != kOkUnRC )
  703. return cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while sending a DSP event message.");
  704. return kOkDspRC;
  705. }