diff --git a/cwIo.cpp b/cwIo.cpp index 3ad95ad..1c45287 100644 --- a/cwIo.cpp +++ b/cwIo.cpp @@ -42,6 +42,7 @@ namespace cw { unsigned id; void* arg; + bool asyncFl; struct io_str* p; struct thread_str* link; } thread_t; @@ -54,12 +55,14 @@ namespace cw char* label; unsigned id; unsigned periodMicroSec; + bool asyncFl; } timer_t; typedef struct serialPort_str { char* label; unsigned userId; + bool asyncFl; char* device; unsigned baudRate; unsigned flags; @@ -68,7 +71,8 @@ namespace cw typedef struct audioGroup_str { - bool enableFl; + bool enableFl; + bool asyncFl; audio_msg_t msg; mutex::handle_t mutexH; unsigned threadTimeOutMs; @@ -91,6 +95,7 @@ namespace cw typedef struct socket_str { bool enableFl; + bool asyncFl; char* label; unsigned sockA_index; unsigned userId; @@ -104,6 +109,9 @@ namespace cw cbFunc_t cbFunc; void* cbArg; + + mutex::handle_t cbMutexH; + unsigned cbMutexTimeOutMs; thread_mach::handle_t threadMachH; @@ -119,6 +127,7 @@ namespace cw serialPortSrv::handle_t serialPortSrvH; midi::device::handle_t midiH; + bool midiAsyncFl; socket_t* sockA; unsigned sockN; @@ -143,14 +152,83 @@ namespace cw ui::ws::handle_t wsUiH; // ui::ws handle (invalid if no UI was specified) ui::appIdMap_t* uiMapA; // Application supplied id's for the UI resource supplied with the cfg script via 'uiCfgFn'. unsigned uiMapN; // + bool uiAsyncFl; } io_t; + //---------------------------------------------------------------------------------------------------------- + // + // io + // + io_t* _handleToPtr( handle_t h ) { return handleToPtr(h); } + // All callbacks to the application occur through this function + rc_t _ioCallback( io_t* p, bool asyncFl, const msg_t* m ) + { + rc_t rc = kOkRC; + rc_t rc_app = kOkRC; + bool unlockFl = false; + + // if this is not an async callback then lock the mutex + if( !asyncFl ) + { + switch(rc = mutex::lock(p->cbMutexH,p->cbMutexTimeOutMs)) + { + case kOkRC: + unlockFl = true; + break; + + case kTimeOutRC: + rc = cwLogError(rc,"io mutex callback mutex lock timed out."); + break; + + default: + rc = cwLogError(rc,"io mutex callback mutex lock failed."); + } + + } + + // make the callback to the client + if( rc == kOkRC ) + rc_app = p->cbFunc( p->cbArg, m ); + + + // if the mutex is locked + if( unlockFl ) + { + if((rc = mutex::unlock(p->cbMutexH)) != kOkRC ) + { + rc = cwLogError(rc,"io mutex callback mutex unlock failed."); + } + } + + return rc_app; + } + + rc_t _ioParse( io_t* p, const object_t* cfg ) + { + rc_t rc; + const object_t* ioCfg; + if((ioCfg = cfg->find("io")) == nullptr ) + { + cwLogError(kInvalidArgRC,"The 'io' configuration block could not be found."); + goto errLabel; + } + + if((rc = ioCfg->getv("callbackMutexTimeOutMs",p->cbMutexTimeOutMs)) != kOkRC ) + { + cwLogError(rc,"Parsing of 'io' block configuration failed."); + goto errLabel; + } + + errLabel: + return rc; + } + //---------------------------------------------------------------------------------------------------------- // // Thread @@ -164,7 +242,7 @@ namespace cw m.tid = kThreadTId; m.u.thread = &tm; - t->p->cbFunc( t->p->cbArg, &m ); + _ioCallback( t->p, t->asyncFl, &m ); return true; } @@ -198,13 +276,13 @@ namespace cw tm.id = t->id; m.tid = kTimerTId; m.u.timer = &tm; - t->io->cbFunc( t->io->cbArg, &m ); + _ioCallback( t->io, t->asyncFl, &m ); } return !t->deletedFl; } - rc_t _timerCreate( io_t* p, const char* label, unsigned id, unsigned periodMicroSec ) + rc_t _timerCreate( io_t* p, const char* label, unsigned id, unsigned periodMicroSec, bool asyncFl ) { rc_t rc = kOkRC; timer_t* t = nullptr; @@ -239,6 +317,7 @@ namespace cw t->io = p; t->label = mem::duplStr(label); t->id = id; + t->asyncFl = asyncFl; t->periodMicroSec = periodMicroSec; if((rc = thread_mach::add(p->threadMachH,_timerThreadCb,t)) != kOkRC ) @@ -281,7 +360,7 @@ namespace cw void _serialPortCb( void* arg, unsigned serialCfgIdx, const void* byteA, unsigned byteN ) { - const io_t* p = (const io_t*)arg; + io_t* p = (io_t*)arg; if( serialCfgIdx > p->serialN ) cwLogError(kAssertFailRC,"The serial cfg index %i is out of range %i in serial port callback.", serialCfgIdx, p->serialN ); @@ -298,7 +377,7 @@ namespace cw m.tid = kSerialTId; m.u.serial = &sm; - p->cbFunc( p->cbArg, &m ); + _ioCallback( p, sp->asyncFl, &m ); } } @@ -320,7 +399,8 @@ namespace cw }; if((rc = e.getv( - "enable_flag", enableFlRef, + "enableFl", enableFlRef, + "asyncFl", port->asyncFl, "label", port->label, "device", port->device, "baud", port->baudRate, @@ -484,7 +564,7 @@ namespace cw m.tid = kMidiTId; m.u.midi = &mm; - p->cbFunc( p->cbArg, &m ); + _ioCallback( p, p->midiAsyncFl, &m ); /* for(unsigned j=0; jmsgCnt; ++j) @@ -509,8 +589,8 @@ namespace cw return kOkRC; } - if((rc = cfg->getv( - "parserBufByteN", parserBufByteN )) != kOkRC ) + if((rc = cfg->getv("parserBufByteN", parserBufByteN, + "asyncFl", p->midiAsyncFl )) != kOkRC ) { rc = cwLogError(kSyntaxErrorRC,"MIDI configuration parse failed."); } @@ -572,7 +652,7 @@ namespace cw m.tid = kSockTId; m.u.sock = &sm; - p->cbFunc( p->cbArg, &m ); + _ioCallback( p, p->sockA[ sockArray_index ].asyncFl, &m ); } } @@ -676,6 +756,7 @@ namespace cw // parse the required arguments if(( rc = node->getv( "enableFl", p->sockA[i].enableFl, + "asyncFl", p->sockA[i].asyncFl, "label", p->sockA[i].label, "port", port, "timeOutMs", timeOutMs, @@ -851,10 +932,10 @@ namespace cw } - rc_t _audioGroupDeviceProcessMeter( io_t* p, audio_group_dev_t* agd, unsigned audioBufFlags ) + rc_t _audioGroupDeviceProcessMeter( io_t* p, audio_group_dev_t* agd, audioGroup_t* ag, unsigned audioBufFlags ) { rc_t rc = kOkRC; - if( agd != nullptr && cwIsFlag(agd->flags,kMeterFl)) + if( agd != nullptr && ag != nullptr && cwIsFlag(agd->flags,kMeterFl)) { msg_t m; m.tid = kAudioMeterTId; @@ -864,7 +945,7 @@ namespace cw audio::buf::meter(p->audioBufH, agd->devIdx, audioBufFlags, agd->meterA, agd->chCnt ); // callback the application with the current meter values - rc = p->cbFunc(p->cbArg,&m); + rc = _ioCallback( p, ag->asyncFl, &m); } @@ -885,11 +966,11 @@ namespace cw rc_t rc0; // update the input meters - if((rc0 = _audioGroupDeviceProcessMeter( p, ad->iagd, audio::buf::kInFl )) != kOkRC ) + if((rc0 = _audioGroupDeviceProcessMeter( p, ad->iagd, ad->iGroup, audio::buf::kInFl )) != kOkRC ) rc = rc0; // update the output meters - if((rc0 = _audioGroupDeviceProcessMeter( p, ad->oagd, audio::buf::kOutFl )) != kOkRC ) + if((rc0 = _audioGroupDeviceProcessMeter( p, ad->oagd, ad->oGroup, audio::buf::kOutFl )) != kOkRC ) rc = rc0; } @@ -974,7 +1055,7 @@ namespace cw _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupGetBuf, true ); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupGetBuf, false ); - ag->p->cbFunc(ag->p->cbArg,&msg); + _ioCallback( ag->p, ag->asyncFl, &msg); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupAdvBuf, true ); _audioGroupProcSampleBufs( ag->p, ag, kAudioGroupAdvBuf, false ); @@ -1610,15 +1691,15 @@ namespace cw r.tid = kUiTId; r.u.ui = { .opId=opId, .wsSessId=wsSessId, .parentAppId=parentAppId, .uuId=uuId, .appId=appId, .chanId=chanId, .value=v }; - return p->cbFunc(p->cbArg,&r); + return _ioCallback( p, p->uiAsyncFl, &r ); } rc_t _uiConfig( io_t* p, const object_t* c, const ui::appIdMap_t* mapA, unsigned mapN ) { - rc_t rc = kOkRC; - const char* uiCfgLabel = "ui"; - ui::ws::args_t args = {}; - + rc_t rc = kOkRC; + const char* uiCfgLabel = "ui"; + ui::ws::args_t args = {}; + const object_t* ui_cfg = nullptr; // Duplicate the application id map if( mapN > 0 ) @@ -1634,9 +1715,15 @@ namespace cw // if a UI cfg record was given - if( c->find(uiCfgLabel) != nullptr ) + if((ui_cfg = c->find(uiCfgLabel)) != nullptr ) { + if((rc = ui_cfg->getv("asyncFl",p->uiAsyncFl)) != kOkRC ) + { + rc = cwLogError(rc,"UI configuration parse failed."); + goto errLabel; + } + // parse the ui if((rc = ui::ws::parseArgs( *c, args, uiCfgLabel )) == kOkRC ) { @@ -1646,6 +1733,8 @@ namespace cw } } + + errLabel: return rc; } @@ -1783,6 +1872,14 @@ cw::rc_t cw::io::create( p->cbFunc = cbFunc; p->cbArg = cbArg; + // parse the 'io' configuration block + if((rc = _ioParse(p,o)) != kOkRC ) + goto errLabel; + + // create the callback mutex + if((rc = mutex::create( p->cbMutexH )) != kOkRC ) + goto errLabel; + // create the the thread machine if((rc = thread_mach::create( p->threadMachH )) != kOkRC ) goto errLabel; @@ -1865,18 +1962,23 @@ cw::rc_t cw::io::stop( handle_t h ) return kOkRC; } -cw::rc_t cw::io::exec( handle_t h ) +cw::rc_t cw::io::exec( handle_t h, void* execCbArg ) { rc_t rc = kOkRC; io_t* p = _handleToPtr(h); if( p->wsUiH.isValid() ) - rc = ui::ws::exec(p->wsUiH ); + rc = ui::ws::exec( p->wsUiH ); time::get(p->t0); if( p->audioMeterDevEnabledN ) _audioDeviceProcessMeters(p); + + msg_t m; + m.tid = kExecTId; + m.u.exec.execArg = execCbArg; + _ioCallback(p,false,&m); return rc; } @@ -1916,13 +2018,14 @@ void cw::io::report( handle_t h ) // Thread // -cw::rc_t cw::io::threadCreate( handle_t h, unsigned id, void* arg ) +cw::rc_t cw::io::threadCreate( handle_t h, unsigned id, bool asyncFl, void* arg ) { rc_t rc = kOkRC; io_t* p = _handleToPtr(h); thread_t* t = mem::allocZ(1); t->id = id; + t->asyncFl = asyncFl; t->arg = arg; t->p = p; t->link = p->threadL; @@ -1939,10 +2042,10 @@ cw::rc_t cw::io::threadCreate( handle_t h, unsigned id, void* arg ) // Timer // -cw::rc_t cw::io::timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec ) +cw::rc_t cw::io::timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec, bool asyncFl ) { io_t* p = _handleToPtr(h); - return _timerCreate(p, label, id, periodMicroSec ); + return _timerCreate(p, label, id, periodMicroSec, asyncFl ); } diff --git a/cwIo.h b/cwIo.h index d1b645b..b658049 100644 --- a/cwIo.h +++ b/cwIo.h @@ -35,7 +35,8 @@ namespace cw kAudioMeterTId, kSockTId, kWebSockTId, - kUiTId + kUiTId, + kExecTId }; typedef struct thread_msg_str @@ -123,9 +124,15 @@ namespace cw const ui::value_t* value; } ui_msg_t; + typedef struct exec_msg_str + { + void* execArg; + } exec_msg_t; + typedef struct msg_str { unsigned tid; + bool asyncFl; union { thread_msg_t* thread; @@ -136,6 +143,7 @@ namespace cw audio_group_dev_t* audioGroupDev; // audioMeterTId socket_msg_t* sock; ui_msg_t ui; + exec_msg_t exec; } u; } msg_t; @@ -155,7 +163,7 @@ namespace cw rc_t start( handle_t h ); rc_t pause( handle_t h ); rc_t stop( handle_t h ); - rc_t exec( handle_t h ); + rc_t exec( handle_t h, void* execCbArg=nullptr ); bool isShuttingDown( handle_t h ); void report( handle_t h ); @@ -163,14 +171,14 @@ namespace cw // // Thread // - rc_t threadCreate( handle_t h, unsigned id, void* arg ); + rc_t threadCreate( handle_t h, unsigned id, bool asyncFl, void* arg ); //---------------------------------------------------------------------------------------------------------- // // Timer // - rc_t timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec ); + rc_t timerCreate( handle_t h, const char* label, unsigned id, unsigned periodMicroSec, bool asyncFl ); rc_t timerDestroy( handle_t h, unsigned timerIdx ); unsigned timerCount( handle_t h );