diff --git a/cwThread.cpp b/cwThread.cpp index ff5a29f..8812bcf 100644 --- a/cwThread.cpp +++ b/cwThread.cpp @@ -21,10 +21,12 @@ namespace cw typedef struct thread_str { pthread_t pThreadH; - stateId_t stateId; + + std::atomic stateId; + std::atomic doFlags; + cbFunc_t func; void* funcArg; - unsigned doFlags; unsigned stateMicros; unsigned pauseMicros; unsigned sleepMicros = 15000; @@ -34,22 +36,29 @@ namespace cw inline thread_t* _handleToPtr(handle_t h) { return handleToPtr(h); } + // Called from client thread to wait for the internal thread to transition to a specified state. rc_t _waitForState( thread_t* p, unsigned stateId ) { unsigned waitTimeMicroSecs = 0; - - while( p->stateId != stateId && waitTimeMicroSecs < p->stateMicros ) + stateId_t curStateId; + + do { - sleepUs( p->sleepMicros ); - waitTimeMicroSecs += p->sleepMicros; - } + curStateId = p->stateId.load(std::memory_order_acquire); - return p->stateId==stateId ? kOkRC : kTimeOutRC; + if(curStateId == stateId ) + break; + + sleepUs( p->sleepMicros ); + + }while( waitTimeMicroSecs < p->stateMicros ); + + return curStateId==stateId ? kOkRC : kTimeOutRC; } void _threadCleanUpCallback(void* p) { - ((thread_t*)p)->stateId = kExitedThId; + ((thread_t*)p)->stateId.store(kExitedThId,std::memory_order_release); } @@ -61,36 +70,43 @@ namespace cw // thread terminates unexpectedly or pthread_cleanup_pop() is called. pthread_cleanup_push(_threadCleanUpCallback,p); - while( cwIsFlag(p->doFlags,kDoExitThFl) == false ) + unsigned curDoFlags = 0; + + do { + // get the current thread state (running or paused) + stateId_t curStateId = p->stateId.load(std::memory_order_relaxed); // if we are in the pause state - if( p->stateId == kPausedThId ) + if( curStateId == kPausedThId ) { sleepUs( p->pauseMicros ); + curDoFlags = p->doFlags.load(std::memory_order_acquire); + // check if we have been requested to leave the pause state - if( cwIsFlag(p->doFlags,kDoRunThFl) ) + if( cwIsFlag(curDoFlags,kDoRunThFl) ) { - p->doFlags = cwClrFlag(p->doFlags,kDoRunThFl); - p->stateId = kRunningThId; + p->stateId.store(kRunningThId,std::memory_order_release); } } - else + else // ... we are in running state { // call the user-defined function if( p->func(p->funcArg)==false ) break; + curDoFlags = p->doFlags.load(std::memory_order_acquire); + // check if we have been requested to enter the pause state - if( cwIsFlag(p->doFlags,kDoPauseThFl) ) + if( cwIsFlag(curDoFlags,kDoPauseThFl) ) { - p->doFlags = cwClrFlag(p->doFlags,kDoPauseThFl); - p->stateId = kPausedThId; + p->stateId.store(kPausedThId,std::memory_order_release); } } - } + + }while( cwIsFlag(curDoFlags,kDoExitThFl) == false ); pthread_cleanup_pop(1); @@ -157,7 +173,7 @@ cw::rc_t cw::thread::destroy( handle_t& hRef ) thread_t* p = _handleToPtr(hRef); // tell the thread to exit - p->doFlags = cwSetFlag(p->doFlags,kDoExitThFl); + p->doFlags.store(kDoExitThFl,std::memory_order_release); // wait for the thread to exit and then deallocate the thread object if((rc = _waitForState(p,kExitedThId)) != kOkRC ) @@ -184,7 +200,8 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags ) bool pauseFl = cwIsFlag(cmdFlags,kPauseFl); bool waitFl = cwIsFlag(cmdFlags,kWaitFl); thread_t* p = _handleToPtr(h); - bool isPausedFl = p->stateId == kPausedThId; + stateId_t curStateId = p->stateId.load(std::memory_order_acquire); + bool isPausedFl = curStateId == kPausedThId; unsigned waitId; if( isPausedFl == pauseFl ) @@ -192,12 +209,12 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags ) if( pauseFl ) { - p->doFlags = cwSetFlag(p->doFlags,kDoPauseThFl); + p->doFlags.store(kDoPauseThFl,std::memory_order_release); waitId = kPausedThId; } else { - p->doFlags = cwSetFlag(p->doFlags,kDoRunThFl); + p->doFlags.store(kDoRunThFl,std::memory_order_release); waitId = kRunningThId; } @@ -217,7 +234,7 @@ cw::rc_t cw::thread::unpause( handle_t h ) cw::thread::stateId_t cw::thread::state( handle_t h ) { thread_t* p = _handleToPtr(h); - return p->stateId; + return p->stateId.load(std::memory_order_acquire); } cw::thread::thread_id_t cw::thread::id()