diff --git a/cmTaskMgr.c b/cmTaskMgr.c index 39f4df3..109bd63 100644 --- a/cmTaskMgr.c +++ b/cmTaskMgr.c @@ -7,6 +7,8 @@ #include "cmThread.h" #include "cmTime.h" #include "cmTaskMgr.h" +#include "cmLinkedHeap.h" +#include "cmText.h" cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE; @@ -17,19 +19,22 @@ typedef struct cmTmTask_str unsigned taskId; cmChar_t* label; cmTaskMgrFunc_t func; + cmTaskMgrRecv_t recv; struct cmTmTask_str* link; } cmTmTask_t; typedef struct cmTmInst_str { - unsigned instId; - struct cmTmTask_str* task; - void* funcArg; - unsigned progCnt; - cmStatusTmId_t status; - void* result; - unsigned resultByteCnt; - cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread + unsigned instId; // Task instance id. + struct cmTmTask_str* task; // Pointer to task record for this task instance. + void* funcArg; // Client supplied pointer to cmTaskMgrFuncArg_t.arg; + unsigned progCnt; // Maximum expected progress (cmTaskMgrStatusArg_t.prog) to be used by this task instance. + cmChar_t* label; // Optional instance label. + cmStatusTmId_t status; // Current instance status (See cmStatusTmId_t) + void* result; // Task instance result pointer. + unsigned resultByteCnt; // Size of the task instance result pointer in bytes. + cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread + cmTs1p1cH_t msgQueH; // client->inst 'msg' communication queue bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete struct cmTmInst_str* link; } cmTmInst_t; @@ -38,30 +43,34 @@ struct cmTm_str* p; typedef struct cmTmThread_str { - struct cmTm_str* p; // - cmThreadH_t thH; // - cmTmInst_t* inst; // Ptr to the task instance this thread is executing. - double durSecs; - cmTimeSpec_t t0; - bool deactivateFl; - struct cmTmThread_str* link; + struct cmTm_str* p; // Pointer to task mgr. + cmThreadH_t thH; // Thread handle. + cmTmInst_t* inst; // Ptr to the task instance this thread is executing. + double durSecs; // Duration of the instance currently assigned to this thread in seconds. + cmTimeSpec_t t0; // Task start time. + cmTimeSpec_t t1; // Time of last review by the master thread. + bool deactivateFl; // True if this instance has been deactivated by the system. + cmTaskMgrFuncArg_t procArg; // + cmChar_t* text; // Temporary text buffer + struct cmTmThread_str* link; // p->threads link. } cmTmThread_t; typedef struct cmTm_str { - cmErr_t err; - cmThreadH_t mstrThH; // - cmTmThread_t* threads; // - unsigned maxActiveThreadCnt; // - unsigned threadRecdCnt; - cmTaskMgrStatusCb_t statusCb; // - void* statusCbArg; // - unsigned pauseSleepMs; // - cmTs1p1cH_t inQueH; // client->mgr - cmTsMp1cH_t outQueH; // mgr->client - cmTmTask_t* tasks; // - cmTmInst_t* insts; // - unsigned nextInstId; + cmErr_t err; // Task manager error object. + cmThreadH_t mstrThH; // Master thread handle. + cmTmThread_t* threads; // Thread record list. + unsigned threadRecdCnt; // Current count of records in 'threads' list. + unsigned maxActiveTaskCnt; // Max. number of active tasks. + cmTaskMgrStatusCb_t statusCb; // Client task status callback. + void* statusCbArg; // Client task status callback argument. + unsigned pauseSleepMs; // + cmTs1p1cH_t callQueH; // client->mgr 'inst' communication queue + cmTsMp1cH_t outQueH; // mgr->client communication queue + cmTmTask_t* tasks; // Task list. + cmTmInst_t* insts; // Task instance list. + unsigned nextInstId; // Next available task instance id. + unsigned activeTaskCnt; // Current active task count. } cmTm_t; @@ -72,21 +81,22 @@ void _cmTaskMgrStatusArgSetup( cmSelTmId_t selId, cmStatusTmId_t statusId, unsigned prog, - const cmChar_t* msg, - void* result, - unsigned resultByteCnt ) + const cmChar_t* text, + const void* msg, + unsigned msgByteCnt ) { s->arg = arg; s->instId = instId; s->selId = selId; s->statusId = statusId; s->prog = prog; + s->text = text; s->msg = msg; - s->result = result; - s->resultByteCnt = resultByteCnt; + s->msgByteCnt = msgByteCnt; } -// WARNING: THIS FUNCTION IS CALLED BY BOTH THE WORKER AND THE MASTER THREAD. + +// Called by MASTER and WORKER. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s ) { enum { arrayCnt = 3 }; @@ -94,12 +104,12 @@ cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s ) unsigned msgSizeArray[arrayCnt]; msgPtrArray[0] = s; - msgPtrArray[1] = s->msg==NULL ? "" : s->msg; - msgPtrArray[2] = s->result; + msgPtrArray[1] = s->text==NULL ? "" : s->text; + msgPtrArray[2] = s->msg; msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t); - msgSizeArray[1] = s->msg==NULL ? 1 : strlen(s->msg)+1; - msgSizeArray[2] = s->resultByteCnt; + msgSizeArray[1] = s->text==NULL ? 1 : strlen(s->text)+1; + msgSizeArray[2] = s->msgByteCnt; if( cmTsMp1cEnqueueSegMsg(p->outQueH, msgPtrArray, msgSizeArray, arrayCnt ) != kOkThRC ) @@ -109,6 +119,7 @@ cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s ) } +// Called by MASTER and WORKER. // This function is called by the worker thread wrapper _cmTmWorkerStatusCb() // function to enqueue messages being sent back to the client. cmTmRC_t _cmTmEnqueueStatusMsg1( @@ -117,15 +128,16 @@ cmTmRC_t _cmTmEnqueueStatusMsg1( cmSelTmId_t selId, cmStatusTmId_t statusId, unsigned prog, - const cmChar_t* msg, - void* result, - unsigned resultByteCnt ) + const cmChar_t* text, + const void* msg, + unsigned msgByteCnt ) { cmTaskMgrStatusArg_t s; - _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,msg,result,resultByteCnt); + _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,text,msg,msgByteCnt); return _cmTmEnqueueStatusMsg0(p,&s); } +// Called by WORKER. // Worker threads call this function to enqueue status messages // for delivery to the task mgr client. void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status ) @@ -141,24 +153,36 @@ void _cmTmWorkerStatusCb( const cmTaskMgrStatusArg_t* status ) } +// Called by WORKER. +// This function is called in the worker thread by +// cmTs1p1cDequeueMsg() from within cmTaskMgrWorkerHandleCommand() +// to transfer msg's waiting in the worker's incoming msg queue +// (cmTmInst_t.msgQueH) to the instance recv function (cmTmInst_t.recv). +cmRC_t _cmTmWorkerRecvCb( void* arg, unsigned msgByteCnt, const void* msg ) +{ + cmTmThread_t* trp = (cmTmThread_t*)arg; + assert(trp->inst->task->recv); + trp->inst->task->recv(&trp->procArg,msg,msgByteCnt); + return cmOkRC; +} +// Called by WORKER. // This is the wrapper for all worker threads. bool _cmTmWorkerThreadFunc(void* arg) { cmTmThread_t* trp = (cmTmThread_t*)arg; - cmTaskMgrFuncArg_t r; - - r.reserved = trp; - r.arg = trp->inst->funcArg; - r.instId = trp->inst->instId; - r.statusCb = _cmTmWorkerStatusCb; - r.statusCbArg = trp; - r.progCnt = trp->inst->progCnt; - r.pauseSleepMs= trp->p->pauseSleepMs; + + trp->procArg.reserved = trp; + trp->procArg.arg = trp->inst->funcArg; + trp->procArg.instId = trp->inst->instId; + trp->procArg.statusCb = _cmTmWorkerStatusCb; + trp->procArg.statusCbArg = trp; + trp->procArg.progCnt = trp->inst->progCnt; + trp->procArg.pauseSleepMs= trp->p->pauseSleepMs; // if the task was paused or killed while it was queued then // cmTaskMgrHandleCommand() will do the right thing - if( cmTaskMgrWorkerHandleCommand(&r) != kKillTmId ) + if( cmTaskMgrWorkerHandleCommand(&trp->procArg) != kStopTmwRC ) { trp->inst->status = kStartedTmId; @@ -166,18 +190,22 @@ bool _cmTmWorkerThreadFunc(void* arg) _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0); // Execute the client provided task function. - trp->inst->task->func(&r); + trp->inst->task->func(&trp->procArg); } - // Notify the client that the instance has completed or been killed + // Notify the client if the instance was killed if( trp->inst->ctlId == kKillTmId ) + { trp->inst->status = kKilledTmId; - else - trp->inst->status = kCompletedTmId; + _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0); + } - _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0); + // Notify the client that the instance is completed + // (but don't actually set the status yet) + _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kCompletedTmId,0,NULL,NULL,0); + + trp->inst->status = kCompletedTmId; - // Force the thread to go into the 'pause' state when it // returns to it's internal loop. The master thread recognizes paused // threads as available for reuse. @@ -188,7 +216,7 @@ bool _cmTmWorkerThreadFunc(void* arg) void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg ) { - assert(0); + _cmTmEnqueueStatusMsg1(p,cmInvalidId,kErrorTmId,kInvalidTmId,rc,msg,NULL,0); } int _cmTmSortThreadByDur( const void* t0, const void* t1 ) @@ -203,14 +231,19 @@ int _cmTmSortThreadByDur( const void* t0, const void* t1 ) bool _cmTmMasterThreadFunc(void* arg) { cmTm_t* p = (cmTm_t*)arg; - unsigned activeCnt = 0; + unsigned activeThreadCnt = 0; + unsigned activeTaskCnt = 0; cmTmThread_t* trp = p->threads; - + if( p->threadRecdCnt > 0 ) { cmTmThread_t* thArray[p->threadRecdCnt]; unsigned deactivatedCnt = 0; + // + // Determine the number of active threads and tasks + // + // for each thread record for(trp=p->threads; trp!=NULL; trp=trp->link) { @@ -218,99 +251,103 @@ bool _cmTmMasterThreadFunc(void* arg) // if this thread is active ... if( (thState = cmThreadState(trp->thH)) != kPausedThId ) - { - // update the task lifetime duration - cmTimeSpec_t t1; - cmTimeGet(&t1); - trp->durSecs += (double)cmTimeElapsedMicros(&trp->t0,&t1) / 1000000.0; - trp->t0 = t1; - + { assert(trp->inst!=NULL); - // if the task assoc'd with this thread is running - if( trp->inst->status == kStartedTmId ) - { - thArray[activeCnt] = trp; - ++activeCnt; - } + thArray[activeThreadCnt] = trp; + ++activeThreadCnt; - // count the number of deactivated threads + // if the task assigned to this thread is started then the task is active + if( trp->inst->status == kStartedTmId ) + ++activeTaskCnt; + + // if the deactivatedFl is set then this thread has been deactivated by the system if( trp->deactivateFl ) ++deactivatedCnt; + + // update the task lifetime duration + if( trp->inst->status != kCompletedTmId ) + { + cmTimeSpec_t t2; + cmTimeGet(&t2); + trp->durSecs += (double)cmTimeElapsedMicros(&trp->t1,&t2) / 1000000.0; + trp->t1 = t2; + } + } } - // The first 'activeCnt' elements of thArray[] now point to - // cmTmThread_t records of the active tasks. + // + // thArray[activeThreadCnt] now holds pointers to the + // cmTmThread_t records of the active threads + // + // if more tasks are active than should be - then deactive the youngest - if( activeCnt > p->maxActiveThreadCnt ) + if( activeTaskCnt > p->maxActiveTaskCnt ) { // sort the active tasks in increasing order of lifetime - qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur); + qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur); - // determine the number of threads that need to be paused - int n = activeCnt - p->maxActiveThreadCnt; - int i; + // determine the number of threads that need to be deactivated + int n = activeTaskCnt - p->maxActiveTaskCnt; + int i,j; // pause the active threads with the lowest lifetime - for(i=0; ideactivateFl == false ) { thArray[i]->deactivateFl = true; ++deactivatedCnt; + ++j; } } - + + // // if there are deactivated tasks and the max thread count has not been reached // then re-activate some of the deactivated tasks. - if( activeCnt < p->maxActiveThreadCnt && deactivatedCnt > 0 ) + // + + if( activeTaskCnt < p->maxActiveTaskCnt && deactivatedCnt > 0 ) { // sort the active tasks in increasing order of lifetime - qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur); + qsort(&thArray[0],activeThreadCnt,sizeof(thArray[0]),_cmTmSortThreadByDur); - int n = cmMin(p->maxActiveThreadCnt - activeCnt, deactivatedCnt ); + int n = cmMin(p->maxActiveTaskCnt - activeTaskCnt, deactivatedCnt ); int i; - // re-activate the oldest deactivated tasks - for(i=activeCnt-1; i>=0 && n>0; --i) + // re-activate the oldest deactivated tasks first + for(i=activeThreadCnt-1; i>=0 && n>0; --i) if( thArray[i]->deactivateFl ) { thArray[i]->deactivateFl = false; --n; - ++activeCnt; + ++activeTaskCnt; } } } - // if a queued task exists - while( cmTs1p1cMsgWaiting(p->inQueH) ) + // If the number of activeTaskCnt is less than the limit and a queued task exists + while( activeTaskCnt < p->maxActiveTaskCnt && cmTs1p1cMsgWaiting(p->callQueH) ) { cmTmInst_t* ip = NULL; cmTmThread_t* atrp = NULL; - activeCnt = 0; // Find a worker thread that is in the 'paused' state. // This is the definitive indication that the thread - // does not have an assigned instance + // does not have an assigned instance and the thread recd can be reused. for(trp=p->threads; trp!=NULL; trp=trp->link) - { if( cmThreadState(trp->thH) == kPausedThId ) + { atrp = trp; - else - ++activeCnt; - } + break; + } - // If the maximum number of active threads already exists then we cannot start a new task - if( activeCnt >= p->maxActiveThreadCnt ) - break; - - // If all the existing worker threads are busy - // but the maximum number of threads has not yet been allocated ... - if( atrp==NULL && p->threadRecdCnt < p->maxActiveThreadCnt) + // If all the existing worker threads are in use ... + if( atrp==NULL ) { // ... then create a new worker thread recd atrp = cmMemAllocZ(cmTmThread_t,1); @@ -326,39 +363,52 @@ bool _cmTmMasterThreadFunc(void* arg) else { // ... setup the new thread record - atrp->p = p; - atrp->link = p->threads; - p->threads = atrp; - + atrp->p = p; + atrp->link = p->threads; + p->threads = atrp; p->threadRecdCnt += 1; } } - // if there are no available threads then give up + // if the thread creation failed if( atrp == NULL ) break; // dequeue a pending task instance pointer from the input queue - if(cmTs1p1cDequeueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC ) + if(cmTs1p1cDequeueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC ) { _cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue."); break; } + // if the task has a msg recv callback then assign it here + if( ip->task->recv != NULL ) + if( cmTs1p1cSetCallback( ip->msgQueH, _cmTmWorkerRecvCb, atrp ) != kOkThRC ) + { + _cmTmMasterRptError(p,kQueueFailTmRC,"Worker thread msg queue callback assignment failed."); + break; + } + // setup the thread record associated with the new task atrp->inst = ip; atrp->durSecs = 0; atrp->deactivateFl = false; + cmTimeGet(&atrp->t0); + atrp->t1 = atrp->t0; + // start the worker thread - if( cmThreadPause(atrp->thH,0) != kOkThRC ) + if( cmThreadPause(atrp->thH,kWaitThFl) != kOkThRC ) _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed."); + ++activeTaskCnt; } cmSleepMs(p->pauseSleepMs); + p->activeTaskCnt = activeTaskCnt; + return true; } @@ -403,6 +453,10 @@ cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId ) else pp->link = ip->link; + if( cmTs1p1cDestroy(&ip->msgQueH) != kOkThRC ) + return cmErrMsg(&p->err,kQueueFailTmRC,"The 'msg' input queue destroy failed."); + + cmMemFree(ip->label); cmMemFree(ip->result); cmMemFree(ip); return kOkTmRC; @@ -437,13 +491,14 @@ cmTmRC_t _cmTmDestroy( cmTm_t* p ) cmTmThread_t* trp = p->threads; p->threads = p->threads->link; + cmMemFree(trp->text); cmMemFree(trp); } - // release the input queue - if( cmTs1p1cDestroy(&p->inQueH) != kOkThRC ) + // release the call input queue + if( cmTs1p1cDestroy(&p->callQueH) != kOkThRC ) { - rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue destroy failed."); + rc = cmErrMsg(&p->err,kQueueFailTmRC,"The 'call' input queue destroy failed."); goto errLabel; } @@ -486,7 +541,7 @@ cmTmRC_t cmTaskMgrCreate( cmTaskMgrH_t* hp, cmTaskMgrStatusCb_t statusCb, void* statusCbArg, - unsigned maxActiveThreadCnt, + unsigned maxActiveTaskCnt, unsigned queueByteCnt, unsigned pauseSleepMs) { @@ -499,7 +554,7 @@ cmTmRC_t cmTaskMgrCreate( cmErrSetup(&p->err,&ctx->rpt,"Task Mgr."); - p->maxActiveThreadCnt = maxActiveThreadCnt; + p->maxActiveTaskCnt = maxActiveTaskCnt; p->statusCb = statusCb; p->statusCbArg = statusCbArg; p->pauseSleepMs = pauseSleepMs; @@ -511,10 +566,10 @@ cmTmRC_t cmTaskMgrCreate( goto errLabel; } - // create the input queue - if(cmTs1p1cCreate( &p->inQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC ) + // create the call input queue + if(cmTs1p1cCreate( &p->callQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC ) { - rc = cmErrMsg(&p->err,kQueueFailTmRC,"The input queue creation failed."); + rc = cmErrMsg(&p->err,kQueueFailTmRC,"The call input queue creation failed."); goto errLabel; } @@ -547,9 +602,108 @@ cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp ) return rc; } +void _cmTmWaitForCompletion( cmTm_t* p, unsigned timeOutMs ) +{ + unsigned durMs = 0; + cmTimeSpec_t t0,t1; + + cmTimeGet(&t0); + + // Go into timeout loop - waiting for all instances to finish + while( timeOutMs==0 || durMs < timeOutMs ) + { + cmTimeGet(&t1); + durMs += cmTimeElapsedMicros(&t0,&t1) / 1000; + t0 = t1; + + cmSleepMs(p->pauseSleepMs); + + if( p->activeTaskCnt == 0 ) + break; + } + +} + + +cmTmRC_t cmTaskMgrClose( cmTaskMgrH_t h, unsigned flags, unsigned timeOutMs ) +{ + cmTmRC_t rc = kOkTmRC; + cmTm_t* p = _cmTmHandleToPtr(h); + bool fl = false; + + // if requested kill any queued tasks + if( cmIsFlag(flags,kKillQueuedTmFl) ) + { + cmTmInst_t* ip = p->insts; + for(; ip!=NULL; ip=ip->link) + if( ip->status == kQueuedTmId ) + ip->ctlId = kKillTmId; + } + + // wait for any existing or queued tasks to complete + _cmTmWaitForCompletion(p,timeOutMs); + + // force any queued msgs for the client to be sent + cmTaskMgrOnIdle(h); + + // if the 'kill on timeout' flag is set then kill any remaining active tasks + if( cmIsFlag(flags,kTimeOutKillTmFl) ) + { + cmTmInst_t* ip = p->insts; + for(; ip!=NULL; ip=ip->link) + if( ip->status != kCompletedTmId ) + { + ip->ctlId = kKillTmId; + fl = true; + } + } + + // wait for the remaining tasks to complete + if( fl ) + _cmTmWaitForCompletion(p,timeOutMs); + + // force any queued msgs for the client to be sent + cmTaskMgrOnIdle(h); + + return rc; +} + +unsigned cmTaskMgrActiveTaskCount( cmTaskMgrH_t h ) +{ + cmTm_t* p = _cmTmHandleToPtr(h); + return p->activeTaskCnt; +} + + bool cmTaskMgrIsValid( cmTaskMgrH_t h ) { return h.h != NULL; } +const cmChar_t* cmTaskMgrStatusIdToLabel( cmStatusTmId_t statusId ) +{ + typedef struct map_str + { + cmStatusTmId_t id; + const cmChar_t* label; + } map_t; + + map_t a[] = + { + { kQueuedTmId, "Queued" }, + { kStartedTmId, "Started" }, + { kPausedTmId, "Paused" }, + { kDeactivatedTmId, "Deactivated" }, + { kCompletedTmId, "Completed" }, + { kKilledTmId, "Killed" }, + { kInvalidTmId, "" }, + }; + + int i; + for(i=0; a[i].id!=kInvalidTmId; ++i) + if( a[i].id == statusId ) + return a[i].label; + + return ""; +} // This function is called by cmTaskMgrIdle() to dispatch // status updates to the client. @@ -562,15 +716,15 @@ cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgData // pointed to by msgDataPtr should be safe even though it is marked as const. memcpy(&s,msgDataPtr,sizeof(s)); - // The 'msg' and 'result' data have been serialized after the status record. - // The 'msg' is guaranteed to at least contain a terminating zero. - s.msg = ((char*)msgDataPtr) + sizeof(s); + // The 'text' and 'msg' data have been serialized after the status record. + // The 'text' is guaranteed to at least contain a terminating zero. + s.text = ((char*)msgDataPtr) + sizeof(s); // if the 'resultByteCnt' > 0 then there is a result record - if( s.resultByteCnt > 0 ) - s.result = ((char*)msgDataPtr) + sizeof(s) + strlen(s.msg) + 1; + if( s.msgByteCnt > 0 ) + s.msg = ((char*)msgDataPtr) + sizeof(s) + strlen(s.text) + 1; else - s.result = NULL; + s.msg = NULL; s.arg = p->statusCbArg; @@ -602,7 +756,7 @@ cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h ) { cmTmInst_t* np = ip->link; - if( (ip->status==kCompletedTmId || ip->status==kKilledTmId) && ip->deleteOnCompleteFl ) + if( ip->status==kCompletedTmId && ip->deleteOnCompleteFl ) _cmTmInstFree(p,ip->instId); ip = np; @@ -631,7 +785,12 @@ cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl ) return rc; } -cmTmRC_t cmTaskMgrInstall( cmTaskMgrH_t h, unsigned taskId, const cmChar_t* label, cmTaskMgrFunc_t func ) +cmTmRC_t cmTaskMgrInstall( + cmTaskMgrH_t h, + unsigned taskId, + const cmChar_t* label, + cmTaskMgrFunc_t func, + cmTaskMgrRecv_t recv) { cmTmRC_t rc = kOkTmRC; cmTm_t* p = _cmTmHandleToPtr(h); @@ -645,6 +804,7 @@ cmTmRC_t cmTaskMgrInstall( cmTaskMgrH_t h, unsigned taskId, const cmChar_t* labe tp->taskId = taskId; tp->func = func; + tp->recv = recv; tp->label = cmMemAllocStr(label); tp->link = p->tasks; @@ -654,12 +814,15 @@ cmTmRC_t cmTaskMgrInstall( cmTaskMgrH_t h, unsigned taskId, const cmChar_t* labe return rc; } + cmTmRC_t cmTaskMgrCall( - cmTaskMgrH_t h, - unsigned taskId, - void* funcArg, - unsigned progCnt, - unsigned* retInstIdPtr ) + cmTaskMgrH_t h, + unsigned taskId, + void* funcArg, + unsigned progCnt, + unsigned queueByteCnt, + const cmChar_t* label, + unsigned* retInstIdPtr ) { cmTmRC_t rc = kOkTmRC; cmTm_t* p = _cmTmHandleToPtr(h); @@ -679,12 +842,21 @@ cmTmRC_t cmTaskMgrCall( // allocate a new instance record ip = cmMemAllocZ(cmTmInst_t,1); - // setupt the instance record + // setup the instance record ip->instId = p->nextInstId++; ip->task = tp; ip->funcArg = funcArg; ip->progCnt = progCnt; + ip->label = label==NULL ? NULL : cmMemAllocStr(label); ip->status = kQueuedTmId; + ip->ctlId = kStartTmId; + + // create the msg input queue + if(cmTs1p1cCreate( &ip->msgQueH, queueByteCnt, NULL, NULL, p->err.rpt ) != kOkThRC ) + { + rc = cmErrMsg(&p->err,kQueueFailTmRC,"The msg input queue creation failed."); + goto errLabel; + } // insert the new instance at the end of the instance list if( p->insts == NULL ) @@ -700,9 +872,8 @@ cmTmRC_t cmTaskMgrCall( } } - - // enque the instance ptr in the input queue - if( cmTs1p1cEnqueueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC ) + // enqueue the instance ptr in the input queue + if( cmTs1p1cEnqueueMsg(p->callQueH,&ip,sizeof(ip)) != kOkThRC ) { rc = cmErrMsg(&p->err,kQueueFailTmRC,"New task instance command enqueue failed."); goto errLabel; @@ -714,7 +885,7 @@ cmTmRC_t cmTaskMgrCall( // notify the client that the instance was enqueued cmTaskMgrStatusArg_t s; - _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,0,NULL,NULL,0); + _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,progCnt,NULL,NULL,0); p->statusCb( &s ); @@ -722,11 +893,11 @@ cmTmRC_t cmTaskMgrCall( return rc; } -cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId ) +cmTmRC_t cmTaskMgrCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId ) { - cmTmRC_t rc = kOkTmRC; - cmTm_t* p = _cmTmHandleToPtr(h); - cmTmInst_t* ip = NULL; + cmTmRC_t rc = kOkTmRC; + cmTm_t* p = _cmTmHandleToPtr(h); + cmTmInst_t* ip = NULL; if((ip = _cmTmInstFromId(p,instId)) == NULL ) { @@ -740,9 +911,6 @@ cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctl switch(ctlId ) { - case kNoneTmId: - break; - case kStartTmId: // Acting on a 'start' cmd only makes sense if the previous command was 'pause' if( ip->ctlId == kPauseTmId ) @@ -750,10 +918,8 @@ cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctl break; case kPauseTmId: - - // Acting on a 'pause' command only makes sense if this is the first command - // or the previous command was a 'start' - if( ip->ctlId == kNoneTmId || ip->ctlId == kStartTmId ) + // Acting on a 'pause' command only makes sense if the previous command was a 'start' + if( ip->ctlId == kStartTmId ) ip->ctlId = kPauseTmId; break; @@ -784,6 +950,39 @@ cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId ) errLabel: return status; } + +cmTmRC_t cmTaskMgrSendMsg( cmTaskMgrH_t h, unsigned instId, const void* msg, unsigned msgByteCnt ) +{ + cmTm_t* p = _cmTmHandleToPtr(h); + cmTmRC_t rc = kOkTmRC; + cmTmInst_t* ip = NULL; + + if((ip = _cmTmInstFromId(p,instId)) == NULL ) + return cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId); + + if( cmTs1p1cEnqueueMsg(ip->msgQueH, msg, msgByteCnt ) != kOkThRC ) + rc = cmErrMsg(&p->err,kQueueFailTmRC,"Task msg enqueue failed."); + + return rc; +} + +const cmChar_t* cmTaskMgrTaskIdToLabel( cmTaskMgrH_t h, unsigned taskId ) +{ + cmTm_t* p = _cmTmHandleToPtr(h); + cmTmTask_t* tp; + if((tp = _cmTmTaskFromId(p,taskId)) == NULL ) + return NULL; + return tp->label; +} + +const cmChar_t* cmTaskMgrInstIdToLabel( cmTaskMgrH_t h, unsigned instId ) +{ + cmTm_t* p = _cmTmHandleToPtr(h); + cmTmInst_t* ip; + if((ip = _cmTmInstFromId(p,instId)) == NULL ) + return NULL; + return ip->label; +} const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId ) { @@ -793,11 +992,19 @@ const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId ) if((ip = _cmTmInstFromId(p,instId)) == NULL ) { cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId); - return NULL; + goto errLabel; + } + + if( ip->status != kCompletedTmId ) + { + cmErrMsg(&p->err,kOpFailTmRC,"The result of a running task (id:%i) may not be accessed.",instId); + goto errLabel; } return ip->result; + errLabel: + return NULL; } unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId ) @@ -808,16 +1015,49 @@ unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId ) if((ip = _cmTmInstFromId(p,instId)) == NULL ) { cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId); - return 0; + goto errLabel; + } + + if( ip->status != kCompletedTmId ) + { + cmErrMsg(&p->err,kOpFailTmRC,"The result byte count of a running task (id:%i) may not be accessed.",instId); + goto errLabel; } return ip->resultByteCnt; + + errLabel: + return 0; } +void* cmTaskMgrFuncArg( cmTaskMgrH_t h, unsigned instId ) +{ + cmTm_t* p = _cmTmHandleToPtr(h); + cmTmInst_t* ip = NULL; + + if((ip = _cmTmInstFromId(p,instId)) == NULL ) + { + cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId); + goto errLabel; + } + + if( ip->status != kCompletedTmId ) + { + cmErrMsg(&p->err,kOpFailTmRC,"The function argument of a running task (id:%i) may not be accessed.",instId); + goto errLabel; + } + + return ip->funcArg; + + errLabel: + return NULL; +} + + cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId ) { - cmTmRC_t rc = kOkTmRC; - cmTm_t* p = _cmTmHandleToPtr(h); + cmTmRC_t rc = kOkTmRC; + cmTm_t* p = _cmTmHandleToPtr(h); cmTmInst_t* ip = NULL; if((ip = _cmTmInstFromId(p,instId)) == NULL ) @@ -831,11 +1071,10 @@ cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId ) return rc; } - -cmTaskMgrCtlId_t _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId ) +void _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, cmSelTmId_t selId, cmStatusTmId_t statusId, unsigned prog, const cmChar_t* text ) { cmTaskMgrStatusArg_t s; - + _cmTaskMgrStatusArgSetup( &s, a->statusCbArg, @@ -843,50 +1082,193 @@ cmTaskMgrCtlId_t _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, unsigned prog, c statusId == kInvalidTmId ? kProgTmId : kStatusTmId, statusId == kInvalidTmId ? kStartedTmId : statusId, statusId == kInvalidTmId ? prog : 0, - NULL,NULL,0); + text,NULL,0); a->statusCb(&s); - - return cmTaskMgrWorkerHandleCommand(a); } -cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a ) +cmTmWorkerRC_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a ) { - cmTmThread_t* trp = a->reserved; + cmTmThread_t* trp = a->reserved; - while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true ) + // Check if we should go into the paused or deactivated state. + if( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true ) { - // change the instance status to 'paused'. - trp->inst->status = kPausedTmId; + cmStatusTmId_t prvStatus = kInvalidTmId; - // notify the client of the change in state - cmTaskMgrWorkerSendStatus(a,kPausedTmId); - - // sleep the thread for pauseSleepMs milliseconds - cmSleepMs(a->pauseSleepMs); - - // if the task was unpaused while we slept - if( trp->inst->ctlId == kStartTmId && trp->deactivateFl == false ) + do { - // change the instance status to 'started'. - trp->inst->status = kStartedTmId; + // Note that it is possible that the state of the task switch from + // paused <-> deactivated during the course of this loop. + // In either case we continue looping but should report the change + // to the client via a status callback. - // notify the client of the change in state - cmTaskMgrWorkerSendStatus(a,kStartedTmId); + // change the instance status to reflect the true status + trp->inst->status = trp->deactivateFl ? kDeactivatedTmId : kPausedTmId; + + // if the status actually changed then notify the client + if( trp->inst->status != prvStatus ) + { + _cmTaskMgrWorkerHelper(a,kStatusTmId,trp->inst->status,0,NULL); + prvStatus = trp->inst->status; + } + + // sleep the thread for pauseSleepMs milliseconds + cmSleepMs(a->pauseSleepMs); + + // if the task was unpaused while we slept + }while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl==true ); + + // we are leaving the paused state because we were restarted or killed. + switch( trp->inst->ctlId ) + { + case kStartTmId: + // change the instance status to 'started'. + trp->inst->status = kStartedTmId; + + // notify the client of the change in state + _cmTaskMgrWorkerHelper(a,kStatusTmId,kStartedTmId,0,NULL); + break; + + case kKillTmId: + // if killed the client will be notified in the worker thread wrapper + // function: _cmTmWorkerThreadFunc() + break; + + default: + { assert(0); } + } + } + else // There was no command to handle so check for incoming msg's. + { + + if( cmTs1p1cMsgWaiting(trp->inst->msgQueH) ) + { + // if the task registered a msg receive callback + if( trp->inst->task->recv != NULL ) + { + + if( cmTs1p1cDequeueMsg(trp->inst->msgQueH, NULL, 0 ) != kOkThRC ) + { + + // ????? + // ????? how do we send error messages back to the client + // ?????? + return kOkTmwRC; + } + + + } + else + { + } + + return kRecvTmwRC; } } // if ctlId==kKillTmId then the status update will be handled // when the task custom function returns in _cmTmWorkerThreadFunc() - return trp->inst->ctlId; + return trp->inst->ctlId == kKillTmId ? kStopTmwRC : kOkTmwRC; } -cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ) -{ return _cmTaskMgrWorkerHelper(a,0,statusId); } -cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ) -{ return _cmTaskMgrWorkerHelper(a,prog,kInvalidTmId); } + +cmTmRC_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ) +{ + _cmTaskMgrWorkerHelper(a,kStatusTmId,statusId,0,NULL); + return kOkTmRC; +} + +cmTmRC_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* text ) +{ + _cmTaskMgrWorkerHelper(a,kProgTmId,kInvalidTmId,prog,text); + return kOkTmRC; +} + +cmTmRC_t cmTaskMgrWorkerSendProgressV( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, va_list vl ) +{ + cmTmThread_t* trp = a->reserved; + cmTsVPrintfP(trp->text,fmt,vl); + return cmTaskMgrWorkerSendProgress(a,prog,trp->text); +} + +cmTmRC_t cmTaskMgrWorkerSendProgressF( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, ... ) +{ + va_list vl; + va_start(vl,fmt); + cmTmRC_t rc = cmTaskMgrWorkerSendProgressV(a,prog,fmt,vl); + va_end(vl); + return rc; +} + +cmTmRC_t cmTaskMgrWorkerError( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* text ) +{ + _cmTaskMgrWorkerHelper(a, kErrorTmId, kInvalidTmId, rc, text); + return rc; +} + +cmTmRC_t cmTaskMgrWorkerErrorV( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, va_list vl ) +{ + cmTmThread_t* trp = a->reserved; + cmTsVPrintfP(trp->text,fmt,vl); + return cmTaskMgrWorkerError(a,rc,trp->text); +} + +cmTmRC_t cmTaskMgrWorkerErrorF( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, ... ) +{ + va_list vl; + va_start(vl,fmt); + cmTmRC_t rc0 = cmTaskMgrWorkerErrorV(a,rc,fmt,vl); + va_end(vl); + return rc0; +} + +cmTmRC_t cmTaskMgrWorkerSetResult( cmTaskMgrFuncArg_t* a, void* result, unsigned resultByteCnt ) +{ + cmTmThread_t* trp = a->reserved; + trp->inst->result = result; + trp->inst->resultByteCnt = resultByteCnt; + return kOkTmRC; +} + +unsigned cmTaskMgrWorkerMsgByteCount( cmTaskMgrFuncArg_t* a ) +{ + cmTmThread_t* trp = a->reserved; + return cmTs1p1cDequeueMsgByteCount(trp->inst->msgQueH); +} + +unsigned cmTaskMgrWorkerMsgRecv( cmTaskMgrFuncArg_t* a, void* buf, unsigned bufByteCnt ) +{ + cmTmThread_t* trp = a->reserved; + unsigned retVal = bufByteCnt; + + switch( cmTs1p1cDequeueMsg(trp->inst->msgQueH, buf, bufByteCnt ) ) + { + case kOkThRC: + break; + + case kBufEmptyThRC: + retVal = 0; + break; + + case kBufTooSmallThRC: + retVal = cmInvalidCnt; + break; + + default: + { assert(0); } + } + + return retVal; +} + +cmTmRC_t cmTaskMgrWorkerMsgSend( cmTaskMgrFuncArg_t* a, const void* buf, unsigned bufByteCnt ) +{ + cmTmThread_t* trp = a->reserved; + return _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kMsgTmId,trp->inst->status,0,NULL,buf,bufByteCnt); +} //----------------------------------------------------------------------------- @@ -912,17 +1294,7 @@ void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s ) { case kStatusTmId: { - const cmChar_t* label = ""; - switch( s->statusId ) - { - case kInvalidTmId: label=""; break; - case kQueuedTmId: label="Queued"; break; - case kStartedTmId: label="Started"; break; - case kCompletedTmId: label="Completed"; break; - case kKilledTmId: label="Killed"; break; - default: - { assert(0); } - } + const cmChar_t* label = cmTaskMgrStatusIdToLabel(s->statusId); cmRptPrintf(rpt,"status '%s'",label); } break; @@ -965,19 +1337,19 @@ void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s ) // Test worker function. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg ) { - if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId ) + if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC ) return; unsigned prog = 0; for(; progprogCnt; ++prog) { - if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId ) + if( cmTaskMgrWorkerHandleCommand(arg) == kStopTmwRC ) break; cmSleepMs(1000); - if( cmTaskMgrWorkerSendProgress(arg,prog) == kKillTmId ) + if( cmTaskMgrWorkerSendProgress(arg,prog,NULL) == kStopTmwRC ) break; } @@ -1008,7 +1380,7 @@ cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx) } // install a task - if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc ) != kOkTmRC ) + if( cmTaskMgrInstall(tmH, taskId, taskLabel, _cmTmTestFunc, NULL ) != kOkTmRC ) { rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr task install failed."); goto errLabel; @@ -1042,7 +1414,7 @@ cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx) { void* funcArg = app.insts + nextInstId; unsigned progCnt = 5; - if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, &app.insts[nextInstId].instId ) != kOkTmRC ) + if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, queueByteCnt, "My Inst", &app.insts[nextInstId].instId ) != kOkTmRC ) rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed."); else { diff --git a/cmTaskMgr.h b/cmTaskMgr.h index 2cfc7ed..2c150cb 100644 --- a/cmTaskMgr.h +++ b/cmTaskMgr.h @@ -5,6 +5,72 @@ extern "C" { #endif + /* + Usage: + 1) Use cmTaskMgrInstall() to register a worker function + (cmTaskMgrFunc_t) with the task manager. + + 2) Use cmTaskMgrCall() to queue a new instance of a + task to run. + + 3) An internal scheduling program will start the task + when there are less than 'maxActiveTaskCnt' running. + This will occur when active tasks complete or are + paused. + + When 'maxActiveTaskCnt' tasks are active and + a previously paused task is unpaused the unpaused + task will take precedence over tasks started after + it and one of these new tasks will be deactivated. + + As tasks are pause/unpaused and activated/deactivated + the number of active tasks may briefly exceed + 'maxActiveTaskCnt'. + + 4) Once a task is instantiated the task manager + will keep the client notified of the task status + via callbacks to the cmTaskMgrStatusCb_t function. + + Status: + kQueuedTmId - The task instance is waiting to be + exectued in the task queue. + + kStartedTmId - The task has been made active and + is running. + + kPausedTmId - The client sent a kPauseTmId to the + task via cmTaskMgrCtl() and the task + has been paused. The only way to + restart the task is to send kStartTmId + command. + + kDeactivatedTmId - The task was previously started but has + now been deactivated by the system + in an effort to keep the number of + activate tasks at 'maxActiveTaskCnt'. + + kKilledTmId - The task was killed either by the client + (via a cmTaskMgrCtl(kKillTmId)) or + or by the system within cmTaskMgrClose() + or cmTaskMgrDestroy(). + + kCompletedTmId - The task has completed. Note that any + task that gets queued is guaranteed to + generate kCompletedTmId status callback. + + 5) Closing the task manager should follow a two step + process. First all active tasks should be ended and then + cmTaskMgrDestroy() should be called. + + Ending all active tasks can be accomplished by sending + kill signals via cmTaskMgrCtl(), use of cmTaskMgrClose() + or if all tasks are guaranteed to eventually end - waiting + for the task count to go to zero (See cmTaskMgrActiveTaskCount()). + Note that waiting for the task count to go to zero may be + error prone unless one can guarantee that no tasks are + queued or that the last task started has completed. + */ + enum { kOkTmRC, @@ -20,7 +86,7 @@ extern "C" { typedef cmHandle_t cmTaskMgrH_t; - extern cmTaskMgrH_t cmTaskMgrNullHangle; + extern cmTaskMgrH_t cmTaskMgrNullHandle; typedef enum { @@ -28,6 +94,7 @@ extern "C" { kQueuedTmId, // The task is waiting in the queue. kStartedTmId, // The task is running. kPausedTmId, // The task is paused. + kDeactivatedTmId, // The task was temporarily deactivated by the system kCompletedTmId, // The task successfully completed. kKilledTmId // The task was killed by the client. } cmStatusTmId_t; @@ -36,12 +103,12 @@ extern "C" { { kStatusTmId, // Task status updates. These are automatically sent by the system when the task instance changes state. kProgTmId, // Task progress update. The user function should increment the 'prog' toward 'progCnt'. - kErrorTmId // Error message + kErrorTmId, // Error message ('cmTaskMgrStatusArg_t.prog' has error result code) + kMsgTmId // Msg from a task instance in cmTaskMgrStatusArg_t.msg[msgByteCnt]. } cmSelTmId_t; typedef enum { - kNoneTmId, kStartTmId, kPauseTmId, kKillTmId @@ -50,14 +117,14 @@ extern "C" { typedef struct cmTaskMgrStatusArg_str { - void* arg; - unsigned instId; - cmSelTmId_t selId; - cmStatusTmId_t statusId; - unsigned prog; - const cmChar_t* msg; - void* result; - unsigned resultByteCnt; + void* arg; // Client status arg. as passed to cmTaskMgrCreate(). + unsigned instId; // Task instance id of the task which generated the callback. + cmSelTmId_t selId; // See cmSelTmId_t. + cmStatusTmId_t statusId; // See cmStatusTmId_t. + unsigned prog; // selId==kProgTmId (0<=prog<=cmTaskMgrFuncArg_t.progCnt) selId=kErrorTmid prog=error result code. + const cmChar_t* text; // Used by kErrorTmId. + const void* msg; // Used by kMsgTmId. msg[msgByteCnt] + unsigned msgByteCnt; // Count of bytes in msg[]. } cmTaskMgrStatusArg_t; typedef void (*cmTaskMgrStatusCb_t)( const cmTaskMgrStatusArg_t* status ); @@ -74,25 +141,50 @@ extern "C" { unsigned pauseSleepMs; // Length of time to sleep if the task receives a pause command. } cmTaskMgrFuncArg_t; + // Task process function. typedef void (*cmTaskMgrFunc_t)(cmTaskMgrFuncArg_t* arg ); + // Task message receive function. + typedef void (*cmTaskMgrRecv_t)(cmTaskMgrFuncArg_t* arg, const void* msg, unsigned msgByteCnt ); + + // Allocate the task manager. cmTmRC_t cmTaskMgrCreate( - cmCtx_t* ctx, - cmTaskMgrH_t* hp, - cmTaskMgrStatusCb_t statusCb, - void* statusCbArg, - unsigned maxActiveThreadCnt, - unsigned queueByteCnt, - unsigned pauseSleepMs ); + cmCtx_t* ctx, // + cmTaskMgrH_t* hp, // + cmTaskMgrStatusCb_t statusCb, // Task status callbacks. + void* statusCbArg, // Status callback arg + unsigned maxActiveTaskCnt, // Max. number of active tasks (see Usage notes above.) + unsigned queueByteCnt, // Size of task client->taskMgr and taskMgr->client msg queues. + unsigned pauseSleepMs ); // Scheduler sleep time. (20-50ms) // Calling cmTaskMgrDestroy() will send a 'kill' control message // to any existing worker threads. The threads will shutdown // gracefully but the task they were computing will not be completed. cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp ); + enum + { + kKillQueuedTmFl = 0x01, // Kill any queued (otherwise queued tasks will be started) + kTimeOutKillTmFl = 0x02 // KIll any instance not completed before returning + }; + + // Wait for the current task instances to complete. + // Set 'timeOutMs' to the number of milliseconds to wait for the current tasks to complete. + // Set kKillQueueTmFl to kill any queued tasks otherwise queued tasks will run as usual. + // Set kRefuseCallTmFl to refuse to queue any new tasks following the return from this function. + // Set kTimeOutKillTmFl to kill any tasks that are still running after timeOutMs has expired + // otherwise these tasks will still be running when the function returns. + cmTmRC_t cmTaskMgrClose( cmTaskMgrH_t h, unsigned flags, unsigned timeOutMs ); + + // Return the current number of active tasks. + unsigned cmTaskMgrActiveTaskCount( cmTaskMgrH_t h ); + // Return true if the task manager handle is valid. bool cmTaskMgrIsValid( cmTaskMgrH_t h ); + // Given a statusId return a the associated label. + const cmChar_t* cmTaskMgrStatusIdToLabel( cmStatusTmId_t statusId ); + // Called by the client to give the task mgr an opportunity to execute // period functions from within the client thread. Note that 'statusCb()' // (as passed to cmTaskMgrCreate()) is only called within this function @@ -110,36 +202,155 @@ extern "C" { cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl ); // Install a task function and associate it with a label and unique id. - cmTmRC_t cmTaskMgrInstall( cmTaskMgrH_t h, unsigned taskId, const cmChar_t* label, cmTaskMgrFunc_t func ); + cmTmRC_t cmTaskMgrInstall( + cmTaskMgrH_t h, + unsigned taskId, // Unique task id. + const cmChar_t* label, // (optional) Task label + cmTaskMgrFunc_t func, // Task worker function. + cmTaskMgrRecv_t recv); // (optional) Task message receive function. // Queue a new task instance. // The 'queued' status callback occurs from inside this call. + // If this function completes successfully then the client is + // guaranteed to get both a 'kQueuedTmId' status update and + // and a 'kCompletedTmId' update. Any per task instance cleanup + // can therefore be triggered by the 'kCompleteTmId' status callback. cmTmRC_t cmTaskMgrCall( - cmTaskMgrH_t h, - unsigned taskId, - void* funcArg, - unsigned progCnt, - unsigned* retInstIdPtr ); + cmTaskMgrH_t h, + unsigned taskId, // Task id of a task previously registered by cmTaskMgrInstall(). + void* funcArg, // The value assigned to cmTaskMgrFuncArg_t.arg for this instance. + unsigned progCnt, // Max. expected progress value to be eventually reported by this task instances 'kProgTmId' progress updates. + unsigned queueByteCnt, // Size of the client->task message buffer. + const cmChar_t* label, // (optional) Instance label. + unsigned* retInstIdPtr ); // (optional) Unique id assigned to this instance. // Start,pause, or kill a task instance. + // // If a queued task is paused then it will remain at the front - // of the queue and tasks behdind it in the queue will be executed. - cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId ); + // of the queue and tasks behind it in the queue may be executed. + // See the usage note above regarding the interaction between + // pausing/unpausing, activating/deactivating and the + // maxActiveTaskCnt parameter. + // + // Note that killing a task causes it to terminate quickly + // but this does not imply that the task ends in an uncontrolled + // manner. Even killed tasks properly release any resource they + // may hold prior to ending. For long running tasks which do not + // have a natural stopping point prior to the end of the client + // process using the kill signal to end the task is both convenient + // and efficient. + // + // Once a task is paused it is safe to directly interact with + // the data space it has access to (e.g. cmTaskMgrFuncArg_t.funcArg). + // This is true because the client has control over when the task + // may start again via the cmStartTmId signal. Note that this is + // not the case for deactivated signals. Deactivated signals may + // be restarted at any time. Note however that it is possible to + // pause a deactivated task in which case it's data space may + // be accessed as soon as the client is notified that the task + // has switched to the paused state. + cmTmRC_t cmTaskMgrCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctlId ); // Get the status of a task. cmStatusTmId_t cmTaskMgrStatus( cmTaskMgrH_t h, unsigned instId ); - // + // Send a thread-safe msg to a task instance. + cmTmRC_t cmTaskMgrSendMsg( cmTaskMgrH_t h, unsigned instId, const void* msg, unsigned msgByteCnt ); + + const cmChar_t* cmTaskMgrTaskIdToLabel( cmTaskMgrH_t h, unsigned taskId ); + const cmChar_t* cmTaskMgrInstIdToLabel( cmTaskMgrH_t h, unsigned instId ); + + // The following functions are only valid when the task has completed and + // has a status of either kCompletedTmId. const void* cmTaskMgrResult( cmTaskMgrH_t h, unsigned instId ); unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId ); + void* cmTaskMgrFuncArg( cmTaskMgrH_t h, unsigned instId ); cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId ); // ----------------------------------------------------------------------------------- // Worker thread helper functions. - cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a ); - cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ); - cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ); + // + // These functions should only be called from inside the client supplied + // worker function (cmTaskMgrFuncArg_t). + // + + // There are two thread-safe methods the worker thread can use to receive + // raw byte messages from the client. + // + // 1. Assign a cmTaskMgrRecv_t function at task installation + // (See cmTaskMgrInstall().) This callback will be invoked from inside + // cmTaskMgrWorkerHandleCommand() + // if a client message is found to be waiting. When this occurs the + // worker helper function will return kRecvTmwRC. + // + // 2. If the task was not assigned a cmTaskMgrRect_t function then + // the worker should notice when + // cmTaskMgrWorkerHandleCommand() returns cmRecvTmwRC. When this occurs + // it should call cmTaskMgrMsgRecv() to copy the message into a + // locally allocated buffer. cmTaskMgrWorkerMsgByteCount() can + // be called to find out the size of the message and therefore + // the minimum size of the copy destination buffer. + + typedef enum + { + kOkTmwRC, + kStopTmwRC, + kRecvTmwRC + } cmTmWorkerRC_t; + + // The instance function cmTaskMgrFunc_t must poll this + // function to respond to incoming commands and messages. + // The polling rate should be determined by the application to + // trade-off the cost of the poll versus the command execution + // latency. Checking every 20 to 100 milliseconcds is probably + // about right. + // If the function returns 'kStopTmwRC' then the function has received + // a kKillCtlId and should release any resources and return immediately. + // If the function returns 'kRecvTmwRC' and has not registered a + // cmTaskMgrRecv_t function then it should call cmTaskMgrWorkerRecv() + // to pick up an incoming message from the client. + // If the function returns 'kRecvTmwRC' and does have a registered + // cmTaskMgrRecv_t function then a new message was received by the + // cmTaskMGrRecv_t function. + cmTmWorkerRC_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a ); + + // Send a task status update to the client. + //cmTmRC_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ); + + // Send a progress update to the client. + cmTmRC_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* text ); + cmTmRC_t cmTaskMgrWorkerSendProgressV( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, va_list vl ); + cmTmRC_t cmTaskMgrWorkerSendProgressF( cmTaskMgrFuncArg_t* a, unsigned prog, const cmChar_t* fmt, ... ); + + // Send an error message to the client. The result code is application + // dependent. These function return 'rc' as does cmErrMsg(). + cmTmRC_t cmTaskMgrWorkerError( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* text ); + cmTmRC_t cmTaskMgrWorkerErrorV( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, va_list vl ); + cmTmRC_t cmTaskMgrWorkerErrorF( cmTaskMgrFuncArg_t* a, unsigned rc, const cmChar_t* fmt, ... ); + + // Set the internal result buffer for this instance. + // This is a convenient way to assign a final result to a instance + // which can eventually be picked up by cmTaskMgrResult() + // after the worker has officially completed. Alternatively the + // result of the worker computation can be returned using + // cmTaskMgrWorkerMsgSend(). + cmTmRC_t cmTaskMgrWorkerSetResult( cmTaskMgrFuncArg_t* a, void* result, unsigned resultByteCnt ); + + // Get the size of an incoming message sent to this task instance + // from the client. Use cmTaskMgrWorkerMsgRecv() to get + // the msg. contents. + unsigned cmTaskMgrWorkerMsgByteCount( cmTaskMgrFuncArg_t* a ); + + // Copy a msg from the client into buf[bufByteCnt] and + // return the count of bytes copied into buf[bufByteCnt] or cmInvalidCnt + // if the buf[] was too small. This function is only used when + // the task did notregister a cmTaskMgrRecv_t with cmTaskMgrInstall(). + // cmTaskMgrWorkerMsgByteCount() returns the minimum required size of buf[]. + unsigned cmTaskMgrWorkerMsgRecv( cmTaskMgrFuncArg_t* a, void* buf, unsigned bufByteCnt ); + + // Send a generic msg to the client. + cmTmWorkerRC_t cmTaskMgrWorkerMsgSend( cmTaskMgrFuncArg_t* a, const void* buf, unsigned bufByteCnt ); cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx);