diff --git a/cwMutex.h b/cwMutex.h index 15a4143..d05e147 100644 --- a/cwMutex.h +++ b/cwMutex.h @@ -15,6 +15,8 @@ namespace cw rc_t lock( handle_t h ); rc_t unlock( handle_t h ); + // Set timeOutMs to 0 to wait forever. + // // Set 'lockThenWaitFl' if the function should lock the mutex prior to waiting. // If 'lockThenWaitFl' is false then the function assumes the mutex is already locked // and directly waits. If 'lockThenWaitFl' is set and the mutex is not already locked diff --git a/cwThread.cpp b/cwThread.cpp index f900e2a..fa56eab 100644 --- a/cwThread.cpp +++ b/cwThread.cpp @@ -3,6 +3,9 @@ #include "cwCommonImpl.h" #include "cwMem.h" #include "cwThread.h" +#include "cwMutex.h" +#include "cwTest.h" +#include "cwTime.h" #include @@ -25,13 +28,15 @@ namespace cw std::atomic stateId; std::atomic doFlags; - cbFunc_t func; - void* funcArg; - unsigned stateMicros; - unsigned pauseMicros; - unsigned sleepMicros; + cbFunc_t func; + void* funcArg; + unsigned stateMicros; + unsigned pauseMicros; + unsigned waitMicros; pthread_attr_t attr; - char* label; + char* label; + + mutex::handle_t mutexH; } thread_t; @@ -50,9 +55,9 @@ namespace cw if(curStateId == stateId ) break; - sleepUs( p->sleepMicros ); + sleepUs( p->waitMicros ); - waitTimeMicroSecs += p->sleepMicros; + waitTimeMicroSecs += p->waitMicros; }while( waitTimeMicroSecs < p->stateMicros ); @@ -83,15 +88,42 @@ namespace cw // if we are in the pause state if( curStateId == kPausedThId ) { - - sleepUs( p->pauseMicros ); + // unlock mutex and block on cond. var. for pauseMicros or until signaled + rc_t rc = waitOnCondVar(p->mutexH, false, p->pauseMicros/1000 ); + + switch(rc) + { + case kTimeOutRC: + // the mutex is not locked + break; + + case kOkRC: + // the cond. var. was signaled and the mutex is locked + break; + + default: + // mutex is not locked + rc = cwLogError(rc,"Condition variable wait failed."); + } + curDoFlags = p->doFlags.load(std::memory_order_acquire); - // check if we have been requested to leave the pause state - if( cwIsFlag(curDoFlags,kDoRunThFl) ) + // if exit was requested - and the mutex is unlocked + if( cwIsFlag(curDoFlags,kDoExitThFl) && rc != kOkRC ) { - p->stateId.store(kRunningThId,std::memory_order_release); + // this will cause the waitOnCondVar() to + // immediately return at the top of the loop + signalCondVar(p->mutexH); + //mutex::lock(p->mutexH); + } + else + { + // check if we have been requested to leave the pause state + if( cwIsFlag(curDoFlags,kDoRunThFl) ) + { + p->stateId.store(kRunningThId,std::memory_order_release); + } } } else // ... we are in running state @@ -125,6 +157,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const { rc_t rc; int sysRC; + bool mutex_is_locked_fl = false; if((rc = destroy(hRef)) != kOkRC ) return rc; @@ -136,7 +169,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const p->stateMicros = stateMicros; p->pauseMicros = pauseMicros; p->stateId = kPausedThId; - p->sleepMicros = 15000; + p->waitMicros = 15000; p->label = mem::duplStr(label); if((sysRC = pthread_attr_init(&p->attr)) != 0) @@ -146,6 +179,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const } else { + /* // Creating the thread in a detached state should prevent it from leaking memory when @@ -158,11 +192,29 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const } else */ - if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 ) - { - p->stateId = kNotInitThId; - rc = cwLogSysError(kOpFailRC,sysRC,"Thread create failed."); - } + + // Create the cond. var mutex + if((rc = mutex::create(p->mutexH )) != kOkRC ) + { + rc = cwLogError(rc,"Thread signal condition mutex create failed."); + goto errLabel; + } + + // Lock the mutex so that it is already locked prior to the first call to waitOnCondVar() + if((rc = mutex::lock(p->mutexH)) != kOkRC ) + { + rc = cwLogError(rc,"Thread signal condition mutex lock failed."); + goto errLabel; + } + + mutex_is_locked_fl = true; + + // create the thread - in paused state + if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 ) + { + p->stateId = kNotInitThId; + rc = cwLogSysError(kOpFailRC,sysRC,"Thread create failed."); + } } if( label != nullptr ) @@ -174,6 +226,16 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const cwLogInfo("Thread %s id:%p created.",cwStringNullGuard(label), p->pThreadH); +errLabel: + + if( rc != kOkRC && p->mutexH.isValid() ) + { + if( mutex_is_locked_fl ) + mutex::unlock(p->mutexH); + + mutex::destroy(p->mutexH); + } + return rc; } @@ -200,7 +262,12 @@ cw::rc_t cw::thread::destroy( handle_t& hRef ) //if( pthread_attr_destroy(&p->attr) != 0 ) // rc = cwLogError(kOpFailRC,"Thread attribute destroy failed."); - + + if( p->mutexH.isValid() ) + { + mutex::unlock(p->mutexH); + mutex::destroy(p->mutexH); + } mem::release(p->label); mem::release(p); hRef.clear(); @@ -231,13 +298,19 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags ) { p->doFlags.store(kDoRunThFl,std::memory_order_release); waitId = kRunningThId; + if((rc = signalCondVar(p->mutexH)) != kOkRC ) + { + cwLogError(rc,"Cond. var. signalling failed."); + goto errLabel; + } } if( waitFl ) rc = _waitForState(p,waitId); +errLabel: if( rc != kOkRC ) - cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i sleepMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->sleepMicros); + cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i waitMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->waitMicros); return rc; @@ -290,9 +363,21 @@ unsigned cw::thread::pauseMicros( handle_t h ) namespace cw { + time::spec_t g_t0{}; + time_t g_micros = 0; + unsigned g_n = 0; bool _threadTestCb( void* p ) { - unsigned* ip = (unsigned*)p; + if( g_t0.tv_nsec != 0 ) + { + time::spec_t t1; + time::get(t1); + g_micros += time::elapsedMicros(g_t0,t1); + g_n += 1; + g_t0.tv_nsec = 0; + } + + unsigned* ip = (unsigned*)p; ip[0]++; return true; } @@ -304,10 +389,12 @@ cw::rc_t cw::threadTest() unsigned val = 0; rc_t rc; char c = 0; - + + // create the thread if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC ) return rc; - + + // start the thread if((rc = thread::pause(h,0)) != kOkRC ) goto errLabel; @@ -333,7 +420,10 @@ cw::rc_t cw::threadTest() case 'p': { if( thread::state(h) == thread::kPausedThId ) + { + time::get(g_t0); rc = thread::pause(h,thread::kWaitFl); + } else rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl); @@ -349,6 +439,7 @@ cw::rc_t cw::threadTest() break; case 'q': + printf("wakeup micros:%li cnt:%i avg:%li\n",g_micros,g_n,g_micros/g_n); break; //default: diff --git a/cwTime.cpp b/cwTime.cpp index 7fd5658..c5c9cbc 100644 --- a/cwTime.cpp +++ b/cwTime.cpp @@ -262,15 +262,12 @@ void cw::time::subtractMicros( spec_t& ts, unsigned micros ) void cw::time::advanceMicros( spec_t& ts, unsigned us ) { - const unsigned us_per_sec = 1000000; const unsigned ns_per_sec = 1000000000; - - unsigned sec = us / us_per_sec; - ts.tv_sec += sec; - ts.tv_nsec += (us - sec*us_per_sec)*1000; + ts.tv_nsec += us * 1000; // convert us to nano's - sec = ts.tv_nsec / ns_per_sec; + // check if nano's now have more than ns_pser_sec + time_t sec = ts.tv_nsec / ns_per_sec; ts.tv_sec += sec; ts.tv_nsec -= sec * ns_per_sec; @@ -281,18 +278,14 @@ void cw::time::advanceMicros( spec_t& ts, unsigned us ) void cw::time::advanceMs( spec_t& ts, unsigned ms ) { - const unsigned ms_per_sec = 1000; const unsigned ns_per_sec = 1000000000; - - unsigned sec = ms / ms_per_sec; - - ts.tv_sec += sec; - ts.tv_nsec += (ms - (sec*ms_per_sec)) * 1000000; - - sec = ts.tv_nsec / ns_per_sec; + + ts.tv_nsec += ms * 1000000; + time_t sec = ts.tv_nsec / ns_per_sec; ts.tv_sec += sec; ts.tv_nsec -= sec * ns_per_sec; + } cw::rc_t cw::time::futureMs( spec_t& ts, unsigned ms )