libcm is a C development framework with an emphasis on audio signal processing applications.
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

cmTaskMgr.c 23KB


  1. #include "cmGlobal.h"
  2. #include "cmRpt.h"
  3. #include "cmErr.h"
  4. #include "cmCtx.h"
  5. #include "cmMem.h"
  6. #include "cmMallocDebug.h"
  7. #include "cmThread.h"
  8. #include "cmTaskMgr.h"
  9. cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE;
  10. struct cmTmInst_str;
  11. typedef struct cmTmTask_str
  12. {
  13. unsigned taskId;
  14. cmChar_t* label;
  15. cmTaskMgrFunc_t func;
  16. struct cmTmTask_str* link;
  17. } cmTmTask_t;
  18. typedef struct cmTmInst_str
  19. {
  20. unsigned instId;
  21. struct cmTmTask_str* task;
  22. void* funcArg;
  23. unsigned progCnt;
  24. cmStatusTmId_t status;
  25. void* result;
  26. unsigned resultByteCnt;
  27. cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread
  28. bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete
  29. struct cmTmInst_str* link;
  30. } cmTmInst_t;
  31. struct cmTm_str* p;
  32. typedef struct cmTmThread_str
  33. {
  34. struct cmTm_str* p; //
  35. cmThreadH_t thH; //
  36. cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
  37. } cmTmThread_t;
  38. typedef struct cmTm_str
  39. {
  40. cmErr_t err;
  41. cmTmThread_t* thArray; //
  42. unsigned threadCnt; //
  43. cmTaskMgrStatusCb_t statusCb; //
  44. void* statusCbArg; //
  45. unsigned pauseSleepMs;
  46. cmTs1p1cH_t inQueH; // client->mgr
  47. cmTsMp1cH_t outQueH; // mgr->client
  48. cmTmTask_t* tasks; //
  49. cmTmInst_t* insts; //
  50. unsigned nextInstId;
  51. } cmTm_t;
  52. void _cmTaskMgrStatusArgSetup(
  53. cmTaskMgrStatusArg_t* s,
  54. void* arg,
  55. unsigned instId,
  56. cmSelTmId_t selId,
  57. cmStatusTmId_t statusId,
  58. unsigned prog,
  59. const cmChar_t* msg,
  60. void* result,
  61. unsigned resultByteCnt )
  62. {
  63. s->arg = arg;
  64. s->instId = instId;
  65. s->selId = selId;
  66. s->statusId = statusId;
  67. s->prog = prog;
  68. s->msg = msg;
  69. s->result = result;
  70. s->resultByteCnt = resultByteCnt;
  71. }
  72. // WARNING: THIS FUNCTION IS CALLED BY BOTH THE WORKER AND THE MASTER THREAD.
  73. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s )
  74. {
  75. enum { arrayCnt = 3 };
  76. const void* msgPtrArray[arrayCnt];
  77. unsigned msgSizeArray[arrayCnt];
  78. msgPtrArray[0] = s;
  79. msgPtrArray[1] = s->msg==NULL ? "" : s->msg;
  80. msgPtrArray[2] = s->result;
  81. msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t);
  82. msgSizeArray[1] = s->msg==NULL ? 1 : strlen(s->msg)+1;
  83. msgSizeArray[2] = s->resultByteCnt;
  84. if( cmTsMp1cEnqueueSegMsg(p->outQueH, msgPtrArray, msgSizeArray, arrayCnt ) != kOkThRC )
  85. return kQueueFailTmRC;
  86. return kOkTmRC;
  87. }
  88. // This function is called by the worker thread wrapper _cmTmWorkerStatusCb()
  89. // function to enqueue messages being sent back to the client.
  90. cmTmRC_t _cmTmEnqueueStatusMsg1(
  91. cmTm_t* p,
  92. unsigned instId,
  93. cmSelTmId_t selId,
  94. cmStatusTmId_t statusId,
  95. unsigned prog,
  96. const cmChar_t* msg,
  97. void* result,
  98. unsigned resultByteCnt )
  99. {
  100. cmTaskMgrStatusArg_t s;
  101. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,msg,result,resultByteCnt);
  102. return _cmTmEnqueueStatusMsg0(p,&s);
  103. }
  104. // Worker threads call this function to enqueue status messages
  105. // for delivery to the task mgr client.
  106. void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status )
  107. {
  108. cmTmThread_t* trp = (cmTmThread_t*)status->arg;
  109. if( _cmTmEnqueueStatusMsg0( trp->p, status ) != kOkTmRC )
  110. {
  111. /// ??????? HOW DO WE HANDLE ERRORS IN THE WORKER THREAD
  112. /// (set an error code in trp and let the master thread notice it.)
  113. assert(0);
  114. }
  115. }
  116. // This is the wrapper for all worker threads.
  117. bool _cmTmWorkerThreadFunc(void* arg)
  118. {
  119. cmTmThread_t* trp = (cmTmThread_t*)arg;
  120. cmTaskMgrFuncArg_t r;
  121. r.reserved = trp;
  122. r.arg = trp->inst->funcArg;
  123. r.instId = trp->inst->instId;
  124. r.statusCb = _cmTmWorkerStatusCb;
  125. r.statusCbArg = trp;
  126. r.progCnt = trp->inst->progCnt;
  127. r.pauseSleepMs= trp->p->pauseSleepMs;
  128. // if the task was paused or killed while it was queued then
  129. // cmTaskMgrHandleCommand() will do the right thing
  130. if( cmTaskMgrHandleCommand(&r) != kKillTmId )
  131. {
  132. trp->inst->status = kStartedTmId;
  133. // Notify the client that the instance has started.
  134. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  135. // Execute the client provided task function.
  136. trp->inst->task->func(&r);
  137. }
  138. // Notify the client that the instance has completed or been killed
  139. if( trp->inst->ctlId == kKillTmId )
  140. trp->inst->status = kKilledTmId;
  141. else
  142. trp->inst->status = kCompletedTmId;
  143. _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
  144. trp->inst = NULL;
  145. // Force the thread to go into the 'pause' state when it
  146. // returns to it's internal loop. The master thread recognizes paused
  147. // threads as available.
  148. cmThreadPause(trp->thH,kPauseThFl);
  149. return true;
  150. }
  151. // This is the master thread function.
  152. bool _cmTmMasterThreadFunc(void* arg)
  153. {
  154. cmTmThread_t* trp = (cmTmThread_t*)arg;
  155. cmTm_t* p = trp->p;
  156. while( cmTs1p1cMsgWaiting(p->inQueH) )
  157. {
  158. unsigned i;
  159. cmTmInst_t* ip = NULL;
  160. // find an available worker thread
  161. for(i=1; i<p->threadCnt; ++i)
  162. if( cmThreadState(p->thArray[i].thH) == kPausedThId )
  163. break;
  164. // if all worker threads are busy ... give up
  165. if( i==p->threadCnt )
  166. break;
  167. // dequeue a pending task instance pointer from the input queue
  168. if(cmTs1p1cDequeueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC )
  169. {
  170. /// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD
  171. continue;
  172. }
  173. // assign the instance to the available thread
  174. p->thArray[i].inst = ip;
  175. // start the thread and wait for it to enter the running state.
  176. if( cmThreadPause(p->thArray[i].thH,0) != kOkThRC )
  177. {
  178. /// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD
  179. }
  180. }
  181. cmSleepMs(p->pauseSleepMs);
  182. return true;
  183. }
  184. cmTm_t* _cmTmHandleToPtr( cmTaskMgrH_t h )
  185. {
  186. cmTm_t* p = (cmTm_t*)h.h;
  187. assert( p != NULL );
  188. return p;
  189. }
  190. cmTmTask_t* _cmTmTaskFromId( cmTm_t* p, unsigned taskId )
  191. {
  192. cmTmTask_t* tp;
  193. for(tp=p->tasks; tp!=NULL; tp=tp->link)
  194. if( tp->taskId == taskId )
  195. return tp;
  196. return NULL;
  197. }
  198. cmTmInst_t* _cmTmInstFromId( cmTm_t* p, unsigned instId )
  199. {
  200. cmTmInst_t* ip;
  201. for(ip=p->insts; ip!=NULL; ip=ip->link)
  202. if( ip->instId == instId )
  203. return ip;
  204. return NULL;
  205. }
  206. cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId )
  207. {
  208. cmTmInst_t* ip = p->insts;
  209. cmTmInst_t* pp = NULL;
  210. for(; ip!=NULL; ip=ip->link)
  211. {
  212. if( ip->instId == instId )
  213. {
  214. if( pp == NULL )
  215. p->insts = ip->link;
  216. else
  217. pp->link = ip->link;
  218. cmMemFree(ip->result);
  219. cmMemFree(ip);
  220. return kOkTmRC;
  221. }
  222. pp = ip;
  223. }
  224. return cmErrMsg(&p->err,kAssertFailTmRC,"The instance %i could not be found to be deleted.",instId);
  225. }
  226. cmTmRC_t _cmTmDestroy( cmTm_t* p )
  227. {
  228. cmTmRC_t rc = kOkTmRC;
  229. unsigned i;
  230. // stop and destroy all the threads
  231. for(i=0; i<p->threadCnt; ++i)
  232. {
  233. if( cmThreadDestroy(&p->thArray[i].thH) != kOkThRC )
  234. {
  235. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i destroy failed.",i);
  236. goto errLabel;
  237. }
  238. }
  239. cmMemFree(p->thArray);
  240. // release the input queue
  241. if( cmTs1p1cDestroy(&p->inQueH) != kOkThRC )
  242. {
  243. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  244. goto errLabel;
  245. }
  246. // draining the output queue
  247. while( cmTsMp1cMsgWaiting(p->outQueH) )
  248. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  249. cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed while draingin.");
  250. // release the output queue
  251. if( cmTsMp1cDestroy(&p->outQueH) != kOkThRC )
  252. {
  253. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed.");
  254. goto errLabel;
  255. }
  256. // release instance list
  257. while( p->insts != NULL )
  258. _cmTmInstFree(p,p->insts->instId);
  259. // release the task list
  260. cmTmTask_t* tp = p->tasks;
  261. while( tp != NULL )
  262. {
  263. cmTmTask_t* np = tp->link;
  264. cmMemFree(tp->label);
  265. cmMemFree(tp);
  266. tp = np;
  267. }
  268. cmMemFree(p);
  269. errLabel:
  270. return rc;
  271. }
  272. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr );
  273. cmTmRC_t cmTaskMgrCreate(
  274. cmCtx_t* ctx,
  275. cmTaskMgrH_t* hp,
  276. cmTaskMgrStatusCb_t statusCb,
  277. void* statusCbArg,
  278. unsigned threadCnt,
  279. unsigned queueByteCnt,
  280. unsigned pauseSleepMs)
  281. {
  282. cmTmRC_t rc = kOkTmRC;
  283. unsigned i;
  284. if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC )
  285. return rc;
  286. cmTm_t* p = cmMemAllocZ(cmTm_t,1);
  287. cmErrSetup(&p->err,&ctx->rpt,"Task Mgr.");
  288. threadCnt += 1;
  289. p->thArray = cmMemAllocZ(cmTmThread_t,threadCnt);
  290. p->threadCnt = threadCnt;
  291. p->statusCb = statusCb;
  292. p->statusCbArg = statusCbArg;
  293. p->pauseSleepMs = pauseSleepMs;
  294. // create the threads
  295. for(i=0; i<threadCnt; ++i)
  296. {
  297. if( cmThreadCreate(&p->thArray[i].thH, i==0 ? _cmTmMasterThreadFunc : _cmTmWorkerThreadFunc, p->thArray+i, &ctx->rpt ) != kOkThRC )
  298. {
  299. rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.",i);
  300. goto errLabel;
  301. }
  302. p->thArray[i].p = p;
  303. }
  304. // create the input queue
  305. if(cmTs1p1cCreate( &p->inQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC )
  306. {
  307. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue creation failed.");
  308. goto errLabel;
  309. }
  310. // create the output queue
  311. if( cmTsMp1cCreate( &p->outQueH, queueByteCnt, _cmTmMasterOutQueueCb, p, p->err.rpt ) != kOkThRC )
  312. {
  313. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue creation failed.");
  314. goto errLabel;
  315. }
  316. hp->h = p;
  317. errLabel:
  318. return rc;
  319. }
  320. cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp )
  321. {
  322. cmTmRC_t rc = kOkTmRC;
  323. if( hp==NULL || cmTaskMgrIsValid(*hp)==false )
  324. return rc;
  325. cmTm_t* p = _cmTmHandleToPtr(*hp);
  326. if((rc = _cmTmDestroy(p)) != kOkTmRC )
  327. return rc;
  328. hp->h = NULL;
  329. return rc;
  330. }
  331. bool cmTaskMgrIsValid( cmTaskMgrH_t h )
  332. { return h.h != NULL; }
  333. // This function is called by cmTaskMgrIdle() to dispatch
  334. // status updates to the client.
  335. cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgDataPtr )
  336. {
  337. cmTm_t* p = (cmTm_t*)arg;
  338. cmTaskMgrStatusArg_t s;
  339. // This is probably not nesessary since changing the memory
  340. // pointed to by msgDataPtr should be safe even though it is marked as const.
  341. memcpy(&s,msgDataPtr,sizeof(s));
  342. // The 'msg' and 'result' data have been serialized after the status record.
  343. // The 'msg' is guaranteed to at least contain a terminating zero.
  344. s.msg = ((char*)msgDataPtr) + sizeof(s);
  345. // if the 'resultByteCnt' > 0 then there is a result record
  346. if( s.resultByteCnt > 0 )
  347. s.result = ((char*)msgDataPtr) + sizeof(s) + strlen(s.msg) + 1;
  348. else
  349. s.result = NULL;
  350. s.arg = p->statusCbArg;
  351. p->statusCb( &s );
  352. return cmOkRC;
  353. }
  354. cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h )
  355. {
  356. cmTmRC_t rc = kOkTmRC;
  357. cmTm_t* p = _cmTmHandleToPtr(h);
  358. // Transmit any msgs waiting to be sent to the client.
  359. while( cmTsMp1cMsgWaiting(p->outQueH) )
  360. {
  361. // calling this function calls: _cmTmMasterOutQueueCb()
  362. if(cmTsMp1cDequeueMsg(p->outQueH,NULL,0) != kOkThRC )
  363. {
  364. rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed during a dequeue.");
  365. goto errLabel;
  366. }
  367. }
  368. // Step through the instance list and delete instances that are
  369. // completed and also marked for deletion.
  370. cmTmInst_t* ip = p->insts;
  371. while( ip != NULL )
  372. {
  373. cmTmInst_t* np = ip->link;
  374. if( (ip->status==kCompletedTmId || ip->status==kKilledTmId) && ip->deleteOnCompleteFl )
  375. _cmTmInstFree(p,ip->instId);
  376. ip = np;
  377. }
  378. errLabel:
  379. return rc;
  380. }
  381. bool cmTaskMgrIsEnabled( cmTaskMgrH_t h )
  382. {
  383. cmTm_t* p = _cmTmHandleToPtr(h);
  384. return cmThreadState(p->thArray[0].thH) != kPausedThId;
  385. }
  386. cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
  387. {
  388. cmTmRC_t rc = kOkTmRC;
  389. cmTm_t* p = _cmTmHandleToPtr(h);
  390. unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl;
  391. if( cmThreadPause(p->thArray[0].thH, flags ) != kOkThRC )
  392. rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" );
  393. return rc;
  394. }
  395. cmTmRC_t cmTaskMgrInstall( cmTaskMgrH_t h, unsigned taskId, const cmChar_t* label, cmTaskMgrFunc_t func )
  396. {
  397. cmTmRC_t rc = kOkTmRC;
  398. cmTm_t* p = _cmTmHandleToPtr(h);
  399. cmTmTask_t* tp = cmMemAllocZ(cmTmTask_t,1);
  400. if( _cmTmTaskFromId(p,taskId) != NULL )
  401. {
  402. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"The task id %i is already in use.",taskId);
  403. goto errLabel;
  404. }
  405. tp->taskId = taskId;
  406. tp->func = func;
  407. tp->label = cmMemAllocStr(label);
  408. tp->link = p->tasks;
  409. p->tasks = tp;
  410. errLabel:
  411. return rc;
  412. }
  413. cmTmRC_t cmTaskMgrCall(
  414. cmTaskMgrH_t h,
  415. unsigned taskId,
  416. void* funcArg,
  417. unsigned progCnt,
  418. unsigned* retInstIdPtr )
  419. {
  420. cmTmRC_t rc = kOkTmRC;
  421. cmTm_t* p = _cmTmHandleToPtr(h);
  422. cmTmTask_t* tp = NULL;
  423. cmTmInst_t* ip = NULL;
  424. if( retInstIdPtr != NULL )
  425. *retInstIdPtr = cmInvalidId;
  426. // locate the task for this instance
  427. if((tp = _cmTmTaskFromId(p,taskId)) == NULL )
  428. {
  429. rc = cmErrMsg(&p->err,kInvalidArgTmRC,"Task not found for task id=%i.",taskId);
  430. goto errLabel;
  431. }
  432. // allocate a new instance record
  433. ip = cmMemAllocZ(cmTmInst_t,1);
  434. // setupt the instance record
  435. ip->instId = p->nextInstId++;
  436. ip->task = tp;
  437. ip->funcArg = funcArg;
  438. ip->progCnt = progCnt;
  439. ip->status = kQueuedTmId;
  440. // insert the new instance at the end of the instance list
  441. if( p->insts == NULL )
  442. p->insts = ip;
  443. else
  444. {
  445. cmTmInst_t* pp = p->insts;
  446. for(; pp != NULL; pp=pp->link )
  447. if( pp->link == NULL )
  448. {
  449. pp->link = ip;
  450. break;
  451. }
  452. }
  453. // enque the instance ptr in the input queue
  454. if( cmTs1p1cEnqueueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC )
  455. {
  456. rc = cmErrMsg(&p->err,kQueueFailTmRC,"New task instance command enqueue failed.");
  457. goto errLabel;
  458. }
  459. // set the returned instance id
  460. if( retInstIdPtr != NULL )
  461. *retInstIdPtr = ip->instId;
  462. // notify the client that the instance was enqueued
  463. cmTaskMgrStatusArg_t s;
  464. _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,0,NULL,NULL,0);
  465. p->statusCb( &s );
  466. errLabel:
  467. return rc;
  468. }
  469. cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId )
  470. {
  471. cmTmRC_t rc = kOkTmRC;
  472. cmTm_t* p = _cmTmHandleToPtr(h);
  473. cmTmInst_t* ip = NULL;
  474. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  475. {
  476. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  477. goto errLabel;
  478. }
  479. // Once an instance ctlId is set to kKillTmId don't allow it to change.
  480. if( ip->ctlId == kKillTmId )
  481. return rc;
  482. switch(ctlId )
  483. {
  484. case kNoneTmId:
  485. break;
  486. case kStartTmId:
  487. // Acting on a 'start' cmd only makes sense if the previous command was 'pause'
  488. if( ip->ctlId == kPauseTmId )
  489. ip->ctlId = kStartTmId;
  490. break;
  491. case kPauseTmId:
  492. // Acting on a 'pause' command only makes sense if this is the first command
  493. // or the previous command was a 'start'
  494. if( ip->ctlId == kNoneTmId || ip->ctlId == kStartTmId )
  495. ip->ctlId = kPauseTmId;
  496. break;
  497. case kKillTmId:
  498. ip->ctlId = kKillTmId;
  499. break;
  500. }
  501. errLabel:
  502. return rc;
  503. }
  504. cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId )
  505. {
  506. cmTm_t* p = _cmTmHandleToPtr(h);
  507. cmTmInst_t* ip = NULL;
  508. cmStatusTmId_t status = kInvalidTmId;
  509. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  510. {
  511. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  512. goto errLabel;
  513. }
  514. status = ip->status;
  515. errLabel:
  516. return status;
  517. }
  518. const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId )
  519. {
  520. cmTm_t* p = _cmTmHandleToPtr(h);
  521. cmTmInst_t* ip = NULL;
  522. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  523. {
  524. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  525. return NULL;
  526. }
  527. return ip->result;
  528. }
  529. unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId )
  530. {
  531. cmTm_t* p = _cmTmHandleToPtr(h);
  532. cmTmInst_t* ip = NULL;
  533. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  534. {
  535. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  536. return 0;
  537. }
  538. return ip->resultByteCnt;
  539. }
  540. cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId )
  541. {
  542. cmTmRC_t rc = kOkTmRC;
  543. cmTm_t* p = _cmTmHandleToPtr(h);
  544. cmTmInst_t* ip = NULL;
  545. if((ip = _cmTmInstFromId(p,instId)) == NULL )
  546. {
  547. cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId);
  548. return 0;
  549. }
  550. ip->deleteOnCompleteFl = true;
  551. return rc;
  552. }
  553. cmTaskMgrCtlId_t _cmTaskMgrHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId )
  554. {
  555. cmTaskMgrStatusArg_t s;
  556. _cmTaskMgrStatusArgSetup(
  557. &s,
  558. a->statusCbArg,
  559. a->instId,
  560. statusId == kInvalidTmId ? kProgTmId : kStatusTmId,
  561. statusId == kInvalidTmId ? kStartedTmId : statusId,
  562. statusId == kInvalidTmId ? prog : 0,
  563. NULL,NULL,0);
  564. a->statusCb(&s);
  565. return cmTaskMgrHandleCommand(a);
  566. }
  567. cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a )
  568. {
  569. cmTmThread_t* trp = a->reserved;
  570. while( trp->inst->ctlId == kPauseTmId )
  571. {
  572. // change the instance status to 'paused'.
  573. trp->inst->status = kPausedTmId;
  574. // notify the client of the change in state
  575. cmTaskMgrSendStatus(a,kPausedTmId);
  576. // sleep the thread for pauseSleepMs milliseconds
  577. cmSleepMs(a->pauseSleepMs);
  578. // if the task was unpaused while we slept
  579. if( trp->inst->ctlId == kStartTmId )
  580. {
  581. // change the instance status to 'started'.
  582. trp->inst->status = kStartedTmId;
  583. // notify the client of the change in state
  584. cmTaskMgrSendStatus(a,kStartedTmId);
  585. }
  586. }
  587. // if ctlId==kKillTmId then the status update will be handled
  588. // when the task custom function returns in _cmTmWorkerThreadFunc()
  589. return trp->inst->ctlId;
  590. }
  591. cmTaskMgrCtlId_t cmTaskMgrSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
  592. { return _cmTaskMgrHelper(a,0,statusId); }
  593. cmTaskMgrCtlId_t cmTaskMgrSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog )
  594. { return _cmTaskMgrHelper(a,prog,kInvalidTmId); }
  595. //-----------------------------------------------------------------------------
  596. enum { kMaxTestInstCnt = 3 };
  597. typedef struct cmTmTestInst_str
  598. {
  599. unsigned instId;
  600. } cmTmTestInst_t;
  601. typedef struct cmTmTestApp_str
  602. {
  603. cmErr_t* err;
  604. cmTmTestInst_t insts[kMaxTestInstCnt];
  605. } cmTmTestApp_t;
  606. void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s )
  607. {
  608. cmRptPrintf(rpt,"inst:%i ",s->instId );
  609. switch( s->selId )
  610. {
  611. case kStatusTmId:
  612. {
  613. const cmChar_t* label = "<none>";
  614. switch( s->statusId )
  615. {
  616. case kInvalidTmId: label="<Invalid>"; break;
  617. case kQueuedTmId: label="Queued"; break;
  618. case kStartedTmId: label="Started"; break;
  619. case kCompletedTmId: label="Completed"; break;
  620. case kKilledTmId: label="Killed"; break;
  621. default:
  622. { assert(0); }
  623. }
  624. cmRptPrintf(rpt,"status '%s'",label);
  625. }
  626. break;
  627. case kProgTmId:
  628. cmRptPrintf(rpt,"prog %i",s->prog);
  629. break;
  630. case kErrorTmId:
  631. cmRptPrintf(rpt,"error %s",cmStringNullGuard(s->msg));
  632. break;
  633. default:
  634. { assert(0); }
  635. }
  636. cmRptPrintf(rpt,"\n");
  637. }
  638. // Test client status callback function.
  639. void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s )
  640. {
  641. // s.arg set from cmTaskMgrCreate( ..., statusCbArg, ...);
  642. cmTmTestApp_t* app = (cmTmTestApp_t*)s->arg;
  643. unsigned i;
  644. // locate the instance record assoc'd with this callback
  645. for(i=0; i<kMaxTestInstCnt; ++i)
  646. if( app->insts[i].instId == s->instId )
  647. break;
  648. if( i==kMaxTestInstCnt )
  649. cmRptPrintf(app->err->rpt,"instId %i not found.\n",s->instId);
  650. _cmTmTestReportStatus(app->err->rpt,s);
  651. }
  652. // Test worker function.
  653. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg )
  654. {
  655. unsigned prog = 0;
  656. for(; prog<arg->progCnt; ++prog)
  657. {
  658. if( cmTaskMgrHandleCommand(arg) == kKillTmId )
  659. break;
  660. cmSleepMs(1000);
  661. if( cmTaskMgrSendProgress(arg,prog) == kKillTmId )
  662. break;
  663. }
  664. }
  665. cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx)
  666. {
  667. cmTmRC_t rc = kOkTmRC;
  668. cmTaskMgrH_t tmH = cmTaskMgrNullHandle;
  669. unsigned threadCnt = 2;
  670. unsigned queueByteCnt = 1024;
  671. unsigned pauseSleepMs = 50;
  672. unsigned nextInstId = 0;
  673. unsigned taskId = 0;
  674. const cmChar_t* taskLabel = "Task Label";
  675. cmTmTestApp_t app;
  676. char c;
  677. memset(&app,0,sizeof(app));
  678. app.err = &ctx->err;
  679. // create the task mgr
  680. if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC )
  681. {
  682. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr create failed.");
  683. goto errLabel;
  684. }
  685. // install a task
  686. if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc ) != kOkTmRC )
  687. {
  688. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr task install failed.");
  689. goto errLabel;
  690. }
  691. // go into interactive mode
  692. printf("q=quit e=enable c=call i=idle\n");
  693. while((c = getchar()) != 'q')
  694. {
  695. switch(c)
  696. {
  697. case 'i':
  698. cmTaskMgrOnIdle(tmH);
  699. cmRptPrintf(&ctx->rpt,"idled\n");
  700. break;
  701. case 'e':
  702. {
  703. // toggle the enable state of the task mgr.
  704. bool fl = !cmTaskMgrIsEnabled(tmH);
  705. if( cmTaskMgrEnable(tmH,fl) != kOkTmRC )
  706. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test enable failed.");
  707. else
  708. cmRptPrintf(&ctx->rpt,"%s\n", fl ? "enabled" : "disabled" );
  709. }
  710. break;
  711. case 'c':
  712. if( nextInstId < kMaxTestInstCnt )
  713. {
  714. void* funcArg = app.insts + nextInstId;
  715. unsigned progCnt = 5;
  716. if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, &app.insts[nextInstId].instId ) != kOkTmRC )
  717. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed.");
  718. else
  719. {
  720. ++nextInstId;
  721. cmRptPrintf(&ctx->rpt,"called\n");
  722. }
  723. }
  724. }
  725. }
  726. errLabel:
  727. // destroy the task mgr
  728. if( cmTaskMgrDestroy(&tmH) != kOkTmRC )
  729. rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr destroy failed.");
  730. return rc;
  731. }