diff --git a/cwNbMpScQueue.cpp b/cwNbMpScQueue.cpp index 2d1ff3a..defa081 100644 --- a/cwNbMpScQueue.cpp +++ b/cwNbMpScQueue.cpp @@ -37,11 +37,14 @@ namespace cw typedef struct nbmpscq_str { + uint8_t* mem; unsigned blkN; // count of blocks in blockL unsigned blkByteN; // size of each block_t.mem[] buffer block_t* blockL; // linked list of blocks - std::atomic clean_cnt; // count of blocks that need to be cleaned + + std::atomic cleanBlkN; // count of blocks that need to be cleaned + unsigned cleanProcN; // count of times the clear process has been run node_t* stub; // dummy node std::atomic head; // last-in @@ -58,7 +61,7 @@ namespace cw if( p != nullptr ) { mem::release(p->stub); - mem::release(p->blockL); + mem::release(p->mem); mem::release(p); } return rc; @@ -73,7 +76,7 @@ namespace cw { if( b->eleN.load(std::memory_order_acquire) <= 0 ) { - unsigned cc = p->clean_cnt.fetch_add(-1,std::memory_order_relaxed); + unsigned cc = p->cleanBlkN.fetch_add(-1,std::memory_order_relaxed); assert(cc>=1); // Note: b->full_flag==true and p->eleN==0 so it is safe to reset the block @@ -85,6 +88,7 @@ namespace cw } } } + p->cleanProcN += 1; } @@ -99,7 +103,7 @@ namespace cw { unsigned id; // thread id unsigned iter; // execution counter - unsigned value; + unsigned value; test_share_t* share; } test_t; @@ -111,7 +115,7 @@ namespace cw t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); - push(t->share->qH,t,sizeof(t)); + push(t->share->qH,t,sizeof(test_t)); t->iter += 1; @@ -129,7 +133,6 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt rc_t rc = kOkRC; nbmpscq_t* p = nullptr; unsigned byteN = 0; - uint8_t* mem = nullptr; if((rc = destroy(hRef)) != kOkRC ) goto errLabel; @@ -139,16 +142,16 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt p->stub = mem::allocZ(); p->head = p->stub; // last-in p->tail = p->stub; // first-out - p->clean_cnt = 0; + p->cleanBlkN = 0; p->blkN = initBlkN; p->blkByteN = blkByteN; byteN = initBlkN * (sizeof(block_t) + blkByteN ); - mem = mem::allocZ(byteN); + p->mem = mem::allocZ(byteN); for(unsigned i=0; imem+i); b->buf = (uint8_t*)(b + 1); b->bufByteN = blkByteN; @@ -202,13 +205,13 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) for(; b!=nullptr; b=b->link) { - if( !b->full_flag.load(std::memory_order_acquire) ) + if( b->full_flag.load(std::memory_order_acquire) == false ) { unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel); if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN ) { - p->clean_cnt.fetch_add(1,std::memory_order_relaxed); + p->cleanBlkN.fetch_add(1,std::memory_order_relaxed); b->full_flag.store(true,std::memory_order_release); } else @@ -216,11 +219,11 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) node_t* n = (node_t*)(b->buf + idx); n->blobByteN = blobByteN; n->block = b; - - b->eleN.fetch_add(1,std::memory_order_release); memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN); + b->eleN.fetch_add(1,std::memory_order_release); + n->next.store(nullptr); // Note that the elements of the queue are only accessed from the end of the queue (tail). // New nodes can therefore safely be updated in two steps: @@ -233,13 +236,14 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) // 2. Set the old-head next pointer to the new node (thereby adding the new node to the list) prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer - + + break; } } } if( b == nullptr ) - rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow."); + rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow. %i %i",p->cleanProcN, p->cleanBlkN.load()); return rc; @@ -286,26 +290,42 @@ cw::rc_t cw::nbmpscq::advance( handle_t h ) } - if( p->clean_cnt.load(std::memory_order_relaxed) > 0 ) + if( p->cleanBlkN.load(std::memory_order_relaxed) > 0 ) _clean(p); return rc; } -cw::rc_t cw::nbmpscq::test( object_t* cfg ) +cw::rc_t cw::nbmpscq::test( const object_t* cfg ) { rc_t rc=kOkRC,rc0,rc1; - const int testArrayN = 2; - test_t testArray[testArrayN]; - const unsigned blkN = 2; - const unsigned blkByteN = 1024; + unsigned testArrayN = 2; + test_t* testArray = nullptr; + unsigned blkN = 2; + unsigned blkByteN = 1024; time::spec_t t0 = time::current_time(); + unsigned testDurMs = 0; test_share_t share; handle_t qH; thread_mach::handle_t tmH; + + if((rc = cfg->getv("blkN",blkN, + "blkByteN",blkByteN, + "testDurMs",testDurMs, + "threadN",testArrayN)) != kOkRC ) + { + rc = cwLogError(rc,"Test params parse failed."); + goto errLabel; + } + + if( testArrayN == 0 ) + { + rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); + goto errLabel; + } - memset(&testArray,0,sizeof(testArray)); + testArray = mem::allocZ(testArrayN); // create the queue if((rc = create( qH, blkN, blkByteN )) != kOkRC ) @@ -315,7 +335,7 @@ cw::rc_t cw::nbmpscq::test( object_t* cfg ) } share.qH = qH; - share.cnt = 0; + share.cnt.store(0); for(unsigned i=0; iid,t->iter,t->value); + printf("%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN); advance(qH); } } @@ -356,8 +376,11 @@ cw::rc_t cw::nbmpscq::test( object_t* cfg ) if((rc1 = destroy(qH)) != kOkRC ) cwLogError(rc1,"nbmpsc queue destroy failed."); - printf("P:%i %i\n",testArray[0].iter, testArray[1].iter); + if( testArray != nullptr ) + printf("P:%i %i\n",testArray[0].iter, testArray[1].iter); + mem::release(testArray); + return rcSelect(rc,rc0,rc1); } diff --git a/cwNbMpScQueue.h b/cwNbMpScQueue.h index 96d4790..5bd30f5 100644 --- a/cwNbMpScQueue.h +++ b/cwNbMpScQueue.h @@ -1,6 +1,32 @@ #ifndef cwNbMpScQueue_h #define cwNbMpScQueue_h +/* +Non-blocking, Lock-free Queue: +================================= + +Push +---- +0. Produceers go to next block, if the write-position is valid, +then fetch-add the write-position forward to allocate space. + +1. If after the fetch-add the area is valid then + - atomically incr ele-count, + - copy in ele + - place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue(). + +2. else (the area is invalid) goto 0. + +Pop +---- +1. copy out next ele. +2. decr. block->ele_count +3. if the ele-count is 0 and write-offset is invalid +reset the write-offset to 0. +*/ + + + namespace cw { namespace nbmpscq @@ -21,7 +47,7 @@ namespace cw blob_t next( handle_t h ); rc_t advance( handle_t h ); - rc_t test( object_t* cfg ); + rc_t test( const object_t* cfg ); } }