libcm is a C development framework with an emphasis on audio signal processing applications.
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

cmTaskMgr.c 38KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436
  1. #include "cmGlobal.h"
  2. #include "cmRpt.h"
  3. #include "cmErr.h"
  4. #include "cmCtx.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmThread.h"
  8. #include "cmTime.h"
  9. #include "cmTaskMgr.h"
  10. #include "cmLinkedHeap.h"
  11. #include "cmText.h"
  12. cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE;
  13. struct cmTmInst_str;
  14. typedef struct cmTmTask_str
  15. {
  16. unsigned taskId;
  17. cmChar_t* label;
  18. cmTaskMgrFunc_t func;
  19. cmTaskMgrRecv_t recv;
  20. struct cmTmTask_str* link;
  21. } cmTmTask_t;
  22. typedef struct cmTmInst_str
  23. {
  24. unsigned instId; // Task instance id.
  25. struct cmTmTask_str* task; // Pointer to task record for this task instance.
  26. void* funcArg; // Client supplied pointer to cmTaskMgrFuncArg_t.arg;
  27. unsigned progCnt; // Maximum expected progress (cmTaskMgrStatusArg_t.prog) to be used by this task instance.
  28. cmChar_t* label; // Optional instance label.
  29. cmStatusTmId_t status; // Current instance status (See cmStatusTmId_t)
  30. void* result; // Task instance result pointer.
  31. unsigned resultByteCnt; // Size of the task instance result pointer in bytes.
  32. cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread
  33. cmTs1p1cH_t msgQueH; // client->inst 'msg' communication queue
  34. bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete
  35. struct cmTmInst_str* link;
  36. } cmTmInst_t;
  37. struct cmTm_str* p;
  38. typedef struct cmTmThread_str
  39. {
  40. struct cmTm_str* p; // Pointer to task mgr.
  41. cmThreadH_t thH; // Thread handle.
  42. cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
  43. double durSecs; // Duration of the instance currently assigned to this thread in seconds.
  44. cmTimeSpec_t t0; // Task start time.
  45. cmTimeSpec_t t1; // Time of last review by the master thread.
  46. bool deactivateFl; // True if this instance has been deactivated by the system.
  47. cmTaskMgrFuncArg_t procArg; //
  48. cmChar_t* text; // Temporary text buffer
  49. struct cmTmThread_str* link; // p->threads link.
  50. } cmTmThread_t;
  51. typedef struct cmTm_str
  52. {
  53. cmErr_t err; // Task manager error object.
  54. cmThreadH_t mstrThH; // Master thread handle.
  55. cmTmThread_t* threads; // Thread record list.
  56. unsigned threadRecdCnt; // Current count of records in 'threads' list.
  57. unsigned maxActiveTaskCnt; // Max. number of active tasks.
  58. cmTaskMgrStatusCb_t statusCb; // Client task status callback.
  59. void* statusCbArg; // Client task status callback argument.
  60. unsigned pauseSleepMs; //
  61. cmTs1p1cH_t callQueH; // client->mgr 'inst' communication queue
  62. cmTsMp1cH_t outQueH; // mgr->client communication queue
  63. cmTmTask_t* tasks; // Task list.
  64. cmTmInst_t* insts; // Task instance list.
  65. unsigned nextInstId; // Next available task instance id.
  66. unsigned activeTaskCnt; // Current active task count.
  67. } cmTm_t;
  68. void _cmTaskMgrStatusArgSetup(
  69. cmTaskMgrStatusArg_t* s,
  70. void* arg,
  71. unsigned instId,
  72. cmSelTmId_t selId,
  73. cmStatusTmId_t statusId,
  74. unsigned prog,
  75. const cmChar_t* text,
  76. const void* msg,
  77. unsigned msgByteCnt )
  78. {
  79. s->arg = arg;
  80. s->instId = instId;
  81. s->selId = selId;
  82. s->statusId = statusId;
  83. s->prog = prog;
  84. s->text = text;
  85. s->msg = msg;
  86. s->msgByteCnt = msgByteCnt;
  87. }
  88. // Called by MASTER and WORKER.
  89. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s )
  90. {
  91. enum { arrayCnt = 3 };
  92. const void* msgPtrArray[arrayCnt];
  93. unsigned msgSizeArray[arrayCnt];
  94. msgPtrArray[0] = s;
  95. msgPtrArray[1] = s->text==NULL ? "" : s->text;
  96. msgPtrArray[2] = s->msg;
  97. msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t);
  98. msgSizeArray[1] = s->text==NULL ? 1 : strlen(s->text)+1;
  99. msgSizeArray[2] = s->msgByteCnt;
  100. if( cmTsMp1cEnqueueSegMsg(p->outQueH, msgPtrArray, msgSizeArray, arrayCnt ) != kOkThRC )
  101. return kQueueFailTmRC;
  102. return kOkTmRC;
  103. }
  104. // Called by MASTER and WORKER.
  105. // This function is called by the worker thread wrapper _cmTmWorkerStatusCb()
  106. // function to enqueue messages being sent back to the client.
  107. cmTmRC_t _cmTmEnqueueStatusMsg1(
  108. cmTm_t* p,
  109. unsigned instId,
  110. cmSelTmId_t selId,
  111. cmStatusTmId_t statusId,
  112. unsigned prog,
  113. const cmChar_t* text,
  114. const void* msg,
  115. unsigned msgByteCnt )
  116. {
  117. cmTaskMgrStatusArg_t s;
  118. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,text,msg,msgByteCnt);
  119. return _cmTmEnqueueStatusMsg0(p,&s);
  120. }
  121. // Called by WORKER.
  122. // Worker threads call this function to enqueue status messages
  123. // for delivery to the task mgr client.
  124. void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status )
  125. {
  126. cmTmThread_t* trp = (cmTmThread_t*)status->arg;
  127. if( _cmTmEnqueueStatusMsg0( trp->p, status ) != kOkTmRC )
  128. {
  129. /// ??????? HOW DO WE HANDLE ERRORS IN THE WORKER THREAD
  130. /// (set an error code in trp and let the master thread notice it.)
  131. assert(0);
  132. }
  133. }
  134. // Called by WORKER.
  135. // This function is called in the worker thread by
  136. // cmTs1p1cDequeueMsg() from within cmTaskMgrWorkerHandleCommand()
  137. // to transfer msg's waiting in the worker's incoming msg queue
  138. // (cmTmInst_t.msgQueH) to the instance recv function (cmTmInst_t.recv).
  139. cmRC_t _cmTmWorkerRecvCb( void* arg, unsigned msgByteCnt, const void* msg )
  140. {
  141. cmTmThread_t* trp = (cmTmThread_t*)arg;
  142. assert(trp->inst->task->recv);
  143. trp->inst->task->recv(&trp->procArg,msg,msgByteCnt);
  144. return cmOkRC;
  145. }
  146. // Called by WORKER.
  147. // This is the wrapper for all worker threads.
  148. bool _cmTmWorkerThreadFunc(void* arg)
  149. {
  150. cmTmThread_t* trp = (cmTmThread_t*)arg;
  151. trp->procArg.reserved = trp;
  152. trp->procArg.arg = trp->inst->funcArg;
  153. trp->procArg.instId = trp->inst->instId;
  154. trp->procArg.statusCb = _cmTmWorkerStatusCb;
  155. trp->procArg.statusCbArg = trp;
  156. trp->procArg.progCnt = trp->inst->progCnt;
  157. trp->procArg.pauseSleepMs= trp->p->pauseSleepMs;
  158. // if the task was paused or killed while it was queued then
  159. // cmTaskMgrHandleCommand() will do the right thing
  160. if( cmTaskMgrWorkerHandleCommand(&trp->procArg) != kStopTmwRC )
  161. {
  162. trp->inst->status = kStartedTmId;
  163. // Notify the client that the instance has started.
  164. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  165. // Execute the client provided task function.
  166. trp->inst->task->func(&trp->procArg);
  167. }
  168. // Notify the client if the instance was killed
  169. if( trp->inst->ctlId == kKillTmId )
  170. {
  171. trp->inst->status = kKilledTmId;
  172. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  173. }
  174. // Notify the client that the instance is completed
  175. // (but don't actually set the status yet)
  176. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kCompletedTmId,0,NULL,NULL,0);
  177. trp->inst->status = kCompletedTmId;
  178. // Force the thread to go into the 'pause' state when it
  179. // returns to it's internal loop. The master thread recognizes paused
  180. // threads as available for reuse.
  181. cmThreadPause(trp->thH,kPauseThFl);
  182. return true;
  183. }
  184. void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg )
  185. {
  186. _cmTmEnqueueStatusMsg1(p,cmInvalidId,kErrorTmId,kInvalidTmId,rc,msg,NULL,0);
  187. }
  188. int _cmTmSortThreadByDur( const void* t0, const void* t1 )
  189. {
  190. double d = ((cmTmThread_t*)t0)->durSecs - ((cmTmThread_t*)t1)->durSecs;
  191. return d== 0 ? 0 : (d<0 ? -1 : 1);
  192. }
  193. // This is the master thread function.
  194. bool _cmTmMasterThreadFunc(void* arg)
  195. {
  196. cmTm_t* p = (cmTm_t*)arg;
  197. unsigned activeThreadCnt = 0;
  198. unsigned activeTaskCnt = 0;
  199. cmTmThread_t* trp = p->threads;
  200. if( p->threadRecdCnt > 0 )
  201. {
  202. cmTmThread_t* thArray[p->threadRecdCnt];
  203. unsigned deactivatedCnt = 0;
  204. //
  205. // Determine the number of active threads and tasks
  206. //
  207. // for each thread record
  208. for(trp=p->threads; trp!=NULL; trp=trp->link)
  209. {
  210. cmThStateId_t thState;
  211. // if this thread is active ...
  212. if( (thState = cmThreadState(trp->thH)) != kPausedThId )
  213. {
  214. assert(trp->inst!=NULL);
  215. thArray[activeThreadCnt] = trp;
  216. ++activeThreadCnt;
  217. // if the task assigned to this thread is started then the task is active
  218. if( trp->inst->status == kStartedTmId )
  219. ++activeTaskCnt;
  220. // if the deactivatedFl is set then this thread has been deactivated by the system
  221. if( trp->deactivateFl )
  222. ++deactivatedCnt;
  223. // update the task lifetime duration
  224. if( trp->inst->status != kCompletedTmId )
  225. {
  226. cmTimeSpec_t t2;
  227. cmTimeGet(&t2);
  228. trp->durSecs += (double)cmTimeElapsedMicros(&trp->t1,&t2) / 1000000.0;
  229. trp->t1 = t2;
  230. }
  231. }
  232. }
  233. //
  234. // thArray[activeThreadCnt] now holds pointers to the
  235. // cmTmThread_t records of the active threads
  236. //
  237. // if more tasks are active than should be - then deactive the youngest
  238. if( activeTaskCnt > p->maxActiveTaskCnt )
  239. {
  240. // sort the active tasks in increasing order of lifetime
  241. qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  242. // determine the number of threads that need to be deactivated
  243. int n = activeTaskCnt - p->maxActiveTaskCnt;
  244. int i,j;
  245. // pause the active threads with the lowest lifetime
  246. for(i=0,j=0; i<activeThreadCnt && j<n; ++i)
  247. if( thArray[i]->deactivateFl == false )
  248. {
  249. thArray[i]->deactivateFl = true;
  250. ++deactivatedCnt;
  251. ++j;
  252. }
  253. }
  254. //
  255. // if there are deactivated tasks and the max thread count has not been reached
  256. // then re-activate some of the deactivated tasks.
  257. //
  258. if( activeTaskCnt < p->maxActiveTaskCnt && deactivatedCnt > 0 )
  259. {
  260. // sort the active tasks in increasing order of lifetime
  261. qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  262. int n = cmMin(p->maxActiveTaskCnt - activeTaskCnt, deactivatedCnt );
  263. int i;
  264. // re-activate the oldest deactivated tasks first
  265. for(i=activeThreadCnt-1; i>=0 && n>0; --i)
  266. if( thArray[i]->deactivateFl )
  267. {
  268. thArray[i]->deactivateFl = false;
  269. --n;
  270. ++activeTaskCnt;
  271. }
  272. }
  273. }
  274. // If the number of activeTaskCnt is less than the limit and a queued task exists
  275. while( activeTaskCnt < p->maxActiveTaskCnt && cmTs1p1cMsgWaiting(p->callQueH) )
  276. {
  277. cmTmInst_t* ip = NULL;
  278. cmTmThread_t* atrp = NULL;
  279. // Find a worker thread that is in the 'paused' state.
  280. // This is the definitive indication that the thread
  281. // does not have an assigned instance and the thread recd can be reused.
  282. for(trp=p->threads; trp!=NULL; trp=trp->link)
  283. if( cmThreadState(trp->thH) == kPausedThId )
  284. {
  285. atrp = trp;
  286. break;
  287. }
  288. // If all the existing worker threads are in use ...
  289. if( atrp==NULL )
  290. {
  291. // ... then create a new worker thread recd
  292. atrp = cmMemAllocZ(cmTmThread_t,1);
  293. // ... create the new worker thread
  294. if( cmThreadCreate(&atrp->thH,_cmTmWorkerThreadFunc,atrp,p->err.rpt) != kOkThRC )
  295. {
  296. cmMemFree(atrp);
  297. atrp = NULL;
  298. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread create failed.");
  299. break;
  300. }
  301. else
  302. {
  303. // ... setup the new thread record
  304. atrp->p = p;
  305. atrp->link = p->threads;
  306. p->threads = atrp;
  307. p->threadRecdCnt += 1;
  308. }
  309. }
  310. // if the thread creation failed
  311. if( atrp == NULL )
  312. break;
  313. // dequeue a pending task instance pointer from the input queue
  314. if(cmTs1p1cDequeueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC )
  315. {
  316. _cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue.");
  317. break;
  318. }
  319. // if the task has a msg recv callback then assign it here
  320. if( ip->task->recv != NULL )
  321. if( cmTs1p1cSetCallback( ip->msgQueH, _cmTmWorkerRecvCb, atrp ) != kOkThRC )
  322. {
  323. _cmTmMasterRptError(p,kQueueFailTmRC,"Worker thread msg queue callback assignment failed.");
  324. break;
  325. }
  326. // setup the thread record associated with the new task
  327. atrp->inst = ip;
  328. atrp->durSecs = 0;
  329. atrp->deactivateFl = false;
  330. cmTimeGet(&atrp->t0);
  331. atrp->t1 = atrp->t0;
  332. // start the worker thread
  333. if( cmThreadPause(atrp->thH,kWaitThFl) != kOkThRC )
  334. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed.");
  335. ++activeTaskCnt;
  336. }
  337. cmSleepMs(p->pauseSleepMs);
  338. p->activeTaskCnt = activeTaskCnt;
  339. return true;
  340. }
  341. cmTm_t* _cmTmHandleToPtr( cmTaskMgrH_t h )
  342. {
  343. cmTm_t* p = (cmTm_t*)h.h;
  344. assert( p != NULL );
  345. return p;
  346. }
  347. cmTmTask_t* _cmTmTaskFromId( cmTm_t* p, unsigned taskId )
  348. {
  349. cmTmTask_t* tp;
  350. for(tp=p->tasks; tp!=NULL; tp=tp->link)
  351. if( tp->taskId == taskId )
  352. return tp;
  353. return NULL;
  354. }
  355. cmTmInst_t* _cmTmInstFromId( cmTm_t* p, unsigned instId )
  356. {
  357. cmTmInst_t* ip;
  358. for(ip=p->insts; ip!=NULL; ip=ip->link)
  359. if( ip->instId == instId )
  360. return ip;
  361. return NULL;
  362. }
  363. cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId )
  364. {
  365. cmTmInst_t* ip = p->insts;
  366. cmTmInst_t* pp = NULL;
  367. for(; ip!=NULL; ip=ip->link)
  368. {
  369. if( ip->instId == instId )
  370. {
  371. if( pp == NULL )
  372. p->insts = ip->link;
  373. else
  374. pp->link = ip->link;
  375. if( cmTs1p1cDestroy(&ip->msgQueH) != kOkThRC )
  376. return cmErrMsg(&p->err,kQueueFailTmRC,"The 'msg' input queue destroy failed.");
  377. cmMemFree(ip->label);
  378. cmMemFree(ip->result);
  379. cmMemFree(ip);
  380. return kOkTmRC;
  381. }
  382. pp = ip;
  383. }
  384. return cmErrMsg(&p->err,kAssertFailTmRC,"The instance %i could not be found to be deleted.",instId);
  385. }
  386. cmTmRC_t _cmTmDestroy( cmTm_t* p )
  387. {
  388. cmTmRC_t rc = kOkTmRC;
  389. unsigned i;
  390. // stop and destroy the master thread
  391. if( cmThreadDestroy(&p->mstrThH) != kOkThRC )
  392. {
  393. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Master thread destroy failed.");
  394. goto errLabel;
  395. }
  396. // stop and destroy all the worker threads
  397. for(i=0; p->threads != NULL; ++i )
  398. {
  399. if( cmThreadDestroy(&p->threads->thH) != kOkThRC )
  400. {
  401. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread destruction failed for the worker thread at index %i.",i);
  402. goto errLabel;
  403. }
  404. cmTmThread_t* trp = p->threads;
  405. p->threads = p->threads->link;
  406. cmMemFree(trp->text);
  407. cmMemFree(trp);
  408. }
  409. // release the call input queue
  410. if( cmTs1p1cDestroy(&p->callQueH) != kOkThRC )
  411. {
  412. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The 'call' input queue destroy failed.");
  413. goto errLabel;
  414. }
  415. // draining the output queue
  416. while( cmTsMp1cMsgWaiting(p->outQueH) )
  417. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  418. cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed while draingin.");
  419. // release the output queue
  420. if( cmTsMp1cDestroy(&p->outQueH) != kOkThRC )
  421. {
  422. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  423. goto errLabel;
  424. }
  425. // release instance list
  426. while( p->insts != NULL )
  427. _cmTmInstFree(p,p->insts->instId);
  428. // release the task list
  429. cmTmTask_t* tp = p->tasks;
  430. while( tp != NULL )
  431. {
  432. cmTmTask_t* np = tp->link;
  433. cmMemFree(tp->label);
  434. cmMemFree(tp);
  435. tp = np;
  436. }
  437. cmMemFree(p);
  438. errLabel:
  439. return rc;
  440. }
  441. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr );
  442. cmTmRC_t cmTaskMgrCreate(
  443. cmCtx_t* ctx,
  444. cmTaskMgrH_t* hp,
  445. cmTaskMgrStatusCb_t statusCb,
  446. void* statusCbArg,
  447. unsigned maxActiveTaskCnt,
  448. unsigned queueByteCnt,
  449. unsigned pauseSleepMs)
  450. {
  451. cmTmRC_t rc = kOkTmRC;
  452. if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC )
  453. return rc;
  454. cmTm_t* p = cmMemAllocZ(cmTm_t,1);
  455. cmErrSetup(&p->err,&ctx->rpt,"Task Mgr.");
  456. p->maxActiveTaskCnt = maxActiveTaskCnt;
  457. p->statusCb = statusCb;
  458. p->statusCbArg = statusCbArg;
  459. p->pauseSleepMs = pauseSleepMs;
  460. // create the master thread
  461. if( cmThreadCreate(&p->mstrThH, _cmTmMasterThreadFunc,p,&ctx->rpt) != kOkThRC )
  462. {
  463. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.");
  464. goto errLabel;
  465. }
  466. // create the call input queue
  467. if(cmTs1p1cCreate( &p->callQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  468. {
  469. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The call input queue creation failed.");
  470. goto errLabel;
  471. }
  472. // create the output queue
  473. if( cmTsMp1cCreate( &p->outQueH, queueByteCnt, _cmTmMasterOutQueueCb, p, p->err.rpt ) != kOkThRC )
  474. {
  475. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue creation failed.");
  476. goto errLabel;
  477. }
  478. hp->h = p;
  479. errLabel:
  480. return rc;
  481. }
  482. cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp )
  483. {
  484. cmTmRC_t rc = kOkTmRC;
  485. if( hp==NULL || cmTaskMgrIsValid(*hp)==false )
  486. return rc;
  487. cmTm_t* p = _cmTmHandleToPtr(*hp);
  488. if((rc = _cmTmDestroy(p)) != kOkTmRC )
  489. return rc;
  490. hp->h = NULL;
  491. return rc;
  492. }
  493. void _cmTmWaitForCompletion( cmTm_t* p, unsigned timeOutMs )
  494. {
  495. unsigned durMs = 0;
  496. cmTimeSpec_t t0,t1;
  497. cmTimeGet(&t0);
  498. // Go into timeout loop - waiting for all instances to finish
  499. while( timeOutMs==0 || durMs < timeOutMs )
  500. {
  501. cmTimeGet(&t1);
  502. durMs += cmTimeElapsedMicros(&t0,&t1) / 1000;
  503. t0 = t1;
  504. cmSleepMs(p->pauseSleepMs);
  505. if( p->activeTaskCnt == 0 )
  506. break;
  507. }
  508. }
  509. cmTmRC_t cmTaskMgrClose( cmTaskMgrH_t h, unsigned flags, unsigned timeOutMs )
  510. {
  511. cmTmRC_t rc = kOkTmRC;
  512. cmTm_t* p = _cmTmHandleToPtr(h);
  513. bool fl = false;
  514. // if requested kill any queued tasks
  515. if( cmIsFlag(flags,kKillQueuedTmFl) )
  516. {
  517. cmTmInst_t* ip = p->insts;
  518. for(; ip!=NULL; ip=ip->link)
  519. if( ip->status == kQueuedTmId )
  520. ip->ctlId = kKillTmId;
  521. }
  522. // wait for any existing or queued tasks to complete
  523. _cmTmWaitForCompletion(p,timeOutMs);
  524. // force any queued msgs for the client to be sent
  525. cmTaskMgrOnIdle(h);
  526. // if the 'kill on timeout' flag is set then kill any remaining active tasks
  527. if( cmIsFlag(flags,kTimeOutKillTmFl) )
  528. {
  529. cmTmInst_t* ip = p->insts;
  530. for(; ip!=NULL; ip=ip->link)
  531. if( ip->status != kCompletedTmId )
  532. {
  533. ip->ctlId = kKillTmId;
  534. fl = true;
  535. }
  536. }
  537. // wait for the remaining tasks to complete
  538. if( fl )
  539. _cmTmWaitForCompletion(p,timeOutMs);
  540. // force any queued msgs for the client to be sent
  541. cmTaskMgrOnIdle(h);
  542. return rc;
  543. }
  544. unsigned cmTaskMgrActiveTaskCount( cmTaskMgrH_t h )
  545. {
  546. cmTm_t* p = _cmTmHandleToPtr(h);
  547. return p->activeTaskCnt;
  548. }
  549. bool cmTaskMgrIsValid( cmTaskMgrH_t h )
  550. { return h.h != NULL; }
  551. const cmChar_t* cmTaskMgrStatusIdToLabel( cmStatusTmId_t statusId )
  552. {
  553. typedef struct map_str
  554. {
  555. cmStatusTmId_t id;
  556. const cmChar_t* label;
  557. } map_t;
  558. map_t a[] =
  559. {
  560. { kQueuedTmId, "Queued" },
  561. { kStartedTmId, "Started" },
  562. { kPausedTmId, "Paused" },
  563. { kDeactivatedTmId, "Deactivated" },
  564. { kCompletedTmId, "Completed" },
  565. { kKilledTmId, "Killed" },
  566. { kInvalidTmId, "<Invalid>" },
  567. };
  568. int i;
  569. for(i=0; a[i].id!=kInvalidTmId; ++i)
  570. if( a[i].id == statusId )
  571. return a[i].label;
  572. return "<Unknown>";
  573. }
  574. // This function is called by cmTaskMgrIdle() to dispatch
  575. // status updates to the client.
  576. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr )
  577. {
  578. cmTm_t* p = (cmTm_t*)arg;
  579. cmTaskMgrStatusArg_t s;
  580. // This is probably not nesessary since changing the memory
  581. // pointed to by msgDataPtr should be safe even though it is marked as const.
  582. memcpy(&s,msgDataPtr,sizeof(s));
  583. // The 'text' and 'msg' data have been serialized after the status record.
  584. // The 'text' is guaranteed to at least contain a terminating zero.
  585. s.text = ((char*)msgDataPtr) + sizeof(s);
  586. // if the 'resultByteCnt' > 0 then there is a result record
  587. if( s.msgByteCnt > 0 )
  588. s.msg = ((char*)msgDataPtr) + sizeof(s) + strlen(s.text) + 1;
  589. else
  590. s.msg = NULL;
  591. s.arg = p->statusCbArg;
  592. p->statusCb( &s );
  593. return cmOkRC;
  594. }
  595. cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h )
  596. {
  597. cmTmRC_t rc = kOkTmRC;
  598. cmTm_t* p = _cmTmHandleToPtr(h);
  599. // Transmit any msgs waiting to be sent to the client.
  600. while( cmTsMp1cMsgWaiting(p->outQueH) )
  601. {
  602. // calling this function calls: _cmTmMasterOutQueueCb()
  603. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  604. {
  605. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed during a dequeue.");
  606. goto errLabel;
  607. }
  608. }
  609. // Step through the instance list and delete instances that are
  610. // completed and also marked for deletion.
  611. cmTmInst_t* ip = p->insts;
  612. while( ip != NULL )
  613. {
  614. cmTmInst_t* np = ip->link;
  615. if( ip->status==kCompletedTmId && ip->deleteOnCompleteFl )
  616. _cmTmInstFree(p,ip->instId);
  617. ip = np;
  618. }
  619. errLabel:
  620. return rc;
  621. }
  622. bool cmTaskMgrIsEnabled( cmTaskMgrH_t h )
  623. {
  624. cmTm_t* p = _cmTmHandleToPtr(h);
  625. return cmThreadState(p->mstrThH) != kPausedThId;
  626. }
  627. cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
  628. {
  629. cmTmRC_t rc = kOkTmRC;
  630. cmTm_t* p = _cmTmHandleToPtr(h);
  631. unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl;
  632. if( cmThreadPause(p->mstrThH, flags ) != kOkThRC )
  633. rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" );
  634. return rc;
  635. }
  636. cmTmRC_t cmTaskMgrInstall(
  637. cmTaskMgrH_t h,
  638. unsigned taskId,
  639. const cmChar_t* label,
  640. cmTaskMgrFunc_t func,
  641. cmTaskMgrRecv_t recv)
  642. {
  643. cmTmRC_t rc = kOkTmRC;
  644. cmTm_t* p = _cmTmHandleToPtr(h);
  645. cmTmTask_t* tp = cmMemAllocZ(cmTmTask_t,1);
  646. if( _cmTmTaskFromId(p,taskId) != NULL )
  647. {
  648. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"The task id %i is already in use.",taskId);
  649. goto errLabel;
  650. }
  651. tp->taskId = taskId;
  652. tp->func = func;
  653. tp->recv = recv;
  654. tp->label = cmMemAllocStr(label);
  655. tp->link = p->tasks;
  656. p->tasks = tp;
  657. errLabel:
  658. return rc;
  659. }
  660. cmTmRC_t cmTaskMgrCall(
  661. cmTaskMgrH_t h,
  662. unsigned taskId,
  663. void* funcArg,
  664. unsigned progCnt,
  665. unsigned queueByteCnt,
  666. const cmChar_t* label,
  667. unsigned* retInstIdPtr )
  668. {
  669. cmTmRC_t rc = kOkTmRC;
  670. cmTm_t* p = _cmTmHandleToPtr(h);
  671. cmTmTask_t* tp = NULL;
  672. cmTmInst_t* ip = NULL;
  673. if( retInstIdPtr != NULL )
  674. *retInstIdPtr = cmInvalidId;
  675. // locate the task for this instance
  676. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  677. {
  678. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"Task not found for task id=%i.",taskId);
  679. goto errLabel;
  680. }
  681. // allocate a new instance record
  682. ip = cmMemAllocZ(cmTmInst_t,1);
  683. // setup the instance record
  684. ip->instId = p->nextInstId++;
  685. ip->task = tp;
  686. ip->funcArg = funcArg;
  687. ip->progCnt = progCnt;
  688. ip->label = label==NULL ? NULL : cmMemAllocStr(label);
  689. ip->status = kQueuedTmId;
  690. ip->ctlId = kStartTmId;
  691. // create the msg input queue
  692. if(cmTs1p1cCreate( &ip->msgQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  693. {
  694. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The msg input queue creation failed.");
  695. goto errLabel;
  696. }
  697. // insert the new instance at the end of the instance list
  698. if( p->insts == NULL )
  699. p->insts = ip;
  700. else
  701. {
  702. cmTmInst_t* pp = p->insts;
  703. for(; pp != NULL; pp=pp->link )
  704. if( pp->link == NULL )
  705. {
  706. pp->link = ip;
  707. break;
  708. }
  709. }
  710. // enqueue the instance ptr in the input queue
  711. if( cmTs1p1cEnqueueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC )
  712. {
  713. rc = cmErrMsg(&p->err,kQueueFailTmRC,"New task instance command enqueue failed.");
  714. goto errLabel;
  715. }
  716. // set the returned instance id
  717. if( retInstIdPtr != NULL )
  718. *retInstIdPtr = ip->instId;
  719. // notify the client that the instance was enqueued
  720. cmTaskMgrStatusArg_t s;
  721. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,progCnt,NULL,NULL,0);
  722. p->statusCb( &s );
  723. errLabel:
  724. return rc;
  725. }
  726. cmTmRC_t cmTaskMgrCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId )
  727. {
  728. cmTmRC_t rc = kOkTmRC;
  729. cmTm_t* p = _cmTmHandleToPtr(h);
  730. cmTmInst_t* ip = NULL;
  731. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  732. {
  733. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  734. goto errLabel;
  735. }
  736. // Once an instance ctlId is set to kKillTmId don't allow it to change.
  737. if( ip->ctlId == kKillTmId )
  738. return rc;
  739. switch(ctlId )
  740. {
  741. case kStartTmId:
  742. // Acting on a 'start' cmd only makes sense if the previous command was 'pause'
  743. if( ip->ctlId == kPauseTmId )
  744. ip->ctlId = kStartTmId;
  745. break;
  746. case kPauseTmId:
  747. // Acting on a 'pause' command only makes sense if the previous command was a 'start'
  748. if( ip->ctlId == kStartTmId )
  749. ip->ctlId = kPauseTmId;
  750. break;
  751. case kKillTmId:
  752. ip->ctlId = kKillTmId;
  753. break;
  754. }
  755. errLabel:
  756. return rc;
  757. }
  758. cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId )
  759. {
  760. cmTm_t* p = _cmTmHandleToPtr(h);
  761. cmTmInst_t* ip = NULL;
  762. cmStatusTmId_t status = kInvalidTmId;
  763. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  764. {
  765. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  766. goto errLabel;
  767. }
  768. status = ip->status;
  769. errLabel:
  770. return status;
  771. }
  772. cmTmRC_t cmTaskMgrSendMsg( cmTaskMgrH_t h, unsigned instId, const void* msg, unsigned msgByteCnt )
  773. {
  774. cmTm_t* p = _cmTmHandleToPtr(h);
  775. cmTmRC_t rc = kOkTmRC;
  776. cmTmInst_t* ip = NULL;
  777. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  778. return cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  779. if( cmTs1p1cEnqueueMsg(ip->msgQueH, msg, msgByteCnt ) != kOkThRC )
  780. rc = cmErrMsg(&p->err,kQueueFailTmRC,"Task msg enqueue failed.");
  781. return rc;
  782. }
  783. const cmChar_t* cmTaskMgrTaskIdToLabel( cmTaskMgrH_t h, unsigned taskId )
  784. {
  785. cmTm_t* p = _cmTmHandleToPtr(h);
  786. cmTmTask_t* tp;
  787. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  788. return NULL;
  789. return tp->label;
  790. }
  791. const cmChar_t* cmTaskMgrInstIdToLabel( cmTaskMgrH_t h, unsigned instId )
  792. {
  793. cmTm_t* p = _cmTmHandleToPtr(h);
  794. cmTmInst_t* ip;
  795. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  796. return NULL;
  797. return ip->label;
  798. }
  799. const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId )
  800. {
  801. cmTm_t* p = _cmTmHandleToPtr(h);
  802. cmTmInst_t* ip = NULL;
  803. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  804. {
  805. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  806. goto errLabel;
  807. }
  808. if( ip->status != kCompletedTmId )
  809. {
  810. cmErrMsg(&p->err,kOpFailTmRC,"The result of a running task (id:%i) may not be accessed.",instId);
  811. goto errLabel;
  812. }
  813. return ip->result;
  814. errLabel:
  815. return NULL;
  816. }
  817. unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId )
  818. {
  819. cmTm_t* p = _cmTmHandleToPtr(h);
  820. cmTmInst_t* ip = NULL;
  821. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  822. {
  823. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  824. goto errLabel;
  825. }
  826. if( ip->status != kCompletedTmId )
  827. {
  828. cmErrMsg(&p->err,kOpFailTmRC,"The result byte count of a running task (id:%i) may not be accessed.",instId);
  829. goto errLabel;
  830. }
  831. return ip->resultByteCnt;
  832. errLabel:
  833. return 0;
  834. }
  835. void* cmTaskMgrFuncArg( cmTaskMgrH_t h, unsigned instId )
  836. {
  837. cmTm_t* p = _cmTmHandleToPtr(h);
  838. cmTmInst_t* ip = NULL;
  839. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  840. {
  841. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  842. goto errLabel;
  843. }
  844. if( ip->status != kCompletedTmId )
  845. {
  846. cmErrMsg(&p->err,kOpFailTmRC,"The function argument of a running task (id:%i) may not be accessed.",instId);
  847. goto errLabel;
  848. }
  849. return ip->funcArg;
  850. errLabel:
  851. return NULL;
  852. }
  853. cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId )
  854. {
  855. cmTmRC_t rc = kOkTmRC;
  856. cmTm_t* p = _cmTmHandleToPtr(h);
  857. cmTmInst_t* ip = NULL;
  858. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  859. {
  860. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  861. return 0;
  862. }
  863. ip->deleteOnCompleteFl = true;
  864. return rc;
  865. }
  866. void _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, cmSelTmId_t selId, cmStatusTmId_t statusId, unsigned prog, const cmChar_t* text )
  867. {
  868. cmTaskMgrStatusArg_t s;
  869. _cmTaskMgrStatusArgSetup(
  870. &s,
  871. a->statusCbArg,
  872. a->instId,
  873. statusId == kInvalidTmId ? kProgTmId : kStatusTmId,
  874. statusId == kInvalidTmId ? kStartedTmId : statusId,
  875. statusId == kInvalidTmId ? prog : 0,
  876. text,NULL,0);
  877. a->statusCb(&s);
  878. }
  879. cmTmWorkerRC_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a )
  880. {
  881. cmTmThread_t* trp = a->reserved;
  882. // Check if we should go into the paused or deactivated state.
  883. if( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true )
  884. {
  885. cmStatusTmId_t prvStatus = kInvalidTmId;
  886. do
  887. {
  888. // Note that it is possible that the state of the task switch from
  889. // paused <-> deactivated during the course of this loop.
  890. // In either case we continue looping but should report the change
  891. // to the client via a status callback.
  892. // change the instance status to reflect the true status
  893. trp->inst->status = trp->deactivateFl ? kDeactivatedTmId : kPausedTmId;
  894. // if the status actually changed then notify the client
  895. if( trp->inst->status != prvStatus )
  896. {
  897. _cmTaskMgrWorkerHelper(a,kStatusTmId,trp->inst->status,0,NULL);
  898. prvStatus = trp->inst->status;
  899. }
  900. // sleep the thread for pauseSleepMs milliseconds
  901. cmSleepMs(a->pauseSleepMs);
  902. // if the task was unpaused while we slept
  903. }while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl==true );
  904. // we are leaving the paused state because we were restarted or killed.
  905. switch( trp->inst->ctlId )
  906. {
  907. case kStartTmId:
  908. // change the instance status to 'started'.
  909. trp->inst->status = kStartedTmId;
  910. // notify the client of the change in state
  911. _cmTaskMgrWorkerHelper(a,kStatusTmId,kStartedTmId,0,NULL);
  912. break;
  913. case kKillTmId:
  914. // if killed the client will be notified in the worker thread wrapper
  915. // function: _cmTmWorkerThreadFunc()
  916. break;
  917. default:
  918. { assert(0); }
  919. }
  920. }
  921. else // There was no command to handle so check for incoming msg's.
  922. {
  923. if( cmTs1p1cMsgWaiting(trp->inst->msgQueH) )
  924. {
  925. // if the task registered a msg receive callback
  926. if( trp->inst->task->recv != NULL )
  927. {
  928. if( cmTs1p1cDequeueMsg(trp->inst->msgQueH, NULL, 0 ) != kOkThRC )
  929. {
  930. // ?????
  931. // ????? how do we send error messages back to the client
  932. // ??????
  933. return kOkTmwRC;
  934. }
  935. }
  936. else
  937. {
  938. }
  939. return kRecvTmwRC;
  940. }
  941. }
  942. // if ctlId==kKillTmId then the status update will be handled
  943. // when the task custom function returns in _cmTmWorkerThreadFunc()
  944. return trp->inst->ctlId == kKillTmId ? kStopTmwRC : kOkTmwRC;
  945. }
  946. cmTmRC_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
  947. {
  948. _cmTaskMgrWorkerHelper(a,kStatusTmId,statusId,0,NULL);
  949. return kOkTmRC;
  950. }
  951. cmTmRC_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* text )
  952. {
  953. _cmTaskMgrWorkerHelper(a,kProgTmId,kInvalidTmId,prog,text);
  954. return kOkTmRC;
  955. }
  956. cmTmRC_t cmTaskMgrWorkerSendProgressV( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, va_list vl )
  957. {
  958. cmTmThread_t* trp = a->reserved;
  959. cmTsVPrintfP(trp->text,fmt,vl);
  960. return cmTaskMgrWorkerSendProgress(a,prog,trp->text);
  961. }
  962. cmTmRC_t cmTaskMgrWorkerSendProgressF( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, ... )
  963. {
  964. va_list vl;
  965. va_start(vl,fmt);
  966. cmTmRC_t rc = cmTaskMgrWorkerSendProgressV(a,prog,fmt,vl);
  967. va_end(vl);
  968. return rc;
  969. }
  970. cmTmRC_t cmTaskMgrWorkerError( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* text )
  971. {
  972. _cmTaskMgrWorkerHelper(a, kErrorTmId, kInvalidTmId, rc, text);
  973. return rc;
  974. }
  975. cmTmRC_t cmTaskMgrWorkerErrorV( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, va_list vl )
  976. {
  977. cmTmThread_t* trp = a->reserved;
  978. cmTsVPrintfP(trp->text,fmt,vl);
  979. return cmTaskMgrWorkerError(a,rc,trp->text);
  980. }
  981. cmTmRC_t cmTaskMgrWorkerErrorF( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, ... )
  982. {
  983. va_list vl;
  984. va_start(vl,fmt);
  985. cmTmRC_t rc0 = cmTaskMgrWorkerErrorV(a,rc,fmt,vl);
  986. va_end(vl);
  987. return rc0;
  988. }
  989. cmTmRC_t cmTaskMgrWorkerSetResult( cmTaskMgrFuncArg_t* a, void* result, unsigned resultByteCnt )
  990. {
  991. cmTmThread_t* trp = a->reserved;
  992. trp->inst->result = result;
  993. trp->inst->resultByteCnt = resultByteCnt;
  994. return kOkTmRC;
  995. }
  996. unsigned cmTaskMgrWorkerMsgByteCount( cmTaskMgrFuncArg_t* a )
  997. {
  998. cmTmThread_t* trp = a->reserved;
  999. return cmTs1p1cDequeueMsgByteCount(trp->inst->msgQueH);
  1000. }
  1001. unsigned cmTaskMgrWorkerMsgRecv( cmTaskMgrFuncArg_t* a, void* buf, unsigned bufByteCnt )
  1002. {
  1003. cmTmThread_t* trp = a->reserved;
  1004. unsigned retVal = bufByteCnt;
  1005. switch( cmTs1p1cDequeueMsg(trp->inst->msgQueH, buf, bufByteCnt ) )
  1006. {
  1007. case kOkThRC:
  1008. break;
  1009. case kBufEmptyThRC:
  1010. retVal = 0;
  1011. break;
  1012. case kBufTooSmallThRC:
  1013. retVal = cmInvalidCnt;
  1014. break;
  1015. default:
  1016. { assert(0); }
  1017. }
  1018. return retVal;
  1019. }
  1020. cmTmRC_t cmTaskMgrWorkerMsgSend( cmTaskMgrFuncArg_t* a, const void* buf, unsigned bufByteCnt )
  1021. {
  1022. cmTmThread_t* trp = a->reserved;
  1023. return _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kMsgTmId,trp->inst->status,0,NULL,buf,bufByteCnt);
  1024. }
  1025. //-----------------------------------------------------------------------------
  1026. enum { kMaxTestInstCnt = 3 };
  1027. typedef struct cmTmTestInst_str
  1028. {
  1029. unsigned instId;
  1030. } cmTmTestInst_t;
  1031. typedef struct cmTmTestApp_str
  1032. {
  1033. cmErr_t* err;
  1034. cmTmTestInst_t insts[kMaxTestInstCnt];
  1035. } cmTmTestApp_t;
  1036. void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s )
  1037. {
  1038. cmRptPrintf(rpt,"inst:%i ",s->instId );
  1039. switch( s->selId )
  1040. {
  1041. case kStatusTmId:
  1042. {
  1043. const cmChar_t* label = cmTaskMgrStatusIdToLabel(s->statusId);
  1044. cmRptPrintf(rpt,"status '%s'",label);
  1045. }
  1046. break;
  1047. case kProgTmId:
  1048. cmRptPrintf(rpt,"prog %i",s->prog);
  1049. break;
  1050. case kErrorTmId:
  1051. cmRptPrintf(rpt,"error %s",cmStringNullGuard(s->msg));
  1052. break;
  1053. default:
  1054. { assert(0); }
  1055. }
  1056. cmRptPrintf(rpt,"\n");
  1057. }
  1058. // Test client status callback function.
  1059. void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s )
  1060. {
  1061. // s.arg set from cmTaskMgrCreate( ..., statusCbArg, ...);
  1062. cmTmTestApp_t* app = (cmTmTestApp_t*)s->arg;
  1063. unsigned i;
  1064. // locate the instance record assoc'd with this callback
  1065. for(i=0; i<kMaxTestInstCnt; ++i)
  1066. if( app->insts[i].instId == s->instId )
  1067. break;
  1068. if( i==kMaxTestInstCnt )
  1069. cmRptPrintf(app->err->rpt,"instId %i not found.\n",s->instId);
  1070. _cmTmTestReportStatus(app->err->rpt,s);
  1071. }
  1072. // Test worker function.
  1073. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg )
  1074. {
  1075. if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC )
  1076. return;
  1077. unsigned prog = 0;
  1078. for(; prog<arg->progCnt; ++prog)
  1079. {
  1080. if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC )
  1081. break;
  1082. cmSleepMs(1000);
  1083. if( cmTaskMgrWorkerSendProgress(arg,prog,NULL) == kStopTmwRC )
  1084. break;
  1085. }
  1086. }
  1087. cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx)
  1088. {
  1089. cmTmRC_t rc = kOkTmRC;
  1090. cmTaskMgrH_t tmH = cmTaskMgrNullHandle;
  1091. unsigned threadCnt = 2;
  1092. unsigned queueByteCnt = 1024;
  1093. unsigned pauseSleepMs = 50;
  1094. unsigned nextInstId = 0;
  1095. unsigned taskId = 0;
  1096. const cmChar_t* taskLabel = "Task Label";
  1097. cmTmTestApp_t app;
  1098. char c;
  1099. memset(&app,0,sizeof(app));
  1100. app.err = &ctx->err;
  1101. // create the task mgr
  1102. if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC )
  1103. {
  1104. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr create failed.");
  1105. goto errLabel;
  1106. }
  1107. // install a task
  1108. if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc, NULL ) != kOkTmRC )
  1109. {
  1110. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr task install failed.");
  1111. goto errLabel;
  1112. }
  1113. // go into interactive mode
  1114. printf("q=quit e=enable c=call i=idle\n");
  1115. while((c = getchar()) != 'q')
  1116. {
  1117. switch(c)
  1118. {
  1119. case 'i':
  1120. cmTaskMgrOnIdle(tmH);
  1121. cmRptPrintf(&ctx->rpt,"idled\n");
  1122. break;
  1123. case 'e':
  1124. {
  1125. // toggle the enable state of the task mgr.
  1126. bool fl = !cmTaskMgrIsEnabled(tmH);
  1127. if( cmTaskMgrEnable(tmH,fl) != kOkTmRC )
  1128. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test enable failed.");
  1129. else
  1130. cmRptPrintf(&ctx->rpt,"%s\n", fl ? "enabled" : "disabled" );
  1131. }
  1132. break;
  1133. case 'c':
  1134. if( nextInstId < kMaxTestInstCnt )
  1135. {
  1136. void* funcArg = app.insts + nextInstId;
  1137. unsigned progCnt = 5;
  1138. if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, queueByteCnt, "My Inst", &app.insts[nextInstId].instId ) != kOkTmRC )
  1139. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed.");
  1140. else
  1141. {
  1142. ++nextInstId;
  1143. cmRptPrintf(&ctx->rpt,"called\n");
  1144. }
  1145. }
  1146. }
  1147. }
  1148. errLabel:
  1149. // destroy the task mgr
  1150. if( cmTaskMgrDestroy(&tmH) != kOkTmRC )
  1151. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr destroy failed.");
  1152. return rc;
  1153. }