diff --git a/cwNbMpScQueue.cpp b/cwNbMpScQueue.cpp index 34ef85a..3ebce33 100644 --- a/cwNbMpScQueue.cpp +++ b/cwNbMpScQueue.cpp @@ -10,9 +10,6 @@ #include "cwNbMpScQueue.h" -#include "cwThread.h" -#include "cwThreadMach.h" -#include "cwFile.h" namespace cw { @@ -132,42 +129,7 @@ namespace cw b.rc = kOkRC; } - - typedef struct shared_str - { - handle_t qH; - std::atomic cnt; - - } test_share_t; - typedef struct test_str - { - unsigned id; // thread id - unsigned iter; // execution counter - unsigned value; // - test_share_t* share; // pointer to global shared data - } test_t; - - - - bool _test_threadFunc( void* arg ) - { - test_t* t = (test_t*)arg; - - // get and increment a global shared counter - t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); - - // push the current thread instance record - push(t->share->qH,t,sizeof(test_t)); - - // incrmemen this threads exec. counter - t->iter += 1; - - sleepMs( rand() & 0xf ); - - return true; - } - void _block_report( nbmpscq_t* p ) { block_t* b = p->blockL; @@ -301,14 +263,20 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN ) b->eleN.fetch_add(1,std::memory_order_release); n->next.store(nullptr); - // Note that the elements of the queue are only accessed from the end of the queue (tail). - // New nodes can therefore safely be updated in two steps: + + // Note that the elements of the queue are only accessed from the front of the queue (tail). + // New nodes are added to the end of the list (head). + // New node will therefore always have it's next ptr set to null. // 1. Atomically set _head to the new node and return 'old-head' + // We use acq_release to prevent code movement above or below this instruction. node_t* prev = p->head.exchange(n,std::memory_order_acq_rel); // Note that at this point only the new node may have the 'old-head' as it's predecssor. - // Other threads may therefore safely interrupt at this point. + // Other threads may therefore safely interrupt at this point - they will + // have the new node as their predecessor. Note that none of these nodes are accessible + // yet because __tail next__ pointer is still pointing to the 'old-head' - whose next pointer + // is still null. // 2. Set the old-head next pointer to the new node (thereby adding the new node to the list) prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer @@ -341,7 +309,9 @@ cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h ) blob_t blob; nbmpscq_t* p = _handleToPtr(h); - // We always access the tail element through tail->next. + // We always access the tail element through tail->next. Always accessing the + // next tail element via the tail->next pointer is critical to correctness. + // See note in push(). node_t* node = p->tail->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer _init_blob( blob, node ); @@ -436,107 +406,3 @@ unsigned cw::nbmpscq::count( handle_t h ) return eleN; } -cw::rc_t cw::nbmpscq::test( const object_t* cfg ) -{ - rc_t rc=kOkRC,rc0,rc1; - - unsigned testArrayN = 2; - test_t* testArray = nullptr; - unsigned blkN = 2; - unsigned blkByteN = 1024; - const char* out_fname = nullptr; - time::spec_t t0 = time::current_time(); - unsigned testDurMs = 0; - test_share_t share; - handle_t qH; - thread_mach::handle_t tmH; - file::handle_t fH; - - if((rc = cfg->getv("blkN",blkN, - "blkByteN",blkByteN, - "testDurMs",testDurMs, - "threadN",testArrayN, - "out_fname",out_fname)) != kOkRC ) - { - rc = cwLogError(rc,"Test params parse failed."); - goto errLabel; - } - - if((rc = file::open(fH,out_fname,file::kWriteFl)) != kOkRC ) - { - rc = cwLogError(rc,"Error creating the output file:%s",cwStringNullGuard(out_fname)); - goto errLabel; - } - - if( testArrayN == 0 ) - { - rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); - goto errLabel; - } - - // create the thread intance records - testArray = mem::allocZ(testArrayN); - - // create the queue - if((rc = create( qH, blkN, blkByteN )) != kOkRC ) - { - rc = cwLogError(rc,"nbmpsc create failed."); - goto errLabel; - } - - share.qH = qH; - share.cnt.store(0); - - for(unsigned i=0; iid,t->iter,t->value,b.blobByteN); - advance(qH); - } - } - - errLabel: - file::close(fH); - - if((rc0 = thread_mach::destroy(tmH)) != kOkRC ) - cwLogError(rc0,"Thread machine destroy failed."); - - if((rc1 = destroy(qH)) != kOkRC ) - cwLogError(rc1,"nbmpsc queue destroy failed."); - - if( testArray != nullptr ) - printf("P:%i %i\n",testArray[0].iter, testArray[1].iter); - - // TODO: read back the file and verify that none of the - // global incrment values were dropped. - - mem::release(testArray); - - return rcSelect(rc,rc0,rc1); - -} - diff --git a/cwNbMpScQueue.h b/cwNbMpScQueue.h index 8b5f0bb..f779820 100644 --- a/cwNbMpScQueue.h +++ b/cwNbMpScQueue.h @@ -27,6 +27,10 @@ Pop 2. decr. block->ele_count 3. if the ele-count is 0 and write-offset is invalid reset the write-offset to 0. + + +This code is tested in cwMtQueueTester.h/cpp. + */ @@ -89,8 +93,6 @@ namespace cw // Count of elements in the queue. unsigned count( handle_t h ); - rc_t test( const object_t* cfg ); - } }