cwNbMpScQueue.h/cpp :
1. Added peek(). 2. blockL is now made from individual allocated blocks to ease memory alignment. 3. node_t and blob length is now padded out to a multiple of 8 bytes to guarantee memory alignment. 4. advance() now returns the next blob.
This commit is contained in:
parent
9fd9043a2d
commit
99871c5bef
@ -30,12 +30,15 @@ namespace cw
|
|||||||
|
|
||||||
typedef struct node_str
|
typedef struct node_str
|
||||||
{
|
{
|
||||||
std::atomic<struct node_str*> next;
|
std::atomic<struct node_str*> next; // 0
|
||||||
block_t* block;
|
block_t* block; // 8
|
||||||
unsigned blobByteN;
|
unsigned blobByteN; // 16
|
||||||
|
unsigned pad; // 20-24 (mult. of 8)
|
||||||
// blob data follows
|
// blob data follows
|
||||||
} node_t;
|
} node_t;
|
||||||
|
|
||||||
|
static_assert( sizeof(node_t) % 8 == 0 );
|
||||||
|
|
||||||
typedef struct nbmpscq_str
|
typedef struct nbmpscq_str
|
||||||
{
|
{
|
||||||
uint8_t* mem; // Pointer to a single area of memory which holds all blocks.
|
uint8_t* mem; // Pointer to a single area of memory which holds all blocks.
|
||||||
@ -51,6 +54,8 @@ namespace cw
|
|||||||
std::atomic<node_t*> head; // last-in
|
std::atomic<node_t*> head; // last-in
|
||||||
|
|
||||||
node_t* tail; // first-out
|
node_t* tail; // first-out
|
||||||
|
|
||||||
|
node_t* peek;
|
||||||
|
|
||||||
} nbmpscq_t;
|
} nbmpscq_t;
|
||||||
|
|
||||||
@ -62,8 +67,18 @@ namespace cw
|
|||||||
rc_t rc = kOkRC;
|
rc_t rc = kOkRC;
|
||||||
if( p != nullptr )
|
if( p != nullptr )
|
||||||
{
|
{
|
||||||
|
|
||||||
|
block_t* b = p->blockL;
|
||||||
|
while( b != nullptr )
|
||||||
|
{
|
||||||
|
block_t* b0 = b->link;
|
||||||
|
mem::release(b->buf);
|
||||||
|
mem::release(b);
|
||||||
|
b=b0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
mem::release(p->stub);
|
mem::release(p->stub);
|
||||||
mem::release(p->mem);
|
|
||||||
mem::release(p);
|
mem::release(p);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
@ -98,6 +113,22 @@ namespace cw
|
|||||||
p->cleanProcN += 1;
|
p->cleanProcN += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void _init_blob( blob_t& b, node_t* node )
|
||||||
|
{
|
||||||
|
if( node == nullptr )
|
||||||
|
{
|
||||||
|
b.blob = nullptr;
|
||||||
|
b.blobByteN = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
b.blob = (uint8_t*)(node+1);
|
||||||
|
b.blobByteN = node->blobByteN;
|
||||||
|
}
|
||||||
|
|
||||||
|
b.rc = kOkRC;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct shared_str
|
typedef struct shared_str
|
||||||
{
|
{
|
||||||
@ -142,7 +173,6 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt
|
|||||||
{
|
{
|
||||||
rc_t rc = kOkRC;
|
rc_t rc = kOkRC;
|
||||||
nbmpscq_t* p = nullptr;
|
nbmpscq_t* p = nullptr;
|
||||||
unsigned byteN = 0;
|
|
||||||
|
|
||||||
if((rc = destroy(hRef)) != kOkRC )
|
if((rc = destroy(hRef)) != kOkRC )
|
||||||
goto errLabel;
|
goto errLabel;
|
||||||
@ -151,11 +181,31 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt
|
|||||||
|
|
||||||
p->stub = mem::allocZ<node_t>();
|
p->stub = mem::allocZ<node_t>();
|
||||||
p->head = p->stub; // last-in
|
p->head = p->stub; // last-in
|
||||||
p->tail = p->stub; // first-out
|
p->tail = p->stub; // first-out
|
||||||
|
p->peek = nullptr;
|
||||||
p->cleanBlkN = 0;
|
p->cleanBlkN = 0;
|
||||||
|
|
||||||
p->blkN = initBlkN;
|
p->blkN = initBlkN;
|
||||||
p->blkByteN = blkByteN;
|
p->blkByteN = blkByteN;
|
||||||
|
|
||||||
|
for(unsigned i=0; i<initBlkN; ++i)
|
||||||
|
{
|
||||||
|
block_t* b = mem::allocZ<block_t>();
|
||||||
|
b->buf = mem::allocZ<uint8_t>(blkByteN);
|
||||||
|
|
||||||
|
b->bufByteN = blkByteN;
|
||||||
|
|
||||||
|
b->full_flag.store(false);
|
||||||
|
b->index.store(0);
|
||||||
|
b->eleN.store(0);
|
||||||
|
|
||||||
|
b->link = p->blockL;
|
||||||
|
p->blockL = b;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
byteN = initBlkN * (sizeof(block_t) + blkByteN );
|
byteN = initBlkN * (sizeof(block_t) + blkByteN );
|
||||||
p->mem = mem::allocZ<uint8_t>(byteN);
|
p->mem = mem::allocZ<uint8_t>(byteN);
|
||||||
|
|
||||||
@ -172,7 +222,8 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt
|
|||||||
b->link = p->blockL;
|
b->link = p->blockL;
|
||||||
p->blockL = b;
|
p->blockL = b;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
hRef.set(p);
|
hRef.set(p);
|
||||||
|
|
||||||
errLabel:
|
errLabel:
|
||||||
@ -215,6 +266,15 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
|
|||||||
// Note that this case will immediately overflow the queue.
|
// Note that this case will immediately overflow the queue.
|
||||||
|
|
||||||
unsigned nodeByteN = blobByteN + sizeof(node_t);
|
unsigned nodeByteN = blobByteN + sizeof(node_t);
|
||||||
|
|
||||||
|
// force the size of the node to be a multiple of 8
|
||||||
|
nodeByteN = ((nodeByteN-1) & 0xfffffff8) + 8;
|
||||||
|
|
||||||
|
// We will eventually be addressing node_t records stored in pre-allocated blocks
|
||||||
|
// of memory - be sure that they always begin on 8 byte alignment to conform
|
||||||
|
// to Intel standard.
|
||||||
|
assert( nodeByteN % 8 == 0 );
|
||||||
|
|
||||||
|
|
||||||
for(; b!=nullptr; b=b->link)
|
for(; b!=nullptr; b=b->link)
|
||||||
{
|
{
|
||||||
@ -274,33 +334,22 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
|
|||||||
|
|
||||||
cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h )
|
cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h )
|
||||||
{
|
{
|
||||||
blob_t blob;
|
blob_t blob;
|
||||||
nbmpscq_t* p = _handleToPtr(h);
|
nbmpscq_t* p = _handleToPtr(h);
|
||||||
|
node_t* t = p->tail;
|
||||||
node_t* t = p->tail;
|
node_t* node = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
||||||
node_t* n = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
|
||||||
|
_init_blob( blob, node );
|
||||||
if( n == nullptr )
|
|
||||||
{
|
|
||||||
blob.blob = nullptr;
|
|
||||||
blob.blobByteN = 0;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
blob.blob = (uint8_t*)(n+1);
|
|
||||||
blob.blobByteN = n->blobByteN;
|
|
||||||
}
|
|
||||||
|
|
||||||
return blob;
|
|
||||||
|
|
||||||
|
return blob;
|
||||||
}
|
}
|
||||||
|
|
||||||
cw::rc_t cw::nbmpscq::advance( handle_t h )
|
cw::nbmpscq::blob_t cw::nbmpscq::advance( handle_t h )
|
||||||
{
|
{
|
||||||
nbmpscq_t* p = _handleToPtr(h);
|
blob_t blob;
|
||||||
rc_t rc = kOkRC;
|
nbmpscq_t* p = _handleToPtr(h);
|
||||||
node_t* t = p->tail;
|
node_t* t = p->tail;
|
||||||
node_t* next = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
node_t* next = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
||||||
|
|
||||||
if( next != nullptr )
|
if( next != nullptr )
|
||||||
{
|
{
|
||||||
@ -317,7 +366,46 @@ cw::rc_t cw::nbmpscq::advance( handle_t h )
|
|||||||
if( p->cleanBlkN.load(std::memory_order_relaxed) > 0 )
|
if( p->cleanBlkN.load(std::memory_order_relaxed) > 0 )
|
||||||
_clean(p);
|
_clean(p);
|
||||||
|
|
||||||
return rc;
|
|
||||||
|
_init_blob(blob,next);
|
||||||
|
|
||||||
|
return blob;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
cw::nbmpscq::blob_t cw::nbmpscq::peek( handle_t h )
|
||||||
|
{
|
||||||
|
blob_t blob;
|
||||||
|
nbmpscq_t* p = _handleToPtr(h);
|
||||||
|
node_t* n = p->peek;
|
||||||
|
|
||||||
|
// if p->peek is not set ...
|
||||||
|
if( n == nullptr )
|
||||||
|
{
|
||||||
|
// ... then set it to the tail
|
||||||
|
n = p->tail->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
_init_blob(blob,n);
|
||||||
|
|
||||||
|
if( n != nullptr )
|
||||||
|
p->peek = n->next.load(std::memory_order_acquire);
|
||||||
|
|
||||||
|
return blob;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
bool cw::nbmpscq::is_empty( handle_t h )
|
||||||
|
{
|
||||||
|
nbmpscq_t* p = _handleToPtr(h);
|
||||||
|
|
||||||
|
node_t* t = p->tail;
|
||||||
|
node_t* next = t->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
||||||
|
|
||||||
|
return next == nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
cw::rc_t cw::nbmpscq::test( const object_t* cfg )
|
cw::rc_t cw::nbmpscq::test( const object_t* cfg )
|
||||||
|
@ -34,7 +34,7 @@ namespace cw
|
|||||||
namespace nbmpscq
|
namespace nbmpscq
|
||||||
{
|
{
|
||||||
typedef handle<struct nbmpscq_str> handle_t;
|
typedef handle<struct nbmpscq_str> handle_t;
|
||||||
|
|
||||||
rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN );
|
rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN );
|
||||||
|
|
||||||
rc_t destroy( handle_t& hRef );
|
rc_t destroy( handle_t& hRef );
|
||||||
@ -46,18 +46,30 @@ namespace cw
|
|||||||
|
|
||||||
typedef struct blob_str
|
typedef struct blob_str
|
||||||
{
|
{
|
||||||
|
rc_t rc;
|
||||||
const void* blob;
|
const void* blob;
|
||||||
unsigned blobByteN;
|
unsigned blobByteN;
|
||||||
} blob_t;
|
} blob_t;
|
||||||
|
|
||||||
// get() is called by the single consumer thread to access the
|
// get() is called by the single consumer thread to access the
|
||||||
// current blob at the front of the queue. Note that this call
|
// oldest record in the queue. Note that this call
|
||||||
// does not change the state of the queue.
|
// does not change the state of the queue.
|
||||||
blob_t get( handle_t h );
|
blob_t get( handle_t h );
|
||||||
|
|
||||||
// advance() disposes of the blob at the front of the
|
// advance() disposes of the oldest blob in the
|
||||||
// queue and makes the next blob current.
|
// queue and makes the next blob current.
|
||||||
rc_t advance( handle_t h );
|
blob_t advance( handle_t h );
|
||||||
|
|
||||||
|
// The queue maintains a single internal iterator which the consumer
|
||||||
|
// may use to traverse stored records without removing them.
|
||||||
|
// The first call to peek() will return the oldest stored record.
|
||||||
|
// Each subsequent call to peek() will return the next stored record
|
||||||
|
// until no records are available - at which point blob_t.blob will be
|
||||||
|
// set to 'nullptr'. The following call will then revert to returning
|
||||||
|
// the oldest stored record.
|
||||||
|
blob_t peek( handle_t h );
|
||||||
|
|
||||||
|
bool is_empty( handle_t h );
|
||||||
|
|
||||||
rc_t test( const object_t* cfg );
|
rc_t test( const object_t* cfg );
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user