diff --git a/cwNbMpScQueue.cpp b/cwNbMpScQueue.cpp index defa081..6708965 100644 --- a/cwNbMpScQueue.cpp +++ b/cwNbMpScQueue.cpp @@ -9,6 +9,7 @@ #include "cwThread.h" #include "cwThreadMach.h" +#include "cwFile.h" namespace cw { @@ -17,13 +18,13 @@ namespace cw typedef struct block_str { uint8_t* buf; // buf[ bufByteN ] - unsigned bufByteN; + unsigned bufByteN; - std::atomic full_flag; - std::atomic index; // offset to next avail byte in mem[] - std::atomic eleN; // count of elements in block + std::atomic full_flag; // Set if this block is full (i.e. index >= bufByteN) + std::atomic index; // Offset to next avail byte in buf[] + std::atomic eleN; // Current count of elements stored in buf[] - struct block_str* link; + struct block_str* link; // } block_t; @@ -37,17 +38,18 @@ namespace cw typedef struct nbmpscq_str { - uint8_t* mem; - unsigned blkN; // count of blocks in blockL - unsigned blkByteN; // size of each block_t.mem[] buffer + uint8_t* mem; // Pointer to a single area of memory which holds all blocks. + unsigned blkN; // Count of blocks in blockL + unsigned blkByteN; // Size of each block_t.mem[] buffer - block_t* blockL; // linked list of blocks + block_t* blockL; // Linked list of blocks - std::atomic cleanBlkN; // count of blocks that need to be cleaned + std::atomic cleanBlkN; // count of blocks that need to be cleaned unsigned cleanProcN; // count of times the clear process has been run node_t* stub; // dummy node std::atomic head; // last-in + node_t* tail; // first-out } nbmpscq_t; @@ -67,15 +69,20 @@ namespace cw return rc; } + // _clean() is run by the consumer thread to make empty blocks available. void _clean( nbmpscq_t* p ) { block_t* b = p->blockL; + // for each block for(; b!=nullptr; b=b->link) { + // if this block is full ... if( b->full_flag.load(std::memory_order_acquire) ) { + // ... and there are no more elements to be read from the block if( b->eleN.load(std::memory_order_acquire) <= 0 ) { + // decr. the cleanBlkN count unsigned cc = p->cleanBlkN.fetch_add(-1,std::memory_order_relaxed); assert(cc>=1); @@ -103,20 +110,23 @@ namespace cw { unsigned id; // thread id unsigned iter; // execution counter - unsigned value; - test_share_t* share; + unsigned value; // + test_share_t* share; // pointer to global shared data } test_t; - bool _threadFunc( void* arg ) + bool _test_threadFunc( void* arg ) { test_t* t = (test_t*)arg; + // get and increment a global shared counter t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); + // push the current thread instance record push(t->share->qH,t,sizeof(test_t)); - + + // incrmemen this threads exec. counter t->iter += 1; sleepMs( rand() & 0xf ); @@ -201,27 +211,35 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) nbmpscq_t* p = _handleToPtr(h); block_t* b = p->blockL; + // TODO: handle case where blobByteN is greater than p->blkByteN + // Note that this case will immediately overflow the queue. + unsigned nodeByteN = blobByteN + sizeof(node_t); for(; b!=nullptr; b=b->link) { if( b->full_flag.load(std::memory_order_acquire) == false ) { + // attempt to allocate nodeByteN bytes starting at b->index unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel); + // if the allocation was not valid if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN ) { + // incr the 'clean count' and mark the block as full p->cleanBlkN.fetch_add(1,std::memory_order_relaxed); b->full_flag.store(true,std::memory_order_release); } else { + // otherwise this thread owns the allocated block node_t* n = (node_t*)(b->buf + idx); n->blobByteN = blobByteN; n->block = b; memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN); + // incr the block element count b->eleN.fetch_add(1,std::memory_order_release); n->next.store(nullptr); @@ -241,15 +259,20 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) } } } - - if( b == nullptr ) - rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow. %i %i",p->cleanProcN, p->cleanBlkN.load()); + + // if there is no space left in the queue to store the incoming blob + if( b == nullptr ) + { + // TODO: continue to iterate through the blocks waiting for the consumer + // to make more space available. + rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow."); + } return rc; } -cw::nbmpscq::blob_t cw::nbmpscq::next( handle_t h ) +cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h ) { blob_t blob; nbmpscq_t* p = _handleToPtr(h); @@ -283,7 +306,8 @@ cw::rc_t cw::nbmpscq::advance( handle_t h ) { p->tail = next; - int eleN = next->block->eleN.fetch_add(-1,std::memory_order_acq_rel); + block_t* b = next->block; + int eleN = b->eleN.fetch_add(-1,std::memory_order_acq_rel); // next was valid and so eleN must be >= 1 assert( eleN >= 1 ); @@ -304,27 +328,37 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg ) test_t* testArray = nullptr; unsigned blkN = 2; unsigned blkByteN = 1024; + const char* out_fname = nullptr; time::spec_t t0 = time::current_time(); unsigned testDurMs = 0; test_share_t share; handle_t qH; thread_mach::handle_t tmH; + file::handle_t fH; if((rc = cfg->getv("blkN",blkN, "blkByteN",blkByteN, "testDurMs",testDurMs, - "threadN",testArrayN)) != kOkRC ) + "threadN",testArrayN, + "out_fname",out_fname)) != kOkRC ) { rc = cwLogError(rc,"Test params parse failed."); goto errLabel; } + if((rc = file::open(fH,out_fname,file::kWriteFl)) != kOkRC ) + { + rc = cwLogError(rc,"Error creating the output file:%s",cwStringNullGuard(out_fname)); + goto errLabel; + } + if( testArrayN == 0 ) { rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); goto errLabel; } - + + // create the thread intance records testArray = mem::allocZ(testArrayN); // create the queue @@ -342,10 +376,9 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg ) testArray[i].id = i; testArray[i].share = &share; } - // create the thread machine - if((rc = thread_mach::create( tmH, _threadFunc, testArray, sizeof(test_t), testArrayN )) != kOkRC ) + if((rc = thread_mach::create( tmH, _test_threadFunc, testArray, sizeof(test_t), testArrayN )) != kOkRC ) { rc = cwLogError(rc,"Thread machine create failed."); goto errLabel; @@ -358,18 +391,21 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg ) goto errLabel; } + // run the test for 'testDurMs' milliseconds while( time::elapsedMs(t0) < testDurMs ) { - blob_t b = next(qH); + blob_t b = get(qH); if( b.blob != nullptr ) { test_t* t = (test_t*)b.blob; - printf("%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN); + printf(fH,"%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN); advance(qH); } } errLabel: + file::close(fH); + if((rc0 = thread_mach::destroy(tmH)) != kOkRC ) cwLogError(rc0,"Thread machine destroy failed."); @@ -379,6 +415,9 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg ) if( testArray != nullptr ) printf("P:%i %i\n",testArray[0].iter, testArray[1].iter); + // TODO: read back the file and verify that none of the + // global incrment values were dropped. + mem::release(testArray); return rcSelect(rc,rc0,rc1); diff --git a/cwNbMpScQueue.h b/cwNbMpScQueue.h index 5bd30f5..2e775da 100644 --- a/cwNbMpScQueue.h +++ b/cwNbMpScQueue.h @@ -7,10 +7,12 @@ Non-blocking, Lock-free Queue: Push ---- -0. Produceers go to next block, if the write-position is valid, -then fetch-add the write-position forward to allocate space. +0. Produceers go down the a list of blocks (nbmpscq.blockL) +if a block is not already full then it atomically +fetch-add's block->write_idx by the size of the the element +to be inserted. -1. If after the fetch-add the area is valid then +1. If after the fetch-add the write_idx is <= block->byteN then - atomically incr ele-count, - copy in ele - place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue(). @@ -34,8 +36,12 @@ namespace cw typedef handle handle_t; rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN ); + rc_t destroy( handle_t& hRef ); + // push() is called by multiple producer threads to insert + // an element in the queue. Note that the 'blob' is copied into + // the queue and therefore can be released by the caller. rc_t push( handle_t h, const void* blob, unsigned blobByteN ); typedef struct blob_str @@ -43,8 +49,14 @@ namespace cw const void* blob; unsigned blobByteN; } blob_t; - - blob_t next( handle_t h ); + + // get() is called by the single consumer thread to access the + // current blob at the front of the queue. Note that this call + // does not change the state of the queue. + blob_t get( handle_t h ); + + // advance() disposes of the blob at the front of the + // queue and makes the next blob current. rc_t advance( handle_t h ); rc_t test( const object_t* cfg );