cwThread.cpp : The thread now blocks on a condition variable rather than using a time sleep when in pause mode.

This commit is contained in:
kevin 2024-09-13 15:14:28 -04:00
parent 9199aa81e0
commit 1223b55f2b

View File

@ -3,6 +3,9 @@
#include "cwCommonImpl.h" #include "cwCommonImpl.h"
#include "cwMem.h" #include "cwMem.h"
#include "cwThread.h" #include "cwThread.h"
#include "cwMutex.h"
#include "cwTest.h"
#include "cwTime.h"
#include <pthread.h> #include <pthread.h>
@ -29,10 +32,12 @@ namespace cw
void* funcArg; void* funcArg;
unsigned stateMicros; unsigned stateMicros;
unsigned pauseMicros; unsigned pauseMicros;
unsigned sleepMicros; unsigned waitMicros;
pthread_attr_t attr; pthread_attr_t attr;
char* label; char* label;
mutex::handle_t mutexH;
} thread_t; } thread_t;
inline thread_t* _handleToPtr(handle_t h) { return handleToPtr<handle_t,thread_t>(h); } inline thread_t* _handleToPtr(handle_t h) { return handleToPtr<handle_t,thread_t>(h); }
@ -50,9 +55,9 @@ namespace cw
if(curStateId == stateId ) if(curStateId == stateId )
break; break;
sleepUs( p->sleepMicros ); sleepUs( p->waitMicros );
waitTimeMicroSecs += p->sleepMicros; waitTimeMicroSecs += p->waitMicros;
}while( waitTimeMicroSecs < p->stateMicros ); }while( waitTimeMicroSecs < p->stateMicros );
@ -83,17 +88,44 @@ namespace cw
// if we are in the pause state // if we are in the pause state
if( curStateId == kPausedThId ) if( curStateId == kPausedThId )
{ {
// unlock mutex and block on cond. var. for pauseMicros or until signaled
rc_t rc = waitOnCondVar(p->mutexH, false, p->pauseMicros/1000 );
switch(rc)
{
case kTimeOutRC:
// the mutex is not locked
break;
case kOkRC:
// the cond. var. was signaled and the mutex is locked
break;
default:
// mutex is not locked
rc = cwLogError(rc,"Condition variable wait failed.");
}
sleepUs( p->pauseMicros );
curDoFlags = p->doFlags.load(std::memory_order_acquire); curDoFlags = p->doFlags.load(std::memory_order_acquire);
// if exit was requested - and the mutex is unlocked
if( cwIsFlag(curDoFlags,kDoExitThFl) && rc != kOkRC )
{
// this will cause the waitOnCondVar() to
// immediately return at the top of the loop
signalCondVar(p->mutexH);
//mutex::lock(p->mutexH);
}
else
{
// check if we have been requested to leave the pause state // check if we have been requested to leave the pause state
if( cwIsFlag(curDoFlags,kDoRunThFl) ) if( cwIsFlag(curDoFlags,kDoRunThFl) )
{ {
p->stateId.store(kRunningThId,std::memory_order_release); p->stateId.store(kRunningThId,std::memory_order_release);
} }
} }
}
else // ... we are in running state else // ... we are in running state
{ {
// call the user-defined function // call the user-defined function
@ -125,6 +157,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
{ {
rc_t rc; rc_t rc;
int sysRC; int sysRC;
bool mutex_is_locked_fl = false;
if((rc = destroy(hRef)) != kOkRC ) if((rc = destroy(hRef)) != kOkRC )
return rc; return rc;
@ -136,7 +169,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
p->stateMicros = stateMicros; p->stateMicros = stateMicros;
p->pauseMicros = pauseMicros; p->pauseMicros = pauseMicros;
p->stateId = kPausedThId; p->stateId = kPausedThId;
p->sleepMicros = 15000; p->waitMicros = 15000;
p->label = mem::duplStr(label); p->label = mem::duplStr(label);
if((sysRC = pthread_attr_init(&p->attr)) != 0) if((sysRC = pthread_attr_init(&p->attr)) != 0)
@ -146,6 +179,7 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
} }
else else
{ {
/* /*
// Creating the thread in a detached state should prevent it from leaking memory when // Creating the thread in a detached state should prevent it from leaking memory when
@ -158,6 +192,24 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
} }
else else
*/ */
// Create the cond. var mutex
if((rc = mutex::create(p->mutexH )) != kOkRC )
{
rc = cwLogError(rc,"Thread signal condition mutex create failed.");
goto errLabel;
}
// Lock the mutex so that it is already locked prior to the first call to waitOnCondVar()
if((rc = mutex::lock(p->mutexH)) != kOkRC )
{
rc = cwLogError(rc,"Thread signal condition mutex lock failed.");
goto errLabel;
}
mutex_is_locked_fl = true;
// create the thread - in paused state
if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 ) if((sysRC = pthread_create(&p->pThreadH, &p->attr, _threadCallback, (void*)p )) != 0 )
{ {
p->stateId = kNotInitThId; p->stateId = kNotInitThId;
@ -174,6 +226,16 @@ cw::rc_t cw::thread::create( handle_t& hRef, cbFunc_t func, void* funcArg, const
cwLogInfo("Thread %s id:%p created.",cwStringNullGuard(label), p->pThreadH); cwLogInfo("Thread %s id:%p created.",cwStringNullGuard(label), p->pThreadH);
errLabel:
if( rc != kOkRC && p->mutexH.isValid() )
{
if( mutex_is_locked_fl )
mutex::unlock(p->mutexH);
mutex::destroy(p->mutexH);
}
return rc; return rc;
} }
@ -201,6 +263,11 @@ cw::rc_t cw::thread::destroy( handle_t& hRef )
//if( pthread_attr_destroy(&p->attr) != 0 ) //if( pthread_attr_destroy(&p->attr) != 0 )
// rc = cwLogError(kOpFailRC,"Thread attribute destroy failed."); // rc = cwLogError(kOpFailRC,"Thread attribute destroy failed.");
if( p->mutexH.isValid() )
{
mutex::unlock(p->mutexH);
mutex::destroy(p->mutexH);
}
mem::release(p->label); mem::release(p->label);
mem::release(p); mem::release(p);
hRef.clear(); hRef.clear();
@ -231,13 +298,19 @@ cw::rc_t cw::thread::pause( handle_t h, unsigned cmdFlags )
{ {
p->doFlags.store(kDoRunThFl,std::memory_order_release); p->doFlags.store(kDoRunThFl,std::memory_order_release);
waitId = kRunningThId; waitId = kRunningThId;
if((rc = signalCondVar(p->mutexH)) != kOkRC )
{
cwLogError(rc,"Cond. var. signalling failed.");
goto errLabel;
}
} }
if( waitFl ) if( waitFl )
rc = _waitForState(p,waitId); rc = _waitForState(p,waitId);
errLabel:
if( rc != kOkRC ) if( rc != kOkRC )
cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i sleepMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->sleepMicros); cwLogError(rc,"Thread '%s' timed out waiting for '%s'. pauseMicros:%i stateMicros:%i waitMicros:%i", p->label, pauseFl ? "pause" : "un-pause",p->pauseMicros,p->stateMicros,p->waitMicros);
return rc; return rc;
@ -290,8 +363,20 @@ unsigned cw::thread::pauseMicros( handle_t h )
namespace cw namespace cw
{ {
time::spec_t g_t0{};
time_t g_micros = 0;
unsigned g_n = 0;
bool _threadTestCb( void* p ) bool _threadTestCb( void* p )
{ {
if( g_t0.tv_nsec != 0 )
{
time::spec_t t1;
time::get(t1);
g_micros += time::elapsedMicros(g_t0,t1);
g_n += 1;
g_t0.tv_nsec = 0;
}
unsigned* ip = (unsigned*)p; unsigned* ip = (unsigned*)p;
ip[0]++; ip[0]++;
return true; return true;
@ -305,9 +390,11 @@ cw::rc_t cw::threadTest()
rc_t rc; rc_t rc;
char c = 0; char c = 0;
// create the thread
if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC ) if((rc = thread::create(h,_threadTestCb,&val,"thread_test")) != kOkRC )
return rc; return rc;
// start the thread
if((rc = thread::pause(h,0)) != kOkRC ) if((rc = thread::pause(h,0)) != kOkRC )
goto errLabel; goto errLabel;
@ -333,7 +420,10 @@ cw::rc_t cw::threadTest()
case 'p': case 'p':
{ {
if( thread::state(h) == thread::kPausedThId ) if( thread::state(h) == thread::kPausedThId )
{
time::get(g_t0);
rc = thread::pause(h,thread::kWaitFl); rc = thread::pause(h,thread::kWaitFl);
}
else else
rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl); rc = thread::pause(h,thread::kPauseFl|thread::kWaitFl);
@ -349,6 +439,7 @@ cw::rc_t cw::threadTest()
break; break;
case 'q': case 'q':
printf("wakeup micros:%li cnt:%i avg:%li\n",g_micros,g_n,g_micros/g_n);
break; break;
//default: //default: