Merge branch 'threadsafe_io'

This commit is contained in:
kevin 2022-12-12 12:29:58 -05:00
commit 277323d1f8
7 changed files with 179 additions and 36 deletions

151
cwIo.cpp
View File

@ -42,6 +42,7 @@ namespace cw
{ {
unsigned id; unsigned id;
void* arg; void* arg;
bool asyncFl;
struct io_str* p; struct io_str* p;
struct thread_str* link; struct thread_str* link;
} thread_t; } thread_t;
@ -54,12 +55,14 @@ namespace cw
char* label; char* label;
unsigned id; unsigned id;
unsigned periodMicroSec; unsigned periodMicroSec;
bool asyncFl;
} timer_t; } timer_t;
typedef struct serialPort_str typedef struct serialPort_str
{ {
char* label; char* label;
unsigned userId; unsigned userId;
bool asyncFl;
char* device; char* device;
unsigned baudRate; unsigned baudRate;
unsigned flags; unsigned flags;
@ -69,6 +72,7 @@ namespace cw
typedef struct audioGroup_str typedef struct audioGroup_str
{ {
bool enableFl; bool enableFl;
bool asyncFl;
audio_msg_t msg; audio_msg_t msg;
mutex::handle_t mutexH; mutex::handle_t mutexH;
unsigned threadTimeOutMs; unsigned threadTimeOutMs;
@ -91,6 +95,7 @@ namespace cw
typedef struct socket_str typedef struct socket_str
{ {
bool enableFl; bool enableFl;
bool asyncFl;
char* label; char* label;
unsigned sockA_index; unsigned sockA_index;
unsigned userId; unsigned userId;
@ -105,6 +110,9 @@ namespace cw
cbFunc_t cbFunc; cbFunc_t cbFunc;
void* cbArg; void* cbArg;
mutex::handle_t cbMutexH;
unsigned cbMutexTimeOutMs;
thread_mach::handle_t threadMachH; thread_mach::handle_t threadMachH;
object_t* cfg; object_t* cfg;
@ -119,6 +127,7 @@ namespace cw
serialPortSrv::handle_t serialPortSrvH; serialPortSrv::handle_t serialPortSrvH;
midi::device::handle_t midiH; midi::device::handle_t midiH;
bool midiAsyncFl;
socket_t* sockA; socket_t* sockA;
unsigned sockN; unsigned sockN;
@ -143,14 +152,83 @@ namespace cw
ui::ws::handle_t wsUiH; // ui::ws handle (invalid if no UI was specified) ui::ws::handle_t wsUiH; // ui::ws handle (invalid if no UI was specified)
ui::appIdMap_t* uiMapA; // Application supplied id's for the UI resource supplied with the cfg script via 'uiCfgFn'. ui::appIdMap_t* uiMapA; // Application supplied id's for the UI resource supplied with the cfg script via 'uiCfgFn'.
unsigned uiMapN; // unsigned uiMapN; //
bool uiAsyncFl;
} io_t; } io_t;
//----------------------------------------------------------------------------------------------------------
//
// io
//
io_t* _handleToPtr( handle_t h ) io_t* _handleToPtr( handle_t h )
{ return handleToPtr<handle_t,io_t>(h); } { return handleToPtr<handle_t,io_t>(h); }
// All callbacks to the application occur through this function
rc_t _ioCallback( io_t* p, bool asyncFl, const msg_t* m )
{
rc_t rc = kOkRC;
rc_t rc_app = kOkRC;
bool unlockFl = false;
// if this is not an async callback then lock the mutex
if( !asyncFl )
{
switch(rc = mutex::lock(p->cbMutexH,p->cbMutexTimeOutMs))
{
case kOkRC:
unlockFl = true;
break;
case kTimeOutRC:
rc = cwLogError(rc,"io mutex callback mutex lock timed out.");
break;
default:
rc = cwLogError(rc,"io mutex callback mutex lock failed.");
}
}
// make the callback to the client
if( rc == kOkRC )
rc_app = p->cbFunc( p->cbArg, m );
// if the mutex is locked
if( unlockFl )
{
if((rc = mutex::unlock(p->cbMutexH)) != kOkRC )
{
rc = cwLogError(rc,"io mutex callback mutex unlock failed.");
}
}
return rc_app;
}
rc_t _ioParse( io_t* p, const object_t* cfg )
{
rc_t rc;
const object_t* ioCfg;
if((ioCfg = cfg->find("io")) == nullptr )
{
cwLogError(kInvalidArgRC,"The 'io' configuration block could not be found.");
goto errLabel;
}
if((rc = ioCfg->getv("callbackMutexTimeOutMs",p->cbMutexTimeOutMs)) != kOkRC )
{
cwLogError(rc,"Parsing of 'io' block configuration failed.");
goto errLabel;
}
errLabel:
return rc;
}
//---------------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------------
// //
// Thread // Thread
@ -164,7 +242,7 @@ namespace cw
m.tid = kThreadTId; m.tid = kThreadTId;
m.u.thread = &tm; m.u.thread = &tm;
t->p->cbFunc( t->p->cbArg, &m ); _ioCallback( t->p, t->asyncFl, &m );
return true; return true;
} }
@ -198,13 +276,13 @@ namespace cw
tm.id = t->id; tm.id = t->id;
m.tid = kTimerTId; m.tid = kTimerTId;
m.u.timer = &tm; m.u.timer = &tm;
t->io->cbFunc( t->io->cbArg, &m ); _ioCallback( t->io, t->asyncFl, &m );
} }
return !t->deletedFl; return !t->deletedFl;
} }
rc_t _timerCreate( io_t* p, const char* label, unsigned id, unsigned periodMicroSec ) rc_t _timerCreate( io_t* p, const char* label, unsigned id, unsigned periodMicroSec, bool asyncFl )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
timer_t* t = nullptr; timer_t* t = nullptr;
@ -239,6 +317,7 @@ namespace cw
t->io = p; t->io = p;
t->label = mem::duplStr(label); t->label = mem::duplStr(label);
t->id = id; t->id = id;
t->asyncFl = asyncFl;
t->periodMicroSec = periodMicroSec; t->periodMicroSec = periodMicroSec;
if((rc = thread_mach::add(p->threadMachH,_timerThreadCb,t)) != kOkRC ) if((rc = thread_mach::add(p->threadMachH,_timerThreadCb,t)) != kOkRC )
@ -281,7 +360,7 @@ namespace cw
void _serialPortCb( void* arg, unsigned serialCfgIdx, const void* byteA, unsigned byteN ) void _serialPortCb( void* arg, unsigned serialCfgIdx, const void* byteA, unsigned byteN )
{ {
const io_t* p = (const io_t*)arg; io_t* p = (io_t*)arg;
if( serialCfgIdx > p->serialN ) if( serialCfgIdx > p->serialN )
cwLogError(kAssertFailRC,"The serial cfg index %i is out of range %i in serial port callback.", serialCfgIdx, p->serialN ); cwLogError(kAssertFailRC,"The serial cfg index %i is out of range %i in serial port callback.", serialCfgIdx, p->serialN );
@ -298,7 +377,7 @@ namespace cw
m.tid = kSerialTId; m.tid = kSerialTId;
m.u.serial = &sm; m.u.serial = &sm;
p->cbFunc( p->cbArg, &m ); _ioCallback( p, sp->asyncFl, &m );
} }
} }
@ -320,7 +399,8 @@ namespace cw
}; };
if((rc = e.getv( if((rc = e.getv(
"enable_flag", enableFlRef, "enableFl", enableFlRef,
"asyncFl", port->asyncFl,
"label", port->label, "label", port->label,
"device", port->device, "device", port->device,
"baud", port->baudRate, "baud", port->baudRate,
@ -484,7 +564,7 @@ namespace cw
m.tid = kMidiTId; m.tid = kMidiTId;
m.u.midi = &mm; m.u.midi = &mm;
p->cbFunc( p->cbArg, &m ); _ioCallback( p, p->midiAsyncFl, &m );
/* /*
for(unsigned j=0; j<pkt->msgCnt; ++j) for(unsigned j=0; j<pkt->msgCnt; ++j)
@ -509,8 +589,8 @@ namespace cw
return kOkRC; return kOkRC;
} }
if((rc = cfg->getv( if((rc = cfg->getv("parserBufByteN", parserBufByteN,
"parserBufByteN", parserBufByteN )) != kOkRC ) "asyncFl", p->midiAsyncFl )) != kOkRC )
{ {
rc = cwLogError(kSyntaxErrorRC,"MIDI configuration parse failed."); rc = cwLogError(kSyntaxErrorRC,"MIDI configuration parse failed.");
} }
@ -572,7 +652,7 @@ namespace cw
m.tid = kSockTId; m.tid = kSockTId;
m.u.sock = &sm; m.u.sock = &sm;
p->cbFunc( p->cbArg, &m ); _ioCallback( p, p->sockA[ sockArray_index ].asyncFl, &m );
} }
} }
@ -676,6 +756,7 @@ namespace cw
// parse the required arguments // parse the required arguments
if(( rc = node->getv( if(( rc = node->getv(
"enableFl", p->sockA[i].enableFl, "enableFl", p->sockA[i].enableFl,
"asyncFl", p->sockA[i].asyncFl,
"label", p->sockA[i].label, "label", p->sockA[i].label,
"port", port, "port", port,
"timeOutMs", timeOutMs, "timeOutMs", timeOutMs,
@ -851,10 +932,10 @@ namespace cw
} }
rc_t _audioGroupDeviceProcessMeter( io_t* p, audio_group_dev_t* agd, unsigned audioBufFlags ) rc_t _audioGroupDeviceProcessMeter( io_t* p, audio_group_dev_t* agd, audioGroup_t* ag, unsigned audioBufFlags )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
if( agd != nullptr && cwIsFlag(agd->flags,kMeterFl)) if( agd != nullptr && ag != nullptr && cwIsFlag(agd->flags,kMeterFl))
{ {
msg_t m; msg_t m;
m.tid = kAudioMeterTId; m.tid = kAudioMeterTId;
@ -864,7 +945,7 @@ namespace cw
audio::buf::meter(p->audioBufH, agd->devIdx, audioBufFlags, agd->meterA, agd->chCnt ); audio::buf::meter(p->audioBufH, agd->devIdx, audioBufFlags, agd->meterA, agd->chCnt );
// callback the application with the current meter values // callback the application with the current meter values
rc = p->cbFunc(p->cbArg,&m); rc = _ioCallback( p, ag->asyncFl, &m);
} }
@ -885,11 +966,11 @@ namespace cw
rc_t rc0; rc_t rc0;
// update the input meters // update the input meters
if((rc0 = _audioGroupDeviceProcessMeter( p, ad->iagd, audio::buf::kInFl )) != kOkRC ) if((rc0 = _audioGroupDeviceProcessMeter( p, ad->iagd, ad->iGroup, audio::buf::kInFl )) != kOkRC )
rc = rc0; rc = rc0;
// update the output meters // update the output meters
if((rc0 = _audioGroupDeviceProcessMeter( p, ad->oagd, audio::buf::kOutFl )) != kOkRC ) if((rc0 = _audioGroupDeviceProcessMeter( p, ad->oagd, ad->oGroup, audio::buf::kOutFl )) != kOkRC )
rc = rc0; rc = rc0;
} }
@ -974,7 +1055,7 @@ namespace cw
_audioGroupProcSampleBufs( ag->p, ag, kAudioGroupGetBuf, true ); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupGetBuf, true );
_audioGroupProcSampleBufs( ag->p, ag, kAudioGroupGetBuf, false ); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupGetBuf, false );
ag->p->cbFunc(ag->p->cbArg,&msg); _ioCallback( ag->p, ag->asyncFl, &msg);
_audioGroupProcSampleBufs( ag->p, ag, kAudioGroupAdvBuf, true ); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupAdvBuf, true );
_audioGroupProcSampleBufs( ag->p, ag, kAudioGroupAdvBuf, false ); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupAdvBuf, false );
@ -1610,7 +1691,7 @@ namespace cw
r.tid = kUiTId; r.tid = kUiTId;
r.u.ui = { .opId=opId, .wsSessId=wsSessId, .parentAppId=parentAppId, .uuId=uuId, .appId=appId, .chanId=chanId, .value=v }; r.u.ui = { .opId=opId, .wsSessId=wsSessId, .parentAppId=parentAppId, .uuId=uuId, .appId=appId, .chanId=chanId, .value=v };
return p->cbFunc(p->cbArg,&r); return _ioCallback( p, p->uiAsyncFl, &r );
} }
rc_t _uiConfig( io_t* p, const object_t* c, const ui::appIdMap_t* mapA, unsigned mapN ) rc_t _uiConfig( io_t* p, const object_t* c, const ui::appIdMap_t* mapA, unsigned mapN )
@ -1618,7 +1699,7 @@ namespace cw
rc_t rc = kOkRC; rc_t rc = kOkRC;
const char* uiCfgLabel = "ui"; const char* uiCfgLabel = "ui";
ui::ws::args_t args = {}; ui::ws::args_t args = {};
const object_t* ui_cfg = nullptr;
// Duplicate the application id map // Duplicate the application id map
if( mapN > 0 ) if( mapN > 0 )
@ -1634,9 +1715,15 @@ namespace cw
// if a UI cfg record was given // if a UI cfg record was given
if( c->find(uiCfgLabel) != nullptr ) if((ui_cfg = c->find(uiCfgLabel)) != nullptr )
{ {
if((rc = ui_cfg->getv("asyncFl",p->uiAsyncFl)) != kOkRC )
{
rc = cwLogError(rc,"UI configuration parse failed.");
goto errLabel;
}
// parse the ui // parse the ui
if((rc = ui::ws::parseArgs( *c, args, uiCfgLabel )) == kOkRC ) if((rc = ui::ws::parseArgs( *c, args, uiCfgLabel )) == kOkRC )
{ {
@ -1646,6 +1733,8 @@ namespace cw
} }
} }
errLabel:
return rc; return rc;
} }
@ -1783,6 +1872,14 @@ cw::rc_t cw::io::create(
p->cbFunc = cbFunc; p->cbFunc = cbFunc;
p->cbArg = cbArg; p->cbArg = cbArg;
// parse the 'io' configuration block
if((rc = _ioParse(p,o)) != kOkRC )
goto errLabel;
// create the callback mutex
if((rc = mutex::create( p->cbMutexH )) != kOkRC )
goto errLabel;
// create the the thread machine // create the the thread machine
if((rc = thread_mach::create( p->threadMachH )) != kOkRC ) if((rc = thread_mach::create( p->threadMachH )) != kOkRC )
goto errLabel; goto errLabel;
@ -1865,19 +1962,24 @@ cw::rc_t cw::io::stop( handle_t h )
return kOkRC; return kOkRC;
} }
cw::rc_t cw::io::exec( handle_t h ) cw::rc_t cw::io::exec( handle_t h, void* execCbArg )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
io_t* p = _handleToPtr(h); io_t* p = _handleToPtr(h);
if( p->wsUiH.isValid() ) if( p->wsUiH.isValid() )
rc = ui::ws::exec(p->wsUiH ); rc = ui::ws::exec( p->wsUiH );
time::get(p->t0); time::get(p->t0);
if( p->audioMeterDevEnabledN ) if( p->audioMeterDevEnabledN )
_audioDeviceProcessMeters(p); _audioDeviceProcessMeters(p);
msg_t m;
m.tid = kExecTId;
m.u.exec.execArg = execCbArg;
_ioCallback(p,false,&m);
return rc; return rc;
} }
@ -1916,13 +2018,14 @@ void cw::io::report( handle_t h )
// Thread // Thread
// //
cw::rc_t cw::io::threadCreate( handle_t h, unsigned id, void* arg ) cw::rc_t cw::io::threadCreate( handle_t h, unsigned id, bool asyncFl, void* arg )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
io_t* p = _handleToPtr(h); io_t* p = _handleToPtr(h);
thread_t* t = mem::allocZ<thread_t>(1); thread_t* t = mem::allocZ<thread_t>(1);
t->id = id; t->id = id;
t->asyncFl = asyncFl;
t->arg = arg; t->arg = arg;
t->p = p; t->p = p;
t->link = p->threadL; t->link = p->threadL;
@ -1939,10 +2042,10 @@ cw::rc_t cw::io::threadCreate( handle_t h, unsigned id, void* arg )
// Timer // Timer
// //
cw::rc_t cw::io::timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec ) cw::rc_t cw::io::timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec, bool asyncFl )
{ {
io_t* p = _handleToPtr(h); io_t* p = _handleToPtr(h);
return _timerCreate(p, label, id, periodMicroSec ); return _timerCreate(p, label, id, periodMicroSec, asyncFl );
} }

16
cwIo.h
View File

@ -35,7 +35,8 @@ namespace cw
kAudioMeterTId, kAudioMeterTId,
kSockTId, kSockTId,
kWebSockTId, kWebSockTId,
kUiTId kUiTId,
kExecTId
}; };
typedef struct thread_msg_str typedef struct thread_msg_str
@ -123,9 +124,15 @@ namespace cw
const ui::value_t* value; const ui::value_t* value;
} ui_msg_t; } ui_msg_t;
typedef struct exec_msg_str
{
void* execArg;
} exec_msg_t;
typedef struct msg_str typedef struct msg_str
{ {
unsigned tid; unsigned tid;
bool asyncFl;
union union
{ {
thread_msg_t* thread; thread_msg_t* thread;
@ -136,6 +143,7 @@ namespace cw
audio_group_dev_t* audioGroupDev; // audioMeterTId audio_group_dev_t* audioGroupDev; // audioMeterTId
socket_msg_t* sock; socket_msg_t* sock;
ui_msg_t ui; ui_msg_t ui;
exec_msg_t exec;
} u; } u;
} msg_t; } msg_t;
@ -155,7 +163,7 @@ namespace cw
rc_t start( handle_t h ); rc_t start( handle_t h );
rc_t pause( handle_t h ); rc_t pause( handle_t h );
rc_t stop( handle_t h ); rc_t stop( handle_t h );
rc_t exec( handle_t h ); rc_t exec( handle_t h, void* execCbArg=nullptr );
bool isShuttingDown( handle_t h ); bool isShuttingDown( handle_t h );
void report( handle_t h ); void report( handle_t h );
@ -163,14 +171,14 @@ namespace cw
// //
// Thread // Thread
// //
rc_t threadCreate( handle_t h, unsigned id, void* arg ); rc_t threadCreate( handle_t h, unsigned id, bool asyncFl, void* arg );
//---------------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------------
// //
// Timer // Timer
// //
rc_t timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec ); rc_t timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec, bool asyncFl );
rc_t timerDestroy( handle_t h, unsigned timerIdx ); rc_t timerDestroy( handle_t h, unsigned timerIdx );
unsigned timerCount( handle_t h ); unsigned timerCount( handle_t h );

View File

@ -978,6 +978,7 @@ cw::rc_t cw::io::audio_midi::main( const object_t* cfg )
rc_t rc; rc_t rc;
app_t app = {}; app_t app = {};
bool asyncFl = true;
if((rc = _parseCfg(&app,cfg)) != kOkRC ) if((rc = _parseCfg(&app,cfg)) != kOkRC )
goto errLabel; goto errLabel;
@ -987,7 +988,7 @@ cw::rc_t cw::io::audio_midi::main( const object_t* cfg )
return rc; return rc;
// create the MIDI playback timer // create the MIDI playback timer
if((rc = timerCreate( app.ioH, "am_timer", kAmMidiTimerId, app.midi_timer_period_micro_sec)) != kOkRC ) if((rc = timerCreate( app.ioH, "am_timer", kAmMidiTimerId, app.midi_timer_period_micro_sec, asyncFl)) != kOkRC )
{ {
cwLogError(rc,"Audio-MIDI timer create failed."); cwLogError(rc,"Audio-MIDI timer create failed.");
goto errLabel; goto errLabel;

View File

@ -1065,6 +1065,7 @@ namespace cw
cw::rc_t cw::midi_record_play::create( handle_t& hRef, io::handle_t ioH, const object_t& cfg, event_callback_t cb, void* cb_arg ) cw::rc_t cw::midi_record_play::create( handle_t& hRef, io::handle_t ioH, const object_t& cfg, event_callback_t cb, void* cb_arg )
{ {
bool asyncFl = true;
midi_record_play_t* p = nullptr; midi_record_play_t* p = nullptr;
rc_t rc; rc_t rc;
@ -1113,7 +1114,7 @@ cw::rc_t cw::midi_record_play::create( handle_t& hRef, io::handle_t ioH, const o
} }
// create the MIDI playback timer // create the MIDI playback timer
if((rc = timerCreate( p->ioH, TIMER_LABEL, kMidiRecordPlayTimerId, p->midi_timer_period_micro_sec)) != kOkRC ) if((rc = timerCreate( p->ioH, TIMER_LABEL, kMidiRecordPlayTimerId, p->midi_timer_period_micro_sec, asyncFl)) != kOkRC )
{ {
cwLogError(rc,"Audio-MIDI timer create failed."); cwLogError(rc,"Audio-MIDI timer create failed.");
goto errLabel; goto errLabel;

View File

@ -88,18 +88,19 @@ cw::rc_t cw::min_test( const object_t* cfg )
rc_t rc; rc_t rc;
app_t app = {}; app_t app = {};
bool asyncFl = true;
// create the io framework instance // create the io framework instance
if((rc = create(app.ioH,cfg,minTestCb,&app)) != kOkRC ) if((rc = create(app.ioH,cfg,minTestCb,&app)) != kOkRC )
return rc; return rc;
if((rc = threadCreate( app.ioH, kThread0Id, &app )) != kOkRC ) if((rc = threadCreate( app.ioH, kThread0Id, asyncFl, &app )) != kOkRC )
{ {
rc = cwLogError(rc,"Thread 0 create failed."); rc = cwLogError(rc,"Thread 0 create failed.");
goto errLabel; goto errLabel;
} }
if((rc = threadCreate( app.ioH, kThread1Id, &app )) != kOkRC ) if((rc = threadCreate( app.ioH, kThread1Id, asyncFl, &app )) != kOkRC )
{ {
rc = cwLogError(rc,"Thread 1 create failed."); rc = cwLogError(rc,"Thread 1 create failed.");
goto errLabel; goto errLabel;

View File

@ -97,6 +97,34 @@ cw::rc_t cw::mutex::tryLock( handle_t h, bool& lockFlRef )
return rc; return rc;
} }
cw::rc_t cw::mutex::lock( handle_t h, unsigned timeout_milliseconds )
{
rc_t rc = kOkRC;
mutex_t* p = _handleToPtr(h);
int sysRc;
time::spec_t ts;
time::get(ts);
time::advanceMs(ts,timeout_milliseconds);
switch(sysRc = pthread_mutex_timedlock(&p->mutex,&ts) )
{
case 0:
rc = kOkRC;
break;
case ETIMEDOUT:
rc = kTimeOutRC;
default:
rc = cwLogSysError(kInvalidOpRC,sysRc,"Lock failed.");
}
return rc;
}
cw::rc_t cw::mutex::lock( handle_t h ) cw::rc_t cw::mutex::lock( handle_t h )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;

View File

@ -11,6 +11,7 @@ namespace cw
rc_t destroy( handle_t& h ); rc_t destroy( handle_t& h );
rc_t tryLock( handle_t h, bool& lockFlRef ); rc_t tryLock( handle_t h, bool& lockFlRef );
rc_t lock( handle_t h, unsigned timeout_milliseconds );
rc_t lock( handle_t h ); rc_t lock( handle_t h );
rc_t unlock( handle_t h ); rc_t unlock( handle_t h );