cwThread.cpp : Made stateId and doFlags atomic.

This commit is contained in:
kevin.larke 2020-04-19 13:01:47 -04:00
parent b577d476d3
commit 48ebcc89c7

View File

@ -21,10 +21,12 @@ namespace cw
typedef struct thread_str typedef struct thread_str
{ {
pthread_t pThreadH; pthread_t pThreadH;
stateId_t stateId;
std::atomic<stateId_t> stateId;
std::atomic<unsigned> doFlags;
cbFunc_t func; cbFunc_t func;
void* funcArg; void* funcArg;
unsigned doFlags;
unsigned stateMicros; unsigned stateMicros;
unsigned pauseMicros; unsigned pauseMicros;
unsigned sleepMicros = 15000; unsigned sleepMicros = 15000;
@ -34,22 +36,29 @@ namespace cw
inline thread_t* _handleToPtr(handle_t h) { return handleToPtr<handle_t,thread_t>(h); } inline thread_t* _handleToPtr(handle_t h) { return handleToPtr<handle_t,thread_t>(h); }
// Called from client thread to wait for the internal thread to transition to a specified state.
rc_t _waitForState( thread_t* p, unsigned stateId ) rc_t _waitForState( thread_t* p, unsigned stateId )
{ {
unsigned waitTimeMicroSecs = 0; unsigned waitTimeMicroSecs = 0;
stateId_t curStateId;
while( p->stateId != stateId && waitTimeMicroSecs < p->stateMicros ) do
{ {
sleepUs( p->sleepMicros ); curStateId = p->stateId.load(std::memory_order_acquire);
waitTimeMicroSecs += p->sleepMicros;
}
return p->stateId==stateId ? kOkRC : kTimeOutRC; if(curStateId == stateId )
break;
sleepUs( p->sleepMicros );
}while( waitTimeMicroSecs < p->stateMicros );
return curStateId==stateId ? kOkRC : kTimeOutRC;
} }
void _threadCleanUpCallback(void* p) void _threadCleanUpCallback(void* p)
{ {
((thread_t*)p)->stateId = kExitedThId; ((thread_t*)p)->stateId.store(kExitedThId,std::memory_order_release);
} }
@ -61,37 +70,44 @@ namespace cw
// thread terminates unexpectedly or pthread_cleanup_pop() is called. // thread terminates unexpectedly or pthread_cleanup_pop() is called.
pthread_cleanup_push(_threadCleanUpCallback,p); pthread_cleanup_push(_threadCleanUpCallback,p);
while( cwIsFlag(p->doFlags,kDoExitThFl) == false ) unsigned curDoFlags = 0;
do
{ {
// get the current thread state (running or paused)
stateId_t curStateId = p->stateId.load(std::memory_order_relaxed);
// if we are in the pause state // if we are in the pause state
if( p->stateId == kPausedThId ) if( curStateId == kPausedThId )
{ {
sleepUs( p->pauseMicros ); sleepUs( p->pauseMicros );
curDoFlags = p->doFlags.load(std::memory_order_acquire);
// check if we have been requested to leave the pause state // check if we have been requested to leave the pause state
if( cwIsFlag(p->doFlags,kDoRunThFl) ) if( cwIsFlag(curDoFlags,kDoRunThFl) )
{ {
p->doFlags = cwClrFlag(p->doFlags,kDoRunThFl); p->stateId.store(kRunningThId,std::memory_order_release);
p->stateId = kRunningThId;
} }
} }
else else // ... we are in running state
{ {
// call the user-defined function // call the user-defined function
if( p->func(p->funcArg)==false ) if( p->func(p->funcArg)==false )
break; break;
curDoFlags = p->doFlags.load(std::memory_order_acquire);
// check if we have been requested to enter the pause state // check if we have been requested to enter the pause state
if( cwIsFlag(p->doFlags,kDoPauseThFl) ) if( cwIsFlag(curDoFlags,kDoPauseThFl) )
{ {
p->doFlags = cwClrFlag(p->doFlags,kDoPauseThFl); p->stateId.store(kPausedThId,std::memory_order_release);
p->stateId = kPausedThId;
}
} }
} }
}while( cwIsFlag(curDoFlags,kDoExitThFl) == false );
pthread_cleanup_pop(1); pthread_cleanup_pop(1);
pthread_exit(NULL); pthread_exit(NULL);
@ -157,7 +173,7 @@ cw::rc_t cw::thread::destroy( handle_t& hRef )
thread_t* p = _handleToPtr(hRef); thread_t* p = _handleToPtr(hRef);
// tell the thread to exit // tell the thread to exit
p->doFlags = cwSetFlag(p->doFlags,kDoExitThFl); p->doFlags.store(kDoExitThFl,std::memory_order_release);
// wait for the thread to exit and then deallocate the thread object // wait for the thread to exit and then deallocate the thread object
if((rc = _waitForState(p,kExitedThId)) != kOkRC ) if((rc = _waitForState(p,kExitedThId)) != kOkRC )
@ -184,7 +200,8 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags )
bool pauseFl = cwIsFlag(cmdFlags,kPauseFl); bool pauseFl = cwIsFlag(cmdFlags,kPauseFl);
bool waitFl = cwIsFlag(cmdFlags,kWaitFl); bool waitFl = cwIsFlag(cmdFlags,kWaitFl);
thread_t* p = _handleToPtr(h); thread_t* p = _handleToPtr(h);
bool isPausedFl = p->stateId == kPausedThId; stateId_t curStateId = p->stateId.load(std::memory_order_acquire);
bool isPausedFl = curStateId == kPausedThId;
unsigned waitId; unsigned waitId;
if( isPausedFl == pauseFl ) if( isPausedFl == pauseFl )
@ -192,12 +209,12 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags )
if( pauseFl ) if( pauseFl )
{ {
p->doFlags = cwSetFlag(p->doFlags,kDoPauseThFl); p->doFlags.store(kDoPauseThFl,std::memory_order_release);
waitId = kPausedThId; waitId = kPausedThId;
} }
else else
{ {
p->doFlags = cwSetFlag(p->doFlags,kDoRunThFl); p->doFlags.store(kDoRunThFl,std::memory_order_release);
waitId = kRunningThId; waitId = kRunningThId;
} }
@ -217,7 +234,7 @@ cw::rc_t cw::thread::unpause( handle_t h )
cw::thread::stateId_t cw::thread::state( handle_t h ) cw::thread::stateId_t cw::thread::state( handle_t h )
{ {
thread_t* p = _handleToPtr(h); thread_t* p = _handleToPtr(h);
return p->stateId; return p->stateId.load(std::memory_order_acquire);
} }
cw::thread::thread_id_t cw::thread::id() cw::thread::thread_id_t cw::thread::id()