libcm is a C development framework with an emphasis on audio signal processing applications.
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

cmTaskMgr.c 27KB


  1. #include "cmGlobal.h"
  2. #include "cmRpt.h"
  3. #include "cmErr.h"
  4. #include "cmCtx.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmThread.h"
  8. #include "cmTime.h"
  9. #include "cmTaskMgr.h"
  10. cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE;
  11. struct cmTmInst_str;
  12. typedef struct cmTmTask_str
  13. {
  14. unsigned taskId;
  15. cmChar_t* label;
  16. cmTaskMgrFunc_t func;
  17. struct cmTmTask_str* link;
  18. } cmTmTask_t;
  19. typedef struct cmTmInst_str
  20. {
  21. unsigned instId;
  22. struct cmTmTask_str* task;
  23. void* funcArg;
  24. unsigned progCnt;
  25. cmStatusTmId_t status;
  26. void* result;
  27. unsigned resultByteCnt;
  28. cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread
  29. bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete
  30. struct cmTmInst_str* link;
  31. } cmTmInst_t;
  32. struct cmTm_str* p;
  33. typedef struct cmTmThread_str
  34. {
  35. struct cmTm_str* p; //
  36. cmThreadH_t thH; //
  37. cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
  38. double durSecs;
  39. cmTimeSpec_t t0;
  40. bool deactivateFl;
  41. struct cmTmThread_str* link;
  42. } cmTmThread_t;
  43. typedef struct cmTm_str
  44. {
  45. cmErr_t err;
  46. cmThreadH_t mstrThH; //
  47. cmTmThread_t* threads; //
  48. unsigned maxActiveThreadCnt; //
  49. unsigned threadRecdCnt;
  50. cmTaskMgrStatusCb_t statusCb; //
  51. void* statusCbArg; //
  52. unsigned pauseSleepMs; //
  53. cmTs1p1cH_t inQueH; // client->mgr
  54. cmTsMp1cH_t outQueH; // mgr->client
  55. cmTmTask_t* tasks; //
  56. cmTmInst_t* insts; //
  57. unsigned nextInstId;
  58. } cmTm_t;
  59. void _cmTaskMgrStatusArgSetup(
  60. cmTaskMgrStatusArg_t* s,
  61. void* arg,
  62. unsigned instId,
  63. cmSelTmId_t selId,
  64. cmStatusTmId_t statusId,
  65. unsigned prog,
  66. const cmChar_t* msg,
  67. void* result,
  68. unsigned resultByteCnt )
  69. {
  70. s->arg = arg;
  71. s->instId = instId;
  72. s->selId = selId;
  73. s->statusId = statusId;
  74. s->prog = prog;
  75. s->msg = msg;
  76. s->result = result;
  77. s->resultByteCnt = resultByteCnt;
  78. }
  79. // WARNING: THIS FUNCTION IS CALLED BY BOTH THE WORKER AND THE MASTER THREAD.
  80. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s )
  81. {
  82. enum { arrayCnt = 3 };
  83. const void* msgPtrArray[arrayCnt];
  84. unsigned msgSizeArray[arrayCnt];
  85. msgPtrArray[0] = s;
  86. msgPtrArray[1] = s->msg==NULL ? "" : s->msg;
  87. msgPtrArray[2] = s->result;
  88. msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t);
  89. msgSizeArray[1] = s->msg==NULL ? 1 : strlen(s->msg)+1;
  90. msgSizeArray[2] = s->resultByteCnt;
  91. if( cmTsMp1cEnqueueSegMsg(p->outQueH, msgPtrArray, msgSizeArray, arrayCnt ) != kOkThRC )
  92. return kQueueFailTmRC;
  93. return kOkTmRC;
  94. }
  95. // This function is called by the worker thread wrapper _cmTmWorkerStatusCb()
  96. // function to enqueue messages being sent back to the client.
  97. cmTmRC_t _cmTmEnqueueStatusMsg1(
  98. cmTm_t* p,
  99. unsigned instId,
  100. cmSelTmId_t selId,
  101. cmStatusTmId_t statusId,
  102. unsigned prog,
  103. const cmChar_t* msg,
  104. void* result,
  105. unsigned resultByteCnt )
  106. {
  107. cmTaskMgrStatusArg_t s;
  108. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,msg,result,resultByteCnt);
  109. return _cmTmEnqueueStatusMsg0(p,&s);
  110. }
  111. // Worker threads call this function to enqueue status messages
  112. // for delivery to the task mgr client.
  113. void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status )
  114. {
  115. cmTmThread_t* trp = (cmTmThread_t*)status->arg;
  116. if( _cmTmEnqueueStatusMsg0( trp->p, status ) != kOkTmRC )
  117. {
  118. /// ??????? HOW DO WE HANDLE ERRORS IN THE WORKER THREAD
  119. /// (set an error code in trp and let the master thread notice it.)
  120. assert(0);
  121. }
  122. }
  123. // This is the wrapper for all worker threads.
  124. bool _cmTmWorkerThreadFunc(void* arg)
  125. {
  126. cmTmThread_t* trp = (cmTmThread_t*)arg;
  127. cmTaskMgrFuncArg_t r;
  128. r.reserved = trp;
  129. r.arg = trp->inst->funcArg;
  130. r.instId = trp->inst->instId;
  131. r.statusCb = _cmTmWorkerStatusCb;
  132. r.statusCbArg = trp;
  133. r.progCnt = trp->inst->progCnt;
  134. r.pauseSleepMs= trp->p->pauseSleepMs;
  135. // if the task was paused or killed while it was queued then
  136. // cmTaskMgrHandleCommand() will do the right thing
  137. if( cmTaskMgrWorkerHandleCommand(&r) != kKillTmId )
  138. {
  139. trp->inst->status = kStartedTmId;
  140. // Notify the client that the instance has started.
  141. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  142. // Execute the client provided task function.
  143. trp->inst->task->func(&r);
  144. }
  145. // Notify the client that the instance has completed or been killed
  146. if( trp->inst->ctlId == kKillTmId )
  147. trp->inst->status = kKilledTmId;
  148. else
  149. trp->inst->status = kCompletedTmId;
  150. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  151. // Force the thread to go into the 'pause' state when it
  152. // returns to it's internal loop. The master thread recognizes paused
  153. // threads as available for reuse.
  154. cmThreadPause(trp->thH,kPauseThFl);
  155. return true;
  156. }
  157. void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg )
  158. {
  159. assert(0);
  160. }
  161. int _cmTmSortThreadByDur( const void* t0, const void* t1 )
  162. {
  163. double d = ((cmTmThread_t*)t0)->durSecs - ((cmTmThread_t*)t1)->durSecs;
  164. return d== 0 ? 0 : (d<0 ? -1 : 1);
  165. }
  166. // This is the master thread function.
  167. bool _cmTmMasterThreadFunc(void* arg)
  168. {
  169. cmTm_t* p = (cmTm_t*)arg;
  170. unsigned activeCnt = 0;
  171. cmTmThread_t* trp = p->threads;
  172. if( p->threadRecdCnt > 0 )
  173. {
  174. cmTmThread_t* thArray[p->threadRecdCnt];
  175. unsigned deactivatedCnt = 0;
  176. // for each thread record
  177. for(trp=p->threads; trp!=NULL; trp=trp->link)
  178. {
  179. cmThStateId_t thState;
  180. // if this thread is active ...
  181. if( (thState = cmThreadState(trp->thH)) != kPausedThId )
  182. {
  183. // update the task lifetime duration
  184. cmTimeSpec_t t1;
  185. cmTimeGet(&t1);
  186. trp->durSecs += (double)cmTimeElapsedMicros(&trp->t0,&t1) / 1000000.0;
  187. trp->t0 = t1;
  188. assert(trp->inst!=NULL);
  189. // if the task assoc'd with this thread is running
  190. if( trp->inst->status == kStartedTmId )
  191. {
  192. thArray[activeCnt] = trp;
  193. ++activeCnt;
  194. }
  195. // count the number of deactivated threads
  196. if( trp->deactivateFl )
  197. ++deactivatedCnt;
  198. }
  199. }
  200. // The first 'activeCnt' elements of thArray[] now point to
  201. // cmTmThread_t records of the active tasks.
  202. // if more tasks are active than should be - then deactive the youngest
  203. if( activeCnt > p->maxActiveThreadCnt )
  204. {
  205. // sort the active tasks in increasing order of lifetime
  206. qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  207. // determine the number of threads that need to be paused
  208. int n = activeCnt - p->maxActiveThreadCnt;
  209. int i;
  210. // pause the active threads with the lowest lifetime
  211. for(i=0; i<n ; ++i)
  212. if( thArray[i]->deactivateFl == false )
  213. {
  214. thArray[i]->deactivateFl = true;
  215. ++deactivatedCnt;
  216. }
  217. }
  218. // if there are deactivated tasks and the max thread count has not been reached
  219. // then re-activate some of the deactivated tasks.
  220. if( activeCnt < p->maxActiveThreadCnt && deactivatedCnt > 0 )
  221. {
  222. // sort the active tasks in increasing order of lifetime
  223. qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
  224. int n = cmMin(p->maxActiveThreadCnt - activeCnt, deactivatedCnt );
  225. int i;
  226. // re-activate the oldest deactivated tasks
  227. for(i=activeCnt-1; i>=0 && n>0; --i)
  228. if( thArray[i]->deactivateFl )
  229. {
  230. thArray[i]->deactivateFl = false;
  231. --n;
  232. ++activeCnt;
  233. }
  234. }
  235. }
  236. // if a queued task exists
  237. while( cmTs1p1cMsgWaiting(p->inQueH) )
  238. {
  239. cmTmInst_t* ip = NULL;
  240. cmTmThread_t* atrp = NULL;
  241. activeCnt = 0;
  242. // Find a worker thread that is in the 'paused' state.
  243. // This is the definitive indication that the thread
  244. // does not have an assigned instance
  245. for(trp=p->threads; trp!=NULL; trp=trp->link)
  246. {
  247. if( cmThreadState(trp->thH) == kPausedThId )
  248. atrp = trp;
  249. else
  250. ++activeCnt;
  251. }
  252. // If the maximum number of active threads already exists then we cannot start a new task
  253. if( activeCnt >= p->maxActiveThreadCnt )
  254. break;
  255. // If all the existing worker threads are busy
  256. // but the maximum number of threads has not yet been allocated ...
  257. if( atrp==NULL && p->threadRecdCnt < p->maxActiveThreadCnt)
  258. {
  259. // ... then create a new worker thread recd
  260. atrp = cmMemAllocZ(cmTmThread_t,1);
  261. // ... create the new worker thread
  262. if( cmThreadCreate(&atrp->thH,_cmTmWorkerThreadFunc,atrp,p->err.rpt) != kOkThRC )
  263. {
  264. cmMemFree(atrp);
  265. atrp = NULL;
  266. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread create failed.");
  267. break;
  268. }
  269. else
  270. {
  271. // ... setup the new thread record
  272. atrp->p = p;
  273. atrp->link = p->threads;
  274. p->threads = atrp;
  275. p->threadRecdCnt += 1;
  276. }
  277. }
  278. // if there are no available threads then give up
  279. if( atrp == NULL )
  280. break;
  281. // dequeue a pending task instance pointer from the input queue
  282. if(cmTs1p1cDequeueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC )
  283. {
  284. _cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue.");
  285. break;
  286. }
  287. // setup the thread record associated with the new task
  288. atrp->inst = ip;
  289. atrp->durSecs = 0;
  290. atrp->deactivateFl = false;
  291. // start the worker thread
  292. if( cmThreadPause(atrp->thH,0) != kOkThRC )
  293. _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed.");
  294. }
  295. cmSleepMs(p->pauseSleepMs);
  296. return true;
  297. }
  298. cmTm_t* _cmTmHandleToPtr( cmTaskMgrH_t h )
  299. {
  300. cmTm_t* p = (cmTm_t*)h.h;
  301. assert( p != NULL );
  302. return p;
  303. }
  304. cmTmTask_t* _cmTmTaskFromId( cmTm_t* p, unsigned taskId )
  305. {
  306. cmTmTask_t* tp;
  307. for(tp=p->tasks; tp!=NULL; tp=tp->link)
  308. if( tp->taskId == taskId )
  309. return tp;
  310. return NULL;
  311. }
  312. cmTmInst_t* _cmTmInstFromId( cmTm_t* p, unsigned instId )
  313. {
  314. cmTmInst_t* ip;
  315. for(ip=p->insts; ip!=NULL; ip=ip->link)
  316. if( ip->instId == instId )
  317. return ip;
  318. return NULL;
  319. }
  320. cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId )
  321. {
  322. cmTmInst_t* ip = p->insts;
  323. cmTmInst_t* pp = NULL;
  324. for(; ip!=NULL; ip=ip->link)
  325. {
  326. if( ip->instId == instId )
  327. {
  328. if( pp == NULL )
  329. p->insts = ip->link;
  330. else
  331. pp->link = ip->link;
  332. cmMemFree(ip->result);
  333. cmMemFree(ip);
  334. return kOkTmRC;
  335. }
  336. pp = ip;
  337. }
  338. return cmErrMsg(&p->err,kAssertFailTmRC,"The instance %i could not be found to be deleted.",instId);
  339. }
  340. cmTmRC_t _cmTmDestroy( cmTm_t* p )
  341. {
  342. cmTmRC_t rc = kOkTmRC;
  343. unsigned i;
  344. // stop and destroy the master thread
  345. if( cmThreadDestroy(&p->mstrThH) != kOkThRC )
  346. {
  347. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Master thread destroy failed.");
  348. goto errLabel;
  349. }
  350. // stop and destroy all the worker threads
  351. for(i=0; p->threads != NULL; ++i )
  352. {
  353. if( cmThreadDestroy(&p->threads->thH) != kOkThRC )
  354. {
  355. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread destruction failed for the worker thread at index %i.",i);
  356. goto errLabel;
  357. }
  358. cmTmThread_t* trp = p->threads;
  359. p->threads = p->threads->link;
  360. cmMemFree(trp);
  361. }
  362. // release the input queue
  363. if( cmTs1p1cDestroy(&p->inQueH) != kOkThRC )
  364. {
  365. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  366. goto errLabel;
  367. }
  368. // draining the output queue
  369. while( cmTsMp1cMsgWaiting(p->outQueH) )
  370. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  371. cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed while draingin.");
  372. // release the output queue
  373. if( cmTsMp1cDestroy(&p->outQueH) != kOkThRC )
  374. {
  375. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  376. goto errLabel;
  377. }
  378. // release instance list
  379. while( p->insts != NULL )
  380. _cmTmInstFree(p,p->insts->instId);
  381. // release the task list
  382. cmTmTask_t* tp = p->tasks;
  383. while( tp != NULL )
  384. {
  385. cmTmTask_t* np = tp->link;
  386. cmMemFree(tp->label);
  387. cmMemFree(tp);
  388. tp = np;
  389. }
  390. cmMemFree(p);
  391. errLabel:
  392. return rc;
  393. }
  394. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr );
  395. cmTmRC_t cmTaskMgrCreate(
  396. cmCtx_t* ctx,
  397. cmTaskMgrH_t* hp,
  398. cmTaskMgrStatusCb_t statusCb,
  399. void* statusCbArg,
  400. unsigned maxActiveThreadCnt,
  401. unsigned queueByteCnt,
  402. unsigned pauseSleepMs)
  403. {
  404. cmTmRC_t rc = kOkTmRC;
  405. if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC )
  406. return rc;
  407. cmTm_t* p = cmMemAllocZ(cmTm_t,1);
  408. cmErrSetup(&p->err,&ctx->rpt,"Task Mgr.");
  409. p->maxActiveThreadCnt = maxActiveThreadCnt;
  410. p->statusCb = statusCb;
  411. p->statusCbArg = statusCbArg;
  412. p->pauseSleepMs = pauseSleepMs;
  413. // create the master thread
  414. if( cmThreadCreate(&p->mstrThH, _cmTmMasterThreadFunc,p,&ctx->rpt) != kOkThRC )
  415. {
  416. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.");
  417. goto errLabel;
  418. }
  419. // create the input queue
  420. if(cmTs1p1cCreate( &p->inQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  421. {
  422. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue creation failed.");
  423. goto errLabel;
  424. }
  425. // create the output queue
  426. if( cmTsMp1cCreate( &p->outQueH, queueByteCnt, _cmTmMasterOutQueueCb, p, p->err.rpt ) != kOkThRC )
  427. {
  428. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue creation failed.");
  429. goto errLabel;
  430. }
  431. hp->h = p;
  432. errLabel:
  433. return rc;
  434. }
  435. cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp )
  436. {
  437. cmTmRC_t rc = kOkTmRC;
  438. if( hp==NULL || cmTaskMgrIsValid(*hp)==false )
  439. return rc;
  440. cmTm_t* p = _cmTmHandleToPtr(*hp);
  441. if((rc = _cmTmDestroy(p)) != kOkTmRC )
  442. return rc;
  443. hp->h = NULL;
  444. return rc;
  445. }
  446. bool cmTaskMgrIsValid( cmTaskMgrH_t h )
  447. { return h.h != NULL; }
  448. // This function is called by cmTaskMgrIdle() to dispatch
  449. // status updates to the client.
  450. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr )
  451. {
  452. cmTm_t* p = (cmTm_t*)arg;
  453. cmTaskMgrStatusArg_t s;
  454. // This is probably not nesessary since changing the memory
  455. // pointed to by msgDataPtr should be safe even though it is marked as const.
  456. memcpy(&s,msgDataPtr,sizeof(s));
  457. // The 'msg' and 'result' data have been serialized after the status record.
  458. // The 'msg' is guaranteed to at least contain a terminating zero.
  459. s.msg = ((char*)msgDataPtr) + sizeof(s);
  460. // if the 'resultByteCnt' > 0 then there is a result record
  461. if( s.resultByteCnt > 0 )
  462. s.result = ((char*)msgDataPtr) + sizeof(s) + strlen(s.msg) + 1;
  463. else
  464. s.result = NULL;
  465. s.arg = p->statusCbArg;
  466. p->statusCb( &s );
  467. return cmOkRC;
  468. }
  469. cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h )
  470. {
  471. cmTmRC_t rc = kOkTmRC;
  472. cmTm_t* p = _cmTmHandleToPtr(h);
  473. // Transmit any msgs waiting to be sent to the client.
  474. while( cmTsMp1cMsgWaiting(p->outQueH) )
  475. {
  476. // calling this function calls: _cmTmMasterOutQueueCb()
  477. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  478. {
  479. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed during a dequeue.");
  480. goto errLabel;
  481. }
  482. }
  483. // Step through the instance list and delete instances that are
  484. // completed and also marked for deletion.
  485. cmTmInst_t* ip = p->insts;
  486. while( ip != NULL )
  487. {
  488. cmTmInst_t* np = ip->link;
  489. if( (ip->status==kCompletedTmId || ip->status==kKilledTmId) && ip->deleteOnCompleteFl )
  490. _cmTmInstFree(p,ip->instId);
  491. ip = np;
  492. }
  493. errLabel:
  494. return rc;
  495. }
  496. bool cmTaskMgrIsEnabled( cmTaskMgrH_t h )
  497. {
  498. cmTm_t* p = _cmTmHandleToPtr(h);
  499. return cmThreadState(p->mstrThH) != kPausedThId;
  500. }
  501. cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
  502. {
  503. cmTmRC_t rc = kOkTmRC;
  504. cmTm_t* p = _cmTmHandleToPtr(h);
  505. unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl;
  506. if( cmThreadPause(p->mstrThH, flags ) != kOkThRC )
  507. rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" );
  508. return rc;
  509. }
  510. cmTmRC_t cmTaskMgrInstall( cmTaskMgrH_t h, unsigned taskId, const cmChar_t* label, cmTaskMgrFunc_t func )
  511. {
  512. cmTmRC_t rc = kOkTmRC;
  513. cmTm_t* p = _cmTmHandleToPtr(h);
  514. cmTmTask_t* tp = cmMemAllocZ(cmTmTask_t,1);
  515. if( _cmTmTaskFromId(p,taskId) != NULL )
  516. {
  517. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"The task id %i is already in use.",taskId);
  518. goto errLabel;
  519. }
  520. tp->taskId = taskId;
  521. tp->func = func;
  522. tp->label = cmMemAllocStr(label);
  523. tp->link = p->tasks;
  524. p->tasks = tp;
  525. errLabel:
  526. return rc;
  527. }
  528. cmTmRC_t cmTaskMgrCall(
  529. cmTaskMgrH_t h,
  530. unsigned taskId,
  531. void* funcArg,
  532. unsigned progCnt,
  533. unsigned* retInstIdPtr )
  534. {
  535. cmTmRC_t rc = kOkTmRC;
  536. cmTm_t* p = _cmTmHandleToPtr(h);
  537. cmTmTask_t* tp = NULL;
  538. cmTmInst_t* ip = NULL;
  539. if( retInstIdPtr != NULL )
  540. *retInstIdPtr = cmInvalidId;
  541. // locate the task for this instance
  542. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  543. {
  544. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"Task not found for task id=%i.",taskId);
  545. goto errLabel;
  546. }
  547. // allocate a new instance record
  548. ip = cmMemAllocZ(cmTmInst_t,1);
  549. // setupt the instance record
  550. ip->instId = p->nextInstId++;
  551. ip->task = tp;
  552. ip->funcArg = funcArg;
  553. ip->progCnt = progCnt;
  554. ip->status = kQueuedTmId;
  555. // insert the new instance at the end of the instance list
  556. if( p->insts == NULL )
  557. p->insts = ip;
  558. else
  559. {
  560. cmTmInst_t* pp = p->insts;
  561. for(; pp != NULL; pp=pp->link )
  562. if( pp->link == NULL )
  563. {
  564. pp->link = ip;
  565. break;
  566. }
  567. }
  568. // enque the instance ptr in the input queue
  569. if( cmTs1p1cEnqueueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC )
  570. {
  571. rc = cmErrMsg(&p->err,kQueueFailTmRC,"New task instance command enqueue failed.");
  572. goto errLabel;
  573. }
  574. // set the returned instance id
  575. if( retInstIdPtr != NULL )
  576. *retInstIdPtr = ip->instId;
  577. // notify the client that the instance was enqueued
  578. cmTaskMgrStatusArg_t s;
  579. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,0,NULL,NULL,0);
  580. p->statusCb( &s );
  581. errLabel:
  582. return rc;
  583. }
  584. cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId )
  585. {
  586. cmTmRC_t rc = kOkTmRC;
  587. cmTm_t* p = _cmTmHandleToPtr(h);
  588. cmTmInst_t* ip = NULL;
  589. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  590. {
  591. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  592. goto errLabel;
  593. }
  594. // Once an instance ctlId is set to kKillTmId don't allow it to change.
  595. if( ip->ctlId == kKillTmId )
  596. return rc;
  597. switch(ctlId )
  598. {
  599. case kNoneTmId:
  600. break;
  601. case kStartTmId:
  602. // Acting on a 'start' cmd only makes sense if the previous command was 'pause'
  603. if( ip->ctlId == kPauseTmId )
  604. ip->ctlId = kStartTmId;
  605. break;
  606. case kPauseTmId:
  607. // Acting on a 'pause' command only makes sense if this is the first command
  608. // or the previous command was a 'start'
  609. if( ip->ctlId == kNoneTmId || ip->ctlId == kStartTmId )
  610. ip->ctlId = kPauseTmId;
  611. break;
  612. case kKillTmId:
  613. ip->ctlId = kKillTmId;
  614. break;
  615. }
  616. errLabel:
  617. return rc;
  618. }
  619. cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId )
  620. {
  621. cmTm_t* p = _cmTmHandleToPtr(h);
  622. cmTmInst_t* ip = NULL;
  623. cmStatusTmId_t status = kInvalidTmId;
  624. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  625. {
  626. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  627. goto errLabel;
  628. }
  629. status = ip->status;
  630. errLabel:
  631. return status;
  632. }
  633. const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId )
  634. {
  635. cmTm_t* p = _cmTmHandleToPtr(h);
  636. cmTmInst_t* ip = NULL;
  637. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  638. {
  639. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  640. return NULL;
  641. }
  642. return ip->result;
  643. }
  644. unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId )
  645. {
  646. cmTm_t* p = _cmTmHandleToPtr(h);
  647. cmTmInst_t* ip = NULL;
  648. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  649. {
  650. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  651. return 0;
  652. }
  653. return ip->resultByteCnt;
  654. }
  655. cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId )
  656. {
  657. cmTmRC_t rc = kOkTmRC;
  658. cmTm_t* p = _cmTmHandleToPtr(h);
  659. cmTmInst_t* ip = NULL;
  660. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  661. {
  662. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  663. return 0;
  664. }
  665. ip->deleteOnCompleteFl = true;
  666. return rc;
  667. }
  668. cmTaskMgrCtlId_t _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId )
  669. {
  670. cmTaskMgrStatusArg_t s;
  671. _cmTaskMgrStatusArgSetup(
  672. &s,
  673. a->statusCbArg,
  674. a->instId,
  675. statusId == kInvalidTmId ? kProgTmId : kStatusTmId,
  676. statusId == kInvalidTmId ? kStartedTmId : statusId,
  677. statusId == kInvalidTmId ? prog : 0,
  678. NULL,NULL,0);
  679. a->statusCb(&s);
  680. return cmTaskMgrWorkerHandleCommand(a);
  681. }
  682. cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a )
  683. {
  684. cmTmThread_t* trp = a->reserved;
  685. while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true )
  686. {
  687. // change the instance status to 'paused'.
  688. trp->inst->status = kPausedTmId;
  689. // notify the client of the change in state
  690. cmTaskMgrWorkerSendStatus(a,kPausedTmId);
  691. // sleep the thread for pauseSleepMs milliseconds
  692. cmSleepMs(a->pauseSleepMs);
  693. // if the task was unpaused while we slept
  694. if( trp->inst->ctlId == kStartTmId && trp->deactivateFl == false )
  695. {
  696. // change the instance status to 'started'.
  697. trp->inst->status = kStartedTmId;
  698. // notify the client of the change in state
  699. cmTaskMgrWorkerSendStatus(a,kStartedTmId);
  700. }
  701. }
  702. // if ctlId==kKillTmId then the status update will be handled
  703. // when the task custom function returns in _cmTmWorkerThreadFunc()
  704. return trp->inst->ctlId;
  705. }
  706. cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
  707. { return _cmTaskMgrWorkerHelper(a,0,statusId); }
  708. cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog )
  709. { return _cmTaskMgrWorkerHelper(a,prog,kInvalidTmId); }
  710. //-----------------------------------------------------------------------------
  711. enum { kMaxTestInstCnt = 3 };
  712. typedef struct cmTmTestInst_str
  713. {
  714. unsigned instId;
  715. } cmTmTestInst_t;
  716. typedef struct cmTmTestApp_str
  717. {
  718. cmErr_t* err;
  719. cmTmTestInst_t insts[kMaxTestInstCnt];
  720. } cmTmTestApp_t;
  721. void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s )
  722. {
  723. cmRptPrintf(rpt,"inst:%i ",s->instId );
  724. switch( s->selId )
  725. {
  726. case kStatusTmId:
  727. {
  728. const cmChar_t* label = "<none>";
  729. switch( s->statusId )
  730. {
  731. case kInvalidTmId: label="<Invalid>"; break;
  732. case kQueuedTmId: label="Queued"; break;
  733. case kStartedTmId: label="Started"; break;
  734. case kCompletedTmId: label="Completed"; break;
  735. case kKilledTmId: label="Killed"; break;
  736. default:
  737. { assert(0); }
  738. }
  739. cmRptPrintf(rpt,"status '%s'",label);
  740. }
  741. break;
  742. case kProgTmId:
  743. cmRptPrintf(rpt,"prog %i",s->prog);
  744. break;
  745. case kErrorTmId:
  746. cmRptPrintf(rpt,"error %s",cmStringNullGuard(s->msg));
  747. break;
  748. default:
  749. { assert(0); }
  750. }
  751. cmRptPrintf(rpt,"\n");
  752. }
  753. // Test client status callback function.
  754. void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s )
  755. {
  756. // s.arg set from cmTaskMgrCreate( ..., statusCbArg, ...);
  757. cmTmTestApp_t* app = (cmTmTestApp_t*)s->arg;
  758. unsigned i;
  759. // locate the instance record assoc'd with this callback
  760. for(i=0; i<kMaxTestInstCnt; ++i)
  761. if( app->insts[i].instId == s->instId )
  762. break;
  763. if( i==kMaxTestInstCnt )
  764. cmRptPrintf(app->err->rpt,"instId %i not found.\n",s->instId);
  765. _cmTmTestReportStatus(app->err->rpt,s);
  766. }
  767. // Test worker function.
  768. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg )
  769. {
  770. if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId )
  771. return;
  772. unsigned prog = 0;
  773. for(; prog<arg->progCnt; ++prog)
  774. {
  775. if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId )
  776. break;
  777. cmSleepMs(1000);
  778. if( cmTaskMgrWorkerSendProgress(arg,prog) == kKillTmId )
  779. break;
  780. }
  781. }
  782. cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx)
  783. {
  784. cmTmRC_t rc = kOkTmRC;
  785. cmTaskMgrH_t tmH = cmTaskMgrNullHandle;
  786. unsigned threadCnt = 2;
  787. unsigned queueByteCnt = 1024;
  788. unsigned pauseSleepMs = 50;
  789. unsigned nextInstId = 0;
  790. unsigned taskId = 0;
  791. const cmChar_t* taskLabel = "Task Label";
  792. cmTmTestApp_t app;
  793. char c;
  794. memset(&app,0,sizeof(app));
  795. app.err = &ctx->err;
  796. // create the task mgr
  797. if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC )
  798. {
  799. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr create failed.");
  800. goto errLabel;
  801. }
  802. // install a task
  803. if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc ) != kOkTmRC )
  804. {
  805. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr task install failed.");
  806. goto errLabel;
  807. }
  808. // go into interactive mode
  809. printf("q=quit e=enable c=call i=idle\n");
  810. while((c = getchar()) != 'q')
  811. {
  812. switch(c)
  813. {
  814. case 'i':
  815. cmTaskMgrOnIdle(tmH);
  816. cmRptPrintf(&ctx->rpt,"idled\n");
  817. break;
  818. case 'e':
  819. {
  820. // toggle the enable state of the task mgr.
  821. bool fl = !cmTaskMgrIsEnabled(tmH);
  822. if( cmTaskMgrEnable(tmH,fl) != kOkTmRC )
  823. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test enable failed.");
  824. else
  825. cmRptPrintf(&ctx->rpt,"%s\n", fl ? "enabled" : "disabled" );
  826. }
  827. break;
  828. case 'c':
  829. if( nextInstId < kMaxTestInstCnt )
  830. {
  831. void* funcArg = app.insts + nextInstId;
  832. unsigned progCnt = 5;
  833. if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, &app.insts[nextInstId].instId ) != kOkTmRC )
  834. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed.");
  835. else
  836. {
  837. ++nextInstId;
  838. cmRptPrintf(&ctx->rpt,"called\n");
  839. }
  840. }
  841. }
  842. }
  843. errLabel:
  844. // destroy the task mgr
  845. if( cmTaskMgrDestroy(&tmH) != kOkTmRC )
  846. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr destroy failed.");
  847. return rc;
  848. }