//| Copyright: (C) 2009-2020 Kevin Larke //| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file. #include "cmPrefix.h" #include "cmGlobal.h" #include "cmFloatTypes.h" #include "cmRpt.h" #include "cmErr.h" #include "cmCtx.h" #include "cmMem.h" #include "cmMallocDebug.h" #include "cmLinkedHeap.h" #include "cmFileSys.h" #include "cmSymTbl.h" #include "cmJson.h" #include "cmPrefs.h" #include "cmDspValue.h" #include "cmMsgProtocol.h" #include "cmThread.h" #include "cmUdpPort.h" #include "cmUdpNet.h" #include "cmSerialPort.h" #include "cmTime.h" #include "cmAudioSys.h" #include "cmProcObj.h" #include "cmDspCtx.h" #include "cmDspClass.h" #include "cmDspStore.h" #include "cmDspSys.h" #include "cmDspPreset.h" #include "cmDspNet.h" ///============================================================================================ ///============================================================================================ /* cmDspSysAlloc() - create the p->netNodeArray[] - create the p->thH network thread but leave it paused cmDspSysLoad() - _cmDspSysNetPreload() - set srcConnList to NULL - load the progam - this result in calls to _cmDspSysNetCreateSrcConn() which creates the p->srcConnList. - _cmDspSysNetSync() - just before exiting the load process go into sync mode: - initialize the p->netNodeArray[]. - set the srcId of each record in the p->srcConnList. - start the sync thread while(1) { 1) send connection requests to dst machines 2) if( connFl = all src conn's have dst id's ) send 'done' to all remote nodes 3) nodeFl = if have recv'd 'done' from all remote nodes 4) if connFl && nodeFl p->syncState = kSyncSuccessDspId; break; } Enable Audio: - Call reset on all instances cmDspSysUnload() - p->syncState = kSyncPreDspId cmDspSysFree() - delete p->netNodeArray[] - delete p->thH network thread //-------------------------------------------------------- */ ///============================================================================================ // implemented in cmDspSys.c cmDspInst_t* _cmDspSysInstSymIdToPtr( cmDsp_t* p, unsigned instSymId ); cmDsp_t* _cmDspHandleToPtr( cmDspSysH_t h ); cmDspRC_t _cmDspSysNetSend( cmDsp_t* p, unsigned remoteNetNodeId, unsigned subSelId, unsigned srcId, unsigned dstId, const cmChar_t* errMsg ) { cmDspRC_t rc = kOkDspRC; cmDspNetMsg_t m; memset(&m,0,sizeof(m)); // we should never be sending to ourselves assert( remoteNetNodeId != cmUdpNetLocalNodeId(p->netH)); // form the error msg m.asSubIdx = cmDspSys_AsSubIdx_Zero; m.selId = kNetSyncSelAsId; m.subSelId = subSelId; m.srcId = srcId; m.dstId = dstId; if( cmUdpNetSendById(p->netH, remoteNetNodeId, &m, sizeof(m) ) == kOkUnRC ) { //cmSleepUs(p->sendWaitMs*1000); } else { rc = kNetFailDspRC; if( errMsg != NULL ) cmErrMsg(&p->err,rc,errMsg); } return rc; } // set echoFl if the remote node should respond with it's own 'hello' msg cmDspRC_t _cmDspSysNetSendHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl ) { return _cmDspSysNetSend(p, remoteNetNodeId, kNetHelloSelAsId, echoFl ? 1 : 0, cmInvalidId, "A network send failed while sending 'hello'." ); } cmDspRC_t _cmDspSysNetSendSyncError( cmDsp_t* p, cmDspRC_t errRc ) { cmDspRC_t rc = kOkDspRC; unsigned i; // for each non-local node for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].localFl == false) { unsigned remoteNetNodeId = p->netNodeArray[i].id; cmDspRC_t rc0; // send the error code in the dstId if((rc0 = _cmDspSysNetSend(p, remoteNetNodeId, kNetErrSelAsId, cmUdpNetLocalNodeId(p->netH), errRc, "A network send failed while signaling an error." )) != kOkDspRC ) rc = rc0; } if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC ) rc = cmErrMsg(&p->err,kThreadFailDspRC,"An attempt to pause the sync. thread failed after signaling an error."); if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync:Done - Fail\n"); return rc; } bool _cmDspSysNetIsNodeAwake( cmDsp_t* p, unsigned netNodeId ) { unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].id == netNodeId ) return p->netNodeArray[i].helloFl; assert(0); // unknown net node id return false; } bool _cmDspSysNet_AreAllDstIdsResolved( cmDsp_t* p ) { // check that we have received all dst id req's from each remote node unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].localFl == false && p->netNodeArray[i].reqDoneFl==false) return false; _cmDspSrcConn_t* rp = p->srcConnList; // for each src connection which does not yet have a destination id. for(; rp != NULL; rp = rp->link) if( rp->dstId == cmInvalidId ) return false; return true; } // send connection requests to the specified remote node cmDspRC_t _cmDspSysNetSendConnRequests( cmDsp_t* p, unsigned dstNetNodeId ) { cmDspRC_t rc = kOkDspRC; _cmDspSrcConn_t* rp = p->srcConnList; // for each src connection which does not yet have a destination id. for(; rp != NULL; rp = rp->link) { // if this src conn has not been assigned a dstId and it's source node is awake if( rp->dstId == cmInvalidId && rp->dstNetNodeId==dstNetNodeId && _cmDspSysNetIsNodeAwake(p,rp->dstNetNodeId) ) { // calc the msg size unsigned sn0 = strlen(rp->dstInstLabel) + 1; unsigned sn1 = strlen(rp->dstVarLabel) + 1; unsigned byteCnt = sizeof(cmDspNetMsg_t) + sn0 + sn1; // create msg buffer char buf[ byteCnt ]; memset(buf,0,byteCnt); // fill in the msg cmDspNetMsg_t* cp = (cmDspNetMsg_t*)buf; cp->asSubIdx = cmDspSys_AsSubIdx_Zero; cp->selId = kNetSyncSelAsId; cp->subSelId = kNetDstIdReqSelAsId; cp->srcId = rp->srcId; cp->dstId = cmInvalidId; char* dp = buf + sizeof(*cp); memcpy(dp,rp->dstInstLabel,sn0); dp += sn0; memcpy(dp,rp->dstVarLabel,sn1); dp += sn1; assert(dp == buf + byteCnt ); // send the msg if( cmUdpNetSendById(p->netH, rp->dstNetNodeId, buf, byteCnt ) != kOkUnRC ) { rc = cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while registering remote nodes."); goto errLabel; } if( p->netVerbosity > 1 ) cmRptPrintf(p->err.rpt,"Sync: send req to %i\n",rp->dstNetNodeId); //cmSleepUs(p->sendWaitMs*1000); // wait between transmissions } } errLabel: if( rc != kOkDspRC ) _cmDspSysNetSendSyncError(p,rc); return rc; } // Return true when the 'doneFl' on all nodes has been set. // The doneFl is cleared on the beginning of the sync. process. // The doneFl is set as each remote node signals that it has // all of the dstId's that it needs by sending kNetDoneSelAsId // messages. bool _cmDspSysNetCheckNetNodeStatus( cmDsp_t* p ) { unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].doneFl == false ) return false; return true; } // Send kNetDoneSelAsId msgs to all remote nodes to indicate that // this node has all of it's dstId's. cmDspRC_t _cmDspSysNetSendSyncDone( cmDsp_t* p ) { cmDspRC_t rc = kOkDspRC; unsigned i; // broadcast the sync 'done' msg to each non-local node if( p->netDoneSentFl ) return rc; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].localFl == false ) { if((rc = _cmDspSysNetSend(p, p->netNodeArray[i].id, kNetDoneSelAsId, cmInvalidId, cmInvalidId, "A network send failed while signaling sync. completion." )) != kOkDspRC ) goto errLabel; } // create the src connection map _cmDspSrcConn_t* sp = p->srcConnList; if( sp != NULL && p->srcConnMapCnt == 0 ) { if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync:Creating src map\n"); // get the count of src nodes unsigned n; for(n=0; sp != NULL; sp = sp->link ) ++n; // allocate the srcConnMap p->srcConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspSrcConn_t*, p->srcConnMap, n ); p->srcConnMapCnt = n; // load the srcConnMap for(sp = p->srcConnList; sp != NULL; sp = sp->link ) { assert( sp->srcId < n ); p->srcConnMap[ sp->srcId ] = sp; } } // create the dst connection map _cmDspDstConn_t* dp = p->dstConnList; if( dp != NULL && p->dstConnMapCnt == 0 ) { if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync:Creating dst map\n"); unsigned n; // get the count of dst nodes for(n=0; dp != NULL; dp = dp->link ) ++n; // allocate the dstConnMap p->dstConnMap = cmLhResizeNZ( p->ctx.lhH, _cmDspDstConn_t*, p->dstConnMap, n ); p->dstConnMapCnt = n; // load the dstConnMap for(dp = p->dstConnList; dp != NULL; dp = dp->link ) { assert( dp->dstId < n ); p->dstConnMap[ dp->dstId ] = dp; } } p->netDoneSentFl = true; if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync: Done Sent\n",i); errLabel: if( rc != kOkDspRC ) _cmDspSysNetSendSyncError(p,rc); return rc; } // Sync thread callback function bool _cmDspSysNetSyncThreadCb( void* param ) { cmDsp_t* p = (cmDsp_t*)param; bool connFl = true; // bool nodeFl = true; // receive a group of waiting messages from remote nodes // WARNING: calling cmUdpNetReceive() here means that the audio engine cannot be // enabled - otherwise this thread and the audio system thread will simultaneously // attempt to read the UDP port. This will result in unsafe thread conflicts. if( cmUdpNetReceive(p->netH, NULL ) != kOkUnRC ) { cmErrMsg(&p->err,kNetFailDspRC,"UDP Net receive failed during sync. mode."); _cmDspSysNetSendSyncError(p,kNetFailDspRC); } // check if all the src connections have been assigned dst id's connFl = _cmDspSysNet_AreAllDstIdsResolved(p); // if all the src connections have dst id's then send a 'done' signal // to all other nodes so that they know this node is ready to leave // sync mode if( connFl ) { if( _cmDspSysNetSendSyncDone(p) != kOkDspRC ) goto errLabel; } // prevent the thread from burning too much time cmSleepUs(p->sendWaitMs*1000); // check if all nodes have completed transmission to this node nodeFl = _cmDspSysNetCheckNetNodeStatus(p); // if the connections have all been setup and all the net nodes have // received a 'done' signal if( connFl && nodeFl ) { // mark the sync as complete p->syncState = kSyncSuccessDspId; // the sync. is done - pause this thread if( cmThreadPause( p->thH, kPauseThFl ) != kOkThRC ) cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the sync. thread upon completion failed."); if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync Done!\n"); } errLabel: return true; } // During DSP network allocation and connection this function is called // to register remote instance/var targets. _cmDspSrcConn_t* _cmDspSysNetCreateSrcConn( cmDsp_t* p, unsigned dstNetNodeId, const cmChar_t* dstInstLabel, const cmChar_t* dstVarLabel ) { if( dstNetNodeId == cmUdpNetLocalNodeId(p->netH) ) { cmErrMsg(&p->err,kNetFailDspRC,"Cannot connect a network node (node:%s inst:%s label:%s)to itself.",cmStringNullGuard(cmUdpNetNodeIdToLabel(p->netH,dstNetNodeId)),cmStringNullGuard(dstInstLabel),cmStringNullGuard(dstVarLabel)); return NULL; } // register the remote node _cmDspSrcConn_t* rp = cmLhAllocZ( p->ctx.lhH, _cmDspSrcConn_t, 1 ); rp->dstNetNodeId = dstNetNodeId; rp->dstInstLabel = cmLhAllocStr( p->ctx.lhH, dstInstLabel ); rp->dstVarLabel = cmLhAllocStr( p->ctx.lhH, dstVarLabel ); rp->link = p->srcConnList; p->srcConnList = rp; if( p->netVerbosity > 1 ) cmRptPrintf(p->err.rpt,"Sync: create src for %i\n", dstNetNodeId ); return rp; } // This function is called in response to receiving a connection request on the // dst machine. It sends a dstId to match the srcId provided by the src machine. cmDspRC_t _cmDspSysNetSendDstConnId( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId, unsigned dstId ) { cmDspRC_t rc = kOkDspRC; if((rc = _cmDspSysNetSend(p, srcNetNodeId, kNetDstIdSelAsId, srcId, dstId, "A network send failed while sending a dst. id." )) == kOkDspRC ) { if( p->netVerbosity > 1 ) cmRptPrintf(p->err.rpt,"Sync:Sent dst id to %i\n",srcNetNodeId); } return rc; } // Handle 'hello' messages cmDspRC_t _cmDspSysNetReceiveHello( cmDsp_t* p, unsigned remoteNetNodeId, bool echoFl ) { cmDspRC_t rc = kOkDspRC; unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].id == remoteNetNodeId ) { p->netNodeArray[i].helloFl = true; // if echo was requested then respond ... if( echoFl ) { // ... but the remote node should not respond - because we already know about it _cmDspSysNetSendHello(p,remoteNetNodeId,false); } if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync:hello from %i\n",remoteNetNodeId); // send connection requests to the remote net node if((rc = _cmDspSysNetSendConnRequests(p, remoteNetNodeId )) == kOkDspRC ) { // send 'req done' msg rc = _cmDspSysNetSend(p, remoteNetNodeId, kNetDstIdReqDoneAsId, 0, cmInvalidId, "A network send failed while sending 'dst req done'." ); } return rc; } return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'hello' message from an unknown remote network node (id=%i).",remoteNetNodeId); } cmDspRC_t _cmDspSysNetReceiveReqDone( cmDsp_t* p, unsigned remoteNetNodeId ) { unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].id == remoteNetNodeId ) { p->netNodeArray[i].reqDoneFl = true; if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync: Req Done from %i\n",remoteNetNodeId); return kOkDspRC; } return cmErrMsg(&p->err,kNetFailDspRC,"Received a 'req done' message from an unknown remote network node (id=%i).",remoteNetNodeId); } // given a srcNetNodeId/srcId return the associated dst connection _cmDspDstConn_t* _cmDspSysNetSrcIdToDstConn( cmDsp_t* p, unsigned srcNetNodeId, unsigned srcId ) { _cmDspDstConn_t* rp = p->dstConnList; for(; rp != NULL; rp = rp->link ) if( rp->srcNetNodeId == srcNetNodeId && rp->srcId == srcId ) return rp; return NULL; } // Handle kNetSyncSelAsId messages. // Called on the dst machine to receive a src connection request cmDspRC_t _cmDspSysNetReceiveSrcConnRequest( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned srcNetNodeId ) { cmDspRC_t rc = kOkDspRC; _cmDspDstConn_t* rp; // if a dstId has not already been assigned to this src if((rp = _cmDspSysNetSrcIdToDstConn(p, srcNetNodeId, msg->srcId )) == NULL ) { if( p->netVerbosity > 1 ) cmRptPrintf(p->err.rpt,"Sync:Rcvd src conn request from %i\n",srcNetNodeId); cmDspInst_t* instPtr = NULL; const cmDspVar_t* varPtr; const cmChar_t* instLabel = (const cmChar_t*)(msg+1); const cmChar_t* varLabel = instLabel + strlen(instLabel) + 1; // get the symbols assoc'd with the dst inst/var unsigned instSymId = cmSymTblId(p->ctx.stH,instLabel); unsigned varSymId = cmSymTblId(p->ctx.stH,varLabel); assert( instSymId != cmInvalidId ); // find the dst inst if((instPtr = _cmDspSysInstSymIdToPtr(p,instSymId)) == NULL ) { rc = cmErrMsg(&p->err,kInstNotFoundDspRC,"The instance associated with '%s' was not found.",cmStringNullGuard(instLabel)); goto errLabel; } // find dst var if((varPtr = cmDspVarSymbolToPtr(&p->ctx, instPtr, varSymId, kInDsvFl )) == NULL ) { rc = cmErrMsg(&p->err,kVarNotFoundDspRC,"The variable '%s' on the instance '%s' was not found.",cmStringNullGuard(varLabel),cmStringNullGuard(instLabel)); goto errLabel; } // register the new dst connection rp = cmLhAllocZ( p->ctx.lhH, _cmDspDstConn_t, 1 ); rp->srcNetNodeId = srcNetNodeId; rp->srcId = msg->srcId; rp->dstId = p->nextDstId++; rp->dstInst = instPtr; rp->dstVarId = varPtr->constId; rp->link = p->dstConnList; p->dstConnList = rp; } assert( rp != NULL ); // send the dstId associated with this connection back to the source if((rc = _cmDspSysNetSendDstConnId(p,srcNetNodeId,msg->srcId,rp->dstId)) != kOkDspRC ) goto errLabel; errLabel: if( rc != kOkDspRC ) _cmDspSysNetSendSyncError(p,rc); return rc; } cmDspRC_t _cmDspSysNetReceiveDstId(cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId) { cmDspRC_t rc; _cmDspSrcConn_t* sp = p->srcConnList; for(; sp != NULL; sp = sp->link ) if( msg->srcId == sp->srcId ) { sp->dstId = msg->dstId; return kOkDspRC; } rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"The src id %i associated with the dst id %i could not be found.",msg->srcId,msg->dstId); _cmDspSysNetSendSyncError(p,rc); return rc; } // Handle kNetDoneSelAsId messages. cmDspRC_t _cmDspSysNetReceiveRemoteSyncDone( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId ) { cmDspRC_t rc = kOkDspRC; unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].id == remoteNetNodeId ) { p->netNodeArray[i].doneFl = true; if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync: Rcvd done from %i\n",remoteNetNodeId); break; } return rc; } // Handle kNetErrSelAsId messages cmDspRC_t _cmDspSysNetReceiveRemoteSyncError( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned remoteNetNodeId ) { cmDspRC_t rc = kOkDspRC; if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync: Rcvd error from %i\n",remoteNetNodeId); if( cmThreadPause(p->thH,kPauseThFl) != kOkThRC ) rc = cmErrMsg(&p->err,kThreadFailDspRC,"The attempt to pause the thread due to a remote error failed."); return rc; } // Verifify that a net node id is valid. cmDspRC_t _cmDspSysNetValidateNetNodeId( cmDsp_t* p, unsigned netNodeId ) { cmDspRC_t rc = kOkDspRC; unsigned i; for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].id == netNodeId ) break; if( i == p->netNodeCnt ) rc = cmErrMsg(&p->err,kNetNodeNotFoundDspRC,"A message arrived from a unknown net node (id=%i).",netNodeId); return rc; } ///============================================================================================ // main hooks to the DSP system // called by cmDspSysInitialize() cmDspRC_t _cmDspSysNetAlloc( cmDsp_t* p ) { if((p->netNodeCnt = cmUdpNetNodeCount(p->netH)) != 0) p->netNodeArray = cmMemAllocZ(_cmDspNetNode_t, p->netNodeCnt ); if( cmThreadCreate(&p->thH,_cmDspSysNetSyncThreadCb,p, p->err.rpt) != kOkThRC ) return cmErrMsg(&p->err,kThreadFailDspRC,"The network syncronization thread create failed."); p->sendWaitMs = 10; p->syncState = kSyncPreDspId; p->netVerbosity = 1; return kOkDspRC; } // called by cmDspSysFinalize() cmDspRC_t _cmDspSysNetFree( cmDsp_t* p ) { cmMemFree(p->netNodeArray); if( cmThreadDestroy(&p->thH) != kOkThRC ) cmErrMsg(&p->err,kThreadFailDspRC,"The network syncrhonization thread destroy failed."); return kOkDspRC; } // called by cmDspSysLoad() cmDspRC_t _cmDspSysNetPreLoad( cmDsp_t* p ) { p->srcConnList = NULL; return kOkDspRC; } // called by cmDspSysUnload() cmDspRC_t _cmDspSysNetUnload( cmDsp_t* p ) { if( cmUdpNetEnableListen(p->netH, false ) != kOkUnRC ) return cmErrMsg(&p->err,kNetFailDspRC,"The network failed to exit 'listening' mode."); p->syncState = kSyncPreDspId; p->nextDstId = 0; p->dstConnList = NULL; p->srcConnMap = NULL; p->srcConnMapCnt = 0; p->dstConnMap = NULL; p->dstConnMapCnt = 0; p->netDoneSentFl = false; return kOkDspRC; } // Call this function to enter 'sync' mode. cmDspRC_t _cmDspSysNetSync( cmDsp_t* p ) { cmDspRC_t rc = kOkDspRC; unsigned localNodeId = cmUdpNetLocalNodeId(p->netH); unsigned i; // if there is no network then there is nothing to do if( p->netNodeCnt == 0 ) { p->syncState = kSyncSuccessDspId; return kOkDspRC; } p->syncState = kSyncPendingDspId; // be sure the sync thread is paused before continuing if( cmThreadPause(p->thH, kPauseThFl | kWaitThFl ) != kOkThRC ) { rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be paused prior to initialization."); goto errLabel; } // initialize the netNodeArry[] for(i=0; inetNodeCnt; ++i) { p->netNodeArray[i].id = cmUdpNetNodeId(p->netH,i); p->netNodeArray[i].localFl = p->netNodeArray[i].id == localNodeId; p->netNodeArray[i].doneFl = p->netNodeArray[i].localFl; p->netNodeArray[i].helloFl = false; p->netNodeArray[i].reqDoneFl = false; p->netNodeArray[i].doneFl = false; assert( p->netNodeArray[i].id != cmInvalidId ); } // initialize the src connection array - this array was created when the // DSP instance network was formed _cmDspSrcConn_t* rp = p->srcConnList; for(i=0; rp != NULL; rp = rp->link,++i) { rp->srcId = i; rp->dstId = cmInvalidId; } // clear the dst conn records _cmDspDstConn_t* dp = p->dstConnList; while( dp!=NULL) { _cmDspDstConn_t* np = dp->link; cmLhFree(p->ctx.lhH,dp); dp = np; } // clear the connection maps p->srcConnMapCnt = 0; p->dstConnMapCnt = 0; p->netDoneSentFl = false; // enter listening mode if( cmUdpNetEnableListen(p->netH, true ) != kOkUnRC ) { rc = cmErrMsg(&p->err,kNetFailDspRC,"The network failed to go into 'listening' mode."); goto errLabel; } // start the sync process if( cmThreadPause(p->thH,0) != kOkThRC ) { rc = cmErrMsg(&p->err,kThreadFailDspRC,"The network sync thread could not be un-paused prior to begin synchronization."); goto errLabel; } // broadcast 'hello' to all remote listeners and request that they respond with their own 'hello'. for(i=0; inetNodeCnt; ++i) if( p->netNodeArray[i].localFl == false ) if((rc = _cmDspSysNetSendHello(p,p->netNodeArray[i].id,true)) != kOkDspRC ) goto errLabel; if( p->netVerbosity > 0 ) cmRptPrintf(p->err.rpt,"Sync:Entering sync loop\n"); errLabel: if( rc != kOkDspRC ) p->syncState = kSyncFailDspId; return rc; } cmDspRC_t _cmDspSysNetRecvEvent( cmDsp_t* p, const void* msgPtr, unsigned msgByteCnt ) { cmDspNetMsg_t* m = (cmDspNetMsg_t*)msgPtr; cmRC_t rc = cmOkRC; //bool jsFl = false; // if the value associated with this msg is a mtx then set // its mtx data area pointer to just after the msg header. if( cmDsvIsJson(&m->value) ) { //rc = cmDsvDeserializeJson(&m->value,p->jsH); assert(0); //jsFl = true; } else rc = cmDsvDeserializeInPlace(&m->value,msgByteCnt-sizeof(cmDspNetMsg_t)); if( rc != kOkDsvRC ) rc = cmErrMsg(&p->err,kInvalidStateDspRC,"Deserialize failed on network event message."); else { assert( m->dstId < p->dstConnMapCnt ); // form the event cmDspEvt_t e; e.flags = 0; e.srcInstPtr = NULL; e.srcVarId = cmInvalidId; e.valuePtr = &m->value; e.dstVarId = p->dstConnMap[ m->dstId ]->dstVarId; e.dstDataPtr = NULL; cmDspInst_t* instPtr = p->dstConnMap[ m->dstId ]->dstInst; assert(instPtr != NULL ); // send the event if( instPtr->recvFunc != NULL ) rc = instPtr->recvFunc(&p->ctx,instPtr,&e); } //if( jsFl ) // cmJsonClearTree(p->jsH); return rc; } // Called from cmAudDsp.c:_cmAdUdpNetCallback() to to send an incoming msg to the DSP system. cmDspRC_t _cmDspSysNetRecv( cmDsp_t* p, const cmDspNetMsg_t* msg, unsigned msgByteCnt, unsigned remoteNetNodeId ) { assert( msg->selId == kNetSyncSelAsId ); cmDspRC_t rc = kOkDspRC; switch( msg->subSelId ) { case kNetHelloSelAsId: rc = _cmDspSysNetReceiveHello(p,remoteNetNodeId,msg->srcId!=0); break; case kNetDstIdReqSelAsId: rc = _cmDspSysNetReceiveSrcConnRequest(p,msg,remoteNetNodeId); break; case kNetDstIdReqDoneAsId: rc = _cmDspSysNetReceiveReqDone(p,remoteNetNodeId); break; case kNetDstIdSelAsId: rc = _cmDspSysNetReceiveDstId(p,msg,remoteNetNodeId); break; case kNetDoneSelAsId: rc = _cmDspSysNetReceiveRemoteSyncDone(p,msg,remoteNetNodeId); break; case kNetErrSelAsId: rc = _cmDspSysNetReceiveRemoteSyncError(p,msg,remoteNetNodeId); break; case kNetEvtSelAsId: rc = _cmDspSysNetRecvEvent(p, msg, msgByteCnt ); break; default: cmErrMsg(&p->err,kNetFailDspRC,"Unexpected message type 'selId=%i' received by the network sync. message dispatcher.",msg->selId); break; } return rc; } cmDspRC_t _cmDspSysNetSendEvent( cmDspSysH_t h, unsigned dstNetNodeId, unsigned dstId, const cmDspEvt_t* evt ) { cmDsp_t* p = _cmDspHandleToPtr(h); unsigned bufByteCnt = sizeof(cmDspNetMsg_t); unsigned dataByteCnt = 0; if( evt->valuePtr != NULL ) dataByteCnt = cmDsvSerialDataByteCount(evt->valuePtr); bufByteCnt += dataByteCnt; char buf[ bufByteCnt ]; cmDspNetMsg_t* hdr = (cmDspNetMsg_t*)buf; hdr->asSubIdx = cmDspSys_AsSubIdx_Zero; hdr->selId = kNetSyncSelAsId; hdr->subSelId = kNetEvtSelAsId; hdr->srcId = cmInvalidId; hdr->dstId = dstId; if( evt->valuePtr == NULL ) cmDsvSetNull(&hdr->value); else { // this function relies on the 'ne->value' field being the last field in the 'hdr'. if( cmDsvSerialize( evt->valuePtr, &hdr->value, sizeof(cmDspValue_t) + dataByteCnt) != kOkDsvRC ) return cmErrMsg(&p->err,kSerializeFailMsgRC,"An attempt to serialize a network event msg failed."); } if( cmUdpNetSendById(p->netH, dstNetNodeId, &buf, bufByteCnt ) != kOkUnRC ) return cmErrMsg(&p->err,kNetFailDspRC,"A network send failed while sending a DSP event message."); return kOkDspRC; }