diff --git a/cwSpScBuf.cpp b/cwSpScBuf.cpp index a7a0992..dca6dd4 100644 --- a/cwSpScBuf.cpp +++ b/cwSpScBuf.cpp @@ -1,6 +1,7 @@ #include "cwCommon.h" #include "cwLog.h" #include "cwCommonImpl.h" +#include "cwTest.h" #include "cwMem.h" #include "cwSpScBuf.h" #include "cwThread.h" diff --git a/cwSpScQueueTmpl.cpp b/cwSpScQueueTmpl.cpp index c11b2df..62bf385 100644 --- a/cwSpScQueueTmpl.cpp +++ b/cwSpScQueueTmpl.cpp @@ -1,6 +1,7 @@ #include "cwCommon.h" #include "cwLog.h" #include "cwCommonImpl.h" +#include "cwTest.h" #include "cwMem.h" #include "cwThread.h" #include "cwThreadMach.h" diff --git a/cwTest.cpp b/cwTest.cpp index 5ba4058..c5c7706 100644 --- a/cwTest.cpp +++ b/cwTest.cpp @@ -30,6 +30,8 @@ #include "cwFlowTest.h" +#include "cwThread.h" +#include "cwThreadMach.h" namespace cw { @@ -54,6 +56,7 @@ namespace cw { "/wt_bank", wt_bank::test }, { "/audio_transform", dsp::test }, { "/wt_note", wt_note::test }, + { "/thread_tasks", thread_tasks::test }, { nullptr, nullptr }, }; diff --git a/cwThread.cpp b/cwThread.cpp index cc098a4..8582c04 100644 --- a/cwThread.cpp +++ b/cwThread.cpp @@ -37,8 +37,9 @@ namespace cw char* label; mutex::handle_t mutexH; - unsigned cycleIdx; - unsigned cycleCnt; + unsigned cycleIdx; // current cycle phase + unsigned cycleCnt; // cycle phase limit + unsigned execCnt; } thread_t; @@ -135,18 +136,23 @@ namespace cw if( p->func(p->funcArg)==false ) break; - p->cycleIdx += 1; - - // if a cycle limit was set then check if the limit was reached - bool cycles_done_fl = p->cycleCnt > 0 && p->cycleIdx >= p->cycleCnt; - curDoFlags = p->doFlags.load(std::memory_order_acquire); - - // check if we have been requested to enter the pause state - if( cwIsNotFlag(curDoFlags,kDoExitThFl) && (cwIsFlag(curDoFlags,kDoPauseThFl) || cycles_done_fl) ) + + if( cwIsNotFlag(curDoFlags,kDoExitThFl) ) { - p->stateId.store(kPausedThId,std::memory_order_release); + p->cycleIdx += 1; + + // if a cycle limit was set then check if the limit was reached + bool cycles_done_fl = p->cycleCnt > 0 && p->cycleIdx >= p->cycleCnt; + + // check if we have been requested to enter the pause state + if( (cwIsFlag(curDoFlags,kDoPauseThFl) || cycles_done_fl) ) + { + p->stateId.store(kPausedThId,std::memory_order_release); + p->doFlags.store(0,std::memory_order_release); + } } + } }while( cwIsFlag(curDoFlags,kDoExitThFl) == false ); @@ -293,7 +299,9 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags, unsigned cycleCnt ) stateId_t curStateId = p->stateId.load(std::memory_order_acquire); bool isPausedFl = curStateId == kPausedThId; stateId_t waitId; - + + p->cycleCnt = cycleCnt; + if( isPausedFl == pauseFl ) return kOkRC; @@ -304,7 +312,6 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags, unsigned cycleCnt ) } else { - p->cycleCnt = cycleCnt; p->doFlags.store(kDoRunThFl,std::memory_order_release); waitId = kRunningThId; if((rc = signalCondVar(p->mutexH)) != kOkRC ) @@ -372,9 +379,10 @@ unsigned cw::thread::pauseMicros( handle_t h ) namespace cw { - time::spec_t g_t0{}; - time_t g_micros = 0; - unsigned g_n = 0; + time::spec_t g_t0 = {0,0}; + time_t g_micros = 0; + unsigned g_n = 0; + bool _threadTestCb( void* p ) { if( g_t0.tv_nsec != 0 ) @@ -398,13 +406,14 @@ cw::rc_t cw::threadTest() unsigned val = 0; rc_t rc; char c = 0; - + unsigned cycleCnt = 0; + // create the thread if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC ) return rc; // start the thread - if((rc = thread::pause(h,0)) != kOkRC ) + if((rc = thread::pause(h,0,cycleCnt)) != kOkRC ) goto errLabel; @@ -419,7 +428,7 @@ cw::rc_t cw::threadTest() switch(c) { case 'o': - cwLogInfo("val: 0x%x\n",val); + cwLogInfo("val: 0x%x %i\n",val,val); break; case 's': @@ -431,7 +440,11 @@ cw::rc_t cw::threadTest() if( thread::state(h) == thread::kPausedThId ) { time::get(g_t0); - rc = thread::pause(h,thread::kWaitFl); + // We don't set kWaitFl w/ cycleCnt>0 because we are running very + // few cycles - the cycles will run and the + // state of the thread will return to 'paused' + // before _waitForState() can notice the 'running' state. + rc = thread::pause(h, cycleCnt==0 ? thread::kWaitFl : 0,cycleCnt); } else rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl); @@ -448,7 +461,7 @@ cw::rc_t cw::threadTest() break; case 'q': - printf("wakeup micros:%li cnt:%i avg:%li\n",g_micros,g_n,g_micros/g_n); + printf("wakeup micros:%li cnt:%i avg:%li\n",g_micros,g_n,g_n>0 ? g_micros/g_n : 0); break; //default: diff --git a/cwThreadMach.cpp b/cwThreadMach.cpp index b35401f..47b974f 100644 --- a/cwThreadMach.cpp +++ b/cwThreadMach.cpp @@ -2,9 +2,12 @@ #include "cwLog.h" #include "cwCommonImpl.h" #include "cwMem.h" +#include "cwMutex.h" #include "cwThread.h" +#include "cwTest.h" #include "cwThreadMach.h" + namespace cw { namespace thread_mach @@ -159,3 +162,356 @@ bool cw::thread_mach::is_shutdown( handle_t h ) return true; } + +//--------------------------------------------------------------------------------------------------- +// thread_tasks +// + + +namespace cw +{ + namespace thread_tasks + { + struct thread_tasks_str; + + typedef struct task_thread_str + { + thread::handle_t threadH; + struct thread_tasks_str* owner; + unsigned threadId; + } task_thread_t; + + typedef struct thread_tasks_str + { + task_thread_t* threadA; // threadA[ threadN ] - arg. records for call to _threadFunc + unsigned threadN; + + task_t* taskA; // taskA[ taskN ] - list of user provided callbacks set by run + unsigned taskN; + + mutex::handle_t mutexH; + bool mutexLockFl; + + std::atomic next_task_idx; + std::atomic done_cnt; + + } thread_tasks_t; + + thread_tasks_t* _handleToPtr( handle_t h ) + { return handleToPtr(h); } + + rc_t _destroy( thread_tasks_t* p ) + { + rc_t rc = kOkRC; + if( p->threadN > 0 && p->threadA != nullptr ) + { + for(unsigned i=0; ithreadN; ++i) + { + rc_t rc0; + if((rc0 = thread::destroy(p->threadA[i].threadH)) != kOkRC ) + { + cwLogError(rc0,"Task thread %i destroy failed.",i); + } + } + + if( p->mutexLockFl ) + { + if((rc = mutex::unlock(p->mutexH)) != kOkRC ) + rc = cwLogError(rc,"Mutex unlock on thread tasks destroy failed."); + else + p->mutexLockFl = false; + } + + if((rc = mutex::destroy(p->mutexH)) != kOkRC ) + rc = cwLogError(rc,"Thread tasks mutex destroy failed."); + } + + mem::release(p->threadA); + mem::release(p); + return rc; + } + + bool _threadFunc( void* arg ) + { + rc_t rc = kOkRC; + task_thread_t* task_thread = (task_thread_t*)arg; + thread_tasks_t* p = task_thread->owner; + + // get the next available task + unsigned nti = p->next_task_idx.fetch_add(1, std::memory_order_acq_rel); + + // if nti is a valid task index ... + if( nti < p->taskN ) + { + // ... then execute the task + task_t* task = p->taskA + nti; + task->rc = task->func( task->arg ); + + // + unsigned done_cnt = p->done_cnt.fetch_add(1, std::memory_order_acq_rel); + + // if the last task is done + if( done_cnt + 1 == p->taskN ) + { + // By taking the lock here we guarantee that the the main thread is + // waiting on the cond. var.. Without doing this we might get here + // before the cond. var. is setup and the main thread will miss the signal. + if((rc = mutex::lock(p->mutexH)) != kOkRC ) + cwLogError(rc,"Last task mutex lock failed."); + else + { + mutex::unlock(p->mutexH); + + // signal the main thread that all tasks are done + if((rc = signalCondVar( p->mutexH )) != kOkRC ) + rc = cwLogError(rc,"Thread tasks signal cond var failed."); + } + + // all tasks are done - pause this thread + thread::pause(task_thread->threadH,thread::kPauseFl); + } + + } + else // ... otherwise pause the thread + { + // you are in the thread callback and so you can't wait - just signal the thread to pause + thread::pause(task_thread->threadH,thread::kPauseFl); + + } + + return true; + } + } +} + + + +cw::rc_t cw::thread_tasks::create( handle_t& hRef, unsigned threadN ) +{ + rc_t rc; + if((rc = destroy(hRef)) != kOkRC ) + return rc; + + thread_tasks_t* p = mem::allocZ(); + const unsigned labelCharN = 255; + char label[ labelCharN + 1 ]; + + p->threadN = threadN; + p->threadA = mem::allocZ(threadN); + + // Create a mutex for the run() blocking cond. var + if((rc = mutex::create( p->mutexH )) != kOkRC ) + { + rc = cwLogError(rc,"Thread tasks mutex failed."); + goto errLabel; + } + + // Lock the mutex so that it is locked on the first call to waitOnCondVar() + if((rc = mutex::lock(p->mutexH)) != kOkRC ) + { + rc = cwLogError(rc,"Thread tasks initial mutex lock failed."); + goto errLabel; + } + + p->mutexLockFl = true; + + for(unsigned i=0; ithreadA[i].owner = p; + p->threadA[i].threadId = i; + + // Threads are create in 'paused' mode + if((rc = thread::create( p->threadA[i].threadH, _threadFunc, p->threadA + i, label )) != kOkRC ) + { + rc = cwLogError(rc,"Task thread create %i failed.",i); + goto errLabel; + } + } + + hRef.set(p); + +errLabel: + if( rc != kOkRC ) + _destroy(p); + + return rc; +} + +cw::rc_t cw::thread_tasks::destroy( handle_t& hRef ) +{ + rc_t rc = kOkRC; + if( !hRef.isValid() ) + return rc; + + thread_tasks_t* p = _handleToPtr(hRef); + + if((rc = _destroy(p)) != kOkRC ) + return rc; + + hRef.clear(); + + return rc; +} + +cw::rc_t cw::thread_tasks::run( handle_t h, task_t* taskA, unsigned taskN, unsigned timeOutMs ) +{ + rc_t rc = kOkRC; + thread_tasks_t* p = _handleToPtr(h); + unsigned activeThreadN = std::min(p->threadN,taskN); + p->taskA = taskA; + p->taskN = taskN; + + p->done_cnt.store(0,std::memory_order_release); + p->next_task_idx.store(0,std::memory_order_release); + + for(unsigned i=0; ithreadA[i].threadH,0)) != kOkRC ) + { + rc = cwLogError(rc,"Task thread %i start failed.",i); + goto errLabel; + } + } + + /* + This spinlock works and is very simple - but it uses up a core which + may be assigned to one of the worker threads. + If threads were given core affinities to avoid this scenario then + the spin lock might be a better solution then the cond. var signaling. + + // block waiting for tasks to complete + while(1) + { + // spin on done_cnt + unsigned done_cnt = p->done_cnt.load(std::memory_order_acquire); + if( done_cnt >= taskN ) + { + //printf("DONE\n"); + break; + } + } + */ + + // block waiting for the the tasks to complete + rc = waitOnCondVar(p->mutexH, false, timeOutMs ); + + switch(rc) + { + case kOkRC: + // mutex is locked + p->mutexLockFl = true; + break; + + case kTimeOutRC: + // mutex is unlocked + p->mutexLockFl = false; + cwLogWarning("Thread tasks timed out."); + break; + + default: + // mutex is unlocked + p->mutexLockFl = false; + rc = cwLogError(rc,"Thread tasks run error."); + } + + /* + // pause all the threads + for(unsigned i=0; ithreadA[i].threadH,thread::kPauseFl | thread::kWaitFl)) != kOkRC ) + { + rc = cwLogError(rc,"Task thread %i post run pause failed.",i); + goto errLabel; + } + } + */ + + // if run failed then pause to threads + if( rc != kOkRC ) + { + // lock the mutex (if it isn't already) + if( !p->mutexLockFl ) + { + if((rc = mutex::lock(p->mutexH)) != kOkRC ) + { + rc = cwLogError(rc,"Thread task lock mutex on error cleanup failed."); + goto errLabel; + } + p->mutexLockFl = true; + } + } + +errLabel: + return rc; +} + + +namespace cw +{ + namespace thread_tasks + { + typedef struct test_task_str + { + std::atomic cnt; + } test_task_t; + + rc_t testThreadFunc( void* arg ) + { + test_task_t* t = (test_task_t*)arg; + + t->cnt.fetch_add(1,std::memory_order_relaxed); + return kOkRC; + } + + } +} + +cw::rc_t cw::thread_tasks::test( const test::test_args_t& args ) +{ + rc_t rc = kOkRC; + const unsigned threadN = 15; + const unsigned taskN = 10; + const unsigned execN = 10; + handle_t ttH; + + test_task_t* test_taskA = mem::allocZ(taskN); + task_t* taskA = mem::allocZ(taskN); + + for(unsigned i=0; i handle_t; + typedef thread::cbFunc_t threadFunc_t; + + // Create a thread tasks machine with threadN records + rc_t create( handle_t& hRef, unsigned threadN ); + rc_t destroy( handle_t& hRef ); + + typedef struct task_str + { + rc_t (*func)(void* arg); + void* arg; + rc_t rc; + } task_t; + + // timeOutMs is the count of milliseconds run will block while waiting + // for all the tasks to complete. + rc_t run( handle_t h, task_t* taskA, unsigned taskN, unsigned timeOutMs=100 ); + + rc_t test( const test::test_args_t& args ); + + } } #endif