libcm is a C development framework with an emphasis on audio signal processing applications.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

cmThread.c 49KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089
  1. #include "cmPrefix.h"
  2. #include "cmGlobal.h"
  3. #include "cmRpt.h"
  4. #include "cmErr.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmThread.h"
  8. #include <pthread.h>
  9. #include <unistd.h> // usleep
  10. #ifdef OS_OSX
  11. #include <libkern/OSAtomic.h>
  12. #endif
  13. cmThreadH_t cmThreadNullHandle = {NULL};
  14. enum
  15. {
  16. kDoExitThFl = 0x01,
  17. kDoPauseThFl = 0x02,
  18. kDoRunThFl = 0x04
  19. };
  20. typedef struct
  21. {
  22. cmErr_t err;
  23. cmThreadFunc_t funcPtr;
  24. pthread_t pthreadH;
  25. cmThStateId_t state;
  26. void* funcParam;
  27. unsigned doFlags;
  28. unsigned pauseMicroSecs;
  29. unsigned waitMicroSecs;
  30. } cmThThread_t;
  31. cmThRC_t _cmThError( cmErr_t* err, cmThRC_t rc, int sysErr, const char* fmt, ... )
  32. {
  33. va_list vl;
  34. va_start(vl,fmt);
  35. cmErrVSysMsg(err,rc,sysErr,fmt,vl);
  36. va_end(vl);
  37. return rc;
  38. }
  39. void _cmThThreadCleanUpCallback(void* t)
  40. {
  41. ((cmThThread_t*)t)->state = kExitedThId;
  42. }
  43. void* _cmThThreadCallback(void* param)
  44. {
  45. cmThThread_t* t = (cmThThread_t*)param;
  46. // set a clean up handler - this will be called when the
  47. // thread terminates unexpectedly or pthread_cleanup_pop() is called.
  48. pthread_cleanup_push(_cmThThreadCleanUpCallback,t);
  49. while( cmIsFlag(t->doFlags,kDoExitThFl) == false )
  50. {
  51. if( t->state == kPausedThId )
  52. {
  53. cmSleepUs( t->pauseMicroSecs );
  54. if( cmIsFlag(t->doFlags,kDoRunThFl) )
  55. {
  56. t->doFlags = cmClrFlag(t->doFlags,kDoRunThFl);
  57. t->state = kRunningThId;
  58. }
  59. }
  60. else
  61. {
  62. if( t->funcPtr(t->funcParam)==false )
  63. break;
  64. if( cmIsFlag(t->doFlags,kDoPauseThFl) )
  65. {
  66. t->doFlags = cmClrFlag(t->doFlags,kDoPauseThFl);
  67. t->state = kPausedThId;
  68. }
  69. }
  70. }
  71. pthread_cleanup_pop(1);
  72. pthread_exit(NULL);
  73. return t;
  74. }
  75. cmThThread_t* _cmThThreadFromHandle( cmThreadH_t h )
  76. {
  77. cmThThread_t* tp = (cmThThread_t*)h.h;
  78. assert(tp != NULL);
  79. return tp->state==kNotInitThId ? NULL : tp;
  80. }
  81. cmThRC_t _cmThWaitForState( cmThThread_t* t, unsigned stateId )
  82. {
  83. unsigned waitTimeMicroSecs = 0;
  84. while( t->state != stateId && waitTimeMicroSecs < t->waitMicroSecs )
  85. {
  86. //cmSleepUs( t->waitMicroSecs );
  87. cmSleepUs( 15000 );
  88. waitTimeMicroSecs += 15000; //t->waitMicroSecs;
  89. }
  90. return t->state==stateId ? kOkThRC : kTimeOutThRC;
  91. }
  92. cmThRC_t cmThreadCreate( cmThreadH_t* hPtr, cmThreadFunc_t funcPtr, void* funcParam, cmRpt_t* rpt )
  93. {
  94. //pthread_attr_t attr;
  95. cmThRC_t rc = kOkThRC;
  96. cmThThread_t* tp = cmMemAllocZ( cmThThread_t, 1 );
  97. int sysErr;
  98. cmErrSetup(&tp->err,rpt,"Thread");
  99. tp->funcPtr = funcPtr;
  100. tp->funcParam = funcParam;
  101. tp->state = kPausedThId;
  102. tp->doFlags = 0;
  103. tp->pauseMicroSecs = 50000;
  104. tp->waitMicroSecs = 1000000;
  105. if((sysErr = pthread_create(&tp->pthreadH,NULL,_cmThThreadCallback, (void*)tp )) != 0 )
  106. {
  107. tp->state = kNotInitThId;
  108. rc = _cmThError(&tp->err,kCreateFailThRC,sysErr,"Thread create failed.");
  109. }
  110. hPtr->h = tp;
  111. return rc;
  112. }
  113. cmThRC_t cmThreadDestroy( cmThreadH_t* hPtr )
  114. {
  115. cmThRC_t rc = kOkThRC;
  116. if( hPtr==NULL || cmThreadIsValid(*hPtr)==false )
  117. return rc;
  118. cmThThread_t* t = _cmThThreadFromHandle(*hPtr );
  119. if( t == NULL )
  120. return kInvalidHandleThRC;
  121. // tell the thread to exit
  122. t->doFlags = cmSetFlag(t->doFlags,kDoExitThFl);
  123. // wait for the thread to exit and then deallocate the thread object
  124. if((rc = _cmThWaitForState(t,kExitedThId)) == kOkThRC )
  125. {
  126. cmMemFree(t);
  127. hPtr->h = NULL;
  128. }
  129. else
  130. {
  131. rc = _cmThError(&t->err,rc,0,"Thread timed out waiting for destroy.");
  132. }
  133. return rc;
  134. }
  135. cmThRC_t cmThreadPause( cmThreadH_t h, unsigned cmdFlags )
  136. {
  137. cmThRC_t rc = kOkThRC;
  138. bool pauseFl = cmIsFlag(cmdFlags,kPauseThFl);
  139. bool waitFl = cmIsFlag(cmdFlags,kWaitThFl);
  140. cmThThread_t* t = _cmThThreadFromHandle(h);
  141. unsigned waitId;
  142. if( t == NULL )
  143. return kInvalidHandleThRC;
  144. bool isPausedFl = t->state == kPausedThId;
  145. if( isPausedFl == pauseFl )
  146. return kOkThRC;
  147. if( pauseFl )
  148. {
  149. t->doFlags = cmSetFlag(t->doFlags,kDoPauseThFl);
  150. waitId = kPausedThId;
  151. }
  152. else
  153. {
  154. t->doFlags = cmSetFlag(t->doFlags,kDoRunThFl);
  155. waitId = kRunningThId;
  156. }
  157. if( waitFl )
  158. rc = _cmThWaitForState(t,waitId);
  159. if( rc != kOkThRC )
  160. _cmThError(&t->err,rc,0,"Thread timed out waiting for '%s'.", pauseFl ? "pause" : "un-pause");
  161. return rc;
  162. }
  163. cmThStateId_t cmThreadState( cmThreadH_t h )
  164. {
  165. cmThThread_t* tp = _cmThThreadFromHandle(h);
  166. if( tp == NULL )
  167. return kNotInitThId;
  168. return tp->state;
  169. }
  170. bool cmThreadIsValid( cmThreadH_t h )
  171. { return h.h != NULL; }
  172. unsigned cmThreadPauseTimeOutMicros( cmThreadH_t h )
  173. {
  174. cmThThread_t* tp = _cmThThreadFromHandle(h);
  175. return tp->pauseMicroSecs;
  176. }
  177. void cmThreadSetPauseTimeOutMicros( cmThreadH_t h, unsigned usecs )
  178. {
  179. cmThThread_t* tp = _cmThThreadFromHandle(h);
  180. tp->pauseMicroSecs = usecs;
  181. }
  182. unsigned cmThreadWaitTimeOutMicros( cmThreadH_t h )
  183. {
  184. cmThThread_t* tp = _cmThThreadFromHandle(h);
  185. return tp->waitMicroSecs;
  186. }
  187. void cmThreadSetWaitTimeOutMicros( cmThreadH_t h, unsigned usecs )
  188. {
  189. cmThThread_t* tp = _cmThThreadFromHandle(h);
  190. tp->waitMicroSecs = usecs;
  191. }
  192. bool _cmThreadTestCb( void* p )
  193. {
  194. unsigned* ip = (unsigned*)p;
  195. ip[0]++;
  196. return true;
  197. }
  198. void cmThreadTest(cmRpt_t* rpt)
  199. {
  200. cmThreadH_t th0;
  201. unsigned val = 0;
  202. if( cmThreadCreate(&th0,_cmThreadTestCb,&val,rpt) == kOkThRC )
  203. {
  204. if( cmThreadPause(th0,0) != kOkThRC )
  205. {
  206. cmRptPrintf(rpt,"Thread start failed.\n");
  207. return;
  208. }
  209. char c = 0;
  210. cmRptPrintf(rpt,"o=print p=pause s=state q=quit\n");
  211. while( c != 'q' )
  212. {
  213. c = (char)fgetc(stdin);
  214. fflush(stdin);
  215. switch(c)
  216. {
  217. case 'o':
  218. cmRptPrintf(rpt,"val: 0x%x\n",val);
  219. break;
  220. case 's':
  221. cmRptPrintf(rpt,"state=%i\n",cmThreadState(th0));
  222. break;
  223. case 'p':
  224. {
  225. cmRC_t rc;
  226. if( cmThreadState(th0) == kPausedThId )
  227. rc = cmThreadPause(th0,kWaitThFl);
  228. else
  229. rc = cmThreadPause(th0,kPauseThFl|kWaitThFl);
  230. if( rc == kOkThRC )
  231. cmRptPrintf(rpt,"new state:%i\n", cmThreadState(th0));
  232. else
  233. cmRptPrintf(rpt,"cmThreadPause() failed.");
  234. }
  235. break;
  236. case 'q':
  237. break;
  238. //default:
  239. //cmRptPrintf(rpt,"Unknown:%c\n",c);
  240. }
  241. }
  242. if( cmThreadDestroy(&th0) != kOkThRC )
  243. cmRptPrintf(rpt,"Thread destroy failed.\n");
  244. }
  245. }
  246. //-----------------------------------------------------------------------------
  247. //-----------------------------------------------------------------------------
  248. //-----------------------------------------------------------------------------
  249. typedef struct
  250. {
  251. cmErr_t err;
  252. pthread_mutex_t mutex;
  253. pthread_cond_t cvar;
  254. } cmThreadMutex_t;
  255. cmThreadMutexH_t kThreadMutexNULL = {NULL};
  256. cmThreadMutex_t* _cmThreadMutexFromHandle( cmThreadMutexH_t h )
  257. {
  258. cmThreadMutex_t* p = (cmThreadMutex_t*)h.h;
  259. assert(p != NULL);
  260. return p;
  261. }
  262. cmThRC_t cmThreadMutexCreate( cmThreadMutexH_t* hPtr, cmRpt_t* rpt )
  263. {
  264. int sysErr;
  265. cmThreadMutex_t* p = cmMemAllocZ( cmThreadMutex_t, 1 );
  266. cmErrSetup(&p->err,rpt,"Thread Mutex");
  267. if((sysErr = pthread_mutex_init(&p->mutex,NULL)) != 0 )
  268. return _cmThError(&p->err,kCreateFailThRC,sysErr,"Thread mutex create failed.");
  269. if((sysErr = pthread_cond_init(&p->cvar,NULL)) != 0 )
  270. return _cmThError(&p->err,kCreateFailThRC,sysErr,"Thread Condition var. create failed.");
  271. hPtr->h = p;
  272. return kOkThRC;
  273. }
  274. cmThRC_t cmThreadMutexDestroy( cmThreadMutexH_t* hPtr )
  275. {
  276. int sysErr;
  277. cmThreadMutex_t* p = _cmThreadMutexFromHandle(*hPtr);
  278. if( p == NULL )
  279. return kInvalidHandleThRC;
  280. if((sysErr = pthread_cond_destroy(&p->cvar)) != 0)
  281. return _cmThError(&p->err,kDestroyFailThRC,sysErr,"Thread condition var. destroy failed.");
  282. if((sysErr = pthread_mutex_destroy(&p->mutex)) != 0)
  283. return _cmThError(&p->err,kDestroyFailThRC,sysErr,"Thread mutex destroy failed.");
  284. cmMemFree(p);
  285. hPtr->h = NULL;
  286. return kOkThRC;
  287. }
  288. cmThRC_t cmThreadMutexTryLock( cmThreadMutexH_t h, bool* lockFlPtr )
  289. {
  290. cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
  291. if( p == NULL )
  292. return kInvalidHandleThRC;
  293. int sysErr = pthread_mutex_trylock(&p->mutex);
  294. switch(sysErr)
  295. {
  296. case EBUSY:
  297. *lockFlPtr = false;
  298. break;
  299. case 0:
  300. *lockFlPtr = true;
  301. break;
  302. default:
  303. return _cmThError(&p->err,kLockFailThRC,sysErr,"Thread mutex try-lock failed.");;
  304. }
  305. return kOkThRC;
  306. }
  307. cmThRC_t cmThreadMutexLock( cmThreadMutexH_t h )
  308. {
  309. cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
  310. if( p == NULL )
  311. return kInvalidHandleThRC;
  312. int sysErr = pthread_mutex_lock(&p->mutex);
  313. if( sysErr == 0 )
  314. return kOkThRC;
  315. return _cmThError(&p->err,kLockFailThRC,sysErr,"Thread mutex lock failed.");
  316. }
  317. cmThRC_t cmThreadMutexUnlock( cmThreadMutexH_t h )
  318. {
  319. cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
  320. if( p == NULL )
  321. return kInvalidHandleThRC;
  322. int sysErr = pthread_mutex_unlock(&p->mutex);
  323. if( sysErr == 0 )
  324. return kOkThRC;
  325. return _cmThError(&p->err,kUnlockFailThRC,sysErr,"Thread mutex unlock failed.");
  326. }
  327. bool cmThreadMutexIsValid( cmThreadMutexH_t h )
  328. { return h.h != NULL; }
  329. cmThRC_t cmThreadMutexWaitOnCondVar( cmThreadMutexH_t h, bool lockFl )
  330. {
  331. cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
  332. if( p == NULL )
  333. return kInvalidHandleThRC;
  334. int sysErr;
  335. if( lockFl )
  336. if( (sysErr=pthread_mutex_lock(&p->mutex)) != 0 )
  337. _cmThError(&p->err,kLockFailThRC,sysErr,"Thread lock failed on cond. var. wait.");
  338. if((sysErr = pthread_cond_wait(&p->cvar,&p->mutex)) != 0 )
  339. _cmThError(&p->err,kCVarWaitFailThRC,sysErr,"Thread cond. var. wait failed.");
  340. return kOkThRC;
  341. }
  342. cmThRC_t cmThreadMutexSignalCondVar( cmThreadMutexH_t h )
  343. {
  344. int sysErr;
  345. cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
  346. if( p == NULL )
  347. return kInvalidHandleThRC;
  348. if((sysErr = pthread_cond_signal(&p->cvar)) != 0 )
  349. return _cmThError(&p->err,kCVarSignalFailThRC,sysErr,"Thread cond. var. signal failed.");
  350. return kOkThRC;
  351. }
  352. //-----------------------------------------------------------------------------
  353. //-----------------------------------------------------------------------------
  354. //-----------------------------------------------------------------------------
  355. cmTsQueueH_t cmTsQueueNullHandle = { NULL };
  356. enum { cmTsQueueBufCnt = 2 };
  357. typedef struct
  358. {
  359. unsigned allocCnt; // count of bytes allocated for the buffer
  360. unsigned fullCnt; // count of bytes used in the buffer
  361. char* basePtr; // base of buffer memory
  362. unsigned* msgPtr; // pointer to first msg
  363. unsigned msgCnt;
  364. } cmTsQueueBuf;
  365. typedef struct
  366. {
  367. cmThreadMutexH_t mutexH;
  368. cmTsQueueBuf bufArray[cmTsQueueBufCnt];
  369. unsigned inBufIdx;
  370. unsigned outBufIdx;
  371. char* memPtr;
  372. cmTsQueueCb_t cbFunc;
  373. void* userCbPtr;
  374. } cmTsQueue_t;
  375. cmTsQueue_t* _cmTsQueueFromHandle( cmTsQueueH_t h )
  376. {
  377. cmTsQueue_t* p = h.h;
  378. assert(p != NULL);
  379. return p;
  380. }
  381. cmThRC_t _cmTsQueueDestroy( cmTsQueue_t* p )
  382. {
  383. cmThRC_t rc;
  384. if( p == NULL )
  385. return kInvalidHandleThRC;
  386. if( p->mutexH.h != NULL )
  387. if((rc = cmThreadMutexDestroy(&p->mutexH)) != kOkThRC )
  388. return rc;
  389. if( p->memPtr != NULL )
  390. cmMemPtrFree(&p->memPtr);
  391. cmMemPtrFree(&p);
  392. return kOkThRC;
  393. }
  394. cmThRC_t cmTsQueueCreate( cmTsQueueH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* userCbPtr, cmRpt_t* rpt )
  395. {
  396. cmTsQueue_t* p = cmMemAllocZ( cmTsQueue_t, 1 );
  397. unsigned i;
  398. if( cmThreadMutexCreate(&p->mutexH,rpt) != kOkThRC )
  399. goto errLabel;
  400. p->memPtr = cmMemAllocZ( char, bufByteCnt*cmTsQueueBufCnt );
  401. p->outBufIdx = 0;
  402. p->inBufIdx = 1;
  403. p->cbFunc = cbFunc;
  404. p->userCbPtr = userCbPtr;
  405. for(i=0; i<cmTsQueueBufCnt; ++i)
  406. {
  407. p->bufArray[i].allocCnt = bufByteCnt;
  408. p->bufArray[i].fullCnt = 0;
  409. p->bufArray[i].basePtr = p->memPtr + (i*bufByteCnt);
  410. p->bufArray[i].msgPtr = NULL;
  411. p->bufArray[i].msgCnt = 0;
  412. }
  413. hPtr->h = p;
  414. return kOkThRC;
  415. errLabel:
  416. _cmTsQueueDestroy(p);
  417. return kCreateFailThRC;
  418. }
  419. cmThRC_t cmTsQueueDestroy( cmTsQueueH_t* hPtr )
  420. {
  421. cmThRC_t rc = kOkThRC;
  422. if( (hPtr != NULL) && cmTsQueueIsValid(*hPtr))
  423. if((rc = _cmTsQueueDestroy(_cmTsQueueFromHandle(*hPtr))) == kOkThRC )
  424. hPtr->h = NULL;
  425. return rc;
  426. }
  427. cmThRC_t cmTsQueueSetCallback( cmTsQueueH_t h, cmTsQueueCb_t cbFunc, void* cbArg )
  428. {
  429. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  430. p->cbFunc = cbFunc;
  431. p->userCbPtr = cbArg;
  432. return kOkThRC;
  433. }
  434. unsigned cmTsQueueAllocByteCount( cmTsQueueH_t h )
  435. {
  436. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  437. unsigned n = 0;
  438. if( cmThreadMutexLock(p->mutexH) == kOkThRC )
  439. {
  440. n = p->bufArray[ p->inBufIdx ].allocCnt;
  441. cmThreadMutexUnlock(p->mutexH);
  442. }
  443. return n;
  444. }
  445. unsigned cmTsQueueAvailByteCount( cmTsQueueH_t h )
  446. {
  447. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  448. unsigned n = 0;
  449. if(cmThreadMutexLock(p->mutexH) == kOkThRC )
  450. {
  451. n = p->bufArray[ p->inBufIdx ].allocCnt - p->bufArray[ p->inBufIdx].fullCnt;
  452. cmThreadMutexUnlock(p->mutexH);
  453. }
  454. return n;
  455. }
  456. cmThRC_t _cmTsQueueEnqueueMsg( cmTsQueueH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
  457. {
  458. cmThRC_t rc;
  459. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  460. if( p == NULL )
  461. return kInvalidHandleThRC;
  462. // lock the mutex
  463. if((rc = cmThreadMutexLock(p->mutexH)) == kOkThRC )
  464. {
  465. cmTsQueueBuf* b = p->bufArray + p->inBufIdx; // ptr to buf recd
  466. const char* ep = b->basePtr + b->allocCnt; // end of buf data space
  467. unsigned *mp = (unsigned*)(b->basePtr + b->fullCnt); // ptr to size of new msg space
  468. char* dp = (char*)(mp+1); // ptr to data area of new msg space
  469. unsigned ttlByteCnt = 0; // track size of msg data
  470. unsigned i = 0;
  471. // get the total size of the msg
  472. for(i=0; i<arrayCnt; ++i)
  473. ttlByteCnt += msgByteCntArray[i];
  474. // if the msg is too big for the queue buf
  475. if( dp + ttlByteCnt > ep )
  476. rc = kBufFullThRC;
  477. else
  478. {
  479. // for each segment of the incoming msg
  480. for(i=0; i<arrayCnt; ++i)
  481. {
  482. // get the size of the segment
  483. unsigned n = msgByteCntArray[i];
  484. // copy in the segment
  485. memcpy(dp,msgPtrArray[i],n);
  486. dp += n; //
  487. }
  488. assert(dp <= ep );
  489. // write the size ofthe msg into the buffer
  490. *mp = ttlByteCnt;
  491. // update the pointer to the first msg
  492. if( b->msgPtr == NULL )
  493. b->msgPtr = mp;
  494. // track the count of msgs in this buffer
  495. ++b->msgCnt;
  496. // update fullCnt last since dequeue uses fullCnt to
  497. // notice that a msg may be waiting
  498. b->fullCnt += sizeof(unsigned) + ttlByteCnt;
  499. }
  500. cmThreadMutexUnlock(p->mutexH);
  501. }
  502. return rc;
  503. }
  504. cmThRC_t cmTsQueueEnqueueSegMsg( cmTsQueueH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
  505. { return _cmTsQueueEnqueueMsg(h,msgPtrArray,msgByteCntArray,arrayCnt); }
  506. cmThRC_t cmTsQueueEnqueueMsg( cmTsQueueH_t h, const void* dataPtr, unsigned byteCnt )
  507. {
  508. const void* msgPtrArray[] = { dataPtr };
  509. unsigned msgByteCntArray[] = { byteCnt };
  510. return _cmTsQueueEnqueueMsg(h,msgPtrArray,msgByteCntArray,1);
  511. }
  512. cmThRC_t cmTsQueueEnqueueIdMsg( cmTsQueueH_t h, unsigned id, const void* dataPtr, unsigned byteCnt )
  513. {
  514. const void* msgPtrArray[] = { &id, dataPtr };
  515. unsigned msgByteCntArray[] = { sizeof(id), byteCnt };
  516. return _cmTsQueueEnqueueMsg(h,msgPtrArray,msgByteCntArray,2);
  517. }
  518. cmThRC_t _cmTsQueueDequeueMsg( cmTsQueue_t* p, void* retBuf, unsigned refBufByteCnt )
  519. {
  520. cmTsQueueBuf* b = p->bufArray + p->outBufIdx;
  521. // if the output buffer is empty - there is nothing to do
  522. if( b->fullCnt == 0 )
  523. return kBufEmptyThRC;
  524. assert( b->msgPtr != NULL );
  525. // get the output msg size and data
  526. unsigned msgByteCnt = *b->msgPtr;
  527. char* msgDataPtr = (char*)(b->msgPtr + 1);
  528. // transmit the msg via a callback
  529. if( retBuf == NULL && p->cbFunc != NULL )
  530. p->cbFunc(p->userCbPtr,msgByteCnt,msgDataPtr);
  531. else
  532. {
  533. // retBuf may be NULL if the func is being used by cmTsQueueDequeueByteCount()
  534. if( retBuf == NULL || msgByteCnt > refBufByteCnt )
  535. return kBufTooSmallThRC;
  536. // copy the msg to a buffer
  537. if( retBuf != NULL )
  538. memcpy(retBuf,msgDataPtr,msgByteCnt);
  539. }
  540. // update the buffer
  541. b->fullCnt -= sizeof(unsigned) + msgByteCnt;
  542. b->msgPtr = (unsigned*)(msgDataPtr + msgByteCnt);
  543. --(b->msgCnt);
  544. if( b->fullCnt == 0 )
  545. {
  546. assert(b->msgCnt == 0);
  547. b->msgPtr = NULL;
  548. }
  549. return kOkThRC;
  550. }
  551. cmThRC_t cmTsQueueDequeueMsg( cmTsQueueH_t h, void* retBuf, unsigned refBufByteCnt )
  552. {
  553. cmThRC_t rc;
  554. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  555. if( p == NULL )
  556. return kInvalidHandleThRC;
  557. // dequeue the next msg from the current output buffer
  558. if((rc =_cmTsQueueDequeueMsg( p, retBuf, refBufByteCnt )) != kBufEmptyThRC )
  559. return rc;
  560. // the current output buffer was empty
  561. cmTsQueueBuf* b = p->bufArray + p->inBufIdx;
  562. // if the input buffer has msg's ...
  563. if( b->fullCnt > 0 )
  564. {
  565. bool lockFl = false;
  566. // ...attempt to lock the mutex ...
  567. if( (cmThreadMutexTryLock(p->mutexH,&lockFl) == kOkThRC) && lockFl )
  568. {
  569. // ... swap the input and the output buffers ...
  570. unsigned tmp = p->inBufIdx;
  571. p->inBufIdx = p->outBufIdx;
  572. p->outBufIdx = tmp;
  573. // .. unlock the mutex
  574. cmThreadMutexUnlock(p->mutexH);
  575. // ... and dequeue the first msg from the new output buffer
  576. rc = _cmTsQueueDequeueMsg( p, retBuf, refBufByteCnt );
  577. }
  578. }
  579. return rc;
  580. }
  581. bool cmTsQueueMsgWaiting( cmTsQueueH_t h )
  582. {
  583. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  584. if( p == NULL )
  585. return false;
  586. if( p->bufArray[p->outBufIdx].fullCnt )
  587. return true;
  588. return p->bufArray[p->inBufIdx].fullCnt > 0;
  589. }
  590. unsigned cmTsQueueDequeueMsgByteCount( cmTsQueueH_t h )
  591. {
  592. cmTsQueue_t* p = _cmTsQueueFromHandle(h);
  593. if( p == NULL )
  594. return 0;
  595. // if output msgs are available then the msgPtr points to the size of the msg
  596. if( p->bufArray[p->outBufIdx].fullCnt )
  597. return *(p->bufArray[p->outBufIdx].msgPtr);
  598. // no msgs are waiting in the output buffer
  599. // force the buffers to swap - returns kBufEmptyThRC if there are
  600. // still no msgs waiting after the swap (the input buf was also empty)
  601. if( cmTsQueueDequeueMsg(h,NULL,0) == kBufTooSmallThRC )
  602. {
  603. // the buffers swapped so there must be msg waiting
  604. assert( p->bufArray[p->outBufIdx].fullCnt );
  605. return *(p->bufArray[p->outBufIdx].msgPtr);
  606. }
  607. return 0;
  608. }
  609. bool cmTsQueueIsValid( cmTsQueueH_t h )
  610. { return h.h != NULL; }
  611. //--------------------------------------------------------------------------------------------------
  612. //--------------------------------------------------------------------------------------------------
  613. //--------------------------------------------------------------------------------------------------
  614. #ifdef NOT_DEF
  615. enum { kThBufCnt=2 };
  616. typedef struct
  617. {
  618. char* buf;
  619. volatile unsigned ii;
  620. volatile unsigned oi;
  621. } cmThBuf_t;
  622. typedef struct
  623. {
  624. cmErr_t err;
  625. cmThBuf_t a[kThBufCnt];
  626. volatile unsigned ibi;
  627. unsigned bn;
  628. cmTsQueueCb_t cbFunc;
  629. void* cbArg;
  630. } cmTs1p1c_t;
  631. cmTs1p1c_t* _cmTs1p1cHandleToPtr( cmTs1p1cH_t h )
  632. {
  633. cmTs1p1c_t* p = (cmTs1p1c_t*)h.h;
  634. assert( p != NULL );
  635. return p;
  636. }
  637. cmThRC_t _cmTs1p1cDestroy( cmTs1p1c_t* p )
  638. {
  639. unsigned i;
  640. for(i=0; i<kThBufCnt; ++i)
  641. cmMemFree(p->a[i].buf);
  642. cmMemFree(p);
  643. return kOkThRC;
  644. }
  645. cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
  646. {
  647. cmThRC_t rc;
  648. if((rc = cmTs1p1cDestroy(hPtr)) != kOkThRC )
  649. return rc;
  650. unsigned i;
  651. cmTs1p1c_t* p = cmMemAllocZ(cmTs1p1c_t,1);
  652. cmErrSetup(&p->err,rpt,"TS 1p1c Queue");
  653. for(i=0; i<kThBufCnt; ++i)
  654. {
  655. p->a[i].buf = cmMemAllocZ(char,bufByteCnt);
  656. p->a[i].ii = 0;
  657. p->a[i].oi = bufByteCnt;
  658. }
  659. p->ibi = 0;
  660. p->bn = bufByteCnt;
  661. p->cbFunc = cbFunc;
  662. p->cbArg = cbArg;
  663. hPtr->h = p;
  664. return rc;
  665. }
  666. cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hp )
  667. {
  668. cmThRC_t rc = kOkThRC;
  669. if( hp == NULL || cmTs1p1cIsValid(*hp)==false )
  670. return kOkThRC;
  671. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(*hp);
  672. if(( rc = _cmTs1p1cDestroy(p)) != kOkThRC )
  673. return rc;
  674. hp->h = NULL;
  675. return rc;
  676. }
  677. cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
  678. {
  679. cmThRC_t rc = kOkThRC;
  680. unsigned mn = 0;
  681. unsigned i;
  682. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  683. cmThBuf_t* ib = p->a + p->ibi;
  684. // get the total count of bytes for this msg
  685. for(i=0; i<arrayCnt; ++i)
  686. mn += msgByteCntArray[i];
  687. unsigned dn = mn + sizeof(unsigned);
  688. // if the message is too big for even an empty buffer
  689. if( dn > p->bn )
  690. return cmErrMsg(&p->err,kBufFullThRC,"A msg containing %i bytes will never be able to fit in a queue with an empty size of %i bytes.",dn,p->bn);
  691. // if the msg won't fit in the current input buffer then try swapping buffers.
  692. if( ib->ii + dn > p->bn )
  693. {
  694. // get the current output buffer
  695. cmThBuf_t* ob = p->a + (p->ibi==0 ? 1 : 0);
  696. // Empty buffers will be set such that: oi==bn and ii==0.
  697. //
  698. // Note that setting ii to 0 in an output buffer is the last operation
  699. // performed on an empty output buffer. ii==0 is therefore the
  700. // signal that an output buffer can be reused for input.
  701. // if the output buffer is not empty - then an overflow occurred
  702. if( ob->ii != 0 )
  703. return cmErrMsg(&p->err,kBufFullThRC,"The msq queue cannot accept a %i byte msg into %i bytes.",dn, p->bn - ib->ii);
  704. // setup the initial output location of the new output buffer
  705. ib->oi = 0;
  706. // swap buffers
  707. p->ibi = (p->ibi + 1) % kThBufCnt;
  708. // get the new input buffer
  709. ib = ob;
  710. }
  711. // get a pointer to the base of the write location
  712. char* dp = ib->buf + ib->ii;
  713. // write the length of the message
  714. *(unsigned*)dp = mn;
  715. dp += sizeof(unsigned);
  716. // write the body of the message
  717. for(i=0; i<arrayCnt; ++i)
  718. {
  719. memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
  720. dp += msgByteCntArray[i];
  721. }
  722. // this MUST be executed last - we'll use 'dp' in the calculation
  723. // (even though ib->ii += dn would be more straight forward way
  724. // to accomplish the same thing) to prevent the optimizer from
  725. // moving the assignment prior to the for loop.
  726. ib->ii += dp - (ib->buf + ib->ii);
  727. return rc;
  728. }
  729. cmThRC_t cmTs1p1cEnqueueMsg( cmTsQueueH_t h, const void* dataPtr, unsigned byteCnt )
  730. { return cmTs1p1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
  731. unsigned cmTs1p1cAllocByteCount( cmTs1p1cH_t h )
  732. {
  733. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  734. return p->bn;
  735. }
  736. unsigned cmTs1p1cAvailByteCount( cmTs1p1cH_t h )
  737. {
  738. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  739. return p->bn - p->a[ p->ibi ].ii;
  740. }
  741. cmThRC_t cmTs1p1cDequeueMsg( cmTs1p1cH_t h, void* dataPtr, unsigned byteCnt )
  742. {
  743. cmThRC_t rc = kOkThRC;
  744. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  745. cmThBuf_t* ob = p->a + (p->ibi == 0 ? 1 : 0);
  746. // empty buffers always are set to: oi==bn && ii==0
  747. if( ob->oi >= ob->ii )
  748. return kBufEmptyThRC;
  749. // get the size of the msg
  750. unsigned mn = *(unsigned*)(ob->buf + ob->oi);
  751. // increment the current output location to the msg body
  752. ob->oi += sizeof(unsigned);
  753. // copy or send the msg
  754. if( dataPtr != NULL )
  755. {
  756. if( byteCnt < mn )
  757. return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,mn);
  758. memcpy(dataPtr, ob->buf + ob->oi, mn);
  759. }
  760. else
  761. {
  762. p->cbFunc(p->cbArg, mn, ob->buf + ob->oi );
  763. }
  764. ob->oi += mn;
  765. // if we are reading correctly ob->oi should land
  766. // exactly on ob->ii when the buffer is empty
  767. assert( ob->oi <= ob->ii );
  768. // if the buffer is empty
  769. if( ob->oi == ob->ii )
  770. {
  771. ob->oi = p->bn; // mark the buffer as empty
  772. ob->ii = 0; //
  773. }
  774. return rc;
  775. }
  776. unsigned cmTs1p1cDequeueMsgByteCount( cmTsQueueH_t h )
  777. {
  778. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  779. cmThBuf_t* ob = p->a + (p->ibi == 0 ? 1 : 0);
  780. // empty buffers always are set to: oi==bn && ii==0
  781. if( ob->oi >= ob->ii )
  782. return 0;
  783. // get the size of the msg
  784. return *(unsigned*)(ob->buf + ob->oi);
  785. }
  786. bool cmTs1p1cMsgWaiting( cmTsQueueH_t h )
  787. { return cmTs1p1cDequeueMsgByteCount(h) > 0; }
  788. bool cmTs1p1cIsValid( cmTs1p1cH_t h )
  789. { return h.h != NULL; }
  790. #endif
  791. //--------------------------------------------------------------------------------------------------
  792. //--------------------------------------------------------------------------------------------------
  793. //--------------------------------------------------------------------------------------------------
  794. typedef struct
  795. {
  796. volatile unsigned ii;
  797. cmErr_t err;
  798. char* buf;
  799. unsigned bn;
  800. cmTsQueueCb_t cbFunc;
  801. void* cbArg;
  802. volatile unsigned oi;
  803. } cmTs1p1c_t;
  804. cmTs1p1c_t* _cmTs1p1cHandleToPtr( cmTs1p1cH_t h )
  805. {
  806. cmTs1p1c_t* p = (cmTs1p1c_t*)h.h;
  807. assert( p != NULL );
  808. return p;
  809. }
  810. cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
  811. {
  812. cmThRC_t rc;
  813. if((rc = cmTs1p1cDestroy(hPtr)) != kOkThRC )
  814. return rc;
  815. cmTs1p1c_t* p = cmMemAllocZ(cmTs1p1c_t,1);
  816. cmErrSetup(&p->err,rpt,"1p1c Queue");
  817. p->buf = cmMemAllocZ(char,bufByteCnt+sizeof(unsigned));
  818. p->ii = 0;
  819. p->oi = 0;
  820. p->bn = bufByteCnt;
  821. p->cbFunc = cbFunc;
  822. p->cbArg = cbArg;
  823. hPtr->h = p;
  824. return rc;
  825. }
  826. cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hp )
  827. {
  828. cmThRC_t rc = kOkThRC;
  829. if( hp == NULL || cmTs1p1cIsValid(*hp)==false )
  830. return kOkThRC;
  831. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(*hp);
  832. cmMemFree(p->buf);
  833. cmMemFree(p);
  834. hp->h = NULL;
  835. return rc;
  836. }
  837. cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
  838. {
  839. cmThRC_t rc = kOkThRC;
  840. unsigned mn = 0;
  841. unsigned i;
  842. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  843. // get the total count of bytes for this msg
  844. for(i=0; i<arrayCnt; ++i)
  845. mn += msgByteCntArray[i];
  846. int dn = mn + sizeof(unsigned);
  847. int oi = p->oi;
  848. int bi = p->ii; // 'bi' is the idx of the leftmost cell which can be written
  849. int en = p->bn; // 'en' is the idx of the cell just to the right of the rightmost cell that can be written
  850. // note: If 'oi' marks the rightmost location then 'en' must be set
  851. // one cell to the left of 'oi', because 'ii' can never be allowed to
  852. // advance onto 'oi' - because 'oi'=='ii' marks an empty (NOT a full)
  853. // queue.
  854. //
  855. // If 'bn' marks the rightmost location then 'ii' can advance onto 'bn'
  856. // beause the true queue length is bn+1.
  857. // if we need to wrap
  858. if( en-bi < dn && oi<=bi )
  859. {
  860. bi = 0;
  861. en = oi - 1; // note if oi==0 then en is negative - see note above re: oi==ii
  862. assert( p->ii>=0 && p->ii <= p->bn );
  863. *(unsigned*)(p->buf + p->ii) = cmInvalidIdx; // mark the wrap location
  864. }
  865. // if oi is between ii and bn
  866. if( oi > bi )
  867. en = oi - 1; // never allow ii to advance onto oi - see note above
  868. // if the msg won't fit
  869. if( en - bi < dn )
  870. return cmErrMsg(&p->err,kBufFullThRC,"%i consecutive bytes is not available in the queue.",dn);
  871. // set the msg byte count - the msg byte cnt precedes the msg body
  872. char* dp = p->buf + bi;
  873. *(unsigned*)dp = dn - sizeof(unsigned);
  874. dp += sizeof(unsigned);
  875. // copy the msg into the buffer
  876. for(i=0,dn=0; i<arrayCnt; ++i)
  877. {
  878. memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
  879. dp += msgByteCntArray[i];
  880. dn += msgByteCntArray[i];
  881. }
  882. // incrementing p->ii must occur last - the unnecessary accumulation
  883. // of dn in the above loop is intended to prevent this line from
  884. // begin moved before the copy loop.
  885. p->ii = bi + dn + sizeof(unsigned);
  886. assert( p->ii >= 0 && p->ii <= p->bn);
  887. return rc;
  888. }
  889. cmThRC_t cmTs1p1cEnqueueMsg( cmTs1p1cH_t h, const void* dataPtr, unsigned byteCnt )
  890. { return cmTs1p1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
  891. unsigned cmTs1p1cAllocByteCount( cmTs1p1cH_t h )
  892. {
  893. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  894. return p->bn;
  895. }
  896. unsigned cmTs1p1cAvailByteCount( cmTs1p1cH_t h )
  897. {
  898. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  899. unsigned oi = p->oi;
  900. unsigned ii = p->ii;
  901. return oi < ii ? p->bn - ii + oi : oi - ii;
  902. }
  903. unsigned _cmTs1p1cDequeueMsgByteCount( cmTs1p1c_t* p )
  904. {
  905. // if the buffer is empty
  906. if( p->ii == p->oi )
  907. return 0;
  908. // get the length of the next msg
  909. unsigned mn = *(unsigned*)(p->buf + p->oi);
  910. // if the msg length is cmInvalidIdx ...
  911. if( mn == cmInvalidIdx )
  912. {
  913. p->oi = 0; // ... wrap to buf begin and try again
  914. return _cmTs1p1cDequeueMsgByteCount(p);
  915. }
  916. return mn;
  917. }
  918. cmThRC_t cmTs1p1cDequeueMsg( cmTs1p1cH_t h, void* dataPtr, unsigned byteCnt )
  919. {
  920. cmThRC_t rc = kOkThRC;
  921. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  922. unsigned mn;
  923. if((mn = _cmTs1p1cDequeueMsgByteCount(p)) == 0 )
  924. return kBufEmptyThRC;
  925. void* mp = p->buf + p->oi + sizeof(unsigned);
  926. if( dataPtr != NULL )
  927. {
  928. if( byteCnt < mn )
  929. return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,mn);
  930. memcpy(dataPtr,mp,mn);
  931. }
  932. else
  933. {
  934. p->cbFunc(p->cbArg,mn,mp);
  935. }
  936. p->oi += mn + sizeof(unsigned);
  937. return rc;
  938. }
  939. unsigned cmTs1p1cDequeueMsgByteCount( cmTs1p1cH_t h )
  940. {
  941. cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
  942. return _cmTs1p1cDequeueMsgByteCount(p);
  943. }
  944. bool cmTs1p1cMsgWaiting( cmTs1p1cH_t h )
  945. { return cmTs1p1cDequeueMsgByteCount(h) > 0; }
  946. bool cmTs1p1cIsValid( cmTs1p1cH_t h )
  947. { return h.h != NULL; }
  948. //============================================================================================================================
  949. bool cmThIntCAS( int* addr, int old, int new )
  950. {
  951. #ifdef OS_OSX
  952. int rv = OSAtomicCompareAndSwap32Barrier(old,new,addr);
  953. return rv;
  954. #endif
  955. #ifdef OS_LINUX
  956. return __sync_bool_compare_and_swap(addr,old,new);
  957. #endif
  958. }
  959. bool cmThUIntCAS( unsigned* addr, unsigned old, unsigned new )
  960. {
  961. #ifdef OS_OSX
  962. return OSAtomicCompareAndSwap32Barrier((int)old,(int)new,(int*)addr);
  963. #endif
  964. #ifdef OS_LINUX
  965. return __sync_bool_compare_and_swap(addr,old,new);
  966. #endif
  967. }
  968. bool cmThFloatCAS( float* addr, float old, float new )
  969. {
  970. #ifdef OS_OSX
  971. return OSAtomicCompareAndSwap32Barrier(*(int*)(&old),*(int*)(&new),(int*)addr );
  972. #endif
  973. #ifdef OS_LINUX
  974. return __sync_bool_compare_and_swap((unsigned*)addr, *(unsigned*)(&old),*(unsigned*)(&new));
  975. #endif
  976. }
  977. bool cmThPtrCAS( void* addr, void* old, void* neww )
  978. {
  979. #ifdef OS_OSX
  980. // REMOVE THIS HACK AND USE OSAtomicCompareAndSwapPtrBarrier() WHEN
  981. // A 64 BIT BUILD IS POSSIBLE ON OS-X.
  982. typedef struct
  983. {
  984. union
  985. {
  986. void* addr;
  987. #ifdef OS_64
  988. int64_t val;
  989. #else
  990. int val;
  991. #endif
  992. } u;
  993. } s_t;
  994. s_t ov,nv;
  995. ov.u.addr = old;
  996. nv.u.addr = neww;
  997. #ifdef OS_64
  998. int rv = OSAtomicCompareAndSwap64Barrier(ov.u.val,nv.u.val,(int64_t*)addr);
  999. #else
  1000. int rv = OSAtomicCompareAndSwap32Barrier(ov.u.val,nv.u.val,(int*)addr);
  1001. #endif
  1002. return rv;
  1003. #endif
  1004. #ifdef OS_LINUX
  1005. #ifdef OS_64
  1006. return __sync_bool_compare_and_swap((long long*)addr, (long long)old, (long long)neww);
  1007. #else
  1008. return __sync_bool_compare_and_swap((int*)addr,(int)old,(int)neww);
  1009. #endif
  1010. #endif
  1011. }
  1012. void cmThIntIncr( int* addr, int incr )
  1013. {
  1014. #ifdef OS_OSX
  1015. OSAtomicAdd32Barrier(incr,addr);
  1016. #endif
  1017. #ifdef OS_LINUX
  1018. // ... could also use __sync_add_and_fetch() ...
  1019. __sync_fetch_and_add(addr,incr);
  1020. #endif
  1021. }
  1022. void cmThUIntIncr( unsigned* addr, unsigned incr )
  1023. {
  1024. #ifdef OS_OSX
  1025. OSAtomicAdd32Barrier((int)incr,(int*)addr);
  1026. #endif
  1027. #ifdef OS_LINUX
  1028. __sync_fetch_and_add(addr,incr);
  1029. #endif
  1030. }
  1031. void cmThFloatIncr(float* addr, float incr )
  1032. {
  1033. float old,new;
  1034. do
  1035. {
  1036. old = *addr;
  1037. new = old + incr;
  1038. }while( cmThFloatCAS(addr,old,new)==0 );
  1039. }
  1040. void cmThIntDecr( int* addr, int decr )
  1041. {
  1042. #ifdef OS_OSX
  1043. OSAtomicAdd32Barrier(-decr,addr);
  1044. #endif
  1045. #ifdef OS_LINUX
  1046. __sync_fetch_and_sub(addr,decr);
  1047. #endif
  1048. }
  1049. void cmThUIntDecr( unsigned* addr, unsigned decr )
  1050. {
  1051. #ifdef OS_OSX
  1052. OSAtomicAdd32Barrier(-((int)decr),(int*)addr);
  1053. #endif
  1054. #ifdef OS_LINUX
  1055. __sync_fetch_and_sub(addr,decr);
  1056. #endif
  1057. }
  1058. void cmThFloatDecr(float* addr, float decr )
  1059. {
  1060. float old,new;
  1061. do
  1062. {
  1063. old = *addr;
  1064. new = old - decr;
  1065. }while( cmThFloatCAS(addr,old,new)==0 );
  1066. }
  1067. //============================================================================================================================
  1068. //
  1069. //
  1070. typedef pthread_t cmThreadId_t;
  1071. typedef struct
  1072. {
  1073. cmThreadId_t id; // id of this thread as returned by pthread_self()
  1074. char* buf; // buf[bn]
  1075. int ii; // input index
  1076. int oi; // output index (oi==ii == empty buffer)
  1077. } cmTsBuf_t;
  1078. // msg header - which is actually written AFTER the msg it is associated with
  1079. typedef struct cmTsHdr_str
  1080. {
  1081. int mn; // length of the msg
  1082. int ai; // buffer index
  1083. struct cmTsHdr_str* link; // pointer to next msg
  1084. } cmTsHdr_t;
  1085. typedef struct
  1086. {
  1087. cmErr_t err;
  1088. int bn; // bytes per thread buffer
  1089. cmTsBuf_t* a; // a[an] buffer array
  1090. unsigned an; // length of a[] - one buffer per thread
  1091. cmTsQueueCb_t cbFunc;
  1092. void* cbArg;
  1093. cmTsHdr_t* ilp; // prev msg hdr record
  1094. cmTsHdr_t* olp; // prev msg hdr record (wait for olp->link to be set to go to next record)
  1095. } cmTsMp1c_t;
  1096. cmTsMp1cH_t cmTsMp1cNullHandle = cmSTATIC_NULL_HANDLE;
  1097. void _cmTsMp1cPrint( cmTsMp1c_t* p )
  1098. {
  1099. unsigned i;
  1100. for(i=0; i<p->an; ++i)
  1101. printf("%2i ii:%3i oi:%3i\n",i,p->a[i].ii,p->a[i].oi);
  1102. }
  1103. cmTsMp1c_t* _cmTsMp1cHandleToPtr( cmTsMp1cH_t h )
  1104. {
  1105. cmTsMp1c_t* p = (cmTsMp1c_t*)h.h;
  1106. assert(p != NULL);
  1107. return p;
  1108. }
  1109. unsigned _cmTsMp1cBufIndex( cmTsMp1c_t* p, cmThreadId_t id )
  1110. {
  1111. unsigned i;
  1112. for(i=0; i<p->an; ++i)
  1113. if( p->a[i].id == id )
  1114. return i;
  1115. p->an = i+1;
  1116. p->a = cmMemResizePZ(cmTsBuf_t,p->a,p->an);
  1117. p->a[i].buf = cmMemAllocZ(char,p->bn);
  1118. p->a[i].id = id;
  1119. return i;
  1120. }
  1121. cmThRC_t cmTsMp1cDestroy( cmTsMp1cH_t* hp )
  1122. {
  1123. if( hp == NULL || cmTsMp1cIsValid(*hp) == false )
  1124. return kOkThRC;
  1125. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(*hp);
  1126. unsigned i;
  1127. for(i=0; i<p->an; ++i)
  1128. cmMemFree(p->a[i].buf);
  1129. cmMemPtrFree(&p->a);
  1130. cmMemFree(p);
  1131. hp->h = NULL;
  1132. return kOkThRC;
  1133. }
  1134. cmThRC_t cmTsMp1cCreate( cmTsMp1cH_t* hp, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
  1135. {
  1136. cmThRC_t rc;
  1137. if((rc = cmTsMp1cDestroy(hp)) != kOkThRC )
  1138. return rc;
  1139. cmTsMp1c_t* p = cmMemAllocZ(cmTsMp1c_t,1);
  1140. cmErrSetup(&p->err,rpt,"TsMp1c Queue");
  1141. p->a = NULL;
  1142. p->an = 0;
  1143. p->bn = bufByteCnt;
  1144. p->cbFunc = cbFunc;
  1145. p->cbArg = cbArg;
  1146. p->ilp = NULL;
  1147. p->olp = NULL;
  1148. hp->h = p;
  1149. return rc;
  1150. }
  1151. void cmTsMp1cSetCbFunc( cmTsMp1cH_t h, cmTsQueueCb_t cbFunc, void* cbArg )
  1152. {
  1153. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1154. p->cbFunc = cbFunc;
  1155. p->cbArg = cbArg;
  1156. }
  1157. cmTsQueueCb_t cmTsMp1cCbFunc( cmTsMp1cH_t h )
  1158. {
  1159. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1160. return p->cbFunc;
  1161. }
  1162. void* cmTsMp1cCbArg( cmTsMp1cH_t h )
  1163. {
  1164. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1165. return p->cbArg;
  1166. }
  1167. //#define CAS(addr,old,new) __sync_bool_compare_and_swap(addr,old,new)
  1168. //#define CAS(addr,old,neww) cmThPtrCAS(addr,old,neww)
  1169. cmThRC_t cmTsMp1cEnqueueSegMsg( cmTsMp1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
  1170. {
  1171. cmThRC_t rc = kOkThRC;
  1172. unsigned mn = 0;
  1173. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1174. unsigned ai = _cmTsMp1cBufIndex( p, pthread_self() );
  1175. cmTsBuf_t* b = p->a + ai;
  1176. int i,bi,ei;
  1177. cmTsHdr_t hdr;
  1178. // Use a stored oi for the duration of this function.
  1179. // b->oi may be changed by the dequeue thread but storing it here
  1180. // at least prevents it from changing during the course of the this function.
  1181. // Note: b->oi is only used to check for buffer full. Even if it changes
  1182. // it would only mean that more bytes were available than calculated based
  1183. // on the stored value. A low estimate of the actual bytes available is
  1184. // never unsafe.
  1185. volatile int oi = b->oi;
  1186. // get the total count of bytes for this msg
  1187. for(i=0; i<arrayCnt; ++i)
  1188. mn += msgByteCntArray[i];
  1189. // dn = count of msg bytes + count of header bytes
  1190. int dn = mn + sizeof(hdr);
  1191. // if oi is ahead of ii in the queue then we must write
  1192. // in the area between ii and oi
  1193. if( oi > b->ii )
  1194. {
  1195. ei = oi-1; // (never allow ii to equal oi (that's the empty condition))
  1196. bi = b->ii;
  1197. }
  1198. else // otherwise oi is same or before ii in the queue and we have the option to wrap
  1199. {
  1200. // if the new msg will not fit at the end of the queue ....
  1201. if( b->ii + dn > p->bn )
  1202. {
  1203. bi = 0; // ... then wrap to the beginning
  1204. ei = oi-1; // (never allow ii to equal oi (that's the empty condition))
  1205. }
  1206. else
  1207. {
  1208. ei = p->bn; // otherwise write at the current location
  1209. bi = b->ii;
  1210. }
  1211. }
  1212. if( bi + dn > ei )
  1213. return cmErrMsg(&p->err,kBufFullThRC,"%i consecutive bytes is not available in the queue.",dn);
  1214. char* dp = b->buf + bi;
  1215. // write the msg
  1216. for(i=0; i<arrayCnt; ++i)
  1217. {
  1218. memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
  1219. dp += msgByteCntArray[i];
  1220. }
  1221. // setup the msg header
  1222. hdr.ai = ai;
  1223. hdr.mn = mn;
  1224. hdr.link = NULL;
  1225. // write the msg header (following the msg body in memory)
  1226. cmTsHdr_t* hp = (cmTsHdr_t*)dp;
  1227. memcpy(hp,&hdr,sizeof(hdr));
  1228. // increment the buffers input index
  1229. b->ii = bi + dn;
  1230. // update the link list head to point to this msg hdr
  1231. cmTsHdr_t* old_hp, *new_hp;
  1232. do
  1233. {
  1234. old_hp = p->ilp;
  1235. new_hp = hp;
  1236. }while(!cmThPtrCAS(&p->ilp,old_hp,new_hp));
  1237. // link the prev recd to this recd
  1238. if( old_hp != NULL )
  1239. old_hp->link = hp;
  1240. // if this is the first record written by this queue then prime the output list
  1241. do
  1242. {
  1243. old_hp = p->olp;
  1244. new_hp = hp;
  1245. if( old_hp != NULL )
  1246. break;
  1247. }while(!cmThPtrCAS(&p->olp,old_hp,new_hp));
  1248. //printf("%p %p %i\n",p->ilp,p->olp,p->olp->mn);
  1249. return rc;
  1250. }
  1251. cmThRC_t cmTsMp1cEnqueueMsg( cmTsMp1cH_t h, const void* dataPtr, unsigned byteCnt )
  1252. { return cmTsMp1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
  1253. unsigned cmTsMp1cAllocByteCount( cmTsMp1cH_t h )
  1254. {
  1255. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1256. return p->bn;
  1257. }
  1258. unsigned cmTsMp1cAvailByteCount( cmTsMp1cH_t h )
  1259. {
  1260. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1261. unsigned ai = _cmTsMp1cBufIndex(p,pthread_self());
  1262. const cmTsBuf_t* b = p->a + ai;
  1263. if( b->oi > b->ii )
  1264. return b->oi - b->ii - 1;
  1265. return (p->bn - b->ii) + b->oi - 1;
  1266. }
  1267. unsigned _cmTsMp1cNextMsgByteCnt( cmTsMp1c_t* p )
  1268. {
  1269. if( p->olp == NULL )
  1270. return 0;
  1271. // if the current msg has not yet been read
  1272. if( p->olp->mn != 0 )
  1273. return p->olp->mn;
  1274. // if the current msg has been read but a new next msg has been linked
  1275. if( p->olp->mn == 0 && p->olp->link != NULL )
  1276. {
  1277. // advance the buffer output msg past the prev msg header
  1278. char* hp = (char*)(p->olp + 1);
  1279. p->a[p->olp->ai].oi = hp - p->a[p->olp->ai].buf;
  1280. // advance msg pointer to point to the new msg header
  1281. p->olp = p->olp->link;
  1282. // return the size of the new msg
  1283. return p->olp->mn;
  1284. }
  1285. return 0;
  1286. }
  1287. cmThRC_t cmTsMp1cDequeueMsg( cmTsMp1cH_t h, void* dataPtr, unsigned byteCnt )
  1288. {
  1289. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1290. // if there are no messages waiting
  1291. if( _cmTsMp1cNextMsgByteCnt(p) == 0 )
  1292. return kBufEmptyThRC;
  1293. char* hp = (char*)p->olp;
  1294. char* dp = hp - p->olp->mn; // the msg body is before the msg hdr
  1295. if( dataPtr == NULL )
  1296. {
  1297. p->cbFunc(p->cbArg,p->olp->mn,dp);
  1298. }
  1299. else
  1300. {
  1301. if( p->olp->mn > byteCnt )
  1302. return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,p->olp->mn);
  1303. memcpy(dataPtr,dp,p->olp->mn);
  1304. }
  1305. // advance the buffers output index past the msg body
  1306. p->a[p->olp->ai].oi = hp - p->a[p->olp->ai].buf;
  1307. // mark the msg as read
  1308. p->olp->mn = 0;
  1309. return kOkThRC;
  1310. }
  1311. bool cmTsMp1cMsgWaiting( cmTsMp1cH_t h )
  1312. {
  1313. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1314. return _cmTsMp1cNextMsgByteCnt(p) != 0;
  1315. }
  1316. unsigned cmTsMp1cDequeueMsgByteCount( cmTsMp1cH_t h )
  1317. {
  1318. cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
  1319. return _cmTsMp1cNextMsgByteCnt(p);
  1320. }
  1321. bool cmTsMp1cIsValid( cmTsMp1cH_t h )
  1322. { return h.h != NULL; }
  1323. //============================================================================================================================/
  1324. //
  1325. // cmTsQueueTest()
  1326. //
  1327. // param recd for use by _cmTsQueueCb0() and
  1328. // the msg record passed between the sender
  1329. // threads and the receiver thread
  1330. typedef struct
  1331. {
  1332. unsigned id;
  1333. cmTsQueueH_t qH;
  1334. int val;
  1335. } _cmTsQCbParam_t;
  1336. // Generate a random number and put it in a TS queue
  1337. bool _cmTsQueueCb0(void* param)
  1338. {
  1339. _cmTsQCbParam_t* p = (_cmTsQCbParam_t*)param;
  1340. p->val = rand(); // generate a random number
  1341. // send the msg
  1342. if( cmTsQueueEnqueueMsg( p->qH, p, sizeof(_cmTsQCbParam_t)) == kOkThRC )
  1343. printf("in:%i %i\n",p->id,p->val);
  1344. else
  1345. printf("in error %i\n",p->id);
  1346. cmSleepUs(100*1000);
  1347. return true;
  1348. }
  1349. // Monitor a TS queue for incoming messages from _cmTsQueueCb1()
  1350. bool _cmTsQueueCb1(void* param)
  1351. {
  1352. // the thread param is a ptr to the TS queue to monitor.
  1353. cmTsQueueH_t* qp = (cmTsQueueH_t*)param;
  1354. cmThRC_t rc;
  1355. _cmTsQCbParam_t msg;
  1356. // dequeue any waiting messages
  1357. if((rc = cmTsQueueDequeueMsg( *qp, &msg, sizeof(msg))) == kOkThRC )
  1358. printf("out:%i %i\n",msg.id,msg.val);
  1359. else
  1360. {
  1361. if( rc != kBufEmptyThRC )
  1362. printf("out error:%i\n", rc);
  1363. }
  1364. return true;
  1365. }
  1366. // Test the TS queue by starting sender threads (threads 0 & 1)
  1367. // and a receiver thread (thread 2) and sending messages
  1368. // from the sender to the receiver.
  1369. void cmTsQueueTest( cmRpt_t* rpt )
  1370. {
  1371. cmThreadH_t th0=cmThreadNullHandle,th1=cmThreadNullHandle,th2=cmThreadNullHandle;
  1372. cmTsQueueH_t q=cmTsQueueNullHandle;
  1373. _cmTsQCbParam_t param0, param1;
  1374. // create a TS Queue
  1375. if( cmTsQueueCreate(&q,100,NULL,NULL,rpt) != kOkThRC )
  1376. goto errLabel;
  1377. // create thread 0
  1378. param0.id = 0;
  1379. param0.qH = q;
  1380. if( cmThreadCreate(&th0,_cmTsQueueCb0,&param0,rpt) != kOkThRC )
  1381. goto errLabel;
  1382. // create thread 1
  1383. param1.id = 1;
  1384. param1.qH = q;
  1385. if( cmThreadCreate(&th1,_cmTsQueueCb0,&param1,rpt) != kOkThRC )
  1386. goto errLabel;
  1387. // create thread 2
  1388. if( cmThreadCreate(&th2,_cmTsQueueCb1,&q,rpt) != kOkThRC )
  1389. goto errLabel;
  1390. // start thread 0
  1391. if( cmThreadPause(th0,0) != kOkThRC )
  1392. goto errLabel;
  1393. // start thread 1
  1394. if( cmThreadPause(th1,0) != kOkThRC )
  1395. goto errLabel;
  1396. // start thread 2
  1397. if( cmThreadPause(th2,0) != kOkThRC )
  1398. goto errLabel;
  1399. printf("any key to quit.");
  1400. getchar();
  1401. errLabel:
  1402. if( cmThreadIsValid(th0) )
  1403. if( cmThreadDestroy(&th0) != kOkThRC )
  1404. printf("Error destroying thread 0\n");
  1405. if( cmThreadIsValid(th1) )
  1406. if( cmThreadDestroy(&th1) != kOkThRC )
  1407. printf("Error destroying thread 1\n");
  1408. if( cmThreadIsValid(th2) )
  1409. if( cmThreadDestroy(&th2) != kOkThRC )
  1410. printf("Error destroying thread 1\n");
  1411. if( cmTsQueueIsValid(q) )
  1412. if( cmTsQueueDestroy(&q) != kOkThRC )
  1413. printf("Error destroying queue\n");
  1414. }
  1415. //============================================================================================================================/
  1416. //
  1417. // cmTs1p1cTest()
  1418. //
  1419. // param recd for use by _cmTsQueueCb0() and
  1420. // the msg record passed between the sender
  1421. // threads and the receiver thread
  1422. typedef struct
  1423. {
  1424. unsigned id;
  1425. cmTs1p1cH_t qH;
  1426. int val;
  1427. } _cmTs1p1cCbParam_t;
  1428. cmTs1p1cH_t cmTs1p1cNullHandle = cmSTATIC_NULL_HANDLE;
  1429. // Generate a random number and put it in a TS queue
  1430. bool _cmTs1p1cCb0(void* param)
  1431. {
  1432. _cmTs1p1cCbParam_t* p = (_cmTs1p1cCbParam_t*)param;
  1433. p->val = rand(); // generate a random number
  1434. // send the msg
  1435. if( cmTs1p1cEnqueueMsg( p->qH, p, sizeof(_cmTs1p1cCbParam_t)) == kOkThRC )
  1436. printf("in:%i %i\n",p->id,p->val);
  1437. else
  1438. printf("in error %i\n",p->id);
  1439. ++p->id;
  1440. cmSleepUs(100*1000);
  1441. return true;
  1442. }
  1443. // Monitor a TS queue for incoming messages from _cmTs1p1cCb1()
  1444. bool _cmTs1p1cCb1(void* param)
  1445. {
  1446. // the thread param is a ptr to the TS queue to monitor.
  1447. cmTs1p1cH_t* qp = (cmTs1p1cH_t*)param;
  1448. cmThRC_t rc;
  1449. _cmTs1p1cCbParam_t msg;
  1450. // dequeue any waiting messages
  1451. if((rc = cmTs1p1cDequeueMsg( *qp, &msg, sizeof(msg))) == kOkThRC )
  1452. printf("out:%i %i\n",msg.id,msg.val);
  1453. else
  1454. {
  1455. if( rc != kBufEmptyThRC )
  1456. printf("out error:%i\n", rc);
  1457. }
  1458. return true;
  1459. }
  1460. // Test the TS queue by starting sender threads (threads 0 & 1)
  1461. // and a receiver thread (thread 2) and sending messages
  1462. // from the sender to the receiver.
  1463. void cmTs1p1cTest( cmRpt_t* rpt )
  1464. {
  1465. cmThreadH_t th0=cmThreadNullHandle,th1=cmThreadNullHandle,th2=cmThreadNullHandle;
  1466. cmTs1p1cH_t q=cmTs1p1cNullHandle;
  1467. _cmTs1p1cCbParam_t param1;
  1468. // create a TS Queue
  1469. if( cmTs1p1cCreate(&q,28*2,NULL,NULL,rpt) != kOkThRC )
  1470. goto errLabel;
  1471. // create thread 1
  1472. param1.id = 0;
  1473. param1.qH = q;
  1474. if( cmThreadCreate(&th1,_cmTs1p1cCb0,&param1,rpt) != kOkThRC )
  1475. goto errLabel;
  1476. // create thread 2
  1477. if( cmThreadCreate(&th2,_cmTs1p1cCb1,&q,rpt) != kOkThRC )
  1478. goto errLabel;
  1479. // start thread 1
  1480. if( cmThreadPause(th1,0) != kOkThRC )
  1481. goto errLabel;
  1482. // start thread 2
  1483. if( cmThreadPause(th2,0) != kOkThRC )
  1484. goto errLabel;
  1485. printf("any key to quit.");
  1486. getchar();
  1487. errLabel:
  1488. if( cmThreadIsValid(th0) )
  1489. if( cmThreadDestroy(&th0) != kOkThRC )
  1490. printf("Error destroying thread 0\n");
  1491. if( cmThreadIsValid(th1) )
  1492. if( cmThreadDestroy(&th1) != kOkThRC )
  1493. printf("Error destroying thread 1\n");
  1494. if( cmThreadIsValid(th2) )
  1495. if( cmThreadDestroy(&th2) != kOkThRC )
  1496. printf("Error destroying thread 1\n");
  1497. if( cmTs1p1cIsValid(q) )
  1498. if( cmTs1p1cDestroy(&q) != kOkThRC )
  1499. printf("Error destroying queue\n");
  1500. }
  1501. //============================================================================================================================/
  1502. //
  1503. // cmTsMp1cTest()
  1504. //
  1505. // param recd for use by _cmTsQueueCb0() and
  1506. // the msg record passed between the sender
  1507. // threads and the receiver thread
  1508. typedef struct
  1509. {
  1510. unsigned id;
  1511. cmTsMp1cH_t qH;
  1512. int val;
  1513. } _cmTsMp1cCbParam_t;
  1514. unsigned _cmTsMp1cVal = 0;
  1515. // Incr the global value _cmTsMp1cVal and put it in a TS queue
  1516. bool _cmTsMp1cCb0(void* param)
  1517. {
  1518. _cmTsMp1cCbParam_t* p = (_cmTsMp1cCbParam_t*)param;
  1519. //p->val = __sync_fetch_and_add(&_cmTsMp1cVal,1);
  1520. cmThUIntIncr(&_cmTsMp1cVal,1);
  1521. p->val = _cmTsMp1cVal;
  1522. // send the msg
  1523. if( cmTsMp1cEnqueueMsg( p->qH, p, sizeof(_cmTsMp1cCbParam_t)) == kOkThRC )
  1524. printf("in:%i %i\n",p->id,p->val);
  1525. else
  1526. printf("in error %i\n",p->id);
  1527. cmSleepUs(100*1000);
  1528. return true;
  1529. }
  1530. // Monitor a TS queue for incoming messages from _cmTsMp1cCb1()
  1531. bool _cmTsMp1cCb1(void* param)
  1532. {
  1533. // the thread param is a ptr to the TS queue to monitor.
  1534. cmTsMp1cH_t* qp = (cmTsMp1cH_t*)param;
  1535. cmThRC_t rc;
  1536. _cmTsMp1cCbParam_t msg;
  1537. // dequeue any waiting messages
  1538. if((rc = cmTsMp1cDequeueMsg( *qp, &msg, sizeof(msg))) == kOkThRC )
  1539. printf("out - cons id:%i val:%i\n",msg.id,msg.val);
  1540. else
  1541. {
  1542. if( rc != kBufEmptyThRC )
  1543. printf("out error:%i\n", rc);
  1544. }
  1545. return true;
  1546. }
  1547. // Test the TS queue by starting sender threads (threads 0 & 1)
  1548. // and a receiver thread (thread 2) and sending messages
  1549. // from the sender to the receiver.
  1550. void cmTsMp1cTest( cmRpt_t* rpt )
  1551. {
  1552. cmThreadH_t th0=cmThreadNullHandle,th1=cmThreadNullHandle,th2=cmThreadNullHandle;
  1553. cmTsMp1cH_t q=cmTsMp1cNullHandle;
  1554. _cmTsMp1cCbParam_t param0, param1;
  1555. // create a TS Queue
  1556. if( cmTsMp1cCreate(&q,1000,NULL,NULL,rpt) != kOkThRC )
  1557. goto errLabel;
  1558. // create thread 0 - producer 0
  1559. param0.id = 0;
  1560. param0.qH = q;
  1561. if( cmThreadCreate(&th0,_cmTsMp1cCb0,&param0,rpt) != kOkThRC )
  1562. goto errLabel;
  1563. // create thread 1 - producer 1
  1564. param1.id = 1;
  1565. param1.qH = q;
  1566. if( cmThreadCreate(&th1,_cmTsMp1cCb0,&param1,rpt) != kOkThRC )
  1567. goto errLabel;
  1568. // create thread 2 - consumer 0
  1569. if( cmThreadCreate(&th2,_cmTsMp1cCb1,&q,rpt) != kOkThRC )
  1570. goto errLabel;
  1571. // start thread 0
  1572. if( cmThreadPause(th0,0) != kOkThRC )
  1573. goto errLabel;
  1574. // start thread 1
  1575. if( cmThreadPause(th1,0) != kOkThRC )
  1576. goto errLabel;
  1577. // start thread 2
  1578. if( cmThreadPause(th2,0) != kOkThRC )
  1579. goto errLabel;
  1580. printf("any key to quit.");
  1581. getchar();
  1582. errLabel:
  1583. if( cmThreadIsValid(th0) )
  1584. if( cmThreadDestroy(&th0) != kOkThRC )
  1585. printf("Error destroying thread 0\n");
  1586. if( cmThreadIsValid(th1) )
  1587. if( cmThreadDestroy(&th1) != kOkThRC )
  1588. printf("Error destroying thread 1\n");
  1589. if( cmThreadIsValid(th2) )
  1590. if( cmThreadDestroy(&th2) != kOkThRC )
  1591. printf("Error destroying thread 1\n");
  1592. if( cmTsMp1cIsValid(q) )
  1593. if( cmTsMp1cDestroy(&q) != kOkThRC )
  1594. printf("Error destroying queue\n");
  1595. }
  1596. void cmSleepUs( unsigned microseconds )
  1597. { usleep(microseconds); }
  1598. void cmSleepMs( unsigned milliseconds )
  1599. { cmSleepUs(milliseconds*1000); }