Merge branch 'poly' of gitea.larke.org:kevin/libcw into poly

This commit is contained in:
kevin 2024-09-13 15:33:06 -04:00
commit 07b736daae
3 changed files with 124 additions and 38 deletions

View File

@ -15,6 +15,8 @@ namespace cw
rc_t lock( handle_t h ); rc_t lock( handle_t h );
rc_t unlock( handle_t h ); rc_t unlock( handle_t h );
// Set timeOutMs to 0 to wait forever.
//
// Set 'lockThenWaitFl' if the function should lock the mutex prior to waiting. // Set 'lockThenWaitFl' if the function should lock the mutex prior to waiting.
// If 'lockThenWaitFl' is false then the function assumes the mutex is already locked // If 'lockThenWaitFl' is false then the function assumes the mutex is already locked
// and directly waits. If 'lockThenWaitFl' is set and the mutex is not already locked // and directly waits. If 'lockThenWaitFl' is set and the mutex is not already locked

View File

@ -3,6 +3,9 @@
#include "cwCommonImpl.h" #include "cwCommonImpl.h"
#include "cwMem.h" #include "cwMem.h"
#include "cwThread.h" #include "cwThread.h"
#include "cwMutex.h"
#include "cwTest.h"
#include "cwTime.h"
#include <pthread.h> #include <pthread.h>
@ -25,13 +28,15 @@ namespace cw
std::atomic<stateId_t> stateId; std::atomic<stateId_t> stateId;
std::atomic<unsigned> doFlags; std::atomic<unsigned> doFlags;
cbFunc_t func; cbFunc_t func;
void* funcArg; void* funcArg;
unsigned stateMicros; unsigned stateMicros;
unsigned pauseMicros; unsigned pauseMicros;
unsigned sleepMicros; unsigned waitMicros;
pthread_attr_t attr; pthread_attr_t attr;
char* label; char* label;
mutex::handle_t mutexH;
} thread_t; } thread_t;
@ -50,9 +55,9 @@ namespace cw
if(curStateId == stateId ) if(curStateId == stateId )
break; break;
sleepUs( p->sleepMicros ); sleepUs( p->waitMicros );
waitTimeMicroSecs += p->sleepMicros; waitTimeMicroSecs += p->waitMicros;
}while( waitTimeMicroSecs < p->stateMicros ); }while( waitTimeMicroSecs < p->stateMicros );
@ -83,15 +88,42 @@ namespace cw
// if we are in the pause state // if we are in the pause state
if( curStateId == kPausedThId ) if( curStateId == kPausedThId )
{ {
// unlock mutex and block on cond. var. for pauseMicros or until signaled
sleepUs( p->pauseMicros ); rc_t rc = waitOnCondVar(p->mutexH, false, p->pauseMicros/1000 );
switch(rc)
{
case kTimeOutRC:
// the mutex is not locked
break;
case kOkRC:
// the cond. var. was signaled and the mutex is locked
break;
default:
// mutex is not locked
rc = cwLogError(rc,"Condition variable wait failed.");
}
curDoFlags = p->doFlags.load(std::memory_order_acquire); curDoFlags = p->doFlags.load(std::memory_order_acquire);
// check if we have been requested to leave the pause state // if exit was requested - and the mutex is unlocked
if( cwIsFlag(curDoFlags,kDoRunThFl) ) if( cwIsFlag(curDoFlags,kDoExitThFl) && rc != kOkRC )
{ {
p->stateId.store(kRunningThId,std::memory_order_release); // this will cause the waitOnCondVar() to
// immediately return at the top of the loop
signalCondVar(p->mutexH);
//mutex::lock(p->mutexH);
}
else
{
// check if we have been requested to leave the pause state
if( cwIsFlag(curDoFlags,kDoRunThFl) )
{
p->stateId.store(kRunningThId,std::memory_order_release);
}
} }
} }
else // ... we are in running state else // ... we are in running state
@ -125,6 +157,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
{ {
rc_t rc; rc_t rc;
int sysRC; int sysRC;
bool mutex_is_locked_fl = false;
if((rc = destroy(hRef)) != kOkRC ) if((rc = destroy(hRef)) != kOkRC )
return rc; return rc;
@ -136,7 +169,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
p->stateMicros = stateMicros; p->stateMicros = stateMicros;
p->pauseMicros = pauseMicros; p->pauseMicros = pauseMicros;
p->stateId = kPausedThId; p->stateId = kPausedThId;
p->sleepMicros = 15000; p->waitMicros = 15000;
p->label = mem::duplStr(label); p->label = mem::duplStr(label);
if((sysRC = pthread_attr_init(&p->attr)) != 0) if((sysRC = pthread_attr_init(&p->attr)) != 0)
@ -146,6 +179,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
} }
else else
{ {
/* /*
// Creating the thread in a detached state should prevent it from leaking memory when // Creating the thread in a detached state should prevent it from leaking memory when
@ -158,11 +192,29 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
} }
else else
*/ */
if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 )
{ // Create the cond. var mutex
p->stateId = kNotInitThId; if((rc = mutex::create(p->mutexH )) != kOkRC )
rc = cwLogSysError(kOpFailRC,sysRC,"Thread create failed."); {
} rc = cwLogError(rc,"Thread signal condition mutex create failed.");
goto errLabel;
}
// Lock the mutex so that it is already locked prior to the first call to waitOnCondVar()
if((rc = mutex::lock(p->mutexH)) != kOkRC )
{
rc = cwLogError(rc,"Thread signal condition mutex lock failed.");
goto errLabel;
}
mutex_is_locked_fl = true;
// create the thread - in paused state
if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 )
{
p->stateId = kNotInitThId;
rc = cwLogSysError(kOpFailRC,sysRC,"Thread create failed.");
}
} }
if( label != nullptr ) if( label != nullptr )
@ -174,6 +226,16 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
cwLogInfo("Thread %s id:%p created.",cwStringNullGuard(label), p->pThreadH); cwLogInfo("Thread %s id:%p created.",cwStringNullGuard(label), p->pThreadH);
errLabel:
if( rc != kOkRC && p->mutexH.isValid() )
{
if( mutex_is_locked_fl )
mutex::unlock(p->mutexH);
mutex::destroy(p->mutexH);
}
return rc; return rc;
} }
@ -200,7 +262,12 @@ cw::rc_t cw::thread::destroy( handle_t& hRef )
//if( pthread_attr_destroy(&p->attr) != 0 ) //if( pthread_attr_destroy(&p->attr) != 0 )
// rc = cwLogError(kOpFailRC,"Thread attribute destroy failed."); // rc = cwLogError(kOpFailRC,"Thread attribute destroy failed.");
if( p->mutexH.isValid() )
{
mutex::unlock(p->mutexH);
mutex::destroy(p->mutexH);
}
mem::release(p->label); mem::release(p->label);
mem::release(p); mem::release(p);
hRef.clear(); hRef.clear();
@ -231,13 +298,19 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags )
{ {
p->doFlags.store(kDoRunThFl,std::memory_order_release); p->doFlags.store(kDoRunThFl,std::memory_order_release);
waitId = kRunningThId; waitId = kRunningThId;
if((rc = signalCondVar(p->mutexH)) != kOkRC )
{
cwLogError(rc,"Cond. var. signalling failed.");
goto errLabel;
}
} }
if( waitFl ) if( waitFl )
rc = _waitForState(p,waitId); rc = _waitForState(p,waitId);
errLabel:
if( rc != kOkRC ) if( rc != kOkRC )
cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i sleepMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->sleepMicros); cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i waitMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->waitMicros);
return rc; return rc;
@ -290,9 +363,21 @@ unsigned cw::thread::pauseMicros( handle_t h )
namespace cw namespace cw
{ {
time::spec_t g_t0{};
time_t g_micros = 0;
unsigned g_n = 0;
bool _threadTestCb( void* p ) bool _threadTestCb( void* p )
{ {
unsigned* ip = (unsigned*)p; if( g_t0.tv_nsec != 0 )
{
time::spec_t t1;
time::get(t1);
g_micros += time::elapsedMicros(g_t0,t1);
g_n += 1;
g_t0.tv_nsec = 0;
}
unsigned* ip = (unsigned*)p;
ip[0]++; ip[0]++;
return true; return true;
} }
@ -304,10 +389,12 @@ cw::rc_t cw::threadTest()
unsigned val = 0; unsigned val = 0;
rc_t rc; rc_t rc;
char c = 0; char c = 0;
// create the thread
if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC ) if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC )
return rc; return rc;
// start the thread
if((rc = thread::pause(h,0)) != kOkRC ) if((rc = thread::pause(h,0)) != kOkRC )
goto errLabel; goto errLabel;
@ -333,7 +420,10 @@ cw::rc_t cw::threadTest()
case 'p': case 'p':
{ {
if( thread::state(h) == thread::kPausedThId ) if( thread::state(h) == thread::kPausedThId )
{
time::get(g_t0);
rc = thread::pause(h,thread::kWaitFl); rc = thread::pause(h,thread::kWaitFl);
}
else else
rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl); rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl);
@ -349,6 +439,7 @@ cw::rc_t cw::threadTest()
break; break;
case 'q': case 'q':
printf("wakeup micros:%li cnt:%i avg:%li\n",g_micros,g_n,g_micros/g_n);
break; break;
//default: //default:

View File

@ -262,15 +262,12 @@ void cw::time::subtractMicros( spec_t& ts, unsigned micros )
void cw::time::advanceMicros( spec_t& ts, unsigned us ) void cw::time::advanceMicros( spec_t& ts, unsigned us )
{ {
const unsigned us_per_sec = 1000000;
const unsigned ns_per_sec = 1000000000; const unsigned ns_per_sec = 1000000000;
unsigned sec = us / us_per_sec;
ts.tv_sec += sec; ts.tv_nsec += us * 1000; // convert us to nano's
ts.tv_nsec += (us - sec*us_per_sec)*1000;
sec = ts.tv_nsec / ns_per_sec; // check if nano's now have more than ns_pser_sec
time_t sec = ts.tv_nsec / ns_per_sec;
ts.tv_sec += sec; ts.tv_sec += sec;
ts.tv_nsec -= sec * ns_per_sec; ts.tv_nsec -= sec * ns_per_sec;
@ -281,18 +278,14 @@ void cw::time::advanceMicros( spec_t& ts, unsigned us )
void cw::time::advanceMs( spec_t& ts, unsigned ms ) void cw::time::advanceMs( spec_t& ts, unsigned ms )
{ {
const unsigned ms_per_sec = 1000;
const unsigned ns_per_sec = 1000000000; const unsigned ns_per_sec = 1000000000;
unsigned sec = ms / ms_per_sec; ts.tv_nsec += ms * 1000000;
time_t sec = ts.tv_nsec / ns_per_sec;
ts.tv_sec += sec;
ts.tv_nsec += (ms - (sec*ms_per_sec)) * 1000000;
sec = ts.tv_nsec / ns_per_sec;
ts.tv_sec += sec; ts.tv_sec += sec;
ts.tv_nsec -= sec * ns_per_sec; ts.tv_nsec -= sec * ns_per_sec;
} }
cw::rc_t cw::time::futureMs( spec_t& ts, unsigned ms ) cw::rc_t cw::time::futureMs( spec_t& ts, unsigned ms )