libcm is a C development framework with an emphasis on audio signal processing applications.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

cmTaskMgr.c 38KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438
  1. //| Copyright: (C) 2009-2020 Kevin Larke <contact AT larke DOT org>
  2. //| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
  3. #include "cmGlobal.h"
  4. #include "cmRpt.h"
  5. #include "cmErr.h"
  6. #include "cmCtx.h"
  7. #include "cmMem.h"
  8. #include "cmMallocDebug.h"
  9. #include "cmThread.h"
  10. #include "cmTime.h"
  11. #include "cmTaskMgr.h"
  12. #include "cmLinkedHeap.h"
  13. #include "cmText.h"
  14. cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE;
  15. struct cmTmInst_str;
  16. typedef struct cmTmTask_str
  17. {
  18. unsigned taskId;
  19. cmChar_t* label;
  20. cmTaskMgrFunc_t func;
  21. cmTaskMgrRecv_t recv;
  22. struct cmTmTask_str* link;
  23. } cmTmTask_t;
  24. typedef struct cmTmInst_str
  25. {
  26. unsigned instId; // Task instance id.
  27. struct cmTmTask_str* task; // Pointer to task record for this task instance.
  28. void* funcArg; // Client supplied pointer to cmTaskMgrFuncArg_t.arg;
  29. unsigned progCnt; // Maximum expected progress (cmTaskMgrStatusArg_t.prog) to be used by this task instance.
  30. cmChar_t* label; // Optional instance label.
  31. cmStatusTmId_t status; // Current instance status (See cmStatusTmId_t)
  32. void* result; // Task instance result pointer.
  33. unsigned resultByteCnt; // Size of the task instance result pointer in bytes.
  34. cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread
  35. cmTs1p1cH_t msgQueH; // client->inst 'msg' communication queue
  36. bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete
  37. struct cmTmInst_str* link;
  38. } cmTmInst_t;
  39. struct cmTm_str* p;
  40. typedef struct cmTmThread_str
  41. {
  42. struct cmTm_str* p; // Pointer to task mgr.
  43. cmThreadH_t thH; // Thread handle.
  44. cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
  45. double durSecs; // Duration of the instance currently assigned to this thread in seconds.
  46. cmTimeSpec_t t0; // Task start time.
  47. cmTimeSpec_t t1; // Time of last review by the master thread.
  48. bool deactivateFl; // True if this instance has been deactivated by the system.
  49. cmTaskMgrFuncArg_t procArg; //
  50. cmChar_t* text; // Temporary text buffer
  51. struct cmTmThread_str* link; // p->threads link.
  52. } cmTmThread_t;
  53. typedef struct cmTm_str
  54. {
  55. cmErr_t err; // Task manager error object.
  56. cmThreadH_t mstrThH; // Master thread handle.
  57. cmTmThread_t* threads; // Thread record list.
  58. unsigned threadRecdCnt; // Current count of records in 'threads' list.
  59. unsigned maxActiveTaskCnt; // Max. number of active tasks.
  60. cmTaskMgrStatusCb_t statusCb; // Client task status callback.
  61. void* statusCbArg; // Client task status callback argument.
  62. unsigned pauseSleepMs; //
  63. cmTs1p1cH_t callQueH; // client->mgr 'inst' communication queue
  64. cmTsMp1cH_t outQueH; // mgr->client communication queue
  65. cmTmTask_t* tasks; // Task list.
  66. cmTmInst_t* insts; // Task instance list.
  67. unsigned nextInstId; // Next available task instance id.
  68. unsigned activeTaskCnt; // Current active task count.
  69. } cmTm_t;
  70. void _cmTaskMgrStatusArgSetup(
  71. cmTaskMgrStatusArg_t* s,
  72. void* arg,
  73. unsigned instId,
  74. cmSelTmId_t selId,
  75. cmStatusTmId_t statusId,
  76. unsigned prog,
  77. const cmChar_t* text,
  78. const void* msg,
  79. unsigned msgByteCnt )
  80. {
  81. s->arg = arg;
  82. s->instId = instId;
  83. s->selId = selId;
  84. s->statusId = statusId;
  85. s->prog = prog;
  86. s->text = text;
  87. s->msg = msg;
  88. s->msgByteCnt = msgByteCnt;
  89. }
  90. // Called by MASTER and WORKER.
  91. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s )
  92. {
  93. enum { arrayCnt = 3 };
  94. const void* msgPtrArray[arrayCnt];
  95. unsigned msgSizeArray[arrayCnt];
  96. msgPtrArray[0] = s;
  97. msgPtrArray[1] = s->text==NULL ? "" : s->text;
  98. msgPtrArray[2] = s->msg;
  99. msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t);
  100. msgSizeArray[1] = s->text==NULL ? 1 : strlen(s->text)+1;
  101. msgSizeArray[2] = s->msgByteCnt;
  102. if( cmTsMp1cEnqueueSegMsg(p->outQueH, msgPtrArray, msgSizeArray, arrayCnt ) != kOkThRC )
  103. return kQueueFailTmRC;
  104. return kOkTmRC;
  105. }
  106. // Called by MASTER and WORKER.
  107. // This function is called by the worker thread wrapper _cmTmWorkerStatusCb()
  108. // function to enqueue messages being sent back to the client.
  109. cmTmRC_t _cmTmEnqueueStatusMsg1(
  110. cmTm_t* p,
  111. unsigned instId,
  112. cmSelTmId_t selId,
  113. cmStatusTmId_t statusId,
  114. unsigned prog,
  115. const cmChar_t* text,
  116. const void* msg,
  117. unsigned msgByteCnt )
  118. {
  119. cmTaskMgrStatusArg_t s;
  120. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,text,msg,msgByteCnt);
  121. return _cmTmEnqueueStatusMsg0(p,&s);
  122. }
  123. // Called by WORKER.
  124. // Worker threads call this function to enqueue status messages
  125. // for delivery to the task mgr client.
  126. void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status )
  127. {
  128. cmTmThread_t* trp = (cmTmThread_t*)status->arg;
  129. if( _cmTmEnqueueStatusMsg0( trp->p, status ) != kOkTmRC )
  130. {
  131. /// ??????? HOW DO WE HANDLE ERRORS IN THE WORKER THREAD
  132. /// (set an error code in trp and let the master thread notice it.)
  133. assert(0);
  134. }
  135. }
  136. // Called by WORKER.
  137. // This function is called in the worker thread by
  138. // cmTs1p1cDequeueMsg() from within cmTaskMgrWorkerHandleCommand()
  139. // to transfer msg's waiting in the worker's incoming msg queue
  140. // (cmTmInst_t.msgQueH) to the instance recv function (cmTmInst_t.recv).
  141. cmRC_t _cmTmWorkerRecvCb( void* arg, unsigned msgByteCnt, const void* msg )
  142. {
  143. cmTmThread_t* trp = (cmTmThread_t*)arg;
  144. assert(trp->inst->task->recv);
  145. trp->inst->task->recv(&trp->procArg,msg,msgByteCnt);
  146. return cmOkRC;
  147. }
  148. // Called by WORKER.
  149. // This is the wrapper for all worker threads.
  150. bool _cmTmWorkerThreadFunc(void* arg)
  151. {
  152. cmTmThread_t* trp = (cmTmThread_t*)arg;
  153. trp->procArg.reserved = trp;
  154. trp->procArg.arg = trp->inst->funcArg;
  155. trp->procArg.instId = trp->inst->instId;
  156. trp->procArg.statusCb = _cmTmWorkerStatusCb;
  157. trp->procArg.statusCbArg = trp;
  158. trp->procArg.progCnt = trp->inst->progCnt;
  159. trp->procArg.pauseSleepMs= trp->p->pauseSleepMs;
  160. // if the task was paused or killed while it was queued then
  161. // cmTaskMgrHandleCommand() will do the right thing
  162. if( cmTaskMgrWorkerHandleCommand(&trp->procArg) != kStopTmwRC )
  163. {
  164. trp->inst->status = kStartedTmId;
  165. // Notify the client that the instance has started.
  166. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  167. // Execute the client provided task function.
  168. trp->inst->task->func(&trp->procArg);
  169. }
  170. // Notify the client if the instance was killed
  171. if( trp->inst->ctlId == kKillTmId )
  172. {
  173. trp->inst->status = kKilledTmId;
  174. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  175. }
  176. // Notify the client that the instance is completed
  177. // (but don't actually set the status yet)
  178. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kCompletedTmId,0,NULL,NULL,0);
  179. trp->inst->status = kCompletedTmId;
  180. // Force the thread to go into the 'pause' state when it
  181. // returns to it's internal loop. The master thread recognizes paused
  182. // threads as available for reuse.
  183. cmThreadPause(trp->thH,kPauseThFl);
  184. return true;
  185. }
  186. void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg )
  187. {
  188. _cmTmEnqueueStatusMsg1(p,cmInvalidId,kErrorTmId,kInvalidTmId,rc,msg,NULL,0);
  189. }
  190. int _cmTmSortThreadByDur( const void* t0, const void* t1 )
  191. {
  192. double d = ((cmTmThread_t*)t0)->durSecs - ((cmTmThread_t*)t1)->durSecs;
  193. return d== 0 ? 0 : (d<0 ? -1 : 1);
  194. }
  195. // This is the master thread function.
  196. bool _cmTmMasterThreadFunc(void* arg)
  197. {
  198. cmTm_t* p = (cmTm_t*)arg;
  199. unsigned activeThreadCnt = 0;
  200. unsigned activeTaskCnt = 0;
  201. cmTmThread_t* trp = p->threads;
  202. if( p->threadRecdCnt > 0 )
  203. {
  204. cmTmThread_t* thArray[p->threadRecdCnt];
  205. unsigned deactivatedCnt = 0;
  206. //
  207. // Determine the number of active threads and tasks
  208. //
  209. // for each thread record
  210. for(trp=p->threads; trp!=NULL; trp=trp->link)
  211. {
  212. cmThStateId_t thState;
  213. // if this thread is active ...
  214. if( (thState = cmThreadState(trp->thH)) != kPausedThId )
  215. {
  216. assert(trp->inst!=NULL);
  217. thArray[activeThreadCnt] = trp;
  218. ++activeThreadCnt;
  219. // if the task assigned to this thread is started then the task is active
  220. if( trp->inst->status == kStartedTmId )
  221. ++activeTaskCnt;
  222. // if the deactivatedFl is set then this thread has been deactivated by the system
  223. if( trp->deactivateFl )
  224. ++deactivatedCnt;
  225. // update the task lifetime duration
  226. if( trp->inst->status != kCompletedTmId )
  227. {
  228. cmTimeSpec_t t2;
  229. cmTimeGet(&t2);
  230. trp->durSecs += (double)cmTimeElapsedMicros(&trp->t1,&t2) / 1000000.0;
  231. trp->t1 = t2;
  232. }
  233. }
  234. }
  235. //
  236. // thArray[activeThreadCnt] now holds pointers to the
  237. // cmTmThread_t records of the active threads
  238. //
  239. // if more tasks are active than should be - then deactive the youngest
  240. if( activeTaskCnt > p->maxActiveTaskCnt )
  241. {
  242. // sort the active tasks in increasing order of lifetime
  243. qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  244. // determine the number of threads that need to be deactivated
  245. int n = activeTaskCnt - p->maxActiveTaskCnt;
  246. int i,j;
  247. // pause the active threads with the lowest lifetime
  248. for(i=0,j=0; i<activeThreadCnt && j<n; ++i)
  249. if( thArray[i]->deactivateFl == false )
  250. {
  251. thArray[i]->deactivateFl = true;
  252. ++deactivatedCnt;
  253. ++j;
  254. }
  255. }
  256. //
  257. // if there are deactivated tasks and the max thread count has not been reached
  258. // then re-activate some of the deactivated tasks.
  259. //
  260. if( activeTaskCnt < p->maxActiveTaskCnt && deactivatedCnt > 0 )
  261. {
  262. // sort the active tasks in increasing order of lifetime
  263. qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  264. int n = cmMin(p->maxActiveTaskCnt - activeTaskCnt, deactivatedCnt );
  265. int i;
  266. // re-activate the oldest deactivated tasks first
  267. for(i=activeThreadCnt-1; i>=0 && n>0; --i)
  268. if( thArray[i]->deactivateFl )
  269. {
  270. thArray[i]->deactivateFl = false;
  271. --n;
  272. ++activeTaskCnt;
  273. }
  274. }
  275. }
  276. // If the number of activeTaskCnt is less than the limit and a queued task exists
  277. while( activeTaskCnt < p->maxActiveTaskCnt && cmTs1p1cMsgWaiting(p->callQueH) )
  278. {
  279. cmTmInst_t* ip = NULL;
  280. cmTmThread_t* atrp = NULL;
  281. // Find a worker thread that is in the 'paused' state.
  282. // This is the definitive indication that the thread
  283. // does not have an assigned instance and the thread recd can be reused.
  284. for(trp=p->threads; trp!=NULL; trp=trp->link)
  285. if( cmThreadState(trp->thH) == kPausedThId )
  286. {
  287. atrp = trp;
  288. break;
  289. }
  290. // If all the existing worker threads are in use ...
  291. if( atrp==NULL )
  292. {
  293. // ... then create a new worker thread recd
  294. atrp = cmMemAllocZ(cmTmThread_t,1);
  295. // ... create the new worker thread
  296. if( cmThreadCreate(&atrp->thH,_cmTmWorkerThreadFunc,atrp,p->err.rpt) != kOkThRC )
  297. {
  298. cmMemFree(atrp);
  299. atrp = NULL;
  300. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread create failed.");
  301. break;
  302. }
  303. else
  304. {
  305. // ... setup the new thread record
  306. atrp->p = p;
  307. atrp->link = p->threads;
  308. p->threads = atrp;
  309. p->threadRecdCnt += 1;
  310. }
  311. }
  312. // if the thread creation failed
  313. if( atrp == NULL )
  314. break;
  315. // dequeue a pending task instance pointer from the input queue
  316. if(cmTs1p1cDequeueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC )
  317. {
  318. _cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue.");
  319. break;
  320. }
  321. // if the task has a msg recv callback then assign it here
  322. if( ip->task->recv != NULL )
  323. if( cmTs1p1cSetCallback( ip->msgQueH, _cmTmWorkerRecvCb, atrp ) != kOkThRC )
  324. {
  325. _cmTmMasterRptError(p,kQueueFailTmRC,"Worker thread msg queue callback assignment failed.");
  326. break;
  327. }
  328. // setup the thread record associated with the new task
  329. atrp->inst = ip;
  330. atrp->durSecs = 0;
  331. atrp->deactivateFl = false;
  332. cmTimeGet(&atrp->t0);
  333. atrp->t1 = atrp->t0;
  334. // start the worker thread
  335. if( cmThreadPause(atrp->thH,kWaitThFl) != kOkThRC )
  336. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed.");
  337. ++activeTaskCnt;
  338. }
  339. cmSleepMs(p->pauseSleepMs);
  340. p->activeTaskCnt = activeTaskCnt;
  341. return true;
  342. }
  343. cmTm_t* _cmTmHandleToPtr( cmTaskMgrH_t h )
  344. {
  345. cmTm_t* p = (cmTm_t*)h.h;
  346. assert( p != NULL );
  347. return p;
  348. }
  349. cmTmTask_t* _cmTmTaskFromId( cmTm_t* p, unsigned taskId )
  350. {
  351. cmTmTask_t* tp;
  352. for(tp=p->tasks; tp!=NULL; tp=tp->link)
  353. if( tp->taskId == taskId )
  354. return tp;
  355. return NULL;
  356. }
  357. cmTmInst_t* _cmTmInstFromId( cmTm_t* p, unsigned instId )
  358. {
  359. cmTmInst_t* ip;
  360. for(ip=p->insts; ip!=NULL; ip=ip->link)
  361. if( ip->instId == instId )
  362. return ip;
  363. return NULL;
  364. }
  365. cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId )
  366. {
  367. cmTmInst_t* ip = p->insts;
  368. cmTmInst_t* pp = NULL;
  369. for(; ip!=NULL; ip=ip->link)
  370. {
  371. if( ip->instId == instId )
  372. {
  373. if( pp == NULL )
  374. p->insts = ip->link;
  375. else
  376. pp->link = ip->link;
  377. if( cmTs1p1cDestroy(&ip->msgQueH) != kOkThRC )
  378. return cmErrMsg(&p->err,kQueueFailTmRC,"The 'msg' input queue destroy failed.");
  379. cmMemFree(ip->label);
  380. cmMemFree(ip->result);
  381. cmMemFree(ip);
  382. return kOkTmRC;
  383. }
  384. pp = ip;
  385. }
  386. return cmErrMsg(&p->err,kAssertFailTmRC,"The instance %i could not be found to be deleted.",instId);
  387. }
  388. cmTmRC_t _cmTmDestroy( cmTm_t* p )
  389. {
  390. cmTmRC_t rc = kOkTmRC;
  391. unsigned i;
  392. // stop and destroy the master thread
  393. if( cmThreadDestroy(&p->mstrThH) != kOkThRC )
  394. {
  395. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Master thread destroy failed.");
  396. goto errLabel;
  397. }
  398. // stop and destroy all the worker threads
  399. for(i=0; p->threads != NULL; ++i )
  400. {
  401. if( cmThreadDestroy(&p->threads->thH) != kOkThRC )
  402. {
  403. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread destruction failed for the worker thread at index %i.",i);
  404. goto errLabel;
  405. }
  406. cmTmThread_t* trp = p->threads;
  407. p->threads = p->threads->link;
  408. cmMemFree(trp->text);
  409. cmMemFree(trp);
  410. }
  411. // release the call input queue
  412. if( cmTs1p1cDestroy(&p->callQueH) != kOkThRC )
  413. {
  414. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The 'call' input queue destroy failed.");
  415. goto errLabel;
  416. }
  417. // draining the output queue
  418. while( cmTsMp1cMsgWaiting(p->outQueH) )
  419. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  420. cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed while draingin.");
  421. // release the output queue
  422. if( cmTsMp1cDestroy(&p->outQueH) != kOkThRC )
  423. {
  424. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  425. goto errLabel;
  426. }
  427. // release instance list
  428. while( p->insts != NULL )
  429. _cmTmInstFree(p,p->insts->instId);
  430. // release the task list
  431. cmTmTask_t* tp = p->tasks;
  432. while( tp != NULL )
  433. {
  434. cmTmTask_t* np = tp->link;
  435. cmMemFree(tp->label);
  436. cmMemFree(tp);
  437. tp = np;
  438. }
  439. cmMemFree(p);
  440. errLabel:
  441. return rc;
  442. }
  443. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr );
  444. cmTmRC_t cmTaskMgrCreate(
  445. cmCtx_t* ctx,
  446. cmTaskMgrH_t* hp,
  447. cmTaskMgrStatusCb_t statusCb,
  448. void* statusCbArg,
  449. unsigned maxActiveTaskCnt,
  450. unsigned queueByteCnt,
  451. unsigned pauseSleepMs)
  452. {
  453. cmTmRC_t rc = kOkTmRC;
  454. if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC )
  455. return rc;
  456. cmTm_t* p = cmMemAllocZ(cmTm_t,1);
  457. cmErrSetup(&p->err,&ctx->rpt,"Task Mgr.");
  458. p->maxActiveTaskCnt = maxActiveTaskCnt;
  459. p->statusCb = statusCb;
  460. p->statusCbArg = statusCbArg;
  461. p->pauseSleepMs = pauseSleepMs;
  462. // create the master thread
  463. if( cmThreadCreate(&p->mstrThH, _cmTmMasterThreadFunc,p,&ctx->rpt) != kOkThRC )
  464. {
  465. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.");
  466. goto errLabel;
  467. }
  468. // create the call input queue
  469. if(cmTs1p1cCreate( &p->callQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  470. {
  471. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The call input queue creation failed.");
  472. goto errLabel;
  473. }
  474. // create the output queue
  475. if( cmTsMp1cCreate( &p->outQueH, queueByteCnt, _cmTmMasterOutQueueCb, p, p->err.rpt ) != kOkThRC )
  476. {
  477. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue creation failed.");
  478. goto errLabel;
  479. }
  480. hp->h = p;
  481. errLabel:
  482. return rc;
  483. }
  484. cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp )
  485. {
  486. cmTmRC_t rc = kOkTmRC;
  487. if( hp==NULL || cmTaskMgrIsValid(*hp)==false )
  488. return rc;
  489. cmTm_t* p = _cmTmHandleToPtr(*hp);
  490. if((rc = _cmTmDestroy(p)) != kOkTmRC )
  491. return rc;
  492. hp->h = NULL;
  493. return rc;
  494. }
  495. void _cmTmWaitForCompletion( cmTm_t* p, unsigned timeOutMs )
  496. {
  497. unsigned durMs = 0;
  498. cmTimeSpec_t t0,t1;
  499. cmTimeGet(&t0);
  500. // Go into timeout loop - waiting for all instances to finish
  501. while( timeOutMs==0 || durMs < timeOutMs )
  502. {
  503. cmTimeGet(&t1);
  504. durMs += cmTimeElapsedMicros(&t0,&t1) / 1000;
  505. t0 = t1;
  506. cmSleepMs(p->pauseSleepMs);
  507. if( p->activeTaskCnt == 0 )
  508. break;
  509. }
  510. }
  511. cmTmRC_t cmTaskMgrClose( cmTaskMgrH_t h, unsigned flags, unsigned timeOutMs )
  512. {
  513. cmTmRC_t rc = kOkTmRC;
  514. cmTm_t* p = _cmTmHandleToPtr(h);
  515. bool fl = false;
  516. // if requested kill any queued tasks
  517. if( cmIsFlag(flags,kKillQueuedTmFl) )
  518. {
  519. cmTmInst_t* ip = p->insts;
  520. for(; ip!=NULL; ip=ip->link)
  521. if( ip->status == kQueuedTmId )
  522. ip->ctlId = kKillTmId;
  523. }
  524. // wait for any existing or queued tasks to complete
  525. _cmTmWaitForCompletion(p,timeOutMs);
  526. // force any queued msgs for the client to be sent
  527. cmTaskMgrOnIdle(h);
  528. // if the 'kill on timeout' flag is set then kill any remaining active tasks
  529. if( cmIsFlag(flags,kTimeOutKillTmFl) )
  530. {
  531. cmTmInst_t* ip = p->insts;
  532. for(; ip!=NULL; ip=ip->link)
  533. if( ip->status != kCompletedTmId )
  534. {
  535. ip->ctlId = kKillTmId;
  536. fl = true;
  537. }
  538. }
  539. // wait for the remaining tasks to complete
  540. if( fl )
  541. _cmTmWaitForCompletion(p,timeOutMs);
  542. // force any queued msgs for the client to be sent
  543. cmTaskMgrOnIdle(h);
  544. return rc;
  545. }
  546. unsigned cmTaskMgrActiveTaskCount( cmTaskMgrH_t h )
  547. {
  548. cmTm_t* p = _cmTmHandleToPtr(h);
  549. return p->activeTaskCnt;
  550. }
  551. bool cmTaskMgrIsValid( cmTaskMgrH_t h )
  552. { return h.h != NULL; }
  553. const cmChar_t* cmTaskMgrStatusIdToLabel( cmStatusTmId_t statusId )
  554. {
  555. typedef struct map_str
  556. {
  557. cmStatusTmId_t id;
  558. const cmChar_t* label;
  559. } map_t;
  560. map_t a[] =
  561. {
  562. { kQueuedTmId, "Queued" },
  563. { kStartedTmId, "Started" },
  564. { kPausedTmId, "Paused" },
  565. { kDeactivatedTmId, "Deactivated" },
  566. { kCompletedTmId, "Completed" },
  567. { kKilledTmId, "Killed" },
  568. { kInvalidTmId, "<Invalid>" },
  569. };
  570. int i;
  571. for(i=0; a[i].id!=kInvalidTmId; ++i)
  572. if( a[i].id == statusId )
  573. return a[i].label;
  574. return "<Unknown>";
  575. }
  576. // This function is called by cmTaskMgrIdle() to dispatch
  577. // status updates to the client.
  578. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr )
  579. {
  580. cmTm_t* p = (cmTm_t*)arg;
  581. cmTaskMgrStatusArg_t s;
  582. // This is probably not nesessary since changing the memory
  583. // pointed to by msgDataPtr should be safe even though it is marked as const.
  584. memcpy(&s,msgDataPtr,sizeof(s));
  585. // The 'text' and 'msg' data have been serialized after the status record.
  586. // The 'text' is guaranteed to at least contain a terminating zero.
  587. s.text = ((char*)msgDataPtr) + sizeof(s);
  588. // if the 'resultByteCnt' > 0 then there is a result record
  589. if( s.msgByteCnt > 0 )
  590. s.msg = ((char*)msgDataPtr) + sizeof(s) + strlen(s.text) + 1;
  591. else
  592. s.msg = NULL;
  593. s.arg = p->statusCbArg;
  594. p->statusCb( &s );
  595. return cmOkRC;
  596. }
  597. cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h )
  598. {
  599. cmTmRC_t rc = kOkTmRC;
  600. cmTm_t* p = _cmTmHandleToPtr(h);
  601. // Transmit any msgs waiting to be sent to the client.
  602. while( cmTsMp1cMsgWaiting(p->outQueH) )
  603. {
  604. // calling this function calls: _cmTmMasterOutQueueCb()
  605. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  606. {
  607. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed during a dequeue.");
  608. goto errLabel;
  609. }
  610. }
  611. // Step through the instance list and delete instances that are
  612. // completed and also marked for deletion.
  613. cmTmInst_t* ip = p->insts;
  614. while( ip != NULL )
  615. {
  616. cmTmInst_t* np = ip->link;
  617. if( ip->status==kCompletedTmId && ip->deleteOnCompleteFl )
  618. _cmTmInstFree(p,ip->instId);
  619. ip = np;
  620. }
  621. errLabel:
  622. return rc;
  623. }
  624. bool cmTaskMgrIsEnabled( cmTaskMgrH_t h )
  625. {
  626. cmTm_t* p = _cmTmHandleToPtr(h);
  627. return cmThreadState(p->mstrThH) != kPausedThId;
  628. }
  629. cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
  630. {
  631. cmTmRC_t rc = kOkTmRC;
  632. cmTm_t* p = _cmTmHandleToPtr(h);
  633. unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl;
  634. if( cmThreadPause(p->mstrThH, flags ) != kOkThRC )
  635. rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" );
  636. return rc;
  637. }
  638. cmTmRC_t cmTaskMgrInstall(
  639. cmTaskMgrH_t h,
  640. unsigned taskId,
  641. const cmChar_t* label,
  642. cmTaskMgrFunc_t func,
  643. cmTaskMgrRecv_t recv)
  644. {
  645. cmTmRC_t rc = kOkTmRC;
  646. cmTm_t* p = _cmTmHandleToPtr(h);
  647. cmTmTask_t* tp = cmMemAllocZ(cmTmTask_t,1);
  648. if( _cmTmTaskFromId(p,taskId) != NULL )
  649. {
  650. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"The task id %i is already in use.",taskId);
  651. goto errLabel;
  652. }
  653. tp->taskId = taskId;
  654. tp->func = func;
  655. tp->recv = recv;
  656. tp->label = cmMemAllocStr(label);
  657. tp->link = p->tasks;
  658. p->tasks = tp;
  659. errLabel:
  660. return rc;
  661. }
  662. cmTmRC_t cmTaskMgrCall(
  663. cmTaskMgrH_t h,
  664. unsigned taskId,
  665. void* funcArg,
  666. unsigned progCnt,
  667. unsigned queueByteCnt,
  668. const cmChar_t* label,
  669. unsigned* retInstIdPtr )
  670. {
  671. cmTmRC_t rc = kOkTmRC;
  672. cmTm_t* p = _cmTmHandleToPtr(h);
  673. cmTmTask_t* tp = NULL;
  674. cmTmInst_t* ip = NULL;
  675. if( retInstIdPtr != NULL )
  676. *retInstIdPtr = cmInvalidId;
  677. // locate the task for this instance
  678. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  679. {
  680. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"Task not found for task id=%i.",taskId);
  681. goto errLabel;
  682. }
  683. // allocate a new instance record
  684. ip = cmMemAllocZ(cmTmInst_t,1);
  685. // setup the instance record
  686. ip->instId = p->nextInstId++;
  687. ip->task = tp;
  688. ip->funcArg = funcArg;
  689. ip->progCnt = progCnt;
  690. ip->label = label==NULL ? NULL : cmMemAllocStr(label);
  691. ip->status = kQueuedTmId;
  692. ip->ctlId = kStartTmId;
  693. // create the msg input queue
  694. if(cmTs1p1cCreate( &ip->msgQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  695. {
  696. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The msg input queue creation failed.");
  697. goto errLabel;
  698. }
  699. // insert the new instance at the end of the instance list
  700. if( p->insts == NULL )
  701. p->insts = ip;
  702. else
  703. {
  704. cmTmInst_t* pp = p->insts;
  705. for(; pp != NULL; pp=pp->link )
  706. if( pp->link == NULL )
  707. {
  708. pp->link = ip;
  709. break;
  710. }
  711. }
  712. // enqueue the instance ptr in the input queue
  713. if( cmTs1p1cEnqueueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC )
  714. {
  715. rc = cmErrMsg(&p->err,kQueueFailTmRC,"New task instance command enqueue failed.");
  716. goto errLabel;
  717. }
  718. // set the returned instance id
  719. if( retInstIdPtr != NULL )
  720. *retInstIdPtr = ip->instId;
  721. // notify the client that the instance was enqueued
  722. cmTaskMgrStatusArg_t s;
  723. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,progCnt,NULL,NULL,0);
  724. p->statusCb( &s );
  725. errLabel:
  726. return rc;
  727. }
  728. cmTmRC_t cmTaskMgrCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId )
  729. {
  730. cmTmRC_t rc = kOkTmRC;
  731. cmTm_t* p = _cmTmHandleToPtr(h);
  732. cmTmInst_t* ip = NULL;
  733. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  734. {
  735. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  736. goto errLabel;
  737. }
  738. // Once an instance ctlId is set to kKillTmId don't allow it to change.
  739. if( ip->ctlId == kKillTmId )
  740. return rc;
  741. switch(ctlId )
  742. {
  743. case kStartTmId:
  744. // Acting on a 'start' cmd only makes sense if the previous command was 'pause'
  745. if( ip->ctlId == kPauseTmId )
  746. ip->ctlId = kStartTmId;
  747. break;
  748. case kPauseTmId:
  749. // Acting on a 'pause' command only makes sense if the previous command was a 'start'
  750. if( ip->ctlId == kStartTmId )
  751. ip->ctlId = kPauseTmId;
  752. break;
  753. case kKillTmId:
  754. ip->ctlId = kKillTmId;
  755. break;
  756. }
  757. errLabel:
  758. return rc;
  759. }
  760. cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId )
  761. {
  762. cmTm_t* p = _cmTmHandleToPtr(h);
  763. cmTmInst_t* ip = NULL;
  764. cmStatusTmId_t status = kInvalidTmId;
  765. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  766. {
  767. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  768. goto errLabel;
  769. }
  770. status = ip->status;
  771. errLabel:
  772. return status;
  773. }
  774. cmTmRC_t cmTaskMgrSendMsg( cmTaskMgrH_t h, unsigned instId, const void* msg, unsigned msgByteCnt )
  775. {
  776. cmTm_t* p = _cmTmHandleToPtr(h);
  777. cmTmRC_t rc = kOkTmRC;
  778. cmTmInst_t* ip = NULL;
  779. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  780. return cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  781. if( cmTs1p1cEnqueueMsg(ip->msgQueH, msg, msgByteCnt ) != kOkThRC )
  782. rc = cmErrMsg(&p->err,kQueueFailTmRC,"Task msg enqueue failed.");
  783. return rc;
  784. }
  785. const cmChar_t* cmTaskMgrTaskIdToLabel( cmTaskMgrH_t h, unsigned taskId )
  786. {
  787. cmTm_t* p = _cmTmHandleToPtr(h);
  788. cmTmTask_t* tp;
  789. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  790. return NULL;
  791. return tp->label;
  792. }
  793. const cmChar_t* cmTaskMgrInstIdToLabel( cmTaskMgrH_t h, unsigned instId )
  794. {
  795. cmTm_t* p = _cmTmHandleToPtr(h);
  796. cmTmInst_t* ip;
  797. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  798. return NULL;
  799. return ip->label;
  800. }
  801. const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId )
  802. {
  803. cmTm_t* p = _cmTmHandleToPtr(h);
  804. cmTmInst_t* ip = NULL;
  805. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  806. {
  807. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  808. goto errLabel;
  809. }
  810. if( ip->status != kCompletedTmId )
  811. {
  812. cmErrMsg(&p->err,kOpFailTmRC,"The result of a running task (id:%i) may not be accessed.",instId);
  813. goto errLabel;
  814. }
  815. return ip->result;
  816. errLabel:
  817. return NULL;
  818. }
  819. unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId )
  820. {
  821. cmTm_t* p = _cmTmHandleToPtr(h);
  822. cmTmInst_t* ip = NULL;
  823. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  824. {
  825. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  826. goto errLabel;
  827. }
  828. if( ip->status != kCompletedTmId )
  829. {
  830. cmErrMsg(&p->err,kOpFailTmRC,"The result byte count of a running task (id:%i) may not be accessed.",instId);
  831. goto errLabel;
  832. }
  833. return ip->resultByteCnt;
  834. errLabel:
  835. return 0;
  836. }
  837. void* cmTaskMgrFuncArg( cmTaskMgrH_t h, unsigned instId )
  838. {
  839. cmTm_t* p = _cmTmHandleToPtr(h);
  840. cmTmInst_t* ip = NULL;
  841. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  842. {
  843. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  844. goto errLabel;
  845. }
  846. if( ip->status != kCompletedTmId )
  847. {
  848. cmErrMsg(&p->err,kOpFailTmRC,"The function argument of a running task (id:%i) may not be accessed.",instId);
  849. goto errLabel;
  850. }
  851. return ip->funcArg;
  852. errLabel:
  853. return NULL;
  854. }
  855. cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId )
  856. {
  857. cmTmRC_t rc = kOkTmRC;
  858. cmTm_t* p = _cmTmHandleToPtr(h);
  859. cmTmInst_t* ip = NULL;
  860. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  861. {
  862. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  863. return 0;
  864. }
  865. ip->deleteOnCompleteFl = true;
  866. return rc;
  867. }
  868. void _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, cmSelTmId_t selId, cmStatusTmId_t statusId, unsigned prog, const cmChar_t* text )
  869. {
  870. cmTaskMgrStatusArg_t s;
  871. _cmTaskMgrStatusArgSetup(
  872. &s,
  873. a->statusCbArg,
  874. a->instId,
  875. statusId == kInvalidTmId ? kProgTmId : kStatusTmId,
  876. statusId == kInvalidTmId ? kStartedTmId : statusId,
  877. statusId == kInvalidTmId ? prog : 0,
  878. text,NULL,0);
  879. a->statusCb(&s);
  880. }
  881. cmTmWorkerRC_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a )
  882. {
  883. cmTmThread_t* trp = a->reserved;
  884. // Check if we should go into the paused or deactivated state.
  885. if( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true )
  886. {
  887. cmStatusTmId_t prvStatus = kInvalidTmId;
  888. do
  889. {
  890. // Note that it is possible that the state of the task switch from
  891. // paused <-> deactivated during the course of this loop.
  892. // In either case we continue looping but should report the change
  893. // to the client via a status callback.
  894. // change the instance status to reflect the true status
  895. trp->inst->status = trp->deactivateFl ? kDeactivatedTmId : kPausedTmId;
  896. // if the status actually changed then notify the client
  897. if( trp->inst->status != prvStatus )
  898. {
  899. _cmTaskMgrWorkerHelper(a,kStatusTmId,trp->inst->status,0,NULL);
  900. prvStatus = trp->inst->status;
  901. }
  902. // sleep the thread for pauseSleepMs milliseconds
  903. cmSleepMs(a->pauseSleepMs);
  904. // if the task was unpaused while we slept
  905. }while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl==true );
  906. // we are leaving the paused state because we were restarted or killed.
  907. switch( trp->inst->ctlId )
  908. {
  909. case kStartTmId:
  910. // change the instance status to 'started'.
  911. trp->inst->status = kStartedTmId;
  912. // notify the client of the change in state
  913. _cmTaskMgrWorkerHelper(a,kStatusTmId,kStartedTmId,0,NULL);
  914. break;
  915. case kKillTmId:
  916. // if killed the client will be notified in the worker thread wrapper
  917. // function: _cmTmWorkerThreadFunc()
  918. break;
  919. default:
  920. { assert(0); }
  921. }
  922. }
  923. else // There was no command to handle so check for incoming msg's.
  924. {
  925. if( cmTs1p1cMsgWaiting(trp->inst->msgQueH) )
  926. {
  927. // if the task registered a msg receive callback
  928. if( trp->inst->task->recv != NULL )
  929. {
  930. if( cmTs1p1cDequeueMsg(trp->inst->msgQueH, NULL, 0 ) != kOkThRC )
  931. {
  932. // ?????
  933. // ????? how do we send error messages back to the client
  934. // ??????
  935. return kOkTmwRC;
  936. }
  937. }
  938. else
  939. {
  940. }
  941. return kRecvTmwRC;
  942. }
  943. }
  944. // if ctlId==kKillTmId then the status update will be handled
  945. // when the task custom function returns in _cmTmWorkerThreadFunc()
  946. return trp->inst->ctlId == kKillTmId ? kStopTmwRC : kOkTmwRC;
  947. }
  948. cmTmRC_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
  949. {
  950. _cmTaskMgrWorkerHelper(a,kStatusTmId,statusId,0,NULL);
  951. return kOkTmRC;
  952. }
  953. cmTmRC_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* text )
  954. {
  955. _cmTaskMgrWorkerHelper(a,kProgTmId,kInvalidTmId,prog,text);
  956. return kOkTmRC;
  957. }
  958. cmTmRC_t cmTaskMgrWorkerSendProgressV( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, va_list vl )
  959. {
  960. cmTmThread_t* trp = a->reserved;
  961. cmTsVPrintfP(trp->text,fmt,vl);
  962. return cmTaskMgrWorkerSendProgress(a,prog,trp->text);
  963. }
  964. cmTmRC_t cmTaskMgrWorkerSendProgressF( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, ... )
  965. {
  966. va_list vl;
  967. va_start(vl,fmt);
  968. cmTmRC_t rc = cmTaskMgrWorkerSendProgressV(a,prog,fmt,vl);
  969. va_end(vl);
  970. return rc;
  971. }
  972. cmTmRC_t cmTaskMgrWorkerError( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* text )
  973. {
  974. _cmTaskMgrWorkerHelper(a, kErrorTmId, kInvalidTmId, rc, text);
  975. return rc;
  976. }
  977. cmTmRC_t cmTaskMgrWorkerErrorV( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, va_list vl )
  978. {
  979. cmTmThread_t* trp = a->reserved;
  980. cmTsVPrintfP(trp->text,fmt,vl);
  981. return cmTaskMgrWorkerError(a,rc,trp->text);
  982. }
  983. cmTmRC_t cmTaskMgrWorkerErrorF( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, ... )
  984. {
  985. va_list vl;
  986. va_start(vl,fmt);
  987. cmTmRC_t rc0 = cmTaskMgrWorkerErrorV(a,rc,fmt,vl);
  988. va_end(vl);
  989. return rc0;
  990. }
  991. cmTmRC_t cmTaskMgrWorkerSetResult( cmTaskMgrFuncArg_t* a, void* result, unsigned resultByteCnt )
  992. {
  993. cmTmThread_t* trp = a->reserved;
  994. trp->inst->result = result;
  995. trp->inst->resultByteCnt = resultByteCnt;
  996. return kOkTmRC;
  997. }
  998. unsigned cmTaskMgrWorkerMsgByteCount( cmTaskMgrFuncArg_t* a )
  999. {
  1000. cmTmThread_t* trp = a->reserved;
  1001. return cmTs1p1cDequeueMsgByteCount(trp->inst->msgQueH);
  1002. }
  1003. unsigned cmTaskMgrWorkerMsgRecv( cmTaskMgrFuncArg_t* a, void* buf, unsigned bufByteCnt )
  1004. {
  1005. cmTmThread_t* trp = a->reserved;
  1006. unsigned retVal = bufByteCnt;
  1007. switch( cmTs1p1cDequeueMsg(trp->inst->msgQueH, buf, bufByteCnt ) )
  1008. {
  1009. case kOkThRC:
  1010. break;
  1011. case kBufEmptyThRC:
  1012. retVal = 0;
  1013. break;
  1014. case kBufTooSmallThRC:
  1015. retVal = cmInvalidCnt;
  1016. break;
  1017. default:
  1018. { assert(0); }
  1019. }
  1020. return retVal;
  1021. }
  1022. cmTmRC_t cmTaskMgrWorkerMsgSend( cmTaskMgrFuncArg_t* a, const void* buf, unsigned bufByteCnt )
  1023. {
  1024. cmTmThread_t* trp = a->reserved;
  1025. return _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kMsgTmId,trp->inst->status,0,NULL,buf,bufByteCnt);
  1026. }
  1027. //-----------------------------------------------------------------------------
  1028. enum { kMaxTestInstCnt = 3 };
  1029. typedef struct cmTmTestInst_str
  1030. {
  1031. unsigned instId;
  1032. } cmTmTestInst_t;
  1033. typedef struct cmTmTestApp_str
  1034. {
  1035. cmErr_t* err;
  1036. cmTmTestInst_t insts[kMaxTestInstCnt];
  1037. } cmTmTestApp_t;
  1038. void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s )
  1039. {
  1040. cmRptPrintf(rpt,"inst:%i ",s->instId );
  1041. switch( s->selId )
  1042. {
  1043. case kStatusTmId:
  1044. {
  1045. const cmChar_t* label = cmTaskMgrStatusIdToLabel(s->statusId);
  1046. cmRptPrintf(rpt,"status '%s'",label);
  1047. }
  1048. break;
  1049. case kProgTmId:
  1050. cmRptPrintf(rpt,"prog %i",s->prog);
  1051. break;
  1052. case kErrorTmId:
  1053. cmRptPrintf(rpt,"error %s",cmStringNullGuard(s->msg));
  1054. break;
  1055. default:
  1056. { assert(0); }
  1057. }
  1058. cmRptPrintf(rpt,"\n");
  1059. }
  1060. // Test client status callback function.
  1061. void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s )
  1062. {
  1063. // s.arg set from cmTaskMgrCreate( ..., statusCbArg, ...);
  1064. cmTmTestApp_t* app = (cmTmTestApp_t*)s->arg;
  1065. unsigned i;
  1066. // locate the instance record assoc'd with this callback
  1067. for(i=0; i<kMaxTestInstCnt; ++i)
  1068. if( app->insts[i].instId == s->instId )
  1069. break;
  1070. if( i==kMaxTestInstCnt )
  1071. cmRptPrintf(app->err->rpt,"instId %i not found.\n",s->instId);
  1072. _cmTmTestReportStatus(app->err->rpt,s);
  1073. }
  1074. // Test worker function.
  1075. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg )
  1076. {
  1077. if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC )
  1078. return;
  1079. unsigned prog = 0;
  1080. for(; prog<arg->progCnt; ++prog)
  1081. {
  1082. if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC )
  1083. break;
  1084. cmSleepMs(1000);
  1085. if( cmTaskMgrWorkerSendProgress(arg,prog,NULL) == kStopTmwRC )
  1086. break;
  1087. }
  1088. }
  1089. cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx)
  1090. {
  1091. cmTmRC_t rc = kOkTmRC;
  1092. cmTaskMgrH_t tmH = cmTaskMgrNullHandle;
  1093. unsigned threadCnt = 2;
  1094. unsigned queueByteCnt = 1024;
  1095. unsigned pauseSleepMs = 50;
  1096. unsigned nextInstId = 0;
  1097. unsigned taskId = 0;
  1098. const cmChar_t* taskLabel = "Task Label";
  1099. cmTmTestApp_t app;
  1100. char c;
  1101. memset(&app,0,sizeof(app));
  1102. app.err = &ctx->err;
  1103. // create the task mgr
  1104. if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC )
  1105. {
  1106. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr create failed.");
  1107. goto errLabel;
  1108. }
  1109. // install a task
  1110. if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc, NULL ) != kOkTmRC )
  1111. {
  1112. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr task install failed.");
  1113. goto errLabel;
  1114. }
  1115. // go into interactive mode
  1116. printf("q=quit e=enable c=call i=idle\n");
  1117. while((c = getchar()) != 'q')
  1118. {
  1119. switch(c)
  1120. {
  1121. case 'i':
  1122. cmTaskMgrOnIdle(tmH);
  1123. cmRptPrintf(&ctx->rpt,"idled\n");
  1124. break;
  1125. case 'e':
  1126. {
  1127. // toggle the enable state of the task mgr.
  1128. bool fl = !cmTaskMgrIsEnabled(tmH);
  1129. if( cmTaskMgrEnable(tmH,fl) != kOkTmRC )
  1130. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test enable failed.");
  1131. else
  1132. cmRptPrintf(&ctx->rpt,"%s\n", fl ? "enabled" : "disabled" );
  1133. }
  1134. break;
  1135. case 'c':
  1136. if( nextInstId < kMaxTestInstCnt )
  1137. {
  1138. void* funcArg = app.insts + nextInstId;
  1139. unsigned progCnt = 5;
  1140. if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, queueByteCnt, "My Inst", &app.insts[nextInstId].instId ) != kOkTmRC )
  1141. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed.");
  1142. else
  1143. {
  1144. ++nextInstId;
  1145. cmRptPrintf(&ctx->rpt,"called\n");
  1146. }
  1147. }
  1148. }
  1149. }
  1150. errLabel:
  1151. // destroy the task mgr
  1152. if( cmTaskMgrDestroy(&tmH) != kOkTmRC )
  1153. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr destroy failed.");
  1154. return rc;
  1155. }