465 lines
11 KiB
C++
465 lines
11 KiB
C++
#include "cwCommon.h"
|
|
#include "cwLog.h"
|
|
#include "cwCommonImpl.h"
|
|
#include "cwMem.h"
|
|
#include "cwThread.h"
|
|
#include "cwMutex.h"
|
|
#include "cwTest.h"
|
|
#include "cwTime.h"
|
|
|
|
#include <pthread.h>
|
|
|
|
namespace cw
|
|
{
|
|
|
|
namespace thread
|
|
{
|
|
enum
|
|
{
|
|
kDoExitThFl = 0x01,
|
|
kDoPauseThFl = 0x02,
|
|
kDoRunThFl = 0x04
|
|
};
|
|
|
|
typedef struct thread_str
|
|
{
|
|
pthread_t pThreadH;
|
|
|
|
std::atomic<stateId_t> stateId;
|
|
std::atomic<unsigned> doFlags;
|
|
|
|
cbFunc_t func;
|
|
void* funcArg;
|
|
unsigned stateMicros;
|
|
unsigned pauseMicros;
|
|
unsigned waitMicros;
|
|
pthread_attr_t attr;
|
|
char* label;
|
|
|
|
mutex::handle_t mutexH;
|
|
unsigned cycleIdx;
|
|
unsigned cycleCnt;
|
|
|
|
} thread_t;
|
|
|
|
inline thread_t* _handleToPtr(handle_t h) { return handleToPtr<handle_t,thread_t>(h); }
|
|
|
|
// Called from client thread to wait for the internal thread to transition to a specified state.
|
|
rc_t _waitForState( thread_t* p, stateId_t stateId )
|
|
{
|
|
unsigned waitTimeMicroSecs = 0;
|
|
stateId_t curStateId;
|
|
|
|
do
|
|
{
|
|
curStateId = p->stateId.load(std::memory_order_acquire);
|
|
|
|
if(curStateId == stateId )
|
|
break;
|
|
|
|
sleepUs( p->waitMicros );
|
|
|
|
waitTimeMicroSecs += p->waitMicros;
|
|
|
|
}while( waitTimeMicroSecs < p->stateMicros );
|
|
|
|
return curStateId==stateId ? kOkRC : kTimeOutRC;
|
|
}
|
|
|
|
void _threadCleanUpCallback(void* p)
|
|
{
|
|
((thread_t*)p)->stateId.store(kExitedThId,std::memory_order_release);
|
|
}
|
|
|
|
|
|
void* _threadCallback(void* param)
|
|
{
|
|
thread_t* p = (thread_t*)param;
|
|
|
|
// set a clean up handler - this will be called when the
|
|
// thread terminates unexpectedly or pthread_cleanup_pop() is called.
|
|
pthread_cleanup_push(_threadCleanUpCallback,p);
|
|
|
|
unsigned curDoFlags = 0;
|
|
|
|
do
|
|
{
|
|
// get the current thread state (running or paused)
|
|
stateId_t curStateId = p->stateId.load(std::memory_order_relaxed);
|
|
|
|
// if we are in the pause state
|
|
if( curStateId == kPausedThId )
|
|
{
|
|
// unlock mutex and block on cond. var. for pauseMicros or until signaled
|
|
rc_t rc = waitOnCondVar(p->mutexH, false, p->pauseMicros/1000 );
|
|
|
|
switch(rc)
|
|
{
|
|
case kTimeOutRC:
|
|
// the mutex is not locked
|
|
break;
|
|
|
|
case kOkRC:
|
|
// the cond. var. was signaled and the mutex is locked
|
|
break;
|
|
|
|
default:
|
|
// mutex is not locked
|
|
rc = cwLogError(rc,"Condition variable wait failed.");
|
|
}
|
|
|
|
|
|
curDoFlags = p->doFlags.load(std::memory_order_acquire);
|
|
|
|
// if exit was requested - and the mutex is unlocked
|
|
if( cwIsFlag(curDoFlags,kDoExitThFl) && rc != kOkRC )
|
|
{
|
|
// this will cause the waitOnCondVar() to
|
|
// immediately return at the top of the loop
|
|
signalCondVar(p->mutexH);
|
|
//mutex::lock(p->mutexH);
|
|
}
|
|
else
|
|
{
|
|
// check if we have been requested to leave the pause state
|
|
if( cwIsFlag(curDoFlags,kDoRunThFl) )
|
|
{
|
|
p->cycleIdx = 0;
|
|
p->stateId.store(kRunningThId,std::memory_order_release);
|
|
}
|
|
}
|
|
}
|
|
else // ... we are in running state
|
|
{
|
|
// call the user-defined function
|
|
if( p->func(p->funcArg)==false )
|
|
break;
|
|
|
|
p->cycleIdx += 1;
|
|
|
|
// if a cycle limit was set then check if the limit was reached
|
|
bool cycles_done_fl = p->cycleCnt > 0 && p->cycleIdx >= p->cycleCnt;
|
|
|
|
curDoFlags = p->doFlags.load(std::memory_order_acquire);
|
|
|
|
// check if we have been requested to enter the pause state
|
|
if( cwIsNotFlag(curDoFlags,kDoExitThFl) && (cwIsFlag(curDoFlags,kDoPauseThFl) || cycles_done_fl) )
|
|
{
|
|
p->stateId.store(kPausedThId,std::memory_order_release);
|
|
}
|
|
}
|
|
|
|
}while( cwIsFlag(curDoFlags,kDoExitThFl) == false );
|
|
|
|
pthread_cleanup_pop(1);
|
|
|
|
pthread_exit(NULL);
|
|
|
|
return p;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const char* label, int stateMicros, int pauseMicros )
|
|
{
|
|
rc_t rc;
|
|
int sysRC;
|
|
bool mutex_is_locked_fl = false;
|
|
|
|
if((rc = destroy(hRef)) != kOkRC )
|
|
return rc;
|
|
|
|
thread_t* p = mem::allocZ<thread_t>();
|
|
|
|
p->func = func;
|
|
p->funcArg = funcArg;
|
|
p->stateMicros = stateMicros;
|
|
p->pauseMicros = pauseMicros;
|
|
p->stateId = kPausedThId;
|
|
p->waitMicros = 15000;
|
|
p->label = mem::duplStr(label);
|
|
|
|
if((sysRC = pthread_attr_init(&p->attr)) != 0)
|
|
{
|
|
p->stateId = kNotInitThId;
|
|
rc = cwLogSysError(kOpFailRC,sysRC,"Thread attribute init failed.");
|
|
}
|
|
else
|
|
{
|
|
|
|
/*
|
|
|
|
// Creating the thread in a detached state should prevent it from leaking memory when
|
|
// the thread is closed and pthread_join() is not called but it doesn't seem to work anymore ????
|
|
|
|
if ((sysRC = pthread_attr_setdetachstate(&p->attr, PTHREAD_CREATE_DETACHED)) != 0)
|
|
{
|
|
p->stateId = kNotInitThId;
|
|
rc = cwLogSysError(kOpFailRC,sysRC,"Thread set detach attribute failed.");
|
|
}
|
|
else
|
|
*/
|
|
|
|
// Create the cond. var mutex
|
|
if((rc = mutex::create(p->mutexH )) != kOkRC )
|
|
{
|
|
rc = cwLogError(rc,"Thread signal condition mutex create failed.");
|
|
goto errLabel;
|
|
}
|
|
|
|
// Lock the mutex so that it is already locked prior to the first call to waitOnCondVar()
|
|
if((rc = mutex::lock(p->mutexH)) != kOkRC )
|
|
{
|
|
rc = cwLogError(rc,"Thread signal condition mutex lock failed.");
|
|
goto errLabel;
|
|
}
|
|
|
|
mutex_is_locked_fl = true;
|
|
|
|
// create the thread - in paused state
|
|
if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 )
|
|
{
|
|
p->stateId = kNotInitThId;
|
|
rc = cwLogSysError(kOpFailRC,sysRC,"Thread create failed.");
|
|
}
|
|
}
|
|
|
|
if( label != nullptr )
|
|
pthread_setname_np(p->pThreadH, label);
|
|
|
|
|
|
hRef.set(p);
|
|
|
|
|
|
cwLogInfo("Thread %s id:%p created.",cwStringNullGuard(label), p->pThreadH);
|
|
|
|
errLabel:
|
|
|
|
if( rc != kOkRC && p->mutexH.isValid() )
|
|
{
|
|
if( mutex_is_locked_fl )
|
|
mutex::unlock(p->mutexH);
|
|
|
|
mutex::destroy(p->mutexH);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
cw::rc_t cw::thread::destroy( handle_t& hRef )
|
|
{
|
|
rc_t rc = kOkRC;
|
|
int sysRC;
|
|
|
|
if( !hRef.isValid() )
|
|
return rc;
|
|
|
|
thread_t* p = _handleToPtr(hRef);
|
|
|
|
// tell the thread to exit
|
|
p->doFlags.store(kDoExitThFl,std::memory_order_release);
|
|
|
|
// wait for the thread to exit and then deallocate the thread object
|
|
if((rc = _waitForState(p,kExitedThId)) != kOkRC )
|
|
return cwLogError(rc,"Thread '%s' timed out waiting for destroy.",p->label);
|
|
|
|
// Block until the thread is actually fully cleaned up
|
|
if((sysRC = pthread_join(p->pThreadH,NULL)) != 0)
|
|
rc = cwLogSysError(kOpFailRC,sysRC,"Thread '%s' join failed.",p->label);
|
|
|
|
//if( pthread_attr_destroy(&p->attr) != 0 )
|
|
// rc = cwLogError(kOpFailRC,"Thread attribute destroy failed.");
|
|
|
|
if( p->mutexH.isValid() )
|
|
{
|
|
mutex::unlock(p->mutexH);
|
|
mutex::destroy(p->mutexH);
|
|
}
|
|
mem::release(p->label);
|
|
mem::release(p);
|
|
hRef.clear();
|
|
|
|
return rc;
|
|
}
|
|
|
|
|
|
cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags, unsigned cycleCnt )
|
|
{
|
|
rc_t rc = kOkRC;
|
|
bool pauseFl = cwIsFlag(cmdFlags,kPauseFl);
|
|
bool waitFl = cwIsFlag(cmdFlags,kWaitFl);
|
|
thread_t* p = _handleToPtr(h);
|
|
stateId_t curStateId = p->stateId.load(std::memory_order_acquire);
|
|
bool isPausedFl = curStateId == kPausedThId;
|
|
stateId_t waitId;
|
|
|
|
if( isPausedFl == pauseFl )
|
|
return kOkRC;
|
|
|
|
if( pauseFl )
|
|
{
|
|
p->doFlags.store(kDoPauseThFl,std::memory_order_release);
|
|
waitId = kPausedThId;
|
|
}
|
|
else
|
|
{
|
|
p->cycleCnt = cycleCnt;
|
|
p->doFlags.store(kDoRunThFl,std::memory_order_release);
|
|
waitId = kRunningThId;
|
|
if((rc = signalCondVar(p->mutexH)) != kOkRC )
|
|
{
|
|
cwLogError(rc,"Cond. var. signalling failed.");
|
|
goto errLabel;
|
|
}
|
|
}
|
|
|
|
if( waitFl )
|
|
rc = _waitForState(p,waitId);
|
|
|
|
errLabel:
|
|
if( rc != kOkRC )
|
|
cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i waitMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->waitMicros);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
cw::rc_t cw::thread::unpause( handle_t h, unsigned cycleCnt )
|
|
{ return pause( h, kWaitFl, cycleCnt); }
|
|
|
|
cw::thread::stateId_t cw::thread::state( handle_t h )
|
|
{
|
|
thread_t* p = _handleToPtr(h);
|
|
return p->stateId.load(std::memory_order_acquire);
|
|
}
|
|
|
|
cw::thread::thread_id_t cw::thread::id()
|
|
{
|
|
typedef struct
|
|
{
|
|
union
|
|
{
|
|
thread_id_t id;
|
|
pthread_t pthread_id;
|
|
} u;
|
|
} id_t;
|
|
|
|
id_t id;
|
|
id.u.pthread_id = pthread_self();
|
|
return id.u.id;
|
|
}
|
|
|
|
const char* cw::thread::label( handle_t h )
|
|
{
|
|
thread_t* p = _handleToPtr(h);
|
|
return p->label==nullptr ? "<no_thread_label>" : p->label;
|
|
}
|
|
|
|
|
|
unsigned cw::thread::stateTimeOutMicros( handle_t h)
|
|
{
|
|
thread_t* p = _handleToPtr(h);
|
|
return p->stateMicros;
|
|
}
|
|
|
|
unsigned cw::thread::pauseMicros( handle_t h )
|
|
{
|
|
thread_t* p = _handleToPtr(h);
|
|
return p->pauseMicros;
|
|
}
|
|
|
|
|
|
namespace cw
|
|
{
|
|
time::spec_t g_t0{};
|
|
time_t g_micros = 0;
|
|
unsigned g_n = 0;
|
|
bool _threadTestCb( void* p )
|
|
{
|
|
if( g_t0.tv_nsec != 0 )
|
|
{
|
|
time::spec_t t1;
|
|
time::get(t1);
|
|
g_micros += time::elapsedMicros(g_t0,t1);
|
|
g_n += 1;
|
|
g_t0.tv_nsec = 0;
|
|
}
|
|
|
|
unsigned* ip = (unsigned*)p;
|
|
ip[0]++;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
cw::rc_t cw::threadTest()
|
|
{
|
|
thread::handle_t h;
|
|
unsigned val = 0;
|
|
rc_t rc;
|
|
char c = 0;
|
|
|
|
// create the thread
|
|
if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC )
|
|
return rc;
|
|
|
|
// start the thread
|
|
if((rc = thread::pause(h,0)) != kOkRC )
|
|
goto errLabel;
|
|
|
|
|
|
cwLogInfo("o=print p=pause s=state q=quit\n");
|
|
|
|
while( c != 'q' )
|
|
{
|
|
|
|
c = (char)fgetc(stdin);
|
|
fflush(stdin);
|
|
|
|
switch(c)
|
|
{
|
|
case 'o':
|
|
cwLogInfo("val: 0x%x\n",val);
|
|
break;
|
|
|
|
case 's':
|
|
cwLogInfo("state=%i\n",thread::state(h));
|
|
break;
|
|
|
|
case 'p':
|
|
{
|
|
if( thread::state(h) == thread::kPausedThId )
|
|
{
|
|
time::get(g_t0);
|
|
rc = thread::pause(h,thread::kWaitFl);
|
|
}
|
|
else
|
|
rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl);
|
|
|
|
if( rc == kOkRC )
|
|
cwLogInfo("new state:%i\n", thread::state(h));
|
|
else
|
|
{
|
|
cwLogError(rc,"threadPause() test failed.");
|
|
goto errLabel;
|
|
}
|
|
|
|
}
|
|
break;
|
|
|
|
case 'q':
|
|
printf("wakeup micros:%li cnt:%i avg:%li\n",g_micros,g_n,g_micros/g_n);
|
|
break;
|
|
|
|
//default:
|
|
//cwLogInfo("Unknown:%c\n",c);
|
|
|
|
}
|
|
}
|
|
|
|
errLabel:
|
|
rc_t rc0 = rc = thread::destroy(h);
|
|
|
|
return rc == kOkRC ? rc0 : rc;
|
|
}
|