cwNbMpScQueue.h/cpp : Verified with -fsanitize thread added comments.

This commit is contained in:
kevin 2024-03-11 19:21:03 -04:00
parent e63abddb5d
commit 6f2444b8d7
2 changed files with 81 additions and 30 deletions

View File

@ -9,6 +9,7 @@
#include "cwThread.h" #include "cwThread.h"
#include "cwThreadMach.h" #include "cwThreadMach.h"
#include "cwFile.h"
namespace cw namespace cw
{ {
@ -17,13 +18,13 @@ namespace cw
typedef struct block_str typedef struct block_str
{ {
uint8_t* buf; // buf[ bufByteN ] uint8_t* buf; // buf[ bufByteN ]
unsigned bufByteN; unsigned bufByteN;
std::atomic<bool> full_flag; std::atomic<bool> full_flag; // Set if this block is full (i.e. index >= bufByteN)
std::atomic<unsigned> index; // offset to next avail byte in mem[] std::atomic<unsigned> index; // Offset to next avail byte in buf[]
std::atomic<int> eleN; // count of elements in block std::atomic<int> eleN; // Current count of elements stored in buf[]
struct block_str* link; struct block_str* link; //
} block_t; } block_t;
@ -37,17 +38,18 @@ namespace cw
typedef struct nbmpscq_str typedef struct nbmpscq_str
{ {
uint8_t* mem; uint8_t* mem; // Pointer to a single area of memory which holds all blocks.
unsigned blkN; // count of blocks in blockL unsigned blkN; // Count of blocks in blockL
unsigned blkByteN; // size of each block_t.mem[] buffer unsigned blkByteN; // Size of each block_t.mem[] buffer
block_t* blockL; // linked list of blocks block_t* blockL; // Linked list of blocks
std::atomic<int> cleanBlkN; // count of blocks that need to be cleaned std::atomic<int> cleanBlkN; // count of blocks that need to be cleaned
unsigned cleanProcN; // count of times the clear process has been run unsigned cleanProcN; // count of times the clear process has been run
node_t* stub; // dummy node node_t* stub; // dummy node
std::atomic<node_t*> head; // last-in std::atomic<node_t*> head; // last-in
node_t* tail; // first-out node_t* tail; // first-out
} nbmpscq_t; } nbmpscq_t;
@ -67,15 +69,20 @@ namespace cw
return rc; return rc;
} }
// _clean() is run by the consumer thread to make empty blocks available.
void _clean( nbmpscq_t* p ) void _clean( nbmpscq_t* p )
{ {
block_t* b = p->blockL; block_t* b = p->blockL;
// for each block
for(; b!=nullptr; b=b->link) for(; b!=nullptr; b=b->link)
{ {
// if this block is full ...
if( b->full_flag.load(std::memory_order_acquire) ) if( b->full_flag.load(std::memory_order_acquire) )
{ {
// ... and there are no more elements to be read from the block
if( b->eleN.load(std::memory_order_acquire) <= 0 ) if( b->eleN.load(std::memory_order_acquire) <= 0 )
{ {
// decr. the cleanBlkN count
unsigned cc = p->cleanBlkN.fetch_add(-1,std::memory_order_relaxed); unsigned cc = p->cleanBlkN.fetch_add(-1,std::memory_order_relaxed);
assert(cc>=1); assert(cc>=1);
@ -103,20 +110,23 @@ namespace cw
{ {
unsigned id; // thread id unsigned id; // thread id
unsigned iter; // execution counter unsigned iter; // execution counter
unsigned value; unsigned value; //
test_share_t* share; test_share_t* share; // pointer to global shared data
} test_t; } test_t;
bool _threadFunc( void* arg ) bool _test_threadFunc( void* arg )
{ {
test_t* t = (test_t*)arg; test_t* t = (test_t*)arg;
// get and increment a global shared counter
t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel);
// push the current thread instance record
push(t->share->qH,t,sizeof(test_t)); push(t->share->qH,t,sizeof(test_t));
// incrmemen this threads exec. counter
t->iter += 1; t->iter += 1;
sleepMs( rand() & 0xf ); sleepMs( rand() & 0xf );
@ -201,27 +211,35 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
nbmpscq_t* p = _handleToPtr(h); nbmpscq_t* p = _handleToPtr(h);
block_t* b = p->blockL; block_t* b = p->blockL;
// TODO: handle case where blobByteN is greater than p->blkByteN
// Note that this case will immediately overflow the queue.
unsigned nodeByteN = blobByteN + sizeof(node_t); unsigned nodeByteN = blobByteN + sizeof(node_t);
for(; b!=nullptr; b=b->link) for(; b!=nullptr; b=b->link)
{ {
if( b->full_flag.load(std::memory_order_acquire) == false ) if( b->full_flag.load(std::memory_order_acquire) == false )
{ {
// attempt to allocate nodeByteN bytes starting at b->index
unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel); unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel);
// if the allocation was not valid
if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN ) if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN )
{ {
// incr the 'clean count' and mark the block as full
p->cleanBlkN.fetch_add(1,std::memory_order_relaxed); p->cleanBlkN.fetch_add(1,std::memory_order_relaxed);
b->full_flag.store(true,std::memory_order_release); b->full_flag.store(true,std::memory_order_release);
} }
else else
{ {
// otherwise this thread owns the allocated block
node_t* n = (node_t*)(b->buf + idx); node_t* n = (node_t*)(b->buf + idx);
n->blobByteN = blobByteN; n->blobByteN = blobByteN;
n->block = b; n->block = b;
memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN); memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN);
// incr the block element count
b->eleN.fetch_add(1,std::memory_order_release); b->eleN.fetch_add(1,std::memory_order_release);
n->next.store(nullptr); n->next.store(nullptr);
@ -241,15 +259,20 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
} }
} }
} }
if( b == nullptr ) // if there is no space left in the queue to store the incoming blob
rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow. %i %i",p->cleanProcN, p->cleanBlkN.load()); if( b == nullptr )
{
// TODO: continue to iterate through the blocks waiting for the consumer
// to make more space available.
rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow.");
}
return rc; return rc;
} }
cw::nbmpscq::blob_t cw::nbmpscq::next( handle_t h ) cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h )
{ {
blob_t blob; blob_t blob;
nbmpscq_t* p = _handleToPtr(h); nbmpscq_t* p = _handleToPtr(h);
@ -283,7 +306,8 @@ cw::rc_t cw::nbmpscq::advance( handle_t h )
{ {
p->tail = next; p->tail = next;
int eleN = next->block->eleN.fetch_add(-1,std::memory_order_acq_rel); block_t* b = next->block;
int eleN = b->eleN.fetch_add(-1,std::memory_order_acq_rel);
// next was valid and so eleN must be >= 1 // next was valid and so eleN must be >= 1
assert( eleN >= 1 ); assert( eleN >= 1 );
@ -304,27 +328,37 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg )
test_t* testArray = nullptr; test_t* testArray = nullptr;
unsigned blkN = 2; unsigned blkN = 2;
unsigned blkByteN = 1024; unsigned blkByteN = 1024;
const char* out_fname = nullptr;
time::spec_t t0 = time::current_time(); time::spec_t t0 = time::current_time();
unsigned testDurMs = 0; unsigned testDurMs = 0;
test_share_t share; test_share_t share;
handle_t qH; handle_t qH;
thread_mach::handle_t tmH; thread_mach::handle_t tmH;
file::handle_t fH;
if((rc = cfg->getv("blkN",blkN, if((rc = cfg->getv("blkN",blkN,
"blkByteN",blkByteN, "blkByteN",blkByteN,
"testDurMs",testDurMs, "testDurMs",testDurMs,
"threadN",testArrayN)) != kOkRC ) "threadN",testArrayN,
"out_fname",out_fname)) != kOkRC )
{ {
rc = cwLogError(rc,"Test params parse failed."); rc = cwLogError(rc,"Test params parse failed.");
goto errLabel; goto errLabel;
} }
if((rc = file::open(fH,out_fname,file::kWriteFl)) != kOkRC )
{
rc = cwLogError(rc,"Error creating the output file:%s",cwStringNullGuard(out_fname));
goto errLabel;
}
if( testArrayN == 0 ) if( testArrayN == 0 )
{ {
rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0.");
goto errLabel; goto errLabel;
} }
// create the thread intance records
testArray = mem::allocZ<test_t>(testArrayN); testArray = mem::allocZ<test_t>(testArrayN);
// create the queue // create the queue
@ -342,10 +376,9 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg )
testArray[i].id = i; testArray[i].id = i;
testArray[i].share = &share; testArray[i].share = &share;
} }
// create the thread machine // create the thread machine
if((rc = thread_mach::create( tmH, _threadFunc, testArray, sizeof(test_t), testArrayN )) != kOkRC ) if((rc = thread_mach::create( tmH, _test_threadFunc, testArray, sizeof(test_t), testArrayN )) != kOkRC )
{ {
rc = cwLogError(rc,"Thread machine create failed."); rc = cwLogError(rc,"Thread machine create failed.");
goto errLabel; goto errLabel;
@ -358,18 +391,21 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg )
goto errLabel; goto errLabel;
} }
// run the test for 'testDurMs' milliseconds
while( time::elapsedMs(t0) < testDurMs ) while( time::elapsedMs(t0) < testDurMs )
{ {
blob_t b = next(qH); blob_t b = get(qH);
if( b.blob != nullptr ) if( b.blob != nullptr )
{ {
test_t* t = (test_t*)b.blob; test_t* t = (test_t*)b.blob;
printf("%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN); printf(fH,"%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN);
advance(qH); advance(qH);
} }
} }
errLabel: errLabel:
file::close(fH);
if((rc0 = thread_mach::destroy(tmH)) != kOkRC ) if((rc0 = thread_mach::destroy(tmH)) != kOkRC )
cwLogError(rc0,"Thread machine destroy failed."); cwLogError(rc0,"Thread machine destroy failed.");
@ -379,6 +415,9 @@ cw::rc_t cw::nbmpscq::test( const object_t* cfg )
if( testArray != nullptr ) if( testArray != nullptr )
printf("P:%i %i\n",testArray[0].iter, testArray[1].iter); printf("P:%i %i\n",testArray[0].iter, testArray[1].iter);
// TODO: read back the file and verify that none of the
// global incrment values were dropped.
mem::release(testArray); mem::release(testArray);
return rcSelect(rc,rc0,rc1); return rcSelect(rc,rc0,rc1);

View File

@ -7,10 +7,12 @@ Non-blocking, Lock-free Queue:
Push Push
---- ----
0. Produceers go to next block, if the write-position is valid, 0. Produceers go down the a list of blocks (nbmpscq.blockL)
then fetch-add the write-position forward to allocate space. if a block is not already full then it atomically
fetch-add's block->write_idx by the size of the the element
to be inserted.
1. If after the fetch-add the area is valid then 1. If after the fetch-add the write_idx is <= block->byteN then
- atomically incr ele-count, - atomically incr ele-count,
- copy in ele - copy in ele
- place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue(). - place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue().
@ -34,8 +36,12 @@ namespace cw
typedef handle<struct nbmpscq_str> handle_t; typedef handle<struct nbmpscq_str> handle_t;
rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN ); rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN );
rc_t destroy( handle_t& hRef ); rc_t destroy( handle_t& hRef );
// push() is called by multiple producer threads to insert
// an element in the queue. Note that the 'blob' is copied into
// the queue and therefore can be released by the caller.
rc_t push( handle_t h, const void* blob, unsigned blobByteN ); rc_t push( handle_t h, const void* blob, unsigned blobByteN );
typedef struct blob_str typedef struct blob_str
@ -43,8 +49,14 @@ namespace cw
const void* blob; const void* blob;
unsigned blobByteN; unsigned blobByteN;
} blob_t; } blob_t;
blob_t next( handle_t h ); // get() is called by the single consumer thread to access the
// current blob at the front of the queue. Note that this call
// does not change the state of the queue.
blob_t get( handle_t h );
// advance() disposes of the blob at the front of the
// queue and makes the next blob current.
rc_t advance( handle_t h ); rc_t advance( handle_t h );
rc_t test( const object_t* cfg ); rc_t test( const object_t* cfg );