2019-12-19 03:24:12 +00:00
# include "cwCommon.h"
# include "cwLog.h"
# include "cwCommonImpl.h"
# include "cwMem.h"
# include "cwThread.h"
2024-09-13 19:14:28 +00:00
# include "cwMutex.h"
# include "cwTest.h"
# include "cwTime.h"
2019-12-19 03:24:12 +00:00
# include <pthread.h>
namespace cw
{
2019-12-24 15:05:24 +00:00
namespace thread
2019-12-19 03:24:12 +00:00
{
2019-12-24 15:05:24 +00:00
enum
2019-12-19 03:24:12 +00:00
{
2019-12-24 15:05:24 +00:00
kDoExitThFl = 0x01 ,
kDoPauseThFl = 0x02 ,
kDoRunThFl = 0x04
} ;
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
typedef struct thread_str
{
pthread_t pThreadH ;
2020-04-19 17:01:47 +00:00
std : : atomic < stateId_t > stateId ;
std : : atomic < unsigned > doFlags ;
2024-09-13 19:14:28 +00:00
cbFunc_t func ;
void * funcArg ;
unsigned stateMicros ;
unsigned pauseMicros ;
unsigned waitMicros ;
2020-02-12 18:26:18 +00:00
pthread_attr_t attr ;
2024-09-13 19:14:28 +00:00
char * label ;
mutex : : handle_t mutexH ;
2024-09-15 18:53:18 +00:00
unsigned cycleIdx ; // current cycle phase
unsigned cycleCnt ; // cycle phase limit
unsigned execCnt ;
2020-02-12 18:26:18 +00:00
2019-12-24 15:05:24 +00:00
} thread_t ;
inline thread_t * _handleToPtr ( handle_t h ) { return handleToPtr < handle_t , thread_t > ( h ) ; }
2020-04-19 17:01:47 +00:00
// Called from client thread to wait for the internal thread to transition to a specified state.
2024-02-14 16:30:02 +00:00
rc_t _waitForState ( thread_t * p , stateId_t stateId )
2019-12-24 15:05:24 +00:00
{
unsigned waitTimeMicroSecs = 0 ;
2020-04-19 17:01:47 +00:00
stateId_t curStateId ;
do
2019-12-24 15:05:24 +00:00
{
2020-04-19 17:01:47 +00:00
curStateId = p - > stateId . load ( std : : memory_order_acquire ) ;
if ( curStateId = = stateId )
break ;
2024-09-13 19:14:28 +00:00
sleepUs ( p - > waitMicros ) ;
2020-12-11 20:57:35 +00:00
2024-09-13 19:14:28 +00:00
waitTimeMicroSecs + = p - > waitMicros ;
2019-12-19 03:24:12 +00:00
2020-04-19 17:01:47 +00:00
} while ( waitTimeMicroSecs < p - > stateMicros ) ;
2024-02-14 16:30:02 +00:00
2020-04-19 17:01:47 +00:00
return curStateId = = stateId ? kOkRC : kTimeOutRC ;
2019-12-24 15:05:24 +00:00
}
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
void _threadCleanUpCallback ( void * p )
2019-12-19 03:24:12 +00:00
{
2020-04-19 17:01:47 +00:00
( ( thread_t * ) p ) - > stateId . store ( kExitedThId , std : : memory_order_release ) ;
2019-12-24 15:05:24 +00:00
}
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
void * _threadCallback ( void * param )
{
thread_t * p = ( thread_t * ) param ;
// set a clean up handler - this will be called when the
// thread terminates unexpectedly or pthread_cleanup_pop() is called.
pthread_cleanup_push ( _threadCleanUpCallback , p ) ;
2020-04-19 17:01:47 +00:00
unsigned curDoFlags = 0 ;
do
2019-12-19 03:24:12 +00:00
{
2020-04-19 17:01:47 +00:00
// get the current thread state (running or paused)
stateId_t curStateId = p - > stateId . load ( std : : memory_order_relaxed ) ;
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
// if we are in the pause state
2020-04-19 17:01:47 +00:00
if ( curStateId = = kPausedThId )
2019-12-19 03:24:12 +00:00
{
2024-09-13 19:14:28 +00:00
// unlock mutex and block on cond. var. for pauseMicros or until signaled
rc_t rc = waitOnCondVar ( p - > mutexH , false , p - > pauseMicros / 1000 ) ;
switch ( rc )
{
case kTimeOutRC :
// the mutex is not locked
break ;
case kOkRC :
// the cond. var. was signaled and the mutex is locked
break ;
default :
// mutex is not locked
rc = cwLogError ( rc , " Condition variable wait failed. " ) ;
}
2019-12-19 03:24:12 +00:00
2020-04-19 17:01:47 +00:00
curDoFlags = p - > doFlags . load ( std : : memory_order_acquire ) ;
2024-09-13 19:14:28 +00:00
// if exit was requested - and the mutex is unlocked
if ( cwIsFlag ( curDoFlags , kDoExitThFl ) & & rc ! = kOkRC )
2019-12-24 15:05:24 +00:00
{
2024-09-13 19:14:28 +00:00
// this will cause the waitOnCondVar() to
// immediately return at the top of the loop
signalCondVar ( p - > mutexH ) ;
//mutex::lock(p->mutexH);
}
else
{
// check if we have been requested to leave the pause state
if ( cwIsFlag ( curDoFlags , kDoRunThFl ) )
{
2024-09-13 21:21:21 +00:00
p - > cycleIdx = 0 ;
2024-09-13 19:14:28 +00:00
p - > stateId . store ( kRunningThId , std : : memory_order_release ) ;
}
2019-12-24 15:05:24 +00:00
}
}
2020-04-19 17:01:47 +00:00
else // ... we are in running state
2019-12-19 03:24:12 +00:00
{
2019-12-24 15:05:24 +00:00
// call the user-defined function
if ( p - > func ( p - > funcArg ) = = false )
break ;
2020-04-19 17:01:47 +00:00
curDoFlags = p - > doFlags . load ( std : : memory_order_acquire ) ;
2024-09-15 18:53:18 +00:00
if ( cwIsNotFlag ( curDoFlags , kDoExitThFl ) )
2019-12-24 15:05:24 +00:00
{
2024-09-15 18:53:18 +00:00
p - > cycleIdx + = 1 ;
// if a cycle limit was set then check if the limit was reached
bool cycles_done_fl = p - > cycleCnt > 0 & & p - > cycleIdx > = p - > cycleCnt ;
// check if we have been requested to enter the pause state
if ( ( cwIsFlag ( curDoFlags , kDoPauseThFl ) | | cycles_done_fl ) )
{
p - > stateId . store ( kPausedThId , std : : memory_order_release ) ;
p - > doFlags . store ( 0 , std : : memory_order_release ) ;
}
2019-12-24 15:05:24 +00:00
}
2024-09-15 18:53:18 +00:00
2019-12-19 03:24:12 +00:00
}
2020-04-19 17:01:47 +00:00
} while ( cwIsFlag ( curDoFlags , kDoExitThFl ) = = false ) ;
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
pthread_cleanup_pop ( 1 ) ;
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
pthread_exit ( NULL ) ;
2019-12-19 03:24:12 +00:00
2019-12-24 15:05:24 +00:00
return p ;
}
}
2019-12-19 03:24:12 +00:00
}
2024-02-18 13:37:33 +00:00
cw : : rc_t cw : : thread : : create ( handle_t & hRef , cbFunc_t func , void * funcArg , const char * label , int stateMicros , int pauseMicros )
2019-12-19 03:24:12 +00:00
{
rc_t rc ;
int sysRC ;
2024-09-13 19:14:28 +00:00
bool mutex_is_locked_fl = false ;
2019-12-24 15:05:24 +00:00
if ( ( rc = destroy ( hRef ) ) ! = kOkRC )
2019-12-19 03:24:12 +00:00
return rc ;
2019-12-28 02:51:28 +00:00
thread_t * p = mem : : allocZ < thread_t > ( ) ;
2019-12-19 03:24:12 +00:00
p - > func = func ;
p - > funcArg = funcArg ;
p - > stateMicros = stateMicros ;
p - > pauseMicros = pauseMicros ;
p - > stateId = kPausedThId ;
2024-09-13 19:14:28 +00:00
p - > waitMicros = 15000 ;
2024-02-18 13:37:33 +00:00
p - > label = mem : : duplStr ( label ) ;
2019-12-24 15:05:24 +00:00
2020-02-12 18:26:18 +00:00
if ( ( sysRC = pthread_attr_init ( & p - > attr ) ) ! = 0 )
2019-12-19 03:24:12 +00:00
{
p - > stateId = kNotInitThId ;
2019-12-24 15:05:24 +00:00
rc = cwLogSysError ( kOpFailRC , sysRC , " Thread attribute init failed. " ) ;
2019-12-19 03:24:12 +00:00
}
2019-12-24 15:05:24 +00:00
else
2024-02-14 16:30:02 +00:00
{
2024-09-13 19:14:28 +00:00
2020-03-18 23:05:48 +00:00
/*
// Creating the thread in a detached state should prevent it from leaking memory when
// the thread is closed and pthread_join() is not called but it doesn't seem to work anymore ????
2020-02-12 18:26:18 +00:00
if ( ( sysRC = pthread_attr_setdetachstate ( & p - > attr , PTHREAD_CREATE_DETACHED ) ) ! = 0 )
2019-12-24 15:05:24 +00:00
{
p - > stateId = kNotInitThId ;
rc = cwLogSysError ( kOpFailRC , sysRC , " Thread set detach attribute failed. " ) ;
}
else
2020-03-18 23:05:48 +00:00
*/
2024-09-13 19:14:28 +00:00
// Create the cond. var mutex
if ( ( rc = mutex : : create ( p - > mutexH ) ) ! = kOkRC )
{
rc = cwLogError ( rc , " Thread signal condition mutex create failed. " ) ;
goto errLabel ;
}
// Lock the mutex so that it is already locked prior to the first call to waitOnCondVar()
if ( ( rc = mutex : : lock ( p - > mutexH ) ) ! = kOkRC )
{
rc = cwLogError ( rc , " Thread signal condition mutex lock failed. " ) ;
goto errLabel ;
}
mutex_is_locked_fl = true ;
// create the thread - in paused state
if ( ( sysRC = pthread_create ( & p - > pThreadH , & p - > attr , _threadCallback , ( void * ) p ) ) ! = 0 )
{
p - > stateId = kNotInitThId ;
rc = cwLogSysError ( kOpFailRC , sysRC , " Thread create failed. " ) ;
}
2024-02-14 16:30:02 +00:00
}
2024-02-18 13:37:33 +00:00
if ( label ! = nullptr )
pthread_setname_np ( p - > pThreadH , label ) ;
2019-12-19 03:24:12 +00:00
hRef . set ( p ) ;
2024-02-18 13:37:33 +00:00
cwLogInfo ( " Thread %s id:%p created. " , cwStringNullGuard ( label ) , p - > pThreadH ) ;
2024-09-13 19:14:28 +00:00
errLabel :
if ( rc ! = kOkRC & & p - > mutexH . isValid ( ) )
{
if ( mutex_is_locked_fl )
mutex : : unlock ( p - > mutexH ) ;
mutex : : destroy ( p - > mutexH ) ;
}
2019-12-19 03:24:12 +00:00
return rc ;
}
2019-12-24 15:05:24 +00:00
cw : : rc_t cw : : thread : : destroy ( handle_t & hRef )
2019-12-19 03:24:12 +00:00
{
rc_t rc = kOkRC ;
2020-03-18 23:05:48 +00:00
int sysRC ;
2019-12-19 03:24:12 +00:00
if ( ! hRef . isValid ( ) )
return rc ;
2019-12-24 15:05:24 +00:00
thread_t * p = _handleToPtr ( hRef ) ;
2019-12-19 03:24:12 +00:00
// tell the thread to exit
2020-04-19 17:01:47 +00:00
p - > doFlags . store ( kDoExitThFl , std : : memory_order_release ) ;
2019-12-19 03:24:12 +00:00
// wait for the thread to exit and then deallocate the thread object
if ( ( rc = _waitForState ( p , kExitedThId ) ) ! = kOkRC )
2024-02-18 13:37:33 +00:00
return cwLogError ( rc , " Thread '%s' timed out waiting for destroy. " , p - > label ) ;
2019-12-19 03:24:12 +00:00
2020-03-18 23:05:48 +00:00
// Block until the thread is actually fully cleaned up
if ( ( sysRC = pthread_join ( p - > pThreadH , NULL ) ) ! = 0 )
2024-02-18 13:37:33 +00:00
rc = cwLogSysError ( kOpFailRC , sysRC , " Thread '%s' join failed. " , p - > label ) ;
2020-03-18 23:05:48 +00:00
//if( pthread_attr_destroy(&p->attr) != 0 )
// rc = cwLogError(kOpFailRC,"Thread attribute destroy failed.");
2024-09-13 19:14:28 +00:00
if ( p - > mutexH . isValid ( ) )
{
mutex : : unlock ( p - > mutexH ) ;
mutex : : destroy ( p - > mutexH ) ;
}
2024-02-18 13:37:33 +00:00
mem : : release ( p - > label ) ;
2019-12-28 02:51:28 +00:00
mem : : release ( p ) ;
2019-12-24 15:05:24 +00:00
hRef . clear ( ) ;
2019-12-19 03:24:12 +00:00
return rc ;
}
2024-09-13 21:21:21 +00:00
cw : : rc_t cw : : thread : : pause ( handle_t h , unsigned cmdFlags , unsigned cycleCnt )
2019-12-19 03:24:12 +00:00
{
rc_t rc = kOkRC ;
2019-12-24 15:05:24 +00:00
bool pauseFl = cwIsFlag ( cmdFlags , kPauseFl ) ;
bool waitFl = cwIsFlag ( cmdFlags , kWaitFl ) ;
thread_t * p = _handleToPtr ( h ) ;
2020-04-19 17:01:47 +00:00
stateId_t curStateId = p - > stateId . load ( std : : memory_order_acquire ) ;
bool isPausedFl = curStateId = = kPausedThId ;
2024-02-14 16:30:02 +00:00
stateId_t waitId ;
2024-09-15 18:53:18 +00:00
p - > cycleCnt = cycleCnt ;
2019-12-19 03:24:12 +00:00
if ( isPausedFl = = pauseFl )
return kOkRC ;
if ( pauseFl )
{
2020-04-19 17:01:47 +00:00
p - > doFlags . store ( kDoPauseThFl , std : : memory_order_release ) ;
2019-12-19 03:24:12 +00:00
waitId = kPausedThId ;
}
else
{
2020-04-19 17:01:47 +00:00
p - > doFlags . store ( kDoRunThFl , std : : memory_order_release ) ;
2019-12-19 03:24:12 +00:00
waitId = kRunningThId ;
2024-09-13 19:14:28 +00:00
if ( ( rc = signalCondVar ( p - > mutexH ) ) ! = kOkRC )
{
cwLogError ( rc , " Cond. var. signalling failed. " ) ;
goto errLabel ;
}
2019-12-19 03:24:12 +00:00
}
if ( waitFl )
rc = _waitForState ( p , waitId ) ;
2024-09-13 19:14:28 +00:00
errLabel :
2019-12-19 03:24:12 +00:00
if ( rc ! = kOkRC )
2024-09-13 19:14:28 +00:00
cwLogError ( rc , " Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i waitMicros:%i " , p - > label , pauseFl ? " pause " : " un-pause " , p - > pauseMicros , p - > stateMicros , p - > waitMicros ) ;
2019-12-19 03:24:12 +00:00
return rc ;
}
2024-09-13 21:21:21 +00:00
cw : : rc_t cw : : thread : : unpause ( handle_t h , unsigned cycleCnt )
{ return pause ( h , kWaitFl , cycleCnt ) ; }
2019-12-24 15:05:24 +00:00
cw : : thread : : stateId_t cw : : thread : : state ( handle_t h )
2019-12-19 03:24:12 +00:00
{
2019-12-24 15:05:24 +00:00
thread_t * p = _handleToPtr ( h ) ;
2020-04-19 17:01:47 +00:00
return p - > stateId . load ( std : : memory_order_acquire ) ;
2019-12-19 03:24:12 +00:00
}
2020-01-27 22:50:57 +00:00
cw : : thread : : thread_id_t cw : : thread : : id ( )
{
typedef struct
{
union
{
thread_id_t id ;
pthread_t pthread_id ;
} u ;
} id_t ;
id_t id ;
id . u . pthread_id = pthread_self ( ) ;
return id . u . id ;
}
2019-12-19 03:24:12 +00:00
2024-02-18 13:37:33 +00:00
const char * cw : : thread : : label ( handle_t h )
{
thread_t * p = _handleToPtr ( h ) ;
return p - > label = = nullptr ? " <no_thread_label> " : p - > label ;
}
2024-02-14 16:30:02 +00:00
unsigned cw : : thread : : stateTimeOutMicros ( handle_t h )
{
thread_t * p = _handleToPtr ( h ) ;
return p - > stateMicros ;
}
unsigned cw : : thread : : pauseMicros ( handle_t h )
{
thread_t * p = _handleToPtr ( h ) ;
return p - > pauseMicros ;
}
2019-12-19 03:24:12 +00:00
namespace cw
{
2024-09-15 18:53:18 +00:00
time : : spec_t g_t0 = { 0 , 0 } ;
time_t g_micros = 0 ;
unsigned g_n = 0 ;
2019-12-19 03:24:12 +00:00
bool _threadTestCb ( void * p )
{
2024-09-13 19:14:28 +00:00
if ( g_t0 . tv_nsec ! = 0 )
{
time : : spec_t t1 ;
time : : get ( t1 ) ;
g_micros + = time : : elapsedMicros ( g_t0 , t1 ) ;
g_n + = 1 ;
g_t0 . tv_nsec = 0 ;
}
unsigned * ip = ( unsigned * ) p ;
2019-12-19 03:24:12 +00:00
ip [ 0 ] + + ;
return true ;
}
}
cw : : rc_t cw : : threadTest ( )
{
2019-12-24 15:05:24 +00:00
thread : : handle_t h ;
unsigned val = 0 ;
rc_t rc ;
char c = 0 ;
2024-09-15 18:53:18 +00:00
unsigned cycleCnt = 0 ;
2024-09-13 19:14:28 +00:00
// create the thread
2024-02-18 13:37:33 +00:00
if ( ( rc = thread : : create ( h , _threadTestCb , & val , " thread_test " ) ) ! = kOkRC )
2019-12-19 03:24:12 +00:00
return rc ;
2024-09-13 19:14:28 +00:00
// start the thread
2024-09-15 18:53:18 +00:00
if ( ( rc = thread : : pause ( h , 0 , cycleCnt ) ) ! = kOkRC )
2019-12-19 03:24:12 +00:00
goto errLabel ;
cwLogInfo ( " o=print p=pause s=state q=quit \n " ) ;
while ( c ! = ' q ' )
{
c = ( char ) fgetc ( stdin ) ;
fflush ( stdin ) ;
switch ( c )
{
case ' o ' :
2024-09-15 18:53:18 +00:00
cwLogInfo ( " val: 0x%x %i \n " , val , val ) ;
2019-12-19 03:24:12 +00:00
break ;
case ' s ' :
2019-12-24 15:05:24 +00:00
cwLogInfo ( " state=%i \n " , thread : : state ( h ) ) ;
2019-12-19 03:24:12 +00:00
break ;
case ' p ' :
{
2019-12-24 15:05:24 +00:00
if ( thread : : state ( h ) = = thread : : kPausedThId )
2024-09-13 19:14:28 +00:00
{
time : : get ( g_t0 ) ;
2024-09-15 18:53:18 +00:00
// We don't set kWaitFl w/ cycleCnt>0 because we are running very
// few cycles - the cycles will run and the
// state of the thread will return to 'paused'
// before _waitForState() can notice the 'running' state.
rc = thread : : pause ( h , cycleCnt = = 0 ? thread : : kWaitFl : 0 , cycleCnt ) ;
2024-09-13 19:14:28 +00:00
}
2019-12-19 03:24:12 +00:00
else
2019-12-24 15:05:24 +00:00
rc = thread : : pause ( h , thread : : kPauseFl | thread : : kWaitFl ) ;
2019-12-19 03:24:12 +00:00
if ( rc = = kOkRC )
2019-12-24 15:05:24 +00:00
cwLogInfo ( " new state:%i \n " , thread : : state ( h ) ) ;
2019-12-19 03:24:12 +00:00
else
{
cwLogError ( rc , " threadPause() test failed. " ) ;
goto errLabel ;
}
}
break ;
case ' q ' :
2024-09-15 18:53:18 +00:00
printf ( " wakeup micros:%li cnt:%i avg:%li \n " , g_micros , g_n , g_n > 0 ? g_micros / g_n : 0 ) ;
2019-12-19 03:24:12 +00:00
break ;
//default:
//cwLogInfo("Unknown:%c\n",c);
}
}
errLabel :
2019-12-24 15:05:24 +00:00
rc_t rc0 = rc = thread : : destroy ( h ) ;
2019-12-19 03:24:12 +00:00
return rc = = kOkRC ? rc0 : rc ;
}