diff --git a/cmTaskMgr.c b/cmTaskMgr.c index 7d10aa1..39f4df3 100644 --- a/cmTaskMgr.c +++ b/cmTaskMgr.c @@ -5,6 +5,7 @@ #include "cmMem.h" #include "cmMallocDebug.h" #include "cmThread.h" +#include "cmTime.h" #include "cmTaskMgr.h" cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE; @@ -37,19 +38,25 @@ struct cmTm_str* p; typedef struct cmTmThread_str { - struct cmTm_str* p; // - cmThreadH_t thH; // - cmTmInst_t* inst; // Ptr to the task instance this thread is executing. + struct cmTm_str* p; // + cmThreadH_t thH; // + cmTmInst_t* inst; // Ptr to the task instance this thread is executing. + double durSecs; + cmTimeSpec_t t0; + bool deactivateFl; + struct cmTmThread_str* link; } cmTmThread_t; typedef struct cmTm_str { cmErr_t err; - cmTmThread_t* thArray; // - unsigned threadCnt; // + cmThreadH_t mstrThH; // + cmTmThread_t* threads; // + unsigned maxActiveThreadCnt; // + unsigned threadRecdCnt; cmTaskMgrStatusCb_t statusCb; // void* statusCbArg; // - unsigned pauseSleepMs; + unsigned pauseSleepMs; // cmTs1p1cH_t inQueH; // client->mgr cmTsMp1cH_t outQueH; // mgr->client cmTmTask_t* tasks; // @@ -151,7 +158,7 @@ bool _cmTmWorkerThreadFunc(void* arg) // if the task was paused or killed while it was queued then // cmTaskMgrHandleCommand() will do the right thing - if( cmTaskMgrHandleCommand(&r) != kKillTmId ) + if( cmTaskMgrWorkerHandleCommand(&r) != kKillTmId ) { trp->inst->status = kStartedTmId; @@ -171,56 +178,185 @@ bool _cmTmWorkerThreadFunc(void* arg) _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0); - trp->inst = NULL; - // Force the thread to go into the 'pause' state when it // returns to it's internal loop. The master thread recognizes paused - // threads as available. + // threads as available for reuse. cmThreadPause(trp->thH,kPauseThFl); return true; } +void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg ) +{ + assert(0); +} + +int _cmTmSortThreadByDur( const void* t0, const void* t1 ) +{ + double d = ((cmTmThread_t*)t0)->durSecs - ((cmTmThread_t*)t1)->durSecs; + + return d== 0 ? 0 : (d<0 ? -1 : 1); +} + // This is the master thread function. bool _cmTmMasterThreadFunc(void* arg) { - cmTmThread_t* trp = (cmTmThread_t*)arg; - cmTm_t* p = trp->p; + cmTm_t* p = (cmTm_t*)arg; + unsigned activeCnt = 0; + cmTmThread_t* trp = p->threads; + + if( p->threadRecdCnt > 0 ) + { + cmTmThread_t* thArray[p->threadRecdCnt]; + unsigned deactivatedCnt = 0; + + // for each thread record + for(trp=p->threads; trp!=NULL; trp=trp->link) + { + cmThStateId_t thState; + + // if this thread is active ... + if( (thState = cmThreadState(trp->thH)) != kPausedThId ) + { + // update the task lifetime duration + cmTimeSpec_t t1; + cmTimeGet(&t1); + trp->durSecs += (double)cmTimeElapsedMicros(&trp->t0,&t1) / 1000000.0; + trp->t0 = t1; + + assert(trp->inst!=NULL); + + // if the task assoc'd with this thread is running + if( trp->inst->status == kStartedTmId ) + { + thArray[activeCnt] = trp; + ++activeCnt; + } + + // count the number of deactivated threads + if( trp->deactivateFl ) + ++deactivatedCnt; + } + } + + // The first 'activeCnt' elements of thArray[] now point to + // cmTmThread_t records of the active tasks. + + // if more tasks are active than should be - then deactive the youngest + if( activeCnt > p->maxActiveThreadCnt ) + { + // sort the active tasks in increasing order of lifetime + qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur); + + // determine the number of threads that need to be paused + int n = activeCnt - p->maxActiveThreadCnt; + int i; + + // pause the active threads with the lowest lifetime + for(i=0; ideactivateFl == false ) + { + thArray[i]->deactivateFl = true; + ++deactivatedCnt; + } + } + + // if there are deactivated tasks and the max thread count has not been reached + // then re-activate some of the deactivated tasks. + if( activeCnt < p->maxActiveThreadCnt && deactivatedCnt > 0 ) + { + // sort the active tasks in increasing order of lifetime + qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur); + + int n = cmMin(p->maxActiveThreadCnt - activeCnt, deactivatedCnt ); + int i; + + // re-activate the oldest deactivated tasks + for(i=activeCnt-1; i>=0 && n>0; --i) + if( thArray[i]->deactivateFl ) + { + thArray[i]->deactivateFl = false; + --n; + ++activeCnt; + } + + } + } + + // if a queued task exists while( cmTs1p1cMsgWaiting(p->inQueH) ) { - unsigned i; - cmTmInst_t* ip = NULL; + cmTmInst_t* ip = NULL; + cmTmThread_t* atrp = NULL; - // find an available worker thread - for(i=1; ithreadCnt; ++i) - if( cmThreadState(p->thArray[i].thH) == kPausedThId ) + activeCnt = 0; + + // Find a worker thread that is in the 'paused' state. + // This is the definitive indication that the thread + // does not have an assigned instance + for(trp=p->threads; trp!=NULL; trp=trp->link) + { + if( cmThreadState(trp->thH) == kPausedThId ) + atrp = trp; + else + ++activeCnt; + } + + // If the maximum number of active threads already exists then we cannot start a new task + if( activeCnt >= p->maxActiveThreadCnt ) + break; + + // If all the existing worker threads are busy + // but the maximum number of threads has not yet been allocated ... + if( atrp==NULL && p->threadRecdCnt < p->maxActiveThreadCnt) + { + // ... then create a new worker thread recd + atrp = cmMemAllocZ(cmTmThread_t,1); + + // ... create the new worker thread + if( cmThreadCreate(&atrp->thH,_cmTmWorkerThreadFunc,atrp,p->err.rpt) != kOkThRC ) + { + cmMemFree(atrp); + atrp = NULL; + _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread create failed."); break; + } + else + { + // ... setup the new thread record + atrp->p = p; + atrp->link = p->threads; + p->threads = atrp; - // if all worker threads are busy ... give up - if( i==p->threadCnt ) + p->threadRecdCnt += 1; + } + } + + // if there are no available threads then give up + if( atrp == NULL ) break; // dequeue a pending task instance pointer from the input queue if(cmTs1p1cDequeueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC ) { - /// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD - continue; + _cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue."); + break; } - // assign the instance to the available thread - p->thArray[i].inst = ip; - - // start the thread and wait for it to enter the running state. - - if( cmThreadPause(p->thArray[i].thH,0) != kOkThRC ) - { - /// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD - } + // setup the thread record associated with the new task + atrp->inst = ip; + atrp->durSecs = 0; + atrp->deactivateFl = false; + + // start the worker thread + if( cmThreadPause(atrp->thH,0) != kOkThRC ) + _cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed."); } + cmSleepMs(p->pauseSleepMs); return true; @@ -283,17 +419,26 @@ cmTmRC_t _cmTmDestroy( cmTm_t* p ) cmTmRC_t rc = kOkTmRC; unsigned i; - // stop and destroy all the threads - for(i=0; ithreadCnt; ++i) + // stop and destroy the master thread + if( cmThreadDestroy(&p->mstrThH) != kOkThRC ) { - if( cmThreadDestroy(&p->thArray[i].thH) != kOkThRC ) - { - rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i destroy failed.",i); - goto errLabel; - } + rc = cmErrMsg(&p->err,kThreadFailTmRC,"Master thread destroy failed."); + goto errLabel; } - cmMemFree(p->thArray); + // stop and destroy all the worker threads + for(i=0; p->threads != NULL; ++i ) + { + if( cmThreadDestroy(&p->threads->thH) != kOkThRC ) + { + rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread destruction failed for the worker thread at index %i.",i); + goto errLabel; + } + + cmTmThread_t* trp = p->threads; + p->threads = p->threads->link; + cmMemFree(trp); + } // release the input queue if( cmTs1p1cDestroy(&p->inQueH) != kOkThRC ) @@ -341,12 +486,11 @@ cmTmRC_t cmTaskMgrCreate( cmTaskMgrH_t* hp, cmTaskMgrStatusCb_t statusCb, void* statusCbArg, - unsigned threadCnt, + unsigned maxActiveThreadCnt, unsigned queueByteCnt, unsigned pauseSleepMs) { cmTmRC_t rc = kOkTmRC; - unsigned i; if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC ) return rc; @@ -355,24 +499,16 @@ cmTmRC_t cmTaskMgrCreate( cmErrSetup(&p->err,&ctx->rpt,"Task Mgr."); - threadCnt += 1; - p->thArray = cmMemAllocZ(cmTmThread_t,threadCnt); - p->threadCnt = threadCnt; + p->maxActiveThreadCnt = maxActiveThreadCnt; p->statusCb = statusCb; p->statusCbArg = statusCbArg; p->pauseSleepMs = pauseSleepMs; - // create the threads - for(i=0; imstrThH, _cmTmMasterThreadFunc,p,&ctx->rpt) != kOkThRC ) { - if( cmThreadCreate(&p->thArray[i].thH, i==0 ? _cmTmMasterThreadFunc : _cmTmWorkerThreadFunc, p->thArray+i, &ctx->rpt ) != kOkThRC ) - { - rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.",i); - goto errLabel; - } - - p->thArray[i].p = p; - + rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed."); + goto errLabel; } // create the input queue @@ -480,7 +616,7 @@ bool cmTaskMgrIsEnabled( cmTaskMgrH_t h ) { cmTm_t* p = _cmTmHandleToPtr(h); - return cmThreadState(p->thArray[0].thH) != kPausedThId; + return cmThreadState(p->mstrThH) != kPausedThId; } cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl ) @@ -489,7 +625,7 @@ cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl ) cmTm_t* p = _cmTmHandleToPtr(h); unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl; - if( cmThreadPause(p->thArray[0].thH, flags ) != kOkThRC ) + if( cmThreadPause(p->mstrThH, flags ) != kOkThRC ) rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" ); return rc; @@ -696,7 +832,7 @@ cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId ) } -cmTaskMgrCtlId_t _cmTaskMgrHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId ) +cmTaskMgrCtlId_t _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId ) { cmTaskMgrStatusArg_t s; @@ -711,32 +847,32 @@ cmTaskMgrCtlId_t _cmTaskMgrHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatu a->statusCb(&s); - return cmTaskMgrHandleCommand(a); + return cmTaskMgrWorkerHandleCommand(a); } -cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a ) +cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a ) { cmTmThread_t* trp = a->reserved; - while( trp->inst->ctlId == kPauseTmId ) + while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true ) { // change the instance status to 'paused'. trp->inst->status = kPausedTmId; // notify the client of the change in state - cmTaskMgrSendStatus(a,kPausedTmId); + cmTaskMgrWorkerSendStatus(a,kPausedTmId); // sleep the thread for pauseSleepMs milliseconds cmSleepMs(a->pauseSleepMs); // if the task was unpaused while we slept - if( trp->inst->ctlId == kStartTmId ) + if( trp->inst->ctlId == kStartTmId && trp->deactivateFl == false ) { // change the instance status to 'started'. trp->inst->status = kStartedTmId; // notify the client of the change in state - cmTaskMgrSendStatus(a,kStartedTmId); + cmTaskMgrWorkerSendStatus(a,kStartedTmId); } } @@ -746,11 +882,11 @@ cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a ) return trp->inst->ctlId; } -cmTaskMgrCtlId_t cmTaskMgrSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ) -{ return _cmTaskMgrHelper(a,0,statusId); } +cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ) +{ return _cmTaskMgrWorkerHelper(a,0,statusId); } -cmTaskMgrCtlId_t cmTaskMgrSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ) -{ return _cmTaskMgrHelper(a,prog,kInvalidTmId); } +cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ) +{ return _cmTaskMgrWorkerHelper(a,prog,kInvalidTmId); } //----------------------------------------------------------------------------- @@ -829,17 +965,19 @@ void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s ) // Test worker function. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg ) { + if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId ) + return; unsigned prog = 0; for(; progprogCnt; ++prog) { - if( cmTaskMgrHandleCommand(arg) == kKillTmId ) + if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId ) break; cmSleepMs(1000); - if( cmTaskMgrSendProgress(arg,prog) == kKillTmId ) + if( cmTaskMgrWorkerSendProgress(arg,prog) == kKillTmId ) break; } diff --git a/cmTaskMgr.h b/cmTaskMgr.h index 63b2fb8..2cfc7ed 100644 --- a/cmTaskMgr.h +++ b/cmTaskMgr.h @@ -36,7 +36,7 @@ extern "C" { { kStatusTmId, // Task status updates. These are automatically sent by the system when the task instance changes state. kProgTmId, // Task progress update. The user function should increment the 'prog' toward 'progCnt'. - kErrorTmId // + kErrorTmId // Error message } cmSelTmId_t; typedef enum @@ -81,7 +81,7 @@ extern "C" { cmTaskMgrH_t* hp, cmTaskMgrStatusCb_t statusCb, void* statusCbArg, - unsigned threadCnt, + unsigned maxActiveThreadCnt, unsigned queueByteCnt, unsigned pauseSleepMs ); @@ -137,9 +137,9 @@ extern "C" { // ----------------------------------------------------------------------------------- // Worker thread helper functions. - cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a ); - cmTaskMgrCtlId_t cmTaskMgrSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ); - cmTaskMgrCtlId_t cmTaskMgrSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ); + cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a ); + cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ); + cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ); cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx);