cwNbMpScQueue.h/c : Fixed bug where the tail was not correctly updated in advance().

Added peek_reset().
This commit is contained in:
kevin 2024-03-28 19:43:40 -04:00
parent 73b433feae
commit 475cf5e541
2 changed files with 57 additions and 30 deletions

View File

@ -164,6 +164,19 @@ namespace cw
return true;
}
void _block_report( nbmpscq_t* p )
{
block_t* b = p->blockL;
for(; b!=nullptr; b=b->link)
{
bool full_fl = b->full_flag.load(std::memory_order_acquire);
unsigned index = b->index.load(std::memory_order_acquire);
int eleN = b->eleN.load(std::memory_order_acquire);
printf("full:%i idx:%i eleN:%i\n",full_fl,index,eleN);
}
}
}
@ -204,26 +217,6 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt
}
/*
byteN = initBlkN * (sizeof(block_t) + blkByteN );
p->mem = mem::allocZ<uint8_t>(byteN);
for(unsigned i=0; i<byteN; i+=(sizeof(block_t) + blkByteN))
{
block_t* b = (block_t*)(p->mem+i);
b->buf = (uint8_t*)(b + 1);
b->bufByteN = blkByteN;
b->full_flag.store(false);
b->index.store(0);
b->eleN.store(0);
b->link = p->blockL;
p->blockL = b;
}
*/
hRef.set(p);
errLabel:
@ -275,6 +268,8 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
// to Intel standard.
assert( nodeByteN % 8 == 0 );
if( nodeByteN > p->blkByteN )
return cwLogError(kInvalidArgRC,"The blob size is too large:%i > %i.",nodeByteN,p->blkByteN);
for(; b!=nullptr; b=b->link)
{
@ -325,7 +320,13 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
{
// TODO: continue to iterate through the blocks waiting for the consumer
// to make more space available.
//_block_report(p);
// BEWARE: BUG BUG BUG: Since the cwLog makes calls to cwWebSocket
// this error message, and subsequent error messages,
// will result in a recursive loop which will crash the program.
rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow.");
}
return rc;
@ -335,9 +336,10 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h )
{
blob_t blob;
nbmpscq_t* p = _handleToPtr(h);
node_t* t = p->tail;
node_t* node = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
nbmpscq_t* p = _handleToPtr(h);
// We always access the tail element through tail->next.
node_t* node = p->tail->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
_init_blob( blob, node );
@ -349,17 +351,23 @@ cw::nbmpscq::blob_t cw::nbmpscq::advance( handle_t h )
blob_t blob;
nbmpscq_t* p = _handleToPtr(h);
node_t* t = p->tail;
node_t* next = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
// We always access the tail element through tail->next.
node_t* next = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
// We always leave the last element on the queue to act as 'stub'.
if( next != nullptr )
{
p->tail = next;
block_t* b = next->block;
int eleN = b->eleN.fetch_add(-1,std::memory_order_acq_rel);
// first 'stub' will not have a valid block pointer
if( t->block != nullptr )
{
int eleN = t->block->eleN.fetch_add(-1,std::memory_order_acq_rel);
// next was valid and so eleN must be >= 1
assert( eleN >= 1 );
// next was valid and so eleN must be >= 1
assert( eleN >= 1 );
}
}
@ -393,7 +401,12 @@ cw::nbmpscq::blob_t cw::nbmpscq::peek( handle_t h )
p->peek = n->next.load(std::memory_order_acquire);
return blob;
}
void ::cw::nbmpscq::peek_reset(handle_t h)
{
nbmpscq_t* p = _handleToPtr(h);
p->peek = nullptr;
}

View File

@ -39,11 +39,20 @@ namespace cw
rc_t destroy( handle_t& hRef );
//
// Producer Function
//
// push() is called by multiple producer threads to insert
// an element in the queue. Note that the 'blob' is copied into
// the queue and therefore can be released by the caller.
rc_t push( handle_t h, const void* blob, unsigned blobByteN );
//
// Consumer Functions
//
typedef struct blob_str
{
rc_t rc;
@ -68,9 +77,14 @@ namespace cw
// set to 'nullptr'. The following call will then revert to returning
// the oldest stored record.
blob_t peek( handle_t h );
// Reset peek to point to the oldest stored record.
void peek_reset( handle_t h );
// Return true if the queue is empty.
bool is_empty( handle_t h );
// Count of elements in the queue.
unsigned count( handle_t h );
rc_t test( const object_t* cfg );