cmTaskMgr.h/c : Added dynamic thread allocation and task balancing.

This commit is contained in:
kpl 2013-10-15 10:41:24 -07:00
parent 9b9b49bf3c
commit fe29450106
2 changed files with 211 additions and 73 deletions

View File

@ -5,6 +5,7 @@
#include "cmMem.h"
#include "cmMallocDebug.h"
#include "cmThread.h"
#include "cmTime.h"
#include "cmTaskMgr.h"
cmTaskMgrH_t cmTaskMgrNullHandle = cmSTATIC_NULL_HANDLE;
@ -37,19 +38,25 @@ struct cmTm_str* p;
typedef struct cmTmThread_str
{
struct cmTm_str* p; //
cmThreadH_t thH; //
cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
struct cmTm_str* p; //
cmThreadH_t thH; //
cmTmInst_t* inst; // Ptr to the task instance this thread is executing.
double durSecs;
cmTimeSpec_t t0;
bool deactivateFl;
struct cmTmThread_str* link;
} cmTmThread_t;
typedef struct cmTm_str
{
cmErr_t err;
cmTmThread_t* thArray; //
unsigned threadCnt; //
cmThreadH_t mstrThH; //
cmTmThread_t* threads; //
unsigned maxActiveThreadCnt; //
unsigned threadRecdCnt;
cmTaskMgrStatusCb_t statusCb; //
void* statusCbArg; //
unsigned pauseSleepMs;
unsigned pauseSleepMs; //
cmTs1p1cH_t inQueH; // client->mgr
cmTsMp1cH_t outQueH; // mgr->client
cmTmTask_t* tasks; //
@ -151,7 +158,7 @@ bool _cmTmWorkerThreadFunc(void* arg)
// if the task was paused or killed while it was queued then
// cmTaskMgrHandleCommand() will do the right thing
if( cmTaskMgrHandleCommand(&r) != kKillTmId )
if( cmTaskMgrWorkerHandleCommand(&r) != kKillTmId )
{
trp->inst->status = kStartedTmId;
@ -171,56 +178,185 @@ bool _cmTmWorkerThreadFunc(void* arg)
_cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0);
trp->inst = NULL;
// Force the thread to go into the 'pause' state when it
// returns to it's internal loop. The master thread recognizes paused
// threads as available.
// threads as available for reuse.
cmThreadPause(trp->thH,kPauseThFl);
return true;
}
void _cmTmMasterRptError( cmTm_t* p, unsigned rc, const cmChar_t* msg )
{
assert(0);
}
int _cmTmSortThreadByDur( const void* t0, const void* t1 )
{
double d = ((cmTmThread_t*)t0)->durSecs - ((cmTmThread_t*)t1)->durSecs;
return d== 0 ? 0 : (d<0 ? -1 : 1);
}
// This is the master thread function.
bool _cmTmMasterThreadFunc(void* arg)
{
cmTmThread_t* trp = (cmTmThread_t*)arg;
cmTm_t* p = trp->p;
cmTm_t* p = (cmTm_t*)arg;
unsigned activeCnt = 0;
cmTmThread_t* trp = p->threads;
if( p->threadRecdCnt > 0 )
{
cmTmThread_t* thArray[p->threadRecdCnt];
unsigned deactivatedCnt = 0;
// for each thread record
for(trp=p->threads; trp!=NULL; trp=trp->link)
{
cmThStateId_t thState;
// if this thread is active ...
if( (thState = cmThreadState(trp->thH)) != kPausedThId )
{
// update the task lifetime duration
cmTimeSpec_t t1;
cmTimeGet(&t1);
trp->durSecs += (double)cmTimeElapsedMicros(&trp->t0,&t1) / 1000000.0;
trp->t0 = t1;
assert(trp->inst!=NULL);
// if the task assoc'd with this thread is running
if( trp->inst->status == kStartedTmId )
{
thArray[activeCnt] = trp;
++activeCnt;
}
// count the number of deactivated threads
if( trp->deactivateFl )
++deactivatedCnt;
}
}
// The first 'activeCnt' elements of thArray[] now point to
// cmTmThread_t records of the active tasks.
// if more tasks are active than should be - then deactive the youngest
if( activeCnt > p->maxActiveThreadCnt )
{
// sort the active tasks in increasing order of lifetime
qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
// determine the number of threads that need to be paused
int n = activeCnt - p->maxActiveThreadCnt;
int i;
// pause the active threads with the lowest lifetime
for(i=0; i<n ; ++i)
if( thArray[i]->deactivateFl == false )
{
thArray[i]->deactivateFl = true;
++deactivatedCnt;
}
}
// if there are deactivated tasks and the max thread count has not been reached
// then re-activate some of the deactivated tasks.
if( activeCnt < p->maxActiveThreadCnt && deactivatedCnt > 0 )
{
// sort the active tasks in increasing order of lifetime
qsort(&thArray[0],activeCnt,sizeof(thArray[0]),_cmTmSortThreadByDur);
int n = cmMin(p->maxActiveThreadCnt - activeCnt, deactivatedCnt );
int i;
// re-activate the oldest deactivated tasks
for(i=activeCnt-1; i>=0 && n>0; --i)
if( thArray[i]->deactivateFl )
{
thArray[i]->deactivateFl = false;
--n;
++activeCnt;
}
}
}
// if a queued task exists
while( cmTs1p1cMsgWaiting(p->inQueH) )
{
unsigned i;
cmTmInst_t* ip = NULL;
cmTmInst_t* ip = NULL;
cmTmThread_t* atrp = NULL;
// find an available worker thread
for(i=1; i<p->threadCnt; ++i)
if( cmThreadState(p->thArray[i].thH) == kPausedThId )
activeCnt = 0;
// Find a worker thread that is in the 'paused' state.
// This is the definitive indication that the thread
// does not have an assigned instance
for(trp=p->threads; trp!=NULL; trp=trp->link)
{
if( cmThreadState(trp->thH) == kPausedThId )
atrp = trp;
else
++activeCnt;
}
// If the maximum number of active threads already exists then we cannot start a new task
if( activeCnt >= p->maxActiveThreadCnt )
break;
// If all the existing worker threads are busy
// but the maximum number of threads has not yet been allocated ...
if( atrp==NULL && p->threadRecdCnt < p->maxActiveThreadCnt)
{
// ... then create a new worker thread recd
atrp = cmMemAllocZ(cmTmThread_t,1);
// ... create the new worker thread
if( cmThreadCreate(&atrp->thH,_cmTmWorkerThreadFunc,atrp,p->err.rpt) != kOkThRC )
{
cmMemFree(atrp);
atrp = NULL;
_cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread create failed.");
break;
}
else
{
// ... setup the new thread record
atrp->p = p;
atrp->link = p->threads;
p->threads = atrp;
// if all worker threads are busy ... give up
if( i==p->threadCnt )
p->threadRecdCnt += 1;
}
}
// if there are no available threads then give up
if( atrp == NULL )
break;
// dequeue a pending task instance pointer from the input queue
if(cmTs1p1cDequeueMsg(p->inQueH,&ip,sizeof(ip)) != kOkThRC )
{
/// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD
continue;
_cmTmMasterRptError(p,kQueueFailTmRC,"Dequeue failed on incoming task instance queue.");
break;
}
// assign the instance to the available thread
p->thArray[i].inst = ip;
// setup the thread record associated with the new task
atrp->inst = ip;
atrp->durSecs = 0;
atrp->deactivateFl = false;
// start the thread and wait for it to enter the running state.
if( cmThreadPause(p->thArray[i].thH,0) != kOkThRC )
{
/// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD
}
// start the worker thread
if( cmThreadPause(atrp->thH,0) != kOkThRC )
_cmTmMasterRptError(p,kThreadFailTmRC,"Worker thread start failed.");
}
cmSleepMs(p->pauseSleepMs);
return true;
@ -283,17 +419,26 @@ cmTmRC_t _cmTmDestroy( cmTm_t* p )
cmTmRC_t rc = kOkTmRC;
unsigned i;
// stop and destroy all the threads
for(i=0; i<p->threadCnt; ++i)
// stop and destroy the master thread
if( cmThreadDestroy(&p->mstrThH) != kOkThRC )
{
if( cmThreadDestroy(&p->thArray[i].thH) != kOkThRC )
{
rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i destroy failed.",i);
goto errLabel;
}
rc = cmErrMsg(&p->err,kThreadFailTmRC,"Master thread destroy failed.");
goto errLabel;
}
cmMemFree(p->thArray);
// stop and destroy all the worker threads
for(i=0; p->threads != NULL; ++i )
{
if( cmThreadDestroy(&p->threads->thH) != kOkThRC )
{
rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread destruction failed for the worker thread at index %i.",i);
goto errLabel;
}
cmTmThread_t* trp = p->threads;
p->threads = p->threads->link;
cmMemFree(trp);
}
// release the input queue
if( cmTs1p1cDestroy(&p->inQueH) != kOkThRC )
@ -341,12 +486,11 @@ cmTmRC_t cmTaskMgrCreate(
cmTaskMgrH_t* hp,
cmTaskMgrStatusCb_t statusCb,
void* statusCbArg,
unsigned threadCnt,
unsigned maxActiveThreadCnt,
unsigned queueByteCnt,
unsigned pauseSleepMs)
{
cmTmRC_t rc = kOkTmRC;
unsigned i;
if((rc = cmTaskMgrDestroy(hp)) != kOkTmRC )
return rc;
@ -355,24 +499,16 @@ cmTmRC_t cmTaskMgrCreate(
cmErrSetup(&p->err,&ctx->rpt,"Task Mgr.");
threadCnt += 1;
p->thArray = cmMemAllocZ(cmTmThread_t,threadCnt);
p->threadCnt = threadCnt;
p->maxActiveThreadCnt = maxActiveThreadCnt;
p->statusCb = statusCb;
p->statusCbArg = statusCbArg;
p->pauseSleepMs = pauseSleepMs;
// create the threads
for(i=0; i<threadCnt; ++i)
// create the master thread
if( cmThreadCreate(&p->mstrThH, _cmTmMasterThreadFunc,p,&ctx->rpt) != kOkThRC )
{
if( cmThreadCreate(&p->thArray[i].thH, i==0 ? _cmTmMasterThreadFunc : _cmTmWorkerThreadFunc, p->thArray+i, &ctx->rpt ) != kOkThRC )
{
rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.",i);
goto errLabel;
}
p->thArray[i].p = p;
rc = cmErrMsg(&p->err,kThreadFailTmRC,"Thread index %i create failed.");
goto errLabel;
}
// create the input queue
@ -480,7 +616,7 @@ bool cmTaskMgrIsEnabled( cmTaskMgrH_t h )
{
cmTm_t* p = _cmTmHandleToPtr(h);
return cmThreadState(p->thArray[0].thH) != kPausedThId;
return cmThreadState(p->mstrThH) != kPausedThId;
}
cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
@ -489,7 +625,7 @@ cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl )
cmTm_t* p = _cmTmHandleToPtr(h);
unsigned flags = (enableFl ? 0 : kPauseThFl) | kWaitThFl;
if( cmThreadPause(p->thArray[0].thH, flags ) != kOkThRC )
if( cmThreadPause(p->mstrThH, flags ) != kOkThRC )
rc = cmErrMsg(&p->err,kThreadFailTmRC,"The master thread failed to %s.",enableFl ? "enable" : "disable" );
return rc;
@ -696,7 +832,7 @@ cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId )
}
cmTaskMgrCtlId_t _cmTaskMgrHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId )
cmTaskMgrCtlId_t _cmTaskMgrWorkerHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId )
{
cmTaskMgrStatusArg_t s;
@ -711,32 +847,32 @@ cmTaskMgrCtlId_t _cmTaskMgrHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatu
a->statusCb(&s);
return cmTaskMgrHandleCommand(a);
return cmTaskMgrWorkerHandleCommand(a);
}
cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a )
cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a )
{
cmTmThread_t* trp = a->reserved;
while( trp->inst->ctlId == kPauseTmId )
while( trp->inst->ctlId == kPauseTmId || trp->deactivateFl == true )
{
// change the instance status to 'paused'.
trp->inst->status = kPausedTmId;
// notify the client of the change in state
cmTaskMgrSendStatus(a,kPausedTmId);
cmTaskMgrWorkerSendStatus(a,kPausedTmId);
// sleep the thread for pauseSleepMs milliseconds
cmSleepMs(a->pauseSleepMs);
// if the task was unpaused while we slept
if( trp->inst->ctlId == kStartTmId )
if( trp->inst->ctlId == kStartTmId && trp->deactivateFl == false )
{
// change the instance status to 'started'.
trp->inst->status = kStartedTmId;
// notify the client of the change in state
cmTaskMgrSendStatus(a,kStartedTmId);
cmTaskMgrWorkerSendStatus(a,kStartedTmId);
}
}
@ -746,11 +882,11 @@ cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a )
return trp->inst->ctlId;
}
cmTaskMgrCtlId_t cmTaskMgrSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
{ return _cmTaskMgrHelper(a,0,statusId); }
cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId )
{ return _cmTaskMgrWorkerHelper(a,0,statusId); }
cmTaskMgrCtlId_t cmTaskMgrSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog )
{ return _cmTaskMgrHelper(a,prog,kInvalidTmId); }
cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog )
{ return _cmTaskMgrWorkerHelper(a,prog,kInvalidTmId); }
//-----------------------------------------------------------------------------
@ -829,17 +965,19 @@ void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s )
// Test worker function.
void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg )
{
if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId )
return;
unsigned prog = 0;
for(; prog<arg->progCnt; ++prog)
{
if( cmTaskMgrHandleCommand(arg) == kKillTmId )
if( cmTaskMgrWorkerHandleCommand(arg) == kKillTmId )
break;
cmSleepMs(1000);
if( cmTaskMgrSendProgress(arg,prog) == kKillTmId )
if( cmTaskMgrWorkerSendProgress(arg,prog) == kKillTmId )
break;
}

View File

@ -36,7 +36,7 @@ extern "C" {
{
kStatusTmId, // Task status updates. These are automatically sent by the system when the task instance changes state.
kProgTmId, // Task progress update. The user function should increment the 'prog' toward 'progCnt'.
kErrorTmId //
kErrorTmId // Error message
} cmSelTmId_t;
typedef enum
@ -81,7 +81,7 @@ extern "C" {
cmTaskMgrH_t* hp,
cmTaskMgrStatusCb_t statusCb,
void* statusCbArg,
unsigned threadCnt,
unsigned maxActiveThreadCnt,
unsigned queueByteCnt,
unsigned pauseSleepMs );
@ -137,9 +137,9 @@ extern "C" {
// -----------------------------------------------------------------------------------
// Worker thread helper functions.
cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a );
cmTaskMgrCtlId_t cmTaskMgrSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId );
cmTaskMgrCtlId_t cmTaskMgrSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog );
cmTaskMgrCtlId_t cmTaskMgrWorkerHandleCommand( cmTaskMgrFuncArg_t* a );
cmTaskMgrCtlId_t cmTaskMgrWorkerSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId );
cmTaskMgrCtlId_t cmTaskMgrWorkerSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog );
cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx);