diff --git a/cmTaskMgr.c b/cmTaskMgr.c index 16652d2..7d10aa1 100644 --- a/cmTaskMgr.c +++ b/cmTaskMgr.c @@ -28,7 +28,8 @@ typedef struct cmTmInst_str cmStatusTmId_t status; void* result; unsigned resultByteCnt; - cmTaskMgrCtlId_t ctlId; + cmTaskMgrCtlId_t ctlId; // ctlId must only be written from the client thread + bool deleteOnCompleteFl; // delete this instance when its status indicates that it is killed or complete struct cmTmInst_str* link; } cmTmInst_t; @@ -45,7 +46,7 @@ typedef struct cmTm_str { cmErr_t err; cmTmThread_t* thArray; // - unsigned threadCnt; // + unsigned threadCnt; // cmTaskMgrStatusCb_t statusCb; // void* statusCbArg; // unsigned pauseSleepMs; @@ -56,6 +57,28 @@ typedef struct cmTm_str unsigned nextInstId; } cmTm_t; + +void _cmTaskMgrStatusArgSetup( + cmTaskMgrStatusArg_t* s, + void* arg, + unsigned instId, + cmSelTmId_t selId, + cmStatusTmId_t statusId, + unsigned prog, + const cmChar_t* msg, + void* result, + unsigned resultByteCnt ) +{ + s->arg = arg; + s->instId = instId; + s->selId = selId; + s->statusId = statusId; + s->prog = prog; + s->msg = msg; + s->result = result; + s->resultByteCnt = resultByteCnt; +} + // WARNING: THIS FUNCTION IS CALLED BY BOTH THE WORKER AND THE MASTER THREAD. cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s ) { @@ -67,7 +90,7 @@ cmTmRC_t _cmTmEnqueueStatusMsg0( cmTm_t* p, const cmTaskMgrStatusArg_t* s ) msgPtrArray[1] = s->msg==NULL ? "" : s->msg; msgPtrArray[2] = s->result; - msgSizeArray[0] = sizeof(*s); + msgSizeArray[0] = sizeof(cmTaskMgrStatusArg_t); msgSizeArray[1] = s->msg==NULL ? 1 : strlen(s->msg)+1; msgSizeArray[2] = s->resultByteCnt; @@ -92,13 +115,7 @@ cmTmRC_t _cmTmEnqueueStatusMsg1( unsigned resultByteCnt ) { cmTaskMgrStatusArg_t s; - s.arg = p->statusCbArg; - s.instId = instId; - s.selId = selId; - s.statusId = statusId; - s.msg = msg; - s.result = result; - s.resultByteCnt = resultByteCnt; + _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,instId,selId,statusId,prog,msg,result,resultByteCnt); return _cmTmEnqueueStatusMsg0(p,&s); } @@ -124,25 +141,37 @@ bool _cmTmWorkerThreadFunc(void* arg) cmTmThread_t* trp = (cmTmThread_t*)arg; cmTaskMgrFuncArg_t r; + r.reserved = trp; r.arg = trp->inst->funcArg; r.instId = trp->inst->instId; r.statusCb = _cmTmWorkerStatusCb; r.statusCbArg = trp; r.progCnt = trp->inst->progCnt; - r.cmdIdPtr = &trp->inst->ctlId; r.pauseSleepMs= trp->p->pauseSleepMs; + + // if the task was paused or killed while it was queued then + // cmTaskMgrHandleCommand() will do the right thing + if( cmTaskMgrHandleCommand(&r) != kKillTmId ) + { + trp->inst->status = kStartedTmId; - // Notify the client that the instance has started. - _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kStartedTmId,0,NULL,NULL,0); + // Notify the client that the instance has started. + _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0); - // Execute the client provided task function. - trp->inst->task->func(&r); + // Execute the client provided task function. + trp->inst->task->func(&r); + } // Notify the client that the instance has completed or been killed if( trp->inst->ctlId == kKillTmId ) - _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kKilledTmId,0,NULL,NULL,0); + trp->inst->status = kKilledTmId; else - _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,kCompletedTmId,0,NULL,NULL,0); + trp->inst->status = kCompletedTmId; + + _cmTmEnqueueStatusMsg1(trp->p,trp->inst->instId,kStatusTmId,trp->inst->status,0,NULL,NULL,0); + + + trp->inst = NULL; // Force the thread to go into the 'pause' state when it // returns to it's internal loop. The master thread recognizes paused @@ -158,7 +187,7 @@ bool _cmTmMasterThreadFunc(void* arg) { cmTmThread_t* trp = (cmTmThread_t*)arg; cmTm_t* p = trp->p; - + while( cmTs1p1cMsgWaiting(p->inQueH) ) { unsigned i; @@ -185,13 +214,15 @@ bool _cmTmMasterThreadFunc(void* arg) // start the thread and wait for it to enter the running state. - if( cmThreadPause(p->thArray[i].thH,kWaitThFl) != kOkThRC ) + if( cmThreadPause(p->thArray[i].thH,0) != kOkThRC ) { /// ??????? HOW DO WE HANDLE ERRORS IN THE MASTER THREAD } } + cmSleepMs(p->pauseSleepMs); + return true; } @@ -224,7 +255,6 @@ cmTmInst_t* _cmTmInstFromId( cmTm_t* p, unsigned instId ) cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId ) { - cmTmRC_t rc = kOkTmRC; cmTmInst_t* ip = p->insts; cmTmInst_t* pp = NULL; @@ -239,12 +269,12 @@ cmTmRC_t _cmTmInstFree( cmTm_t* p, unsigned instId ) cmMemFree(ip->result); cmMemFree(ip); - break; + return kOkTmRC; } pp = ip; } - return rc; + return cmErrMsg(&p->err,kAssertFailTmRC,"The instance %i could not be found to be deleted.",instId); } @@ -359,6 +389,8 @@ cmTmRC_t cmTaskMgrCreate( goto errLabel; } + hp->h = p; + errLabel: return rc; } @@ -366,7 +398,7 @@ cmTmRC_t cmTaskMgrCreate( cmTmRC_t cmTaskMgrDestroy( cmTaskMgrH_t* hp ) { cmTmRC_t rc = kOkTmRC; - if( hp!=NULL || cmTaskMgrIsValid(*hp)==false ) + if( hp==NULL || cmTaskMgrIsValid(*hp)==false ) return rc; cmTm_t* p = _cmTmHandleToPtr(*hp); @@ -392,7 +424,7 @@ cmRC_t _cmTmMasterOutQueueCb(void* arg, unsigned msgByteCnt, const void* msgData // This is probably not nesessary since changing the memory // pointed to by msgDataPtr should be safe even though it is marked as const. - memcpy(&s,&msgDataPtr,sizeof(s)); + memcpy(&s,msgDataPtr,sizeof(s)); // The 'msg' and 'result' data have been serialized after the status record. // The 'msg' is guaranteed to at least contain a terminating zero. @@ -416,6 +448,7 @@ cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h ) cmTmRC_t rc = kOkTmRC; cmTm_t* p = _cmTmHandleToPtr(h); + // Transmit any msgs waiting to be sent to the client. while( cmTsMp1cMsgWaiting(p->outQueH) ) { // calling this function calls: _cmTmMasterOutQueueCb() @@ -424,8 +457,19 @@ cmTmRC_t cmTaskMgrOnIdle( cmTaskMgrH_t h ) rc = cmErrMsg(&p->err,kQueueFailTmRC,"The output queue failed during a dequeue."); goto errLabel; } + } + // Step through the instance list and delete instances that are + // completed and also marked for deletion. + cmTmInst_t* ip = p->insts; + while( ip != NULL ) + { + cmTmInst_t* np = ip->link; + + if( (ip->status==kCompletedTmId || ip->status==kKilledTmId) && ip->deleteOnCompleteFl ) + _cmTmInstFree(p,ip->instId); + ip = np; } errLabel: @@ -436,10 +480,7 @@ bool cmTaskMgrIsEnabled( cmTaskMgrH_t h ) { cmTm_t* p = _cmTmHandleToPtr(h); - if( cmThreadState(p->thArray[0].thH) != kPausedThId ) - return false; - - return true; + return cmThreadState(p->thArray[0].thH) != kPausedThId; } cmTmRC_t cmTaskMgrEnable( cmTaskMgrH_t h, bool enableFl ) @@ -489,28 +530,38 @@ cmTmRC_t cmTaskMgrCall( cmTmTask_t* tp = NULL; cmTmInst_t* ip = NULL; - if((tp = _cmTmTaskFromId(p,taskId)) != NULL ) + if( retInstIdPtr != NULL ) + *retInstIdPtr = cmInvalidId; + + // locate the task for this instance + if((tp = _cmTmTaskFromId(p,taskId)) == NULL ) { rc = cmErrMsg(&p->err,kInvalidArgTmRC,"Task not found for task id=%i.",taskId); goto errLabel; } + // allocate a new instance record ip = cmMemAllocZ(cmTmInst_t,1); + // setupt the instance record ip->instId = p->nextInstId++; ip->task = tp; ip->funcArg = funcArg; ip->progCnt = progCnt; ip->status = kQueuedTmId; + // insert the new instance at the end of the instance list if( p->insts == NULL ) p->insts = ip; else { cmTmInst_t* pp = p->insts; - while( pp != NULL ) + for(; pp != NULL; pp=pp->link ) if( pp->link == NULL ) + { pp->link = ip; + break; + } } @@ -521,17 +572,15 @@ cmTmRC_t cmTaskMgrCall( goto errLabel; } - // notify the client that the instance was enqueued - // ??????????????? - // (is this ok??? - we are inserting into p->outQueH from the client thread) - // it would be safe to simply callback directly - // ??????????????? - if( _cmTmEnqueueStatusMsg1(p,ip->instId,kStatusTmId,kQueuedTmId,0,NULL,NULL,0) != kOkTmRC ) - { - cmErrMsg(&p->err,kQueueFailTmRC,"The 'queued' status update message failed to enqueue."); - goto errLabel; - } + // set the returned instance id + if( retInstIdPtr != NULL ) + *retInstIdPtr = ip->instId; + // notify the client that the instance was enqueued + cmTaskMgrStatusArg_t s; + _cmTaskMgrStatusArgSetup(&s,p->statusCbArg,ip->instId,kStatusTmId,kQueuedTmId,0,NULL,NULL,0); + + p->statusCb( &s ); errLabel: return rc; @@ -549,9 +598,34 @@ cmTmRC_t cmTaskMgrTaskCtl( cmTaskMgrH_t h, unsigned instId, cmTaskMgrCtlId_t ctl goto errLabel; } - // once the ctl id is set to kKillTmId don't allow it to change - if( ip->ctlId != kKillTmId ) - ip->ctlId = ctlId; + // Once an instance ctlId is set to kKillTmId don't allow it to change. + if( ip->ctlId == kKillTmId ) + return rc; + + switch(ctlId ) + { + case kNoneTmId: + break; + + case kStartTmId: + // Acting on a 'start' cmd only makes sense if the previous command was 'pause' + if( ip->ctlId == kPauseTmId ) + ip->ctlId = kStartTmId; + break; + + case kPauseTmId: + + // Acting on a 'pause' command only makes sense if this is the first command + // or the previous command was a 'start' + if( ip->ctlId == kNoneTmId || ip->ctlId == kStartTmId ) + ip->ctlId = kPauseTmId; + break; + + case kKillTmId: + ip->ctlId = kKillTmId; + break; + } + errLabel: return rc; @@ -604,32 +678,81 @@ unsigned cmTaskMgrResultByteCount( cmTaskMgrH_t h, unsigned instId ) return ip->resultByteCnt; } -cmTmRC_t cmTaskMgrResultDelete( cmTaskMgrH_t h, unsigned instId ) +cmTmRC_t cmTaskMgrInstDelete( cmTaskMgrH_t h, unsigned instId ) { cmTmRC_t rc = kOkTmRC; cmTm_t* p = _cmTmHandleToPtr(h); + cmTmInst_t* ip = NULL; - if((rc = _cmTmInstFree(p,instId)) != kOkTmRC ) - rc = cmErrMsg(&p->err,kOpFailTmRC,"The instace delete failed on instance id %i.",instId); + if((ip = _cmTmInstFromId(p,instId)) == NULL ) + { + cmErrMsg(&p->err,kInvalidArgTmRC,"The task instance associated with id %i could not be found.",instId); + return 0; + } + + ip->deleteOnCompleteFl = true; return rc; } +cmTaskMgrCtlId_t _cmTaskMgrHelper( cmTaskMgrFuncArg_t* a, unsigned prog, cmStatusTmId_t statusId ) +{ + cmTaskMgrStatusArg_t s; + + _cmTaskMgrStatusArgSetup( + &s, + a->statusCbArg, + a->instId, + statusId == kInvalidTmId ? kProgTmId : kStatusTmId, + statusId == kInvalidTmId ? kStartedTmId : statusId, + statusId == kInvalidTmId ? prog : 0, + NULL,NULL,0); + + a->statusCb(&s); + + return cmTaskMgrHandleCommand(a); +} + cmTaskMgrCtlId_t cmTaskMgrHandleCommand( cmTaskMgrFuncArg_t* a ) { + cmTmThread_t* trp = a->reserved; - while( *(a->cmdIdPtr) == kPauseTmId ) + while( trp->inst->ctlId == kPauseTmId ) { - // ???????? - // maybe we should send a status code to notify the client that the instance has paused. - /// + // change the instance status to 'paused'. + trp->inst->status = kPausedTmId; + + // notify the client of the change in state + cmTaskMgrSendStatus(a,kPausedTmId); + + // sleep the thread for pauseSleepMs milliseconds cmSleepMs(a->pauseSleepMs); + + // if the task was unpaused while we slept + if( trp->inst->ctlId == kStartTmId ) + { + // change the instance status to 'started'. + trp->inst->status = kStartedTmId; + + // notify the client of the change in state + cmTaskMgrSendStatus(a,kStartedTmId); + } } - return *(a->cmdIdPtr); + // if ctlId==kKillTmId then the status update will be handled + // when the task custom function returns in _cmTmWorkerThreadFunc() + + return trp->inst->ctlId; } +cmTaskMgrCtlId_t cmTaskMgrSendStatus( cmTaskMgrFuncArg_t* a, cmStatusTmId_t statusId ) +{ return _cmTaskMgrHelper(a,0,statusId); } + +cmTaskMgrCtlId_t cmTaskMgrSendProgress( cmTaskMgrFuncArg_t* a, unsigned prog ) +{ return _cmTaskMgrHelper(a,prog,kInvalidTmId); } + + //----------------------------------------------------------------------------- enum { kMaxTestInstCnt = 3 }; @@ -647,7 +770,7 @@ typedef struct cmTmTestApp_str void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s ) { - cmRptPrintf(rpt,"%i ",s->instId ); + cmRptPrintf(rpt,"inst:%i ",s->instId ); switch( s->selId ) { @@ -658,7 +781,6 @@ void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s ) { case kInvalidTmId: label=""; break; case kQueuedTmId: label="Queued"; break; - case kQueuedPausedTmId: label="Queued-Paused."; break; case kStartedTmId: label="Started"; break; case kCompletedTmId: label="Completed"; break; case kKilledTmId: label="Killed"; break; @@ -685,6 +807,7 @@ void _cmTmTestReportStatus( cmRpt_t* rpt, const cmTaskMgrStatusArg_t* s ) } +// Test client status callback function. void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s ) { // s.arg set from cmTaskMgrCreate( ..., statusCbArg, ...); @@ -703,26 +826,21 @@ void _cmTmTestStatusCb( const cmTaskMgrStatusArg_t* s ) } +// Test worker function. void _cmTmTestFunc(cmTaskMgrFuncArg_t* arg ) { - cmTaskMgrStatusArg_t s; - memset(&s,0,sizeof(s)); - s.arg = arg->statusCbArg; - s.instId = arg->instId; - s.selId = kProgTmId; - s.statusId = kStartedTmId; - s.prog = 0; - s.msg = NULL; - s.result = NULL; - s.resultByteCnt = 0; - for(; s.progprogCnt; ++s.prog) + unsigned prog = 0; + + for(; progprogCnt; ++prog) { if( cmTaskMgrHandleCommand(arg) == kKillTmId ) break; cmSleepMs(1000); - arg->statusCb(&s); + + if( cmTaskMgrSendProgress(arg,prog) == kKillTmId ) + break; } } @@ -745,7 +863,7 @@ cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx) app.err = &ctx->err; // create the task mgr - if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC ) + if( cmTaskMgrCreate( ctx,&tmH,_cmTmTestStatusCb,&app,threadCnt,queueByteCnt,pauseSleepMs) != kOkTmRC ) { rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Task mgr create failed."); goto errLabel; @@ -760,16 +878,24 @@ cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx) // go into interactive mode + printf("q=quit e=enable c=call i=idle\n"); while((c = getchar()) != 'q') { switch(c) { + case 'i': + cmTaskMgrOnIdle(tmH); + cmRptPrintf(&ctx->rpt,"idled\n"); + break; + case 'e': { // toggle the enable state of the task mgr. - bool fl = cmTaskMgrIsEnabled(tmH); - if( cmTaskMgrEnable(tmH,!fl) != kOkTmRC ) + bool fl = !cmTaskMgrIsEnabled(tmH); + if( cmTaskMgrEnable(tmH,fl) != kOkTmRC ) rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test enable failed."); + else + cmRptPrintf(&ctx->rpt,"%s\n", fl ? "enabled" : "disabled" ); } break; @@ -781,7 +907,10 @@ cmTmRC_t cmTaskMgrTest(cmCtx_t* ctx) if( cmTaskMgrCall( tmH, taskId, funcArg, progCnt, &app.insts[nextInstId].instId ) != kOkTmRC ) rc = cmErrMsg(&ctx->err,kTestFailTmRC,"Test call failed."); else + { ++nextInstId; + cmRptPrintf(&ctx->rpt,"called\n"); + } } } }