libcm is a C development framework with an emphasis on audio signal processing applications.
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

cmVirtNet.c 15KB


  1. #include "cmGlobal.h"
  2. #include "cmRpt.h"
  3. #include "cmErr.h"
  4. #include "cmCtx.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmLinkedHeap.h"
  8. #include "cmJson.h"
  9. #include "cmThread.h"
  10. #include "cmUdpPort.h"
  11. #include "cmUdpNet.h"
  12. #include "cmVirtNet.h"
  13. #include "cmText.h" // use by cmVnTest()
  14. enum { kOwnerVnFl = 0x01, kLocalVnFl = 0x02 };
  15. struct cmVn_str;
  16. typedef struct cmVnNode_str
  17. {
  18. cmChar_t* label;
  19. unsigned id;
  20. unsigned flags;
  21. cmVnH_t vnH; // handle of a local virtual net that is owned by this node
  22. cmTsMp1cH_t qH; // MPSC non-blocking queue to hold incoming msg's from local virt. net's
  23. struct cmVn_str* p; // pointer back to the cmVn_t which owns this NULL
  24. struct cmVnNode_str* link;
  25. } cmVnNode_t;
  26. typedef struct cmVn_str
  27. {
  28. cmErr_t err;
  29. cmUdpNetH_t udpH;
  30. cmVnNode_t* nodes;
  31. cmVnCb_t cbFunc;
  32. void* cbArg;
  33. unsigned ownerNodeId;
  34. } cmVn_t;
  35. cmVnH_t cmVnNullHandle = cmSTATIC_NULL_HANDLE;
  36. cmVn_t* _cmVnHandleToPtr( cmVnH_t h )
  37. {
  38. cmVn_t* p = (cmVn_t*)h.h;
  39. assert(p != NULL);
  40. return p;
  41. }
  42. void _cmVnNodeFree( cmVnNode_t* np )
  43. {
  44. if( np == NULL )
  45. return;
  46. if( cmTsMp1cIsValid(np->qH) )
  47. cmTsMp1cDestroy(&np->qH);
  48. np->vnH = cmVnNullHandle;
  49. cmMemFree(np->label);
  50. cmMemFree(np);
  51. }
  52. cmVnRC_t _cmVnDestroy( cmVn_t* p )
  53. {
  54. cmVnRC_t rc = kOkVnRC;
  55. if( cmUdpNetFree(&p->udpH) != kOkUnRC )
  56. return cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network release failed.");
  57. cmVnNode_t* np = p->nodes;
  58. while( np != NULL )
  59. {
  60. cmVnNode_t* nnp = np->link;
  61. _cmVnNodeFree(np);
  62. np = nnp;
  63. }
  64. cmMemFree(p);
  65. return rc;
  66. }
  67. // This function is called by cmUdpNetReceive() which is called by cmVnReceive().
  68. void _cmVnUdpNetCb( void* cbArg, cmUdpNetH_t h, const char* data, unsigned dataByteCnt, unsigned remoteNodeId )
  69. {
  70. cmVnNode_t* np = (cmVnNode_t*)cbArg;
  71. assert( np->id == remoteNodeId );
  72. np->p->cbFunc(np->p->cbArg,np->id,dataByteCnt,data);
  73. }
  74. cmVnRC_t _cmVnSend( cmVn_t* p, cmVnNode_t* np, unsigned byteCnt, const cmChar_t* buf )
  75. {
  76. cmVnRC_t rc = kOkVnRC;
  77. // if the node is local then send the msg directly
  78. if( cmIsFlag(np->flags,kLocalVnFl) )
  79. {
  80. if( cmVnIsValid(np->vnH) )
  81. rc = cmVnRecvFromLocal(np->vnH,p->ownerNodeId,byteCnt,buf);
  82. }
  83. else
  84. {
  85. // if the node is remote then send the msg via UDP
  86. if( cmUdpNetSendById(p->udpH,np->id,buf,byteCnt) != kOkUnRC )
  87. rc = cmErrMsg( &p->err, kUdpNetFailVnRC, "UDP Net send to remote node '%s' failed.",cmStringNullGuard(np->label));
  88. }
  89. return rc;
  90. }
  91. cmVnNode_t* _cmVnNodeFindById( cmVn_t* p, unsigned id )
  92. {
  93. cmVnNode_t* np = p->nodes;
  94. for(; np!=NULL; np=np->link)
  95. if( np->id == id )
  96. return np;
  97. return NULL;
  98. }
  99. cmVnNode_t* _cmVnNodeFindByLabel( cmVn_t* p, const cmChar_t* label )
  100. {
  101. cmVnNode_t* np = p->nodes;
  102. for(; np!=NULL; np=np->link)
  103. if( strcmp(np->label,label)==0 )
  104. return np;
  105. return NULL;
  106. }
  107. cmVnNode_t* _cmVnNodeFindOwner( cmVn_t* p )
  108. {
  109. cmVnNode_t* np = p->nodes;
  110. for(; np!=NULL; np=np->link)
  111. if( cmIsFlag(np->flags,kOwnerVnFl) )
  112. return np;
  113. return NULL;
  114. }
  115. cmVnRC_t _cmVnCreateNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned flags, cmVnNode_t** npp )
  116. {
  117. cmVnNode_t* np;
  118. if((np = _cmVnNodeFindById(p,nodeId)) != NULL )
  119. return cmErrMsg(&p->err,kDuplicateNodeIdVnRC,"The node id=%i is already in use for '%s'.",cmStringNullGuard(np->label));
  120. if((np = _cmVnNodeFindByLabel(p,label)) != NULL )
  121. return cmErrMsg(&p->err,kDuplicateNodeLabelVnRC,"The node label '%s' is already used by another node.",cmStringNullGuard(label));
  122. np = cmMemAllocZ(cmVnNode_t,1);
  123. np->label = cmMemAllocStr(label);
  124. np->id = nodeId;
  125. np->flags = flags;
  126. np->p = p;
  127. *npp = np;
  128. return kOkVnRC;
  129. }
  130. void _cmVnNodeLink( cmVn_t* p, cmVnNode_t* np )
  131. {
  132. cmVnNode_t* cnp = p->nodes;
  133. if( cnp == NULL )
  134. p->nodes = np;
  135. else
  136. {
  137. while( cnp->link != NULL )
  138. cnp = cnp->link;
  139. cnp->link = np;
  140. }
  141. }
  142. cmVnRC_t _cmVnCreateOwnerNode( cmVn_t* p, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
  143. {
  144. cmVnRC_t rc = kOkVnRC;
  145. cmVnNode_t* np;
  146. // create a generic node
  147. if((rc = _cmVnCreateNode(p, label,nodeId, kOwnerVnFl, &np )) != kOkVnRC )
  148. goto errLabel;
  149. // initialize the UDP net with the owner node
  150. if( cmUdpNetInit(p->udpH,label,nodeId,ipPort,_cmVnUdpNetCb,np,udpRecvBufByteCnt,udpRecvTimeOutMs) != kOkUnRC )
  151. {
  152. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network initialization failed for node:'%s'.",cmStringNullGuard(label));
  153. goto errLabel;
  154. }
  155. _cmVnNodeLink(p,np);
  156. p->ownerNodeId = nodeId;
  157. errLabel:
  158. if( rc != kOkVnRC )
  159. _cmVnNodeFree(np);
  160. return rc;
  161. }
  162. //------------------------------------------------------------------------------------------------------------
  163. cmVnRC_t cmVnCreate( cmCtx_t* ctx, cmVnH_t* hp, cmVnCb_t cbFunc, void* cbArg, const cmChar_t* label, unsigned nodeId, unsigned udpRecvBufByteCnt, unsigned udpRecvTimeOutMs, const cmChar_t* ipAddr, unsigned ipPort )
  164. {
  165. cmVnRC_t rc;
  166. if((rc = cmVnDestroy(hp)) != kOkVnRC )
  167. return rc;
  168. cmVn_t* p = cmMemAllocZ(cmVn_t,1);
  169. cmErrSetup(&p->err,&ctx->rpt,"cmVirtNet");
  170. if( cmUdpNetAlloc(ctx,&p->udpH) != kOkUnRC )
  171. {
  172. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP network allocation failed.");
  173. goto errLabel;
  174. }
  175. // create the owner node
  176. if((rc = _cmVnCreateOwnerNode(p,label,nodeId, udpRecvBufByteCnt, udpRecvTimeOutMs, ipAddr, ipPort )) != kOkVnRC )
  177. goto errLabel;
  178. p->ownerNodeId = nodeId;
  179. p->cbFunc = cbFunc;
  180. p->cbArg = cbArg;
  181. hp->h = p;
  182. errLabel:
  183. if( rc != kOkUdpRC )
  184. _cmVnDestroy(p);
  185. return rc;
  186. }
  187. cmVnRC_t cmVnDestroy( cmVnH_t* hp )
  188. {
  189. cmVnRC_t rc = kOkVnRC;
  190. if( hp == NULL || cmVnIsValid(*hp)==false )
  191. return rc;
  192. cmVn_t* p = _cmVnHandleToPtr(*hp);
  193. if((rc = _cmVnDestroy(p)) != kOkVnRC )
  194. return rc;
  195. hp->h = NULL;
  196. return rc;
  197. }
  198. bool cmVnIsValid( cmVnH_t h )
  199. { return h.h != NULL; }
  200. cmVnRC_t cmVnEnableListen( cmVnH_t h, bool enableFl )
  201. {
  202. cmVn_t* p = _cmVnHandleToPtr(h);
  203. if( cmUdpNetEnableListen( p->udpH, enableFl ) != kOkUnRC )
  204. return cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP listen enable failed.");
  205. return kOkVnRC;
  206. }
  207. // This function is called by cmTsMp1cDequeueMsg() which is called by cmVnReceive().
  208. cmRC_t _cmVnTsQueueCb(void* userCbPtr, unsigned msgByteCnt, const void* msgDataPtr )
  209. {
  210. cmVnNode_t* np = (cmVnNode_t*)userCbPtr;
  211. return np->p->cbFunc(np->p->cbArg,np->id,msgByteCnt,msgDataPtr);
  212. }
  213. cmVnRC_t cmVnCreateLocalNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, cmVnH_t vnH, unsigned queBufByteCnt )
  214. {
  215. cmVnRC_t rc = kOkVnRC;
  216. cmVn_t* p = _cmVnHandleToPtr(h);
  217. cmVnNode_t* np = NULL;
  218. // create a generic node
  219. if((rc = _cmVnCreateNode(p,label,nodeId,kLocalVnFl,&np )) != kOkVnRC )
  220. goto errLabel;
  221. if( cmTsMp1cCreate( &np->qH, queBufByteCnt, _cmVnTsQueueCb, np, p->err.rpt ) != kOkThRC )
  222. {
  223. rc = cmErrMsg(&p->err,kQueueFailVnRC,"The internal thread-safe queue creation failed for local node '%s'.",cmStringNullGuard(label));
  224. goto errLabel;
  225. }
  226. np->vnH = vnH;
  227. _cmVnNodeLink(p,np);
  228. errLabel:
  229. if( rc != kOkVnRC )
  230. _cmVnNodeFree(np);
  231. return rc;
  232. }
  233. cmVnRC_t cmVnCreateRemoteNode( cmVnH_t h, const cmChar_t* label, unsigned nodeId, const cmChar_t* ipAddr, unsigned ipPort )
  234. {
  235. cmVnRC_t rc = kOkVnRC;
  236. cmVn_t* p = _cmVnHandleToPtr(h);
  237. cmVnNode_t* np;
  238. // creaet a generic node
  239. if((rc = _cmVnCreateNode(p, label,nodeId, 0, &np )) != kOkVnRC )
  240. goto errLabel;
  241. if( ipAddr!=NULL )
  242. {
  243. if( cmUdpNetRegisterRemote(p->udpH,label,nodeId,ipAddr,ipPort) != kOkUnRC )
  244. {
  245. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"UDP network remote registration failed for node:'%s'.",cmStringNullGuard(label));
  246. goto errLabel;
  247. }
  248. }
  249. _cmVnNodeLink(p,np);
  250. errLabel:
  251. if( rc != kOkVnRC )
  252. _cmVnNodeFree(np);
  253. return rc;
  254. }
  255. cmVnRC_t cmVnRecvFromLocal( cmVnH_t h, unsigned srcNodeId, unsigned byteCnt, const cmChar_t* buf )
  256. {
  257. cmVnRC_t rc = kOkVnRC;
  258. cmVnNode_t* np;
  259. cmVn_t* p = _cmVnHandleToPtr(h);
  260. if(( np = _cmVnNodeFindById(p,srcNodeId)) == NULL )
  261. return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",srcNodeId);
  262. if( cmTsMp1cIsValid(np->qH) == false )
  263. return cmErrMsg(&p->err,kQueueFailVnRC,"The internal MPSC queue for the node '%s' is not valid. Is this a local node?",cmStringNullGuard(np->label));
  264. if( cmTsMp1cEnqueueMsg(np->qH,buf,byteCnt) != kOkThRC )
  265. return cmErrMsg(&p->err,kQueueFailVnRC,"Enqueue failed on the internal MPSC queue for the node '%s'.",cmStringNullGuard(np->label));
  266. return rc;
  267. }
  268. cmVnRC_t cmVnSendById( cmVnH_t h, unsigned nodeId, unsigned byteCnt, const cmChar_t* buf )
  269. {
  270. cmVnNode_t* np;
  271. cmVn_t* p = _cmVnHandleToPtr(h);
  272. if(( np = _cmVnNodeFindById(p,nodeId)) == NULL )
  273. return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node with id=%i could not be found.",nodeId);
  274. return _cmVnSend(p,np,byteCnt,buf);
  275. }
  276. cmVnRC_t cmVnSendByLabel( cmVnH_t h, const cmChar_t* nodeLabel, unsigned byteCnt, const cmChar_t* buf )
  277. {
  278. cmVnNode_t* np;
  279. cmVn_t* p = _cmVnHandleToPtr(h);
  280. if(( np = _cmVnNodeFindByLabel(p,nodeLabel)) == NULL )
  281. return cmErrMsg(&p->err,kNodeNotFoundVnRC,"The node named '%s' could not be found.",cmStringNullGuard(nodeLabel));
  282. return _cmVnSend(p,np,byteCnt,buf);
  283. }
  284. cmVnRC_t _cmVnRecvQueueMsgs( cmVn_t* p, cmVnNode_t* np, unsigned *msgCntPtr )
  285. {
  286. unsigned i;
  287. unsigned mn = *msgCntPtr;
  288. *msgCntPtr = 0;
  289. for(i=0; (i<mn || mn==cmInvalidCnt) && cmTsMp1cMsgWaiting(np->qH); ++i)
  290. {
  291. // calling this function results in calls to _cmVnTsQueueCb()
  292. if( cmTsMp1cDequeueMsg( np->qH,NULL,0) != kOkThRC )
  293. return cmErrMsg(&p->err,kQueueFailVnRC,"Msg deque failed for node '%s'.",cmStringNullGuard(np->label));
  294. }
  295. *msgCntPtr = i;
  296. return kOkVnRC;
  297. }
  298. cmVnRC_t cmVnReceive( cmVnH_t h, unsigned* msgCntPtr )
  299. {
  300. cmVnRC_t rc = kOkVnRC;
  301. cmVn_t* p = _cmVnHandleToPtr(h);
  302. cmVnNode_t* np = p->nodes;
  303. unsigned mn = msgCntPtr == NULL ? cmInvalidCnt : *msgCntPtr;
  304. if( msgCntPtr != NULL )
  305. *msgCntPtr = 0;
  306. for(; np!=NULL && rc==kOkVnRC; np=np->link)
  307. {
  308. unsigned msgCnt = mn;
  309. switch( np->flags & (kOwnerVnFl | kLocalVnFl) )
  310. {
  311. case kOwnerVnFl:
  312. break;
  313. case kLocalVnFl:
  314. rc = _cmVnRecvQueueMsgs(p,np,&msgCnt);
  315. break;
  316. default:
  317. if( cmUdpNetReceive(p->udpH,msgCntPtr==NULL?NULL:&msgCnt) != kOkUnRC )
  318. rc = cmErrMsg(&p->err,kUdpNetFailVnRC,"The UDP net receive failed on node '%s'.",cmStringNullGuard(np->label));
  319. break;
  320. }
  321. if( rc == kOkVnRC && msgCntPtr != NULL )
  322. *msgCntPtr += msgCnt;
  323. }
  324. return rc;
  325. }
  326. //---------------------------------------------------------------------------------------------------
  327. unsigned udpRecvBufByteCnt = 8192;
  328. unsigned udpRecvTimeOutMs = 100;
  329. unsigned queueBufByteCnt = 8192;
  330. void* cbArg = NULL;
  331. typedef struct
  332. {
  333. const cmChar_t* label;
  334. unsigned id;
  335. const cmChar_t* ipAddr;
  336. unsigned ipPort;
  337. cmVnH_t vnH;
  338. } node_t;
  339. cmVnRC_t _cmVnTestCb( void* cbArg, unsigned srcNodeId, unsigned byteCnt, const char* buf )
  340. {
  341. printf("src node:%i bytes:%i %s\n",srcNodeId,byteCnt,buf);
  342. return kOkVnRC;
  343. }
  344. cmVnRC_t _cmVnTestCreateLocalNet( cmCtx_t* ctx, cmErr_t* err, unsigned id, node_t* nodeArray )
  345. {
  346. cmVnRC_t rc = kOkVnRC;
  347. if((rc = cmVnCreate(ctx, &nodeArray[id].vnH, _cmVnTestCb, cbArg, nodeArray[id].label, nodeArray[id].id, udpRecvBufByteCnt, udpRecvTimeOutMs, nodeArray[id].ipAddr, nodeArray[id].ipPort )) != kOkVnRC )
  348. rc = cmErrMsg(err,rc,"Virtual network create failed.");
  349. return rc;
  350. }
  351. cmVnRC_t _cmVnTestCreateVirtNodes( cmErr_t* err, unsigned id, node_t* nodeArray )
  352. {
  353. unsigned rc = kOkVnRC;
  354. unsigned i;
  355. for(i=0; nodeArray[i].label != NULL; ++i)
  356. {
  357. // if this is a local node
  358. if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr) == 0 )
  359. {
  360. // if this is not the owner node
  361. if( i != id )
  362. if((rc = cmVnCreateLocalNode( nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].vnH, queueBufByteCnt )) != kOkVnRC )
  363. {
  364. cmErrMsg(err,rc,"Local node create failed for node:'%s'.",nodeArray[i].label);
  365. goto errLabel;
  366. }
  367. }
  368. else // this must be a remote node
  369. {
  370. if((rc = cmVnCreateRemoteNode(nodeArray[id].vnH, nodeArray[i].label, nodeArray[i].id, nodeArray[i].ipAddr, nodeArray[i].ipPort )) != kOkVnRC )
  371. {
  372. cmErrMsg(err,rc,"Remote node create failed for node '%s'.",nodeArray[i].label);
  373. goto errLabel;
  374. }
  375. }
  376. }
  377. errLabel:
  378. return rc;
  379. }
  380. cmVnRC_t _cmVnTestSend( cmErr_t* err, node_t* nodeArray, unsigned id, unsigned i, unsigned n )
  381. {
  382. cmVnRC_t rc = kOkVnRC;
  383. if( id == i )
  384. {
  385. printf("A node cannot send to itself.\n");
  386. return rc;
  387. }
  388. const cmChar_t* buf = cmTsPrintfS("msg-%i",n);
  389. unsigned bufByteCnt = strlen(buf)+1;
  390. printf("Sending from '%s' to '%s'.\n", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
  391. if((rc = cmVnSendById(nodeArray[id].vnH, nodeArray[i].id, bufByteCnt, buf ))!= kOkVnRC )
  392. cmErrMsg(err,rc,"Send from '%s' to '%s' failed.", cmStringNullGuard(nodeArray[id].label), cmStringNullGuard(nodeArray[i].label));
  393. return rc;
  394. }
  395. cmVnRC_t cmVnTest( cmCtx_t* ctx )
  396. {
  397. cmVnRC_t rc = kOkVnRC;
  398. cmErr_t err;
  399. unsigned id = 0;
  400. unsigned i;
  401. unsigned n = 0;
  402. node_t nodeArray[] =
  403. {
  404. { "whirl-0", 10, "192.168.15.109", 5768, cmVnNullHandle },
  405. { "whirl-1", 20, "192.168.15.109", 5767, cmVnNullHandle },
  406. { "thunk-0", 30, "192.168.15.111", 5766, cmVnNullHandle },
  407. { NULL, -1, NULL }
  408. };
  409. cmErrSetup(&err,&ctx->rpt,"cmVnTest");
  410. // create the virt networks for the local nodes
  411. for(i=0; nodeArray[i].label !=NULL; ++i)
  412. if( strcmp(nodeArray[i].ipAddr,nodeArray[id].ipAddr ) == 0 )
  413. if((rc = _cmVnTestCreateLocalNet(ctx,&err,i,nodeArray)) != kOkVnRC )
  414. goto errLabel;
  415. // create the virtual nodes for each local virtual network
  416. for(i=0; nodeArray[i].label != NULL; ++i)
  417. if( cmVnIsValid(nodeArray[i].vnH) )
  418. if((rc = _cmVnTestCreateVirtNodes(&err, i, nodeArray )) != kOkVnRC )
  419. break;
  420. char c;
  421. while((c=getchar()) != 'q')
  422. {
  423. bool promptFl = true;
  424. switch( c )
  425. {
  426. case '0':
  427. case '1':
  428. case '2':
  429. _cmVnTestSend(&err, nodeArray, id, (unsigned)c - (unsigned)'0', n );
  430. ++n;
  431. break;
  432. }
  433. for(i=0; nodeArray[i].label!=NULL; ++i)
  434. if( cmVnIsValid(nodeArray[i].vnH) )
  435. cmVnReceive(nodeArray[i].vnH,NULL);
  436. if( promptFl )
  437. cmRptPrintf(&ctx->rpt,"%i> ",n);
  438. }
  439. errLabel:
  440. // destroy the virtual networks
  441. for(i=0; nodeArray[i].label!=NULL; ++i)
  442. if( cmVnIsValid(nodeArray[i].vnH) )
  443. if((rc = cmVnDestroy(&nodeArray[i].vnH)) != kOkVnRC )
  444. cmErrMsg(&err,rc,"Node destroy failed for node '%s'.",cmStringNullGuard(nodeArray[i].label));
  445. return rc;
  446. }