libcm is a C development framework with an emphasis on audio signal processing applications.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cmVirtNet.c 15KB


  1. #include "cmGlobal.h"
  2. #include "cmRpt.h"
  3. #include "cmErr.h"
  4. #include "cmCtx.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmLinkedHeap.h"
  8. #include "cmJson.h"
  9. #include "cmThread.h"
  10. #include "cmUdpPort.h"
  11. #include "cmUdpNet.h"
  12. #include "cmVirtNet.h"
  13. #include "cmText.h" // use by cmVnTest()
  14. enum { kOwnerVnFl = 0x01, kLocalVnFl = 0x02 };
  15. struct cmVn_str;
  16. typedef struct cmVnNode_str
  17. {
  18. cmChar_t* label;
  19. unsigned id;
  20. unsigned flags;
  21. cmVnH_t vnH; // handle of a local virtual net that is owned by this node
  22. cmTsMp1cH_t qH; // MPSC non-blocking queue to hold incoming msg's from local virt. net's
  23. struct cmVn_str* p; // pointer back to the cmVn_t which owns this NULL
  24. struct cmVnNode_str* link;
  25. } cmVnNode_t;
  26. typedef struct cmVn_str
  27. {
  28. cmErr_t err;
  29. cmUdpNetH_t udpH;
  30. cmVnNode_t* nodes;
  31. cmVnCb_t cbFunc;
  32. void* cbArg;
  33. unsigned ownerNodeId;
  34. } cmVn_t;
  35. cmVnH_t cmVnNullHandle = cmSTATIC_NULL_HANDLE;
  36. cmVn_t* _cmVnHandleToPtr( cmVnH_t h )
  37. {
  38. cmVn_t* p = (cmVn_t*)h.h;
  39. assert(p != NULL);
  40. return p;
  41. }
  42. void _cmVnNodeFree( cmVnNode_t* np )
  43. {
  44. if( np == NULL )
  45. return;
  46. if( cmTsMp1cIsValid(np->qH) )
  47. cmTsMp1cDestroy(&np->qH);
  48. np->vnH = cmVnNullHandle;
  49. cmMemFree(np->label);
  50. cmMemFree(np);
  51. }
  52. cmVnRC_t _cmVnDestroy( cmVn_t* p )
  53. {
  54. cmVnRC_t rc = kOkVnRC;
  55. if( cmUdpNetFree(&p->udpH) != kOkUnRC )
  56. return cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network release failed.");
  57. cmVnNode_t* np = p->nodes;
  58. while( np != NULL )
  59. {
  60. cmVnNode_t* nnp = np->link;
  61. _cmVnNodeFree(np);
  62. np = nnp;
  63. }
  64. cmMemFree(p);
  65. return rc;
  66. }
  67. // This function is called by cmUdpNetReceive() which is called by cmVnReceive().
  68. void _cmVnUdpNetCb( void* cbArg, cmUdpNetH_t h, const char* data, unsigned dataByteCnt, unsigned remoteNodeId )
  69. {
  70. cmVnNode_t* np = (cmVnNode_t*)cbArg;
  71. assert( np->id == remoteNodeId );
  72. np->p->cbFunc(np->p->cbArg,np->id,dataByteCnt,data);
  73. }
  74. cmVnRC_t _cmVnSend( cmVn_t* p, cmVnNode_t* np, unsigned byteCnt, const cmChar_t* buf )
  75. {
  76. cmVnRC_t rc = kOkVnRC;
  77. // if the node is local then send the msg directly
  78. if( cmIsFlag(np->flags,kLocalVnFl) )
  79. {
  80. if( cmVnIsValid(np->vnH) )
  81. rc = cmVnRecvFromLocal(np->vnH,p->ownerNodeId,byteCnt,buf);
  82. }
  83. else
  84. {
  85. // if the node is remote then send the msg via UDP
  86. if( cmUdpNetSendById(p->udpH,np->id,buf,byteCnt) != kOkUnRC )
  87. rc = cmErrMsg( &p->err, kUdpNetFailVnRC, "UDP Net send to remote node '%s' failed.",cmStringNullGuard(np->label));
  88. }
  89. return rc;
  90. }
  91. cmVnNode_t* _cmVnNodeFindById( cmVn_t* p, unsigned id )
  92. {
  93. cmVnNode_t* np = p->nodes;
  94. for(; np!=NULL; np=np->link)
  95. if( np->id == id )
  96. return np;
  97. return NULL;
  98. }
  99. cmVnNode_t* _cmVnNodeFindByLabel( cmVn_t* p, const cmChar_t* label )
  100. {
  101. cmVnNode_t* np = p->nodes;
  102. for(; np!=NULL; np=np->link)
  103. if( strcmp(np->label,label)==0 )
  104. return np;
  105. return NULL;
  106. }
  107. cmVnNode_t* _cmVnNodeFindOwner( cmVn_t* p )
  108. {
  109. cmVnNode_t* np = p->nodes;
  110. for(; np!=NULL; np=np->link)
  111. if( cmIsFlag(np->flags,kOwnerVnFl) )
  112. return np;
  113. return NULL;
  114. }
  115. cmVnRC_t _cmVnCreateNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned flags, cmVnNode_t** npp )
  116. {
  117. cmVnNode_t* np;
  118. if((np = _cmVnNodeFindById(p,nodeId)) != NULL )
  119. return cmErrMsg(&p->err,kDuplicateNodeIdVnRC,"The node id=%i is already in use for '%s'.",cmStringNullGuard(np->label));
  120. if((np = _cmVnNodeFindByLabel(p,label)) != NULL )
  121. return cmErrMsg(&p->err,kDuplicateNodeLabelVnRC,"The node label '%s' is already used by another node.",cmStringNullGuard(label));
  122. np = cmMemAllocZ(cmVnNode_t,1);
  123. np->label = cmMemAllocStr(label);
  124. np->id = nodeId;
  125. np->flags = flags;
  126. np->p = p;
  127. *npp = np;
  128. return kOkVnRC;
  129. }
  130. void _cmVnNodeLink( cmVn_t* p, cmVnNode_t* np )
  131. {
  132. cmVnNode_t* cnp = p->nodes;
  133. if( cnp == NULL )
  134. p->nodes = np;
  135. else
  136. {
  137. while( cnp->link != NULL )
  138. cnp = cnp->link;
  139. cnp->link = np;
  140. }
  141. }
  142. cmVnRC_t _cmVnCreateOwnerNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
  143. {
  144. cmVnRC_t rc = kOkVnRC;
  145. cmVnNode_t* np;
  146. // create a generic node
  147. if((rc = _cmVnCreateNode(p, label,nodeId, kOwnerVnFl, &np )) != kOkVnRC )
  148. goto errLabel;
  149. // initialize the UDP net with the owner node
  150. if( cmUdpNetInit(p->udpH,label,nodeId,ipPort,_cmVnUdpNetCb,np,udpRecvBufByteCnt,udpRecvTimeOutMs) != kOkUnRC )
  151. {
  152. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network initialization failed for node:'%s'.",cmStringNullGuard(label));
  153. goto errLabel;
  154. }
  155. _cmVnNodeLink(p,np);
  156. p->ownerNodeId = nodeId;
  157. errLabel:
  158. if( rc != kOkVnRC )
  159. _cmVnNodeFree(np);
  160. return rc;
  161. }
  162. //------------------------------------------------------------------------------------------------------------
  163. cmVnRC_t cmVnCreate( cmCtx_t* ctx, cmVnH_t* hp, cmVnCb_t cbFunc, void* cbArg, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
  164. {
  165. cmVnRC_t rc;
  166. if((rc = cmVnDestroy(hp)) != kOkVnRC )
  167. return rc;
  168. cmVn_t* p = cmMemAllocZ(cmVn_t,1);
  169. cmErrSetup(&p->err,&ctx->rpt,"cmVirtNet");
  170. if( cmUdpNetAlloc(ctx,&p->udpH) != kOkUnRC )
  171. {
  172. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network allocation failed.");
  173. goto errLabel;
  174. }
  175. // create the owner node
  176. if((rc = _cmVnCreateOwnerNode(p,label,nodeId, udpRecvBufByteCnt, udpRecvTimeOutMs, ipAddr, ipPort )) != kOkVnRC )
  177. goto errLabel;
  178. p->ownerNodeId = nodeId;
  179. p->cbFunc = cbFunc;
  180. p->cbArg = cbArg;
  181. hp->h = p;
  182. errLabel:
  183. if( rc != kOkUdpRC )
  184. _cmVnDestroy(p);
  185. return rc;
  186. }
  187. cmVnRC_t cmVnDestroy( cmVnH_t* hp )
  188. {
  189. cmVnRC_t rc = kOkVnRC;
  190. if( hp == NULL || cmVnIsValid(*hp)==false )
  191. return rc;
  192. cmVn_t* p = _cmVnHandleToPtr(*hp);
  193. if((rc = _cmVnDestroy(p)) != kOkVnRC )
  194. return rc;
  195. hp->h = NULL;
  196. return rc;
  197. }
  198. bool cmVnIsValid( cmVnH_t h )
  199. { return h.h != NULL; }
  200. cmVnRC_t cmVnEnableListen( cmVnH_t h, bool enableFl )
  201. {
  202. cmVn_t* p = _cmVnHandleToPtr(h);
  203. if( cmUdpNetEnableListen( p->udpH, enableFl ) != kOkUnRC )
  204. return cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP listen enable failed.");
  205. return kOkVnRC;
  206. }
  207. // This function is called by cmTsMp1cDequeueMsg() which is called by cmVnReceive().
  208. cmRC_t _cmVnTsQueueCb(void* userCbPtr, unsigned msgByteCnt, const void* msgDataPtr )
  209. {
  210. cmVnNode_t* np = (cmVnNode_t*)userCbPtr;
  211. return np->p->cbFunc(np->p->cbArg,np->id,msgByteCnt,msgDataPtr);
  212. }
  213. cmVnRC_t cmVnCreateLocalNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, cmVnH_t vnH, unsigned queBufByteCnt )
  214. {
  215. cmVnRC_t rc = kOkVnRC;
  216. cmVn_t* p = _cmVnHandleToPtr(h);
  217. cmVnNode_t* np = NULL;
  218. // create a generic node
  219. if((rc = _cmVnCreateNode(p,label,nodeId,kLocalVnFl,&np )) != kOkVnRC )
  220. goto errLabel;
  221. if( cmTsMp1cCreate( &np->qH, queBufByteCnt, _cmVnTsQueueCb, np, p->err.rpt ) != kOkThRC )
  222. {
  223. rc = cmErrMsg(&p->err,kQueueFailVnRC,"The internal thread-safe queue creation failed for local node '%s'.",cmStringNullGuard(label));
  224. goto errLabel;
  225. }
  226. np->vnH = vnH;
  227. _cmVnNodeLink(p,np);
  228. errLabel:
  229. if( rc != kOkVnRC )
  230. _cmVnNodeFree(np);
  231. return rc;
  232. }
  233. cmVnRC_t cmVnCreateRemoteNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, const cmChar_t* ipAddr, unsigned ipPort )
  234. {
  235. cmVnRC_t rc = kOkVnRC;
  236. cmVn_t* p = _cmVnHandleToPtr(h);
  237. cmVnNode_t* np;
  238. // creaet a generic node
  239. if((rc = _cmVnCreateNode(p, label,nodeId, 0, &np )) != kOkVnRC )
  240. goto errLabel;
  241. if( ipAddr!=NULL )
  242. {
  243. if( cmUdpNetRegisterRemote(p->udpH,label,nodeId,ipAddr,ipPort) != kOkUnRC )
  244. {
  245. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network remote registration failed for node:'%s'.",cmStringNullGuard(label));
  246. goto errLabel;
  247. }
  248. }
  249. _cmVnNodeLink(p,np);
  250. errLabel:
  251. if( rc != kOkVnRC )
  252. _cmVnNodeFree(np);
  253. return rc;
  254. }
  255. cmVnRC_t cmVnRecvFromLocal( cmVnH_t h, unsigned srcNodeId, unsigned byteCnt, const cmChar_t* buf )
  256. {
  257. cmVnRC_t rc = kOkVnRC;
  258. cmVnNode_t* np;
  259. cmVn_t* p = _cmVnHandleToPtr(h);
  260. if(( np = _cmVnNodeFindById(p,srcNodeId)) == NULL )
  261. return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",srcNodeId);
  262. if( cmTsMp1cIsValid(np->qH) == false )
  263. return cmErrMsg(&p->err,kQueueFailVnRC,"The internal MPSC queue for the node '%s' is not valid. Is this a local node?",cmStringNullGuard(np->label));
  264. if( cmTsMp1cEnqueueMsg(np->qH,buf,byteCnt) != kOkThRC )
  265. return cmErrMsg(&p->err,kQueueFailVnRC,"Enqueue failed on the internal MPSC queue for the node '%s'.",cmStringNullGuard(np->label));
  266. return rc;
  267. }
  268. cmVnRC_t cmVnSendById( cmVnH_t h, unsigned nodeId, unsigned byteCnt, const cmChar_t* buf )
  269. {
  270. cmVnNode_t* np;
  271. cmVn_t* p = _cmVnHandleToPtr(h);
  272. if(( np = _cmVnNodeFindById(p,nodeId)) == NULL )
  273. return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",nodeId);
  274. return _cmVnSend(p,np,byteCnt,buf);
  275. }
  276. cmVnRC_t cmVnSendByLabel( cmVnH_t h, const cmChar_t* nodeLabel, unsigned byteCnt, const cmChar_t* buf )
  277. {
  278. cmVnNode_t* np;
  279. cmVn_t* p = _cmVnHandleToPtr(h);
  280. if(( np = _cmVnNodeFindByLabel(p,nodeLabel)) == NULL )
  281. return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node named '%s' could not be found.",cmStringNullGuard(nodeLabel));
  282. return _cmVnSend(p,np,byteCnt,buf);
  283. }
  284. cmVnRC_t _cmVnRecvQueueMsgs( cmVn_t* p, cmVnNode_t* np, unsigned *msgCntPtr )
  285. {
  286. unsigned i;
  287. unsigned mn = *msgCntPtr;
  288. *msgCntPtr = 0;
  289. for(i=0; (i<mn || mn==cmInvalidCnt) && cmTsMp1cMsgWaiting(np->qH); ++i)
  290. {
  291. // calling this function results in calls to _cmVnTsQueueCb()
  292. if( cmTsMp1cDequeueMsg( np->qH,NULL,0) != kOkThRC )
  293. return cmErrMsg(&p->err,kQueueFailVnRC,"Msg deque failed for node '%s'.",cmStringNullGuard(np->label));
  294. }
  295. *msgCntPtr = i;
  296. return kOkVnRC;
  297. }
  298. cmVnRC_t cmVnReceive( cmVnH_t h, unsigned* msgCntPtr )
  299. {
  300. cmVnRC_t rc = kOkVnRC;
  301. cmVn_t* p = _cmVnHandleToPtr(h);
  302. cmVnNode_t* np = p->nodes;
  303. unsigned mn = msgCntPtr == NULL ? cmInvalidCnt : *msgCntPtr;
  304. if( msgCntPtr != NULL )
  305. *msgCntPtr = 0;
  306. for(; np!=NULL && rc==kOkVnRC; np=np->link)
  307. {
  308. unsigned msgCnt = mn;
  309. switch( np->flags & (kOwnerVnFl | kLocalVnFl) )
  310. {
  311. case kOwnerVnFl:
  312. break;
  313. case kLocalVnFl:
  314. rc = _cmVnRecvQueueMsgs(p,np,&msgCnt);
  315. break;
  316. default:
  317. if( cmUdpNetReceive(p->udpH,msgCntPtr==NULL?NULL:&msgCnt) != kOkUnRC )
  318. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP net receive failed on node '%s'.",cmStringNullGuard(np->label));
  319. break;
  320. }
  321. if( rc == kOkVnRC && msgCntPtr != NULL )
  322. *msgCntPtr += msgCnt;
  323. }
  324. return rc;
  325. }
  326. //---------------------------------------------------------------------------------------------------
  327. unsigned udpRecvBufByteCnt = 8192;
  328. unsigned udpRecvTimeOutMs = 100;
  329. unsigned queueBufByteCnt = 8192;
  330. void* cbArg = NULL;
  331. typedef struct
  332. {
  333. const cmChar_t* label;
  334. unsigned id;
  335. const cmChar_t* ipAddr;
  336. unsigned ipPort;
  337. cmVnH_t vnH;
  338. } node_t;
  339. cmVnRC_t _cmVnTestCb( void* cbArg, unsigned srcNodeId, unsigned byteCnt, const char* buf )
  340. {
  341. printf("src node:%i bytes:%i %s\n",srcNodeId,byteCnt,buf);
  342. return kOkVnRC;
  343. }
  344. cmVnRC_t _cmVnTestCreateLocalNet( cmCtx_t* ctx, cmErr_t* err, unsigned id, node_t* nodeArray )
  345. {
  346. cmVnRC_t rc = kOkVnRC;
  347. if((rc = cmVnCreate(ctx, &nodeArray[id].vnH, _cmVnTestCb, cbArg, nodeArray[id].label, nodeArray[id].id, udpRecvBufByteCnt, udpRecvTimeOutMs, nodeArray[id].ipAddr, nodeArray[id].ipPort )) != kOkVnRC )
  348. rc = cmErrMsg(err,rc,"Virtual network create failed.");
  349. return rc;
  350. }
  351. cmVnRC_t _cmVnTestCreateVirtNodes( cmErr_t* err, unsigned id, node_t* nodeArray )
  352. {
  353. unsigned rc = kOkVnRC;
  354. unsigned i;
  355. for(i=0; nodeArray[i].label != NULL; ++i)
  356. {
  357. // if this is a local node
  358. if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr) == 0 )
  359. {
  360. // if this is not the owner node
  361. if( i != id )
  362. if((rc = cmVnCreateLocalNode( nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].vnH, queueBufByteCnt )) != kOkVnRC )
  363. {
  364. cmErrMsg(err,rc,"Local node create failed for node:'%s'.",nodeArray[i].label);
  365. goto errLabel;
  366. }
  367. }
  368. else // this must be a remote node
  369. {
  370. if((rc = cmVnCreateRemoteNode(nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].ipAddr, nodeArray[i].ipPort )) != kOkVnRC )
  371. {
  372. cmErrMsg(err,rc,"Remote node create failed for node '%s'.",nodeArray[i].label);
  373. goto errLabel;
  374. }
  375. }
  376. }
  377. errLabel:
  378. return rc;
  379. }
  380. cmVnRC_t _cmVnTestSend( cmErr_t* err, node_t* nodeArray, unsigned id, unsigned i, unsigned n )
  381. {
  382. cmVnRC_t rc = kOkVnRC;
  383. if( id == i )
  384. {
  385. printf("A node cannot send to itself.\n");
  386. return rc;
  387. }
  388. const cmChar_t* buf = cmTsPrintfS("msg-%i",n);
  389. unsigned bufByteCnt = strlen(buf)+1;
  390. printf("Sending from '%s' to '%s'.\n", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
  391. if((rc = cmVnSendById(nodeArray[id].vnH, nodeArray[i].id, bufByteCnt, buf ))!= kOkVnRC )
  392. cmErrMsg(err,rc,"Send from '%s' to '%s' failed.", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
  393. return rc;
  394. }
  395. cmVnRC_t cmVnTest( cmCtx_t* ctx )
  396. {
  397. cmVnRC_t rc = kOkVnRC;
  398. cmErr_t err;
  399. unsigned id = 0;
  400. unsigned i;
  401. unsigned n = 0;
  402. node_t nodeArray[] =
  403. {
  404. { "whirl-0", 10, "192.168.15.109", 5768, cmVnNullHandle },
  405. { "whirl-1", 20, "192.168.15.109", 5767, cmVnNullHandle },
  406. { "thunk-0", 30, "192.168.15.111", 5766, cmVnNullHandle },
  407. { NULL, -1, NULL }
  408. };
  409. cmErrSetup(&err,&ctx->rpt,"cmVnTest");
  410. // create the virt networks for the local nodes
  411. for(i=0; nodeArray[i].label !=NULL; ++i)
  412. if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr ) == 0 )
  413. if((rc = _cmVnTestCreateLocalNet(ctx,&err,i,nodeArray)) != kOkVnRC )
  414. goto errLabel;
  415. // create the virtual nodes for each local virtual network
  416. for(i=0; nodeArray[i].label != NULL; ++i)
  417. if( cmVnIsValid(nodeArray[i].vnH) )
  418. if((rc = _cmVnTestCreateVirtNodes(&err, i, nodeArray )) != kOkVnRC )
  419. break;
  420. char c;
  421. while((c=getchar()) != 'q')
  422. {
  423. bool promptFl = true;
  424. switch( c )
  425. {
  426. case '0':
  427. case '1':
  428. case '2':
  429. _cmVnTestSend(&err, nodeArray, id, (unsigned)c - (unsigned)'0', n );
  430. ++n;
  431. break;
  432. }
  433. for(i=0; nodeArray[i].label!=NULL; ++i)
  434. if( cmVnIsValid(nodeArray[i].vnH) )
  435. cmVnReceive(nodeArray[i].vnH,NULL);
  436. if( promptFl )
  437. cmRptPrintf(&ctx->rpt,"%i> ",n);
  438. }
  439. errLabel:
  440. // destroy the virtual networks
  441. for(i=0; nodeArray[i].label!=NULL; ++i)
  442. if( cmVnIsValid(nodeArray[i].vnH) )
  443. if((rc = cmVnDestroy(&nodeArray[i].vnH)) != kOkVnRC )
  444. cmErrMsg(&err,rc,"Node destroy failed for node '%s'.",cmStringNullGuard(nodeArray[i].label));
  445. return rc;
  446. }