diff --git a/Makefile.am b/Makefile.am index b4a126a..ac568fd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -25,6 +25,9 @@ libcwSRC += src/libcw/cwThread.cpp src/libcw/cwMutex.cpp src/libcw/cwThreadMach libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h src/libcw/cwSpScQueueTmpl.h libcwSRC += src/libcw/cwSpScBuf.cpp src/libcw/cwSpScQueueTmpl.cpp +libcwHDR += src/libcw/cwNbMpScQueue.h +libcwSRC += src/libcw/cwNbMpScQueue.cpp + libcwHDR += src/libcw/cwAudioFile.h src/libcw/cwMidiFile.h libcwSRC += src/libcw/cwAudioFile.cpp src/libcw/cwMidiFile.cpp diff --git a/cwIo.cpp b/cwIo.cpp index 74de76f..9a6d969 100644 --- a/cwIo.cpp +++ b/cwIo.cpp @@ -503,6 +503,7 @@ namespace cw const object_t* port_array = nullptr; unsigned pollPeriodMs = 50; unsigned recvBufByteN = 512; + bool enableFl = false; // get the serial port list node if((cfg = c->find("serial")) == nullptr) @@ -512,7 +513,8 @@ namespace cw } // the serial header values - if((rc = cfg->getv("pollPeriodMs", pollPeriodMs, + if((rc = cfg->getv("enableFl",enableFl, + "pollPeriodMs", pollPeriodMs, "recvBufByteN", recvBufByteN, "array", port_array)) != kOkRC ) { @@ -521,6 +523,13 @@ namespace cw } p->serialN = port_array->child_count(); + + if( enableFl==false || p->serialN == 0 ) + { + cwLogInfo("Serial port sub-system disabled."); + goto errLabel; + } + p->serialA = mem::allocZ(p->serialN); @@ -632,9 +641,10 @@ namespace cw rc_t _midiPortCreate( io_t* p, const object_t* c ) { - rc_t rc = kOkRC; - const object_t* cfg = nullptr; - + rc_t rc = kOkRC; + const object_t* cfg = nullptr; + bool enableFl = false; + // get the MIDI port cfg if((cfg = c->find("midi")) == nullptr) { @@ -642,14 +652,19 @@ namespace cw return kOkRC; } - if((rc = cfg->getv("asyncFl", p->midiAsyncFl )) != kOkRC ) + if((rc = cfg->getv( "enableFl", enableFl, + "asyncFl", p->midiAsyncFl )) != kOkRC ) { rc = cwLogError(kSyntaxErrorRC,"MIDI configuration parse failed."); } - - if((rc = create(p->midiH, _midiCallback, p, cfg)) != kOkRC ) - return rc; + if( !enableFl ) + cwLogInfo("MIDI device system disabled."); + else + { + if((rc = create(p->midiH, _midiCallback, p, cfg)) != kOkRC ) + return rc; + } return rc; } @@ -762,6 +777,7 @@ namespace cw unsigned maxSocketCnt = 10; unsigned recvBufByteCnt = 4096; const object_t* socketL = nullptr; + bool enableFl = false; // get the socket configuration node if((node = cfg->find("socket")) == nullptr ) @@ -772,10 +788,11 @@ namespace cw // get the required socket arguments if(( rc = node->getv( - "maxSocketCnt", maxSocketCnt, - "recvBufByteCnt", recvBufByteCnt, - "threadTimeOutMs", p->sockThreadTimeOutMs, - "socketL", socketL )) != kOkRC ) + "enableFl", enableFl, + "maxSocketCnt", maxSocketCnt, + "recvBufByteCnt", recvBufByteCnt, + "threadTimeOutMs", p->sockThreadTimeOutMs, + "socketL", socketL )) != kOkRC ) { rc = cwLogError(kSyntaxErrorRC,"Unable to parse the 'socket' configuration node."); goto errLabel; @@ -786,6 +803,13 @@ namespace cw // create the socket control array p->sockN = socketL->child_count(); + + if( enableFl == false || p->sockN == 0 ) + { + cwLogInfo("Socket system disabled."); + goto errLabel; + } + p->sockA = mem::allocZ(p->sockN); // create the socket manager @@ -2023,8 +2047,27 @@ namespace cw { rc_t rc = kOkRC; audio::device::driver_t* audioDrv = nullptr; - const object_t* cfg; + const object_t* cfg = nullptr; + bool enableFl = false; + + // get the audio port node + if((cfg = c->find("audio")) == nullptr ) + { + cwLogWarning("No 'audio' configuration node."); + goto errLabel; + } + if((rc = cfg->getv("enableFl",enableFl)) != kOkRC ) + { + rc = cwLogError(rc,"Error reading top level audio cfg."); + goto errLabel; + } + + if( !enableFl ) + { + cwLogInfo("Audio sub-system disabled."); + goto errLabel; + } // initialize the audio device interface if((rc = audio::device::create(p->audioH)) != kOkRC ) @@ -2047,39 +2090,30 @@ namespace cw goto errLabel; } - // get the audio port node - if((cfg = c->find("audio")) == nullptr ) - { - cwLogWarning("No 'audio' configuration node."); - } - else - { - - // create the audio device sub-system - audio device files must be created - // before they can be referenced in _audioParseConfig(). - if((rc = _audioCreateDeviceFiles(p,cfg,audioDrv)) != kOkRC ) + // create the audio device sub-system - audio device files must be created + // before they can be referenced in _audioParseConfig(). + if((rc = _audioCreateDeviceFiles(p,cfg,audioDrv)) != kOkRC ) { rc = cwLogError(rc,"The audio device file creation failed."); goto errLabel; } - // register audio device file driver - if( audioDrv != nullptr ) - if((rc = audio::device::registerDriver( p->audioH, audioDrv )) != kOkRC ) - { - rc = cwLogError(rc,"The audio device file driver registration failed."); - goto errLabel; - } - - // read the configuration information and setup the audio hardware - if((rc = _audioParseConfig( p, cfg )) != kOkRC ) + // register audio device file driver + if( audioDrv != nullptr ) + if((rc = audio::device::registerDriver( p->audioH, audioDrv )) != kOkRC ) { - rc = cwLogError(rc,"Audio device configuration failed."); + rc = cwLogError(rc,"The audio device file driver registration failed."); goto errLabel; } - audio::device::report( p->audioH ); + // read the configuration information and setup the audio hardware + if((rc = _audioParseConfig( p, cfg )) != kOkRC ) + { + rc = cwLogError(rc,"Audio device configuration failed."); + goto errLabel; } + + audio::device::report( p->audioH ); errLabel: return rc; @@ -2113,6 +2147,7 @@ namespace cw const char* uiCfgLabel = "ui"; ui::ws::args_t args = {}; const object_t* ui_cfg = nullptr; + bool enableFl = false; // Duplicate the application id map if( mapN > 0 ) @@ -2131,12 +2166,19 @@ namespace cw if((ui_cfg = c->find(uiCfgLabel)) != nullptr ) { - if((rc = ui_cfg->getv("asyncFl",p->uiAsyncFl)) != kOkRC ) + if((rc = ui_cfg->getv("enableFl",enableFl, + "asyncFl",p->uiAsyncFl)) != kOkRC ) { rc = cwLogError(rc,"UI configuration parse failed."); goto errLabel; } + if( !enableFl ) + { + cwLogInfo("UI sub-system disabled."); + goto errLabel; + } + // parse the ui if((rc = ui::ws::parseArgs( *c, args, uiCfgLabel )) == kOkRC ) { diff --git a/cwNbMpScQueue.cpp b/cwNbMpScQueue.cpp new file mode 100644 index 0000000..defa081 --- /dev/null +++ b/cwNbMpScQueue.cpp @@ -0,0 +1,387 @@ +#include "cwCommon.h" +#include "cwLog.h" +#include "cwCommonImpl.h" +#include "cwMem.h" +#include "cwTime.h" +#include "cwObject.h" + +#include "cwNbMpScQueue.h" + +#include "cwThread.h" +#include "cwThreadMach.h" + +namespace cw +{ + namespace nbmpscq + { + typedef struct block_str + { + uint8_t* buf; // buf[ bufByteN ] + unsigned bufByteN; + + std::atomic full_flag; + std::atomic index; // offset to next avail byte in mem[] + std::atomic eleN; // count of elements in block + + struct block_str* link; + + } block_t; + + typedef struct node_str + { + std::atomic next; + block_t* block; + unsigned blobByteN; + // blob data follows + } node_t; + + typedef struct nbmpscq_str + { + uint8_t* mem; + unsigned blkN; // count of blocks in blockL + unsigned blkByteN; // size of each block_t.mem[] buffer + + block_t* blockL; // linked list of blocks + + std::atomic cleanBlkN; // count of blocks that need to be cleaned + unsigned cleanProcN; // count of times the clear process has been run + + node_t* stub; // dummy node + std::atomic head; // last-in + node_t* tail; // first-out + + } nbmpscq_t; + + nbmpscq_t* _handleToPtr( handle_t h ) + { return handleToPtr(h); } + + rc_t _destroy( nbmpscq_t* p ) + { + rc_t rc = kOkRC; + if( p != nullptr ) + { + mem::release(p->stub); + mem::release(p->mem); + mem::release(p); + } + return rc; + } + + void _clean( nbmpscq_t* p ) + { + block_t* b = p->blockL; + for(; b!=nullptr; b=b->link) + { + if( b->full_flag.load(std::memory_order_acquire) ) + { + if( b->eleN.load(std::memory_order_acquire) <= 0 ) + { + unsigned cc = p->cleanBlkN.fetch_add(-1,std::memory_order_relaxed); + assert(cc>=1); + + // Note: b->full_flag==true and p->eleN==0 so it is safe to reset the block + // because all elements have been removed (eleN==0) and + // no other threads will be accessing it (full_flag==true) + b->eleN.store(0,std::memory_order_relaxed); + b->index.store(0,std::memory_order_relaxed); + b->full_flag.store(false,std::memory_order_release); + } + } + } + p->cleanProcN += 1; + } + + + typedef struct shared_str + { + handle_t qH; + std::atomic cnt; + + } test_share_t; + + typedef struct test_str + { + unsigned id; // thread id + unsigned iter; // execution counter + unsigned value; + test_share_t* share; + } test_t; + + + + bool _threadFunc( void* arg ) + { + test_t* t = (test_t*)arg; + + t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); + + push(t->share->qH,t,sizeof(test_t)); + + t->iter += 1; + + sleepMs( rand() & 0xf ); + + return true; + } + + + } +} + +cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN ) +{ + rc_t rc = kOkRC; + nbmpscq_t* p = nullptr; + unsigned byteN = 0; + + if((rc = destroy(hRef)) != kOkRC ) + goto errLabel; + + p = mem::allocZ(); + + p->stub = mem::allocZ(); + p->head = p->stub; // last-in + p->tail = p->stub; // first-out + p->cleanBlkN = 0; + + p->blkN = initBlkN; + p->blkByteN = blkByteN; + byteN = initBlkN * (sizeof(block_t) + blkByteN ); + p->mem = mem::allocZ(byteN); + + for(unsigned i=0; imem+i); + b->buf = (uint8_t*)(b + 1); + b->bufByteN = blkByteN; + + b->full_flag.store(false); + b->index.store(0); + b->eleN.store(0); + + b->link = p->blockL; + p->blockL = b; + } + + hRef.set(p); + +errLabel: + if(rc != kOkRC ) + { + rc = cwLogError(rc,"NbMpScQueue destroy failed."); + _destroy(p); + } + + return rc; +} + +cw::rc_t cw::nbmpscq::destroy( handle_t& hRef ) +{ + rc_t rc = kOkRC; + if(!hRef.isValid()) + return rc; + + nbmpscq_t* p = _handleToPtr(hRef); + + if((rc = _destroy(p)) != kOkRC ) + goto errLabel; + + hRef.clear(); +errLabel: + + if( rc != kOkRC ) + rc = cwLogError(rc,"NbMpScQueue destroy failed."); + return rc; + +} + +cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) +{ + rc_t rc = kOkRC; + nbmpscq_t* p = _handleToPtr(h); + block_t* b = p->blockL; + + unsigned nodeByteN = blobByteN + sizeof(node_t); + + for(; b!=nullptr; b=b->link) + { + if( b->full_flag.load(std::memory_order_acquire) == false ) + { + unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel); + + if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN ) + { + p->cleanBlkN.fetch_add(1,std::memory_order_relaxed); + b->full_flag.store(true,std::memory_order_release); + } + else + { + node_t* n = (node_t*)(b->buf + idx); + n->blobByteN = blobByteN; + n->block = b; + + memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN); + + b->eleN.fetch_add(1,std::memory_order_release); + + n->next.store(nullptr); + // Note that the elements of the queue are only accessed from the end of the queue (tail). + // New nodes can therefore safely be updated in two steps: + + // 1. Atomically set _head to the new node and return 'old-head' + node_t* prev = p->head.exchange(n,std::memory_order_acq_rel); + + // Note that at this point only the new node may have the 'old-head' as it's predecssor. + // Other threads may therefore safely interrupt at this point. + + // 2. Set the old-head next pointer to the new node (thereby adding the new node to the list) + prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer + + break; + } + } + } + + if( b == nullptr ) + rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow. %i %i",p->cleanProcN, p->cleanBlkN.load()); + + return rc; + +} + +cw::nbmpscq::blob_t cw::nbmpscq::next( handle_t h ) +{ + blob_t blob; + nbmpscq_t* p = _handleToPtr(h); + + node_t* t = p->tail; + node_t* n = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer + + if( n == nullptr ) + { + blob.blob = nullptr; + blob.blobByteN = 0; + } + else + { + blob.blob = (uint8_t*)(n+1); + blob.blobByteN = n->blobByteN; + } + + return blob; + +} + +cw::rc_t cw::nbmpscq::advance( handle_t h ) +{ + nbmpscq_t* p = _handleToPtr(h); + rc_t rc = kOkRC; + node_t* t = p->tail; + node_t* next = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer + + if( next != nullptr ) + { + p->tail = next; + + int eleN = next->block->eleN.fetch_add(-1,std::memory_order_acq_rel); + + // next was valid and so eleN must be >= 1 + assert( eleN >= 1 ); + + } + + if( p->cleanBlkN.load(std::memory_order_relaxed) > 0 ) + _clean(p); + + return rc; +} + +cw::rc_t cw::nbmpscq::test( const object_t* cfg ) +{ + rc_t rc=kOkRC,rc0,rc1; + + unsigned testArrayN = 2; + test_t* testArray = nullptr; + unsigned blkN = 2; + unsigned blkByteN = 1024; + time::spec_t t0 = time::current_time(); + unsigned testDurMs = 0; + test_share_t share; + handle_t qH; + thread_mach::handle_t tmH; + + if((rc = cfg->getv("blkN",blkN, + "blkByteN",blkByteN, + "testDurMs",testDurMs, + "threadN",testArrayN)) != kOkRC ) + { + rc = cwLogError(rc,"Test params parse failed."); + goto errLabel; + } + + if( testArrayN == 0 ) + { + rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); + goto errLabel; + } + + testArray = mem::allocZ(testArrayN); + + // create the queue + if((rc = create( qH, blkN, blkByteN )) != kOkRC ) + { + rc = cwLogError(rc,"nbmpsc create failed."); + goto errLabel; + } + + share.qH = qH; + share.cnt.store(0); + + for(unsigned i=0; iid,t->iter,t->value,b.blobByteN); + advance(qH); + } + } + + errLabel: + if((rc0 = thread_mach::destroy(tmH)) != kOkRC ) + cwLogError(rc0,"Thread machine destroy failed."); + + if((rc1 = destroy(qH)) != kOkRC ) + cwLogError(rc1,"nbmpsc queue destroy failed."); + + if( testArray != nullptr ) + printf("P:%i %i\n",testArray[0].iter, testArray[1].iter); + + mem::release(testArray); + + return rcSelect(rc,rc0,rc1); + +} + diff --git a/cwNbMpScQueue.h b/cwNbMpScQueue.h new file mode 100644 index 0000000..5bd30f5 --- /dev/null +++ b/cwNbMpScQueue.h @@ -0,0 +1,56 @@ +#ifndef cwNbMpScQueue_h +#define cwNbMpScQueue_h + +/* +Non-blocking, Lock-free Queue: +================================= + +Push +---- +0. Produceers go to next block, if the write-position is valid, +then fetch-add the write-position forward to allocate space. + +1. If after the fetch-add the area is valid then + - atomically incr ele-count, + - copy in ele + - place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue(). + +2. else (the area is invalid) goto 0. + +Pop +---- +1. copy out next ele. +2. decr. block->ele_count +3. if the ele-count is 0 and write-offset is invalid +reset the write-offset to 0. +*/ + + + +namespace cw +{ + namespace nbmpscq + { + typedef handle handle_t; + + rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN ); + rc_t destroy( handle_t& hRef ); + + rc_t push( handle_t h, const void* blob, unsigned blobByteN ); + + typedef struct blob_str + { + const void* blob; + unsigned blobByteN; + } blob_t; + + blob_t next( handle_t h ); + rc_t advance( handle_t h ); + + rc_t test( const object_t* cfg ); + + } +} + + +#endif diff --git a/cwScoreFollowTest.cpp b/cwScoreFollowTest.cpp index 672fc9c..1b00ed0 100644 --- a/cwScoreFollowTest.cpp +++ b/cwScoreFollowTest.cpp @@ -376,7 +376,8 @@ namespace cw mem::release(midi_fname); mem::release(sync_perf_fname); mem::release(meta_fname); - meta_obj->free(); + if( meta_obj != nullptr ) + meta_obj->free(); } mem::release(dirEntryArray); @@ -409,6 +410,7 @@ namespace cw unsigned start_loc = 0; unsigned end_loc = 0; const object_t* perf = nullptr; + filesys::pathPart_t* pathParts = nullptr; midi::file::handle_t mfH; // get the perf. record @@ -434,13 +436,25 @@ namespace cw if( !enable_fl ) goto errLabel; - - // create the output directory + + if((pathParts = filesys::pathParts(midi_fname)) == nullptr ) + { + rc = cwLogError(kOpFailRC,"MIDI file name parse failed on '%s'.",cwStringNullGuard(midi_fname)); + goto errLabel; + } + + /* + // create the output filename if((out_dir = filesys::makeFn(p->out_dir,perf_label,nullptr,nullptr)) == nullptr ) { rc = cwLogError(kOpFailRC,"Directory name formation failed on '%s'.",cwStringNullGuard(out_dir)); goto errLabel; } + */ + + out_dir = mem::duplStr(pathParts->dirStr); + + mem::release(pathParts); // create the output directory if((rc = filesys::makeDir(out_dir)) != kOkRC ) @@ -567,6 +581,7 @@ namespace cw errLabel: + mem::release(pathParts); mem::release(out_dir); mem::release(fname); close(mfH); diff --git a/cwScoreFollower.cpp b/cwScoreFollower.cpp index 6207509..2f27677 100644 --- a/cwScoreFollower.cpp +++ b/cwScoreFollower.cpp @@ -394,6 +394,7 @@ cw::rc_t cw::score_follower::exec( handle_t h, newMatchFlRef = false; + // This call results in a callback to: _score_follower_cb() // Note: pass p->perf_idx as 'muid' to the score follower rc = exec( p->trackH, sec, smpIdx, p->perf_idx, status, d0, d1, &scLocIdx ); diff --git a/cwScoreFollower.h b/cwScoreFollower.h index 1d7e6ba..b9294df 100644 --- a/cwScoreFollower.h +++ b/cwScoreFollower.h @@ -74,7 +74,7 @@ namespace cw // Write the score to 'out_fname'. void score_report( handle_t h, const char* out_fname ); - // Use the stored MIDI data received since the last call to reset to generate a report + // Use the stored MIDI data, received since the last call to reset(), to generate a report // using midi_state::report_events(). rc_t midi_state_rt_report( handle_t h, const char* out_fname ); diff --git a/cwSfTrack.h b/cwSfTrack.h index 153c0eb..70e0c10 100644 --- a/cwSfTrack.h +++ b/cwSfTrack.h @@ -74,8 +74,8 @@ namespace cw // Step forward/back by p->stepCnt from p->eli. // p->eli must therefore be valid prior to calling this function. // If more than p->maxMissCnt consecutive MIDI events are - // missed then automatically run cmScAlignScan(). - // Return cmEofRC if the end of the score is encountered. + // missed then automatically run _scan (). + // Return kEofRC if the end of the score is encountered. // Return cmSubSysFailRC if an internal scan resync. failed. //rc_t _step( handle_t h );