libcm is a C development framework with an emphasis on audio signal processing applications.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cmTaskMgr.c 38KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438
  1. //| Copyright: (C) 2009-2020 Kevin Larke <contact AT larke DOT org>
  2. //| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
  3. #include "cmGlobal.h"
  4. #include "cmRpt.h"
  5. #include "cmErr.h"
  6. #include "cmCtx.h"
  7. #include "cmMem.h"
  8. #include "cmMallocDebug.h"
  9. #include "cmThread.h"
  10. #include "cmTime.h"
  11. #include "cmTaskMgr.h"
  12. #include "cmLinkedHeap.h"
  13. #include "cmText.h"
  14. cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE;
  15. struct cmTmInst_str;
  16. typedef struct cmTmTask_str
  17. {
  18. unsigned taskId;
  19. cmChar_t* label;
  20. cmTaskMgrFunc_t func;
  21. cmTaskMgrRecv_t recv;
  22. struct cmTmTask_str* link;
  23. } cmTmTask_t;
  24. typedef struct cmTmInst_str
  25. {
  26. unsigned instId; // Task instance id.
  27. struct cmTmTask_str* task; // Pointer to task record for this task instance.
  28. void* funcArg; // Client supplied pointer to cmTaskMgrFuncArg_t.arg;
  29. unsigned progCnt; // Maximum expected progress (cmTaskMgrStatusArg_t.prog) to be used by this task instance.
  30. cmChar_t* label; // Optional instance label.
  31. cmStatusTmId_t status; // Current instance status (See cmStatusTmId_t)
  32. void* result; // Task instance result pointer.
  33. unsigned resultByteCnt; // Size of the task instance result pointer in bytes.
  34. cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread
  35. cmTs1p1cH_t msgQueH; // client->inst 'msg' communication queue
  36. bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete
  37. struct cmTmInst_str* link;
  38. } cmTmInst_t;
  39. struct cmTm_str* p;
  40. typedef struct cmTmThread_str
  41. {
  42. struct cmTm_str* p; // Pointer to task mgr.
  43. cmThreadH_t thH; // Thread handle.
  44. cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
  45. double durSecs; // Duration of the instance currently assigned to this thread in seconds.
  46. cmTimeSpec_t t0; // Task start time.
  47. cmTimeSpec_t t1; // Time of last review by the master thread.
  48. bool deactivateFl; // True if this instance has been deactivated by the system.
  49. cmTaskMgrFuncArg_t procArg; //
  50. cmChar_t* text; // Temporary text buffer
  51. struct cmTmThread_str* link; // p->threads link.
  52. } cmTmThread_t;
  53. typedef struct cmTm_str
  54. {
  55. cmErr_t err; // Task manager error object.
  56. cmThreadH_t mstrThH; // Master thread handle.
  57. cmTmThread_t* threads; // Thread record list.
  58. unsigned threadRecdCnt; // Current count of records in 'threads' list.
  59. unsigned maxActiveTaskCnt; // Max. number of active tasks.
  60. cmTaskMgrStatusCb_t statusCb; // Client task status callback.
  61. void* statusCbArg; // Client task status callback argument.
  62. unsigned pauseSleepMs; //
  63. cmTs1p1cH_t callQueH; // client->mgr 'inst' communication queue
  64. cmTsMp1cH_t outQueH; // mgr->client communication queue
  65. cmTmTask_t* tasks; // Task list.
  66. cmTmInst_t* insts; // Task instance list.
  67. unsigned nextInstId; // Next available task instance id.
  68. unsigned activeTaskCnt; // Current active task count.
  69. } cmTm_t;
  70. void _cmTaskMgrStatusArgSetup(
  71. cmTaskMgrStatusArg_t* s,
  72. void* arg,
  73. unsigned instId,
  74. cmSelTmId_t selId,
  75. cmStatusTmId_t statusId,
  76. unsigned prog,
  77. const cmChar_t* text,
  78. const void* msg,
  79. unsigned msgByteCnt )
  80. {
  81. s->arg = arg;
  82. s->instId = instId;
  83. s->selId = selId;
  84. s->statusId = statusId;
  85. s->prog = prog;
  86. s->text = text;
  87. s->msg = msg;
  88. s->msgByteCnt = msgByteCnt;
  89. }
  90. // Called by MASTER and WORKER.
  91. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s )
  92. {
  93. enum { arrayCnt = 3 };
  94. const void* msgPtrArray[arrayCnt];
  95. unsigned msgSizeArray[arrayCnt];
  96. msgPtrArray[0] = s;
  97. msgPtrArray[1] = s->text==NULL ? "" : s->text;
  98. msgPtrArray[2] = s->msg;
  99. msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t);
  100. msgSizeArray[1] = s->text==NULL ? 1 : strlen(s->text)+1;
  101. msgSizeArray[2] = s->msgByteCnt;
  102. if( cmTsMp1cEnqueueSegMsg(p->outQueH, msgPtrArray, msgSizeArray, arrayCnt ) != kOkThRC )
  103. return kQueueFailTmRC;
  104. return kOkTmRC;
  105. }
  106. // Called by MASTER and WORKER.
  107. // This function is called by the worker thread wrapper _cmTmWorkerStatusCb()
  108. // function to enqueue messages being sent back to the client.
  109. cmTmRC_t _cmTmEnqueueStatusMsg1(
  110. cmTm_t* p,
  111. unsigned instId,
  112. cmSelTmId_t selId,
  113. cmStatusTmId_t statusId,
  114. unsigned prog,
  115. const cmChar_t* text,
  116. const void* msg,
  117. unsigned msgByteCnt )
  118. {
  119. cmTaskMgrStatusArg_t s;
  120. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,text,msg,msgByteCnt);
  121. return _cmTmEnqueueStatusMsg0(p,&s);
  122. }
  123. // Called by WORKER.
  124. // Worker threads call this function to enqueue status messages
  125. // for delivery to the task mgr client.
  126. void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status )
  127. {
  128. cmTmThread_t* trp = (cmTmThread_t*)status->arg;
  129. if( _cmTmEnqueueStatusMsg0( trp->p, status ) != kOkTmRC )
  130. {
  131. /// ??????? HOW DO WE HANDLE ERRORS IN THE WORKER THREAD
  132. /// (set an error code in trp and let the master thread notice it.)
  133. assert(0);
  134. }
  135. }
  136. // Called by WORKER.
  137. // This function is called in the worker thread by
  138. // cmTs1p1cDequeueMsg() from within cmTaskMgrWorkerHandleCommand()
  139. // to transfer msg's waiting in the worker's incoming msg queue
  140. // (cmTmInst_t.msgQueH) to the instance recv function (cmTmInst_t.recv).
  141. cmRC_t _cmTmWorkerRecvCb( void* arg, unsigned msgByteCnt, const void* msg )
  142. {
  143. cmTmThread_t* trp = (cmTmThread_t*)arg;
  144. assert(trp->inst->task->recv);
  145. trp->inst->task->recv(&trp->procArg,msg,msgByteCnt);
  146. return cmOkRC;
  147. }
  148. // Called by WORKER.
  149. // This is the wrapper for all worker threads.
  150. bool _cmTmWorkerThreadFunc(void* arg)
  151. {
  152. cmTmThread_t* trp = (cmTmThread_t*)arg;
  153. trp->procArg.reserved = trp;
  154. trp->procArg.arg = trp->inst->funcArg;
  155. trp->procArg.instId = trp->inst->instId;
  156. trp->procArg.statusCb = _cmTmWorkerStatusCb;
  157. trp->procArg.statusCbArg = trp;
  158. trp->procArg.progCnt = trp->inst->progCnt;
  159. trp->procArg.pauseSleepMs= trp->p->pauseSleepMs;
  160. // if the task was paused or killed while it was queued then
  161. // cmTaskMgrHandleCommand() will do the right thing
  162. if( cmTaskMgrWorkerHandleCommand(&trp->procArg) != kStopTmwRC )
  163. {
  164. trp->inst->status = kStartedTmId;
  165. // Notify the client that the instance has started.
  166. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  167. // Execute the client provided task function.
  168. trp->inst->task->func(&trp->procArg);
  169. }
  170. // Notify the client if the instance was killed
  171. if( trp->inst->ctlId == kKillTmId )
  172. {
  173. trp->inst->status = kKilledTmId;
  174. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  175. }
  176. // Notify the client that the instance is completed
  177. // (but don't actually set the status yet)
  178. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kCompletedTmId,0,NULL,NULL,0);
  179. trp->inst->status = kCompletedTmId;
  180. // Force the thread to go into the 'pause' state when it
  181. // returns to it's internal loop. The master thread recognizes paused
  182. // threads as available for reuse.
  183. cmThreadPause(trp->thH,kPauseThFl);
  184. return true;
  185. }
  186. void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg )
  187. {
  188. _cmTmEnqueueStatusMsg1(p,cmInvalidId,kErrorTmId,kInvalidTmId,rc,msg,NULL,0);
  189. }
  190. int _cmTmSortThreadByDur( const void* t0, const void* t1 )
  191. {
  192. double d = ((cmTmThread_t*)t0)->durSecs - ((cmTmThread_t*)t1)->durSecs;
  193. return d== 0 ? 0 : (d<0 ? -1 : 1);
  194. }
  195. // This is the master thread function.
  196. bool _cmTmMasterThreadFunc(void* arg)
  197. {
  198. cmTm_t* p = (cmTm_t*)arg;
  199. unsigned activeThreadCnt = 0;
  200. unsigned activeTaskCnt = 0;
  201. cmTmThread_t* trp = p->threads;
  202. if( p->threadRecdCnt > 0 )
  203. {
  204. cmTmThread_t* thArray[p->threadRecdCnt];
  205. unsigned deactivatedCnt = 0;
  206. //
  207. // Determine the number of active threads and tasks
  208. //
  209. // for each thread record
  210. for(trp=p->threads; trp!=NULL; trp=trp->link)
  211. {
  212. cmThStateId_t thState;
  213. // if this thread is active ...
  214. if( (thState = cmThreadState(trp->thH)) != kPausedThId )
  215. {
  216. assert(trp->inst!=NULL);
  217. thArray[activeThreadCnt] = trp;
  218. ++activeThreadCnt;
  219. // if the task assigned to this thread is started then the task is active
  220. if( trp->inst->status == kStartedTmId )
  221. ++activeTaskCnt;
  222. // if the deactivatedFl is set then this thread has been deactivated by the system
  223. if( trp->deactivateFl )
  224. ++deactivatedCnt;
  225. // update the task lifetime duration
  226. if( trp->inst->status != kCompletedTmId )
  227. {
  228. cmTimeSpec_t t2;
  229. cmTimeGet(&t2);
  230. trp->durSecs += (double)cmTimeElapsedMicros(&trp->t1,&t2) / 1000000.0;
  231. trp->t1 = t2;
  232. }
  233. }
  234. }
  235. //
  236. // thArray[activeThreadCnt] now holds pointers to the
  237. // cmTmThread_t records of the active threads
  238. //
  239. // if more tasks are active than should be - then deactive the youngest
  240. if( activeTaskCnt > p->maxActiveTaskCnt )
  241. {
  242. // sort the active tasks in increasing order of lifetime
  243. qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  244. // determine the number of threads that need to be deactivated
  245. int n = activeTaskCnt - p->maxActiveTaskCnt;
  246. int i,j;
  247. // pause the active threads with the lowest lifetime
  248. for(i=0,j=0; i<activeThreadCnt && j<n; ++i)
  249. if( thArray[i]->deactivateFl == false )
  250. {
  251. thArray[i]->deactivateFl = true;
  252. ++deactivatedCnt;
  253. ++j;
  254. }
  255. }
  256. //
  257. // if there are deactivated tasks and the max thread count has not been reached
  258. // then re-activate some of the deactivated tasks.
  259. //
  260. if( activeTaskCnt < p->maxActiveTaskCnt && deactivatedCnt > 0 )
  261. {
  262. // sort the active tasks in increasing order of lifetime
  263. qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  264. int n = cmMin(p->maxActiveTaskCnt - activeTaskCnt, deactivatedCnt );
  265. int i;
  266. // re-activate the oldest deactivated tasks first
  267. for(i=activeThreadCnt-1; i>=0 && n>0; --i)
  268. if( thArray[i]->deactivateFl )
  269. {
  270. thArray[i]->deactivateFl = false;
  271. --n;
  272. ++activeTaskCnt;
  273. }
  274. }
  275. }
  276. // If the number of activeTaskCnt is less than the limit and a queued task exists
  277. while( activeTaskCnt < p->maxActiveTaskCnt && cmTs1p1cMsgWaiting(p->callQueH) )
  278. {
  279. cmTmInst_t* ip = NULL;
  280. cmTmThread_t* atrp = NULL;
  281. // Find a worker thread that is in the 'paused' state.
  282. // This is the definitive indication that the thread
  283. // does not have an assigned instance and the thread recd can be reused.
  284. for(trp=p->threads; trp!=NULL; trp=trp->link)
  285. if( cmThreadState(trp->thH) == kPausedThId )
  286. {
  287. atrp = trp;
  288. break;
  289. }
  290. // If all the existing worker threads are in use ...
  291. if( atrp==NULL )
  292. {
  293. // ... then create a new worker thread recd
  294. atrp = cmMemAllocZ(cmTmThread_t,1);
  295. // ... create the new worker thread
  296. if( cmThreadCreate(&atrp->thH,_cmTmWorkerThreadFunc,atrp,p->err.rpt) != kOkThRC )
  297. {
  298. cmMemFree(atrp);
  299. atrp = NULL;
  300. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread create failed.");
  301. break;
  302. }
  303. else
  304. {
  305. // ... setup the new thread record
  306. atrp->p = p;
  307. atrp->link = p->threads;
  308. p->threads = atrp;
  309. p->threadRecdCnt += 1;
  310. }
  311. }
  312. // if the thread creation failed
  313. if( atrp == NULL )
  314. break;
  315. // dequeue a pending task instance pointer from the input queue
  316. if(cmTs1p1cDequeueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC )
  317. {
  318. _cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue.");
  319. break;
  320. }
  321. // if the task has a msg recv callback then assign it here
  322. if( ip->task->recv != NULL )
  323. if( cmTs1p1cSetCallback( ip->msgQueH, _cmTmWorkerRecvCb, atrp ) != kOkThRC )
  324. {
  325. _cmTmMasterRptError(p,kQueueFailTmRC,"Worker thread msg queue callback assignment failed.");
  326. break;
  327. }
  328. // setup the thread record associated with the new task
  329. atrp->inst = ip;
  330. atrp->durSecs = 0;
  331. atrp->deactivateFl = false;
  332. cmTimeGet(&atrp->t0);
  333. atrp->t1 = atrp->t0;
  334. // start the worker thread
  335. if( cmThreadPause(atrp->thH,kWaitThFl) != kOkThRC )
  336. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed.");
  337. ++activeTaskCnt;
  338. }
  339. cmSleepMs(p->pauseSleepMs);
  340. p->activeTaskCnt = activeTaskCnt;
  341. return true;
  342. }
  343. cmTm_t* _cmTmHandleToPtr( cmTaskMgrH_t h )
  344. {
  345. cmTm_t* p = (cmTm_t*)h.h;
  346. assert( p != NULL );
  347. return p;
  348. }
  349. cmTmTask_t* _cmTmTaskFromId( cmTm_t* p, unsigned taskId )
  350. {
  351. cmTmTask_t* tp;
  352. for(tp=p->tasks; tp!=NULL; tp=tp->link)
  353. if( tp->taskId == taskId )
  354. return tp;
  355. return NULL;
  356. }
  357. cmTmInst_t* _cmTmInstFromId( cmTm_t* p, unsigned instId )
  358. {
  359. cmTmInst_t* ip;
  360. for(ip=p->insts; ip!=NULL; ip=ip->link)
  361. if( ip->instId == instId )
  362. return ip;
  363. return NULL;
  364. }
  365. cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId )
  366. {
  367. cmTmInst_t* ip = p->insts;
  368. cmTmInst_t* pp = NULL;
  369. for(; ip!=NULL; ip=ip->link)
  370. {
  371. if( ip->instId == instId )
  372. {
  373. if( pp == NULL )
  374. p->insts = ip->link;
  375. else
  376. pp->link = ip->link;
  377. if( cmTs1p1cDestroy(&ip->msgQueH) != kOkThRC )
  378. return cmErrMsg(&p->err,kQueueFailTmRC,"The 'msg' input queue destroy failed.");
  379. cmMemFree(ip->label);
  380. cmMemFree(ip->result);
  381. cmMemFree(ip);
  382. return kOkTmRC;
  383. }
  384. pp = ip;
  385. }
  386. return cmErrMsg(&p->err,kAssertFailTmRC,"The instance %i could not be found to be deleted.",instId);
  387. }
  388. cmTmRC_t _cmTmDestroy( cmTm_t* p )
  389. {
  390. cmTmRC_t rc = kOkTmRC;
  391. unsigned i;
  392. // stop and destroy the master thread
  393. if( cmThreadDestroy(&p->mstrThH) != kOkThRC )
  394. {
  395. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Master thread destroy failed.");
  396. goto errLabel;
  397. }
  398. // stop and destroy all the worker threads
  399. for(i=0; p->threads != NULL; ++i )
  400. {
  401. if( cmThreadDestroy(&p->threads->thH) != kOkThRC )
  402. {
  403. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread destruction failed for the worker thread at index %i.",i);
  404. goto errLabel;
  405. }
  406. cmTmThread_t* trp = p->threads;
  407. p->threads = p->threads->link;
  408. cmMemFree(trp->text);
  409. cmMemFree(trp);
  410. }
  411. // release the call input queue
  412. if( cmTs1p1cDestroy(&p->callQueH) != kOkThRC )
  413. {
  414. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The 'call' input queue destroy failed.");
  415. goto errLabel;
  416. }
  417. // draining the output queue
  418. while( cmTsMp1cMsgWaiting(p->outQueH) )
  419. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  420. cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed while draingin.");
  421. // release the output queue
  422. if( cmTsMp1cDestroy(&p->outQueH) != kOkThRC )
  423. {
  424. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  425. goto errLabel;
  426. }
  427. // release instance list
  428. while( p->insts != NULL )
  429. _cmTmInstFree(p,p->insts->instId);
  430. // release the task list
  431. cmTmTask_t* tp = p->tasks;
  432. while( tp != NULL )
  433. {
  434. cmTmTask_t* np = tp->link;
  435. cmMemFree(tp->label);
  436. cmMemFree(tp);
  437. tp = np;
  438. }
  439. cmMemFree(p);
  440. errLabel:
  441. return rc;
  442. }
  443. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr );
  444. cmTmRC_t cmTaskMgrCreate(
  445. cmCtx_t* ctx,
  446. cmTaskMgrH_t* hp,
  447. cmTaskMgrStatusCb_t statusCb,
  448. void* statusCbArg,
  449. unsigned maxActiveTaskCnt,
  450. unsigned queueByteCnt,
  451. unsigned pauseSleepMs)
  452. {
  453. cmTmRC_t rc = kOkTmRC;
  454. if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC )
  455. return rc;
  456. cmTm_t* p = cmMemAllocZ(cmTm_t,1);
  457. cmErrSetup(&p->err,&ctx->rpt,"Task Mgr.");
  458. p->maxActiveTaskCnt = maxActiveTaskCnt;
  459. p->statusCb = statusCb;
  460. p->statusCbArg = statusCbArg;
  461. p->pauseSleepMs = pauseSleepMs;
  462. // create the master thread
  463. if( cmThreadCreate(&p->mstrThH, _cmTmMasterThreadFunc,p,&ctx->rpt) != kOkThRC )
  464. {
  465. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.");
  466. goto errLabel;
  467. }
  468. // create the call input queue
  469. if(cmTs1p1cCreate( &p->callQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  470. {
  471. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The call input queue creation failed.");
  472. goto errLabel;
  473. }
  474. // create the output queue
  475. if( cmTsMp1cCreate( &p->outQueH, queueByteCnt, _cmTmMasterOutQueueCb, p, p->err.rpt ) != kOkThRC )
  476. {
  477. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue creation failed.");
  478. goto errLabel;
  479. }
  480. hp->h = p;
  481. errLabel:
  482. return rc;
  483. }
  484. cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp )
  485. {
  486. cmTmRC_t rc = kOkTmRC;
  487. if( hp==NULL || cmTaskMgrIsValid(*hp)==false )
  488. return rc;
  489. cmTm_t* p = _cmTmHandleToPtr(*hp);
  490. if((rc = _cmTmDestroy(p)) != kOkTmRC )
  491. return rc;
  492. hp->h = NULL;
  493. return rc;
  494. }
  495. void _cmTmWaitForCompletion( cmTm_t* p, unsigned timeOutMs )
  496. {
  497. unsigned durMs = 0;
  498. cmTimeSpec_t t0,t1;
  499. cmTimeGet(&t0);
  500. // Go into timeout loop - waiting for all instances to finish
  501. while( timeOutMs==0 || durMs < timeOutMs )
  502. {
  503. cmTimeGet(&t1);
  504. durMs += cmTimeElapsedMicros(&t0,&t1) / 1000;
  505. t0 = t1;
  506. cmSleepMs(p->pauseSleepMs);
  507. if( p->activeTaskCnt == 0 )
  508. break;
  509. }
  510. }
  511. cmTmRC_t cmTaskMgrClose( cmTaskMgrH_t h, unsigned flags, unsigned timeOutMs )
  512. {
  513. cmTmRC_t rc = kOkTmRC;
  514. cmTm_t* p = _cmTmHandleToPtr(h);
  515. bool fl = false;
  516. // if requested kill any queued tasks
  517. if( cmIsFlag(flags,kKillQueuedTmFl) )
  518. {
  519. cmTmInst_t* ip = p->insts;
  520. for(; ip!=NULL; ip=ip->link)
  521. if( ip->status == kQueuedTmId )
  522. ip->ctlId = kKillTmId;
  523. }
  524. // wait for any existing or queued tasks to complete
  525. _cmTmWaitForCompletion(p,timeOutMs);
  526. // force any queued msgs for the client to be sent
  527. cmTaskMgrOnIdle(h);
  528. // if the 'kill on timeout' flag is set then kill any remaining active tasks
  529. if( cmIsFlag(flags,kTimeOutKillTmFl) )
  530. {
  531. cmTmInst_t* ip = p->insts;
  532. for(; ip!=NULL; ip=ip->link)
  533. if( ip->status != kCompletedTmId )
  534. {
  535. ip->ctlId = kKillTmId;
  536. fl = true;
  537. }
  538. }
  539. // wait for the remaining tasks to complete
  540. if( fl )
  541. _cmTmWaitForCompletion(p,timeOutMs);
  542. // force any queued msgs for the client to be sent
  543. cmTaskMgrOnIdle(h);
  544. return rc;
  545. }
  546. unsigned cmTaskMgrActiveTaskCount( cmTaskMgrH_t h )
  547. {
  548. cmTm_t* p = _cmTmHandleToPtr(h);
  549. return p->activeTaskCnt;
  550. }
  551. bool cmTaskMgrIsValid( cmTaskMgrH_t h )
  552. { return h.h != NULL; }
  553. const cmChar_t* cmTaskMgrStatusIdToLabel( cmStatusTmId_t statusId )
  554. {
  555. typedef struct map_str
  556. {
  557. cmStatusTmId_t id;
  558. const cmChar_t* label;
  559. } map_t;
  560. map_t a[] =
  561. {
  562. { kQueuedTmId, "Queued" },
  563. { kStartedTmId, "Started" },
  564. { kPausedTmId, "Paused" },
  565. { kDeactivatedTmId, "Deactivated" },
  566. { kCompletedTmId, "Completed" },
  567. { kKilledTmId, "Killed" },
  568. { kInvalidTmId, "<Invalid>" },
  569. };
  570. int i;
  571. for(i=0; a[i].id!=kInvalidTmId; ++i)
  572. if( a[i].id == statusId )
  573. return a[i].label;
  574. return "<Unknown>";
  575. }
  576. // This function is called by cmTaskMgrIdle() to dispatch
  577. // status updates to the client.
  578. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr )
  579. {
  580. cmTm_t* p = (cmTm_t*)arg;
  581. cmTaskMgrStatusArg_t s;
  582. // This is probably not nesessary since changing the memory
  583. // pointed to by msgDataPtr should be safe even though it is marked as const.
  584. memcpy(&s,msgDataPtr,sizeof(s));
  585. // The 'text' and 'msg' data have been serialized after the status record.
  586. // The 'text' is guaranteed to at least contain a terminating zero.
  587. s.text = ((char*)msgDataPtr) + sizeof(s);
  588. // if the 'resultByteCnt' > 0 then there is a result record
  589. if( s.msgByteCnt > 0 )
  590. s.msg = ((char*)msgDataPtr) + sizeof(s) + strlen(s.text) + 1;
  591. else
  592. s.msg = NULL;
  593. s.arg = p->statusCbArg;
  594. p->statusCb( &s );
  595. return cmOkRC;
  596. }
  597. cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h )
  598. {
  599. cmTmRC_t rc = kOkTmRC;
  600. cmTm_t* p = _cmTmHandleToPtr(h);
  601. // Transmit any msgs waiting to be sent to the client.
  602. while( cmTsMp1cMsgWaiting(p->outQueH) )
  603. {
  604. // calling this function calls: _cmTmMasterOutQueueCb()
  605. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  606. {
  607. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed during a dequeue.");
  608. goto errLabel;
  609. }
  610. }
  611. // Step through the instance list and delete instances that are
  612. // completed and also marked for deletion.
  613. cmTmInst_t* ip = p->insts;
  614. while( ip != NULL )
  615. {
  616. cmTmInst_t* np = ip->link;
  617. if( ip->status==kCompletedTmId && ip->deleteOnCompleteFl )
  618. _cmTmInstFree(p,ip->instId);
  619. ip = np;
  620. }
  621. errLabel:
  622. return rc;
  623. }
  624. bool cmTaskMgrIsEnabled( cmTaskMgrH_t h )
  625. {
  626. cmTm_t* p = _cmTmHandleToPtr(h);
  627. return cmThreadState(p->mstrThH) != kPausedThId;
  628. }
  629. cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
  630. {
  631. cmTmRC_t rc = kOkTmRC;
  632. cmTm_t* p = _cmTmHandleToPtr(h);
  633. unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl;
  634. if( cmThreadPause(p->mstrThH, flags ) != kOkThRC )
  635. rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" );
  636. return rc;
  637. }
  638. cmTmRC_t cmTaskMgrInstall(
  639. cmTaskMgrH_t h,
  640. unsigned taskId,
  641. const cmChar_t* label,
  642. cmTaskMgrFunc_t func,
  643. cmTaskMgrRecv_t recv)
  644. {
  645. cmTmRC_t rc = kOkTmRC;
  646. cmTm_t* p = _cmTmHandleToPtr(h);
  647. cmTmTask_t* tp = cmMemAllocZ(cmTmTask_t,1);
  648. if( _cmTmTaskFromId(p,taskId) != NULL )
  649. {
  650. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"The task id %i is already in use.",taskId);
  651. goto errLabel;
  652. }
  653. tp->taskId = taskId;
  654. tp->func = func;
  655. tp->recv = recv;
  656. tp->label = cmMemAllocStr(label);
  657. tp->link = p->tasks;
  658. p->tasks = tp;
  659. errLabel:
  660. return rc;
  661. }
  662. cmTmRC_t cmTaskMgrCall(
  663. cmTaskMgrH_t h,
  664. unsigned taskId,
  665. void* funcArg,
  666. unsigned progCnt,
  667. unsigned queueByteCnt,
  668. const cmChar_t* label,
  669. unsigned* retInstIdPtr )
  670. {
  671. cmTmRC_t rc = kOkTmRC;
  672. cmTm_t* p = _cmTmHandleToPtr(h);
  673. cmTmTask_t* tp = NULL;
  674. cmTmInst_t* ip = NULL;
  675. if( retInstIdPtr != NULL )
  676. *retInstIdPtr = cmInvalidId;
  677. // locate the task for this instance
  678. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  679. {
  680. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"Task not found for task id=%i.",taskId);
  681. goto errLabel;
  682. }
  683. // allocate a new instance record
  684. ip = cmMemAllocZ(cmTmInst_t,1);
  685. // setup the instance record
  686. ip->instId = p->nextInstId++;
  687. ip->task = tp;
  688. ip->funcArg = funcArg;
  689. ip->progCnt = progCnt;
  690. ip->label = label==NULL ? NULL : cmMemAllocStr(label);
  691. ip->status = kQueuedTmId;
  692. ip->ctlId = kStartTmId;
  693. // create the msg input queue
  694. if(cmTs1p1cCreate( &ip->msgQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  695. {
  696. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The msg input queue creation failed.");
  697. goto errLabel;
  698. }
  699. // insert the new instance at the end of the instance list
  700. if( p->insts == NULL )
  701. p->insts = ip;
  702. else
  703. {
  704. cmTmInst_t* pp = p->insts;
  705. for(; pp != NULL; pp=pp->link )
  706. if( pp->link == NULL )
  707. {
  708. pp->link = ip;
  709. break;
  710. }
  711. }
  712. // enqueue the instance ptr in the input queue
  713. if( cmTs1p1cEnqueueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC )
  714. {
  715. rc = cmErrMsg(&p->err,kQueueFailTmRC,"New task instance command enqueue failed.");
  716. goto errLabel;
  717. }
  718. // set the returned instance id
  719. if( retInstIdPtr != NULL )
  720. *retInstIdPtr = ip->instId;
  721. // notify the client that the instance was enqueued
  722. cmTaskMgrStatusArg_t s;
  723. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,progCnt,NULL,NULL,0);
  724. p->statusCb( &s );
  725. errLabel:
  726. return rc;
  727. }
  728. cmTmRC_t cmTaskMgrCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId )
  729. {
  730. cmTmRC_t rc = kOkTmRC;
  731. cmTm_t* p = _cmTmHandleToPtr(h);
  732. cmTmInst_t* ip = NULL;
  733. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  734. {
  735. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  736. goto errLabel;
  737. }
  738. // Once an instance ctlId is set to kKillTmId don't allow it to change.
  739. if( ip->ctlId == kKillTmId )
  740. return rc;
  741. switch(ctlId )
  742. {
  743. case kStartTmId:
  744. // Acting on a 'start' cmd only makes sense if the previous command was 'pause'
  745. if( ip->ctlId == kPauseTmId )
  746. ip->ctlId = kStartTmId;
  747. break;
  748. case kPauseTmId:
  749. // Acting on a 'pause' command only makes sense if the previous command was a 'start'
  750. if( ip->ctlId == kStartTmId )
  751. ip->ctlId = kPauseTmId;
  752. break;
  753. case kKillTmId:
  754. ip->ctlId = kKillTmId;
  755. break;
  756. }
  757. errLabel:
  758. return rc;
  759. }
  760. cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId )
  761. {
  762. cmTm_t* p = _cmTmHandleToPtr(h);
  763. cmTmInst_t* ip = NULL;
  764. cmStatusTmId_t status = kInvalidTmId;
  765. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  766. {
  767. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  768. goto errLabel;
  769. }
  770. status = ip->status;
  771. errLabel:
  772. return status;
  773. }
  774. cmTmRC_t cmTaskMgrSendMsg( cmTaskMgrH_t h, unsigned instId, const void* msg, unsigned msgByteCnt )
  775. {
  776. cmTm_t* p = _cmTmHandleToPtr(h);
  777. cmTmRC_t rc = kOkTmRC;
  778. cmTmInst_t* ip = NULL;
  779. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  780. return cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  781. if( cmTs1p1cEnqueueMsg(ip->msgQueH, msg, msgByteCnt ) != kOkThRC )
  782. rc = cmErrMsg(&p->err,kQueueFailTmRC,"Task msg enqueue failed.");
  783. return rc;
  784. }
  785. const cmChar_t* cmTaskMgrTaskIdToLabel( cmTaskMgrH_t h, unsigned taskId )
  786. {
  787. cmTm_t* p = _cmTmHandleToPtr(h);
  788. cmTmTask_t* tp;
  789. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  790. return NULL;
  791. return tp->label;
  792. }
  793. const cmChar_t* cmTaskMgrInstIdToLabel( cmTaskMgrH_t h, unsigned instId )
  794. {
  795. cmTm_t* p = _cmTmHandleToPtr(h);
  796. cmTmInst_t* ip;
  797. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  798. return NULL;
  799. return ip->label;
  800. }
  801. const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId )
  802. {
  803. cmTm_t* p = _cmTmHandleToPtr(h);
  804. cmTmInst_t* ip = NULL;
  805. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  806. {
  807. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  808. goto errLabel;
  809. }
  810. if( ip->status != kCompletedTmId )
  811. {
  812. cmErrMsg(&p->err,kOpFailTmRC,"The result of a running task (id:%i) may not be accessed.",instId);
  813. goto errLabel;
  814. }
  815. return ip->result;
  816. errLabel:
  817. return NULL;
  818. }
  819. unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId )
  820. {
  821. cmTm_t* p = _cmTmHandleToPtr(h);
  822. cmTmInst_t* ip = NULL;
  823. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  824. {
  825. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  826. goto errLabel;
  827. }
  828. if( ip->status != kCompletedTmId )
  829. {
  830. cmErrMsg(&p->err,kOpFailTmRC,"The result byte count of a running task (id:%i) may not be accessed.",instId);
  831. goto errLabel;
  832. }
  833. return ip->resultByteCnt;
  834. errLabel:
  835. return 0;
  836. }
  837. void* cmTaskMgrFuncArg( cmTaskMgrH_t h, unsigned instId )
  838. {
  839. cmTm_t* p = _cmTmHandleToPtr(h);
  840. cmTmInst_t* ip = NULL;
  841. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  842. {
  843. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  844. goto errLabel;
  845. }
  846. if( ip->status != kCompletedTmId )
  847. {
  848. cmErrMsg(&p->err,kOpFailTmRC,"The function argument of a running task (id:%i) may not be accessed.",instId);
  849. goto errLabel;
  850. }
  851. return ip->funcArg;
  852. errLabel:
  853. return NULL;
  854. }
  855. cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId )
  856. {
  857. cmTmRC_t rc = kOkTmRC;
  858. cmTm_t* p = _cmTmHandleToPtr(h);
  859. cmTmInst_t* ip = NULL;
  860. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  861. {
  862. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  863. return 0;
  864. }
  865. ip->deleteOnCompleteFl = true;
  866. return rc;
  867. }
  868. void _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, cmSelTmId_t selId, cmStatusTmId_t statusId, unsigned prog, const cmChar_t* text )
  869. {
  870. cmTaskMgrStatusArg_t s;
  871. _cmTaskMgrStatusArgSetup(
  872. &s,
  873. a->statusCbArg,
  874. a->instId,
  875. statusId == kInvalidTmId ? kProgTmId : kStatusTmId,
  876. statusId == kInvalidTmId ? kStartedTmId : statusId,
  877. statusId == kInvalidTmId ? prog : 0,
  878. text,NULL,0);
  879. a->statusCb(&s);
  880. }
  881. cmTmWorkerRC_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a )
  882. {
  883. cmTmThread_t* trp = a->reserved;
  884. // Check if we should go into the paused or deactivated state.
  885. if( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true )
  886. {
  887. cmStatusTmId_t prvStatus = kInvalidTmId;
  888. do
  889. {
  890. // Note that it is possible that the state of the task switch from
  891. // paused <-> deactivated during the course of this loop.
  892. // In either case we continue looping but should report the change
  893. // to the client via a status callback.
  894. // change the instance status to reflect the true status
  895. trp->inst->status = trp->deactivateFl ? kDeactivatedTmId : kPausedTmId;
  896. // if the status actually changed then notify the client
  897. if( trp->inst->status != prvStatus )
  898. {
  899. _cmTaskMgrWorkerHelper(a,kStatusTmId,trp->inst->status,0,NULL);
  900. prvStatus = trp->inst->status;
  901. }
  902. // sleep the thread for pauseSleepMs milliseconds
  903. cmSleepMs(a->pauseSleepMs);
  904. // if the task was unpaused while we slept
  905. }while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl==true );
  906. // we are leaving the paused state because we were restarted or killed.
  907. switch( trp->inst->ctlId )
  908. {
  909. case kStartTmId:
  910. // change the instance status to 'started'.
  911. trp->inst->status = kStartedTmId;
  912. // notify the client of the change in state
  913. _cmTaskMgrWorkerHelper(a,kStatusTmId,kStartedTmId,0,NULL);
  914. break;
  915. case kKillTmId:
  916. // if killed the client will be notified in the worker thread wrapper
  917. // function: _cmTmWorkerThreadFunc()
  918. break;
  919. default:
  920. { assert(0); }
  921. }
  922. }
  923. else // There was no command to handle so check for incoming msg's.
  924. {
  925. if( cmTs1p1cMsgWaiting(trp->inst->msgQueH) )
  926. {
  927. // if the task registered a msg receive callback
  928. if( trp->inst->task->recv != NULL )
  929. {
  930. if( cmTs1p1cDequeueMsg(trp->inst->msgQueH, NULL, 0 ) != kOkThRC )
  931. {
  932. // ?????
  933. // ????? how do we send error messages back to the client
  934. // ??????
  935. return kOkTmwRC;
  936. }
  937. }
  938. else
  939. {
  940. }
  941. return kRecvTmwRC;
  942. }
  943. }
  944. // if ctlId==kKillTmId then the status update will be handled
  945. // when the task custom function returns in _cmTmWorkerThreadFunc()
  946. return trp->inst->ctlId == kKillTmId ? kStopTmwRC : kOkTmwRC;
  947. }
  948. cmTmRC_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
  949. {
  950. _cmTaskMgrWorkerHelper(a,kStatusTmId,statusId,0,NULL);
  951. return kOkTmRC;
  952. }
  953. cmTmRC_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* text )
  954. {
  955. _cmTaskMgrWorkerHelper(a,kProgTmId,kInvalidTmId,prog,text);
  956. return kOkTmRC;
  957. }
  958. cmTmRC_t cmTaskMgrWorkerSendProgressV( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, va_list vl )
  959. {
  960. cmTmThread_t* trp = a->reserved;
  961. cmTsVPrintfP(trp->text,fmt,vl);
  962. return cmTaskMgrWorkerSendProgress(a,prog,trp->text);
  963. }
  964. cmTmRC_t cmTaskMgrWorkerSendProgressF( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, ... )
  965. {
  966. va_list vl;
  967. va_start(vl,fmt);
  968. cmTmRC_t rc = cmTaskMgrWorkerSendProgressV(a,prog,fmt,vl);
  969. va_end(vl);
  970. return rc;
  971. }
  972. cmTmRC_t cmTaskMgrWorkerError( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* text )
  973. {
  974. _cmTaskMgrWorkerHelper(a, kErrorTmId, kInvalidTmId, rc, text);
  975. return rc;
  976. }
  977. cmTmRC_t cmTaskMgrWorkerErrorV( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, va_list vl )
  978. {
  979. cmTmThread_t* trp = a->reserved;
  980. cmTsVPrintfP(trp->text,fmt,vl);
  981. return cmTaskMgrWorkerError(a,rc,trp->text);
  982. }
  983. cmTmRC_t cmTaskMgrWorkerErrorF( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, ... )
  984. {
  985. va_list vl;
  986. va_start(vl,fmt);
  987. cmTmRC_t rc0 = cmTaskMgrWorkerErrorV(a,rc,fmt,vl);
  988. va_end(vl);
  989. return rc0;
  990. }
  991. cmTmRC_t cmTaskMgrWorkerSetResult( cmTaskMgrFuncArg_t* a, void* result, unsigned resultByteCnt )
  992. {
  993. cmTmThread_t* trp = a->reserved;
  994. trp->inst->result = result;
  995. trp->inst->resultByteCnt = resultByteCnt;
  996. return kOkTmRC;
  997. }
  998. unsigned cmTaskMgrWorkerMsgByteCount( cmTaskMgrFuncArg_t* a )
  999. {
  1000. cmTmThread_t* trp = a->reserved;
  1001. return cmTs1p1cDequeueMsgByteCount(trp->inst->msgQueH);
  1002. }
  1003. unsigned cmTaskMgrWorkerMsgRecv( cmTaskMgrFuncArg_t* a, void* buf, unsigned bufByteCnt )
  1004. {
  1005. cmTmThread_t* trp = a->reserved;
  1006. unsigned retVal = bufByteCnt;
  1007. switch( cmTs1p1cDequeueMsg(trp->inst->msgQueH, buf, bufByteCnt ) )
  1008. {
  1009. case kOkThRC:
  1010. break;
  1011. case kBufEmptyThRC:
  1012. retVal = 0;
  1013. break;
  1014. case kBufTooSmallThRC:
  1015. retVal = cmInvalidCnt;
  1016. break;
  1017. default:
  1018. { assert(0); }
  1019. }
  1020. return retVal;
  1021. }
  1022. cmTmRC_t cmTaskMgrWorkerMsgSend( cmTaskMgrFuncArg_t* a, const void* buf, unsigned bufByteCnt )
  1023. {
  1024. cmTmThread_t* trp = a->reserved;
  1025. return _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kMsgTmId,trp->inst->status,0,NULL,buf,bufByteCnt);
  1026. }
  1027. //-----------------------------------------------------------------------------
  1028. enum { kMaxTestInstCnt = 3 };
  1029. typedef struct cmTmTestInst_str
  1030. {
  1031. unsigned instId;
  1032. } cmTmTestInst_t;
  1033. typedef struct cmTmTestApp_str
  1034. {
  1035. cmErr_t* err;
  1036. cmTmTestInst_t insts[kMaxTestInstCnt];
  1037. } cmTmTestApp_t;
  1038. void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s )
  1039. {
  1040. cmRptPrintf(rpt,"inst:%i ",s->instId );
  1041. switch( s->selId )
  1042. {
  1043. case kStatusTmId:
  1044. {
  1045. const cmChar_t* label = cmTaskMgrStatusIdToLabel(s->statusId);
  1046. cmRptPrintf(rpt,"status '%s'",label);
  1047. }
  1048. break;
  1049. case kProgTmId:
  1050. cmRptPrintf(rpt,"prog %i",s->prog);
  1051. break;
  1052. case kErrorTmId:
  1053. cmRptPrintf(rpt,"error %s",cmStringNullGuard(s->msg));
  1054. break;
  1055. default:
  1056. { assert(0); }
  1057. }
  1058. cmRptPrintf(rpt,"\n");
  1059. }
  1060. // Test client status callback function.
  1061. void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s )
  1062. {
  1063. // s.arg set from cmTaskMgrCreate( ..., statusCbArg, ...);
  1064. cmTmTestApp_t* app = (cmTmTestApp_t*)s->arg;
  1065. unsigned i;
  1066. // locate the instance record assoc'd with this callback
  1067. for(i=0; i<kMaxTestInstCnt; ++i)
  1068. if( app->insts[i].instId == s->instId )
  1069. break;
  1070. if( i==kMaxTestInstCnt )
  1071. cmRptPrintf(app->err->rpt,"instId %i not found.\n",s->instId);
  1072. _cmTmTestReportStatus(app->err->rpt,s);
  1073. }
  1074. // Test worker function.
  1075. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg )
  1076. {
  1077. if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC )
  1078. return;
  1079. unsigned prog = 0;
  1080. for(; prog<arg->progCnt; ++prog)
  1081. {
  1082. if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC )
  1083. break;
  1084. cmSleepMs(1000);
  1085. if( cmTaskMgrWorkerSendProgress(arg,prog,NULL) == kStopTmwRC )
  1086. break;
  1087. }
  1088. }
  1089. cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx)
  1090. {
  1091. cmTmRC_t rc = kOkTmRC;
  1092. cmTaskMgrH_t tmH = cmTaskMgrNullHandle;
  1093. unsigned threadCnt = 2;
  1094. unsigned queueByteCnt = 1024;
  1095. unsigned pauseSleepMs = 50;
  1096. unsigned nextInstId = 0;
  1097. unsigned taskId = 0;
  1098. const cmChar_t* taskLabel = "Task Label";
  1099. cmTmTestApp_t app;
  1100. char c;
  1101. memset(&app,0,sizeof(app));
  1102. app.err = &ctx->err;
  1103. // create the task mgr
  1104. if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC )
  1105. {
  1106. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr create failed.");
  1107. goto errLabel;
  1108. }
  1109. // install a task
  1110. if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc, NULL ) != kOkTmRC )
  1111. {
  1112. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr task install failed.");
  1113. goto errLabel;
  1114. }
  1115. // go into interactive mode
  1116. printf("q=quit e=enable c=call i=idle\n");
  1117. while((c = getchar()) != 'q')
  1118. {
  1119. switch(c)
  1120. {
  1121. case 'i':
  1122. cmTaskMgrOnIdle(tmH);
  1123. cmRptPrintf(&ctx->rpt,"idled\n");
  1124. break;
  1125. case 'e':
  1126. {
  1127. // toggle the enable state of the task mgr.
  1128. bool fl = !cmTaskMgrIsEnabled(tmH);
  1129. if( cmTaskMgrEnable(tmH,fl) != kOkTmRC )
  1130. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test enable failed.");
  1131. else
  1132. cmRptPrintf(&ctx->rpt,"%s\n", fl ? "enabled" : "disabled" );
  1133. }
  1134. break;
  1135. case 'c':
  1136. if( nextInstId < kMaxTestInstCnt )
  1137. {
  1138. void* funcArg = app.insts + nextInstId;
  1139. unsigned progCnt = 5;
  1140. if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, queueByteCnt, "My Inst", &app.insts[nextInstId].instId ) != kOkTmRC )
  1141. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed.");
  1142. else
  1143. {
  1144. ++nextInstId;
  1145. cmRptPrintf(&ctx->rpt,"called\n");
  1146. }
  1147. }
  1148. }
  1149. }
  1150. errLabel:
  1151. // destroy the task mgr
  1152. if( cmTaskMgrDestroy(&tmH) != kOkTmRC )
  1153. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr destroy failed.");
  1154. return rc;
  1155. }