cwNbMpScQueue.h/cpp : Added use of 'cleanBlkN' and 'cleanProcN'.
Updates to test function.
This commit is contained in:
parent
a052f2623c
commit
68b92212c2
@ -37,11 +37,14 @@ namespace cw
|
|||||||
|
|
||||||
typedef struct nbmpscq_str
|
typedef struct nbmpscq_str
|
||||||
{
|
{
|
||||||
|
uint8_t* mem;
|
||||||
unsigned blkN; // count of blocks in blockL
|
unsigned blkN; // count of blocks in blockL
|
||||||
unsigned blkByteN; // size of each block_t.mem[] buffer
|
unsigned blkByteN; // size of each block_t.mem[] buffer
|
||||||
|
|
||||||
block_t* blockL; // linked list of blocks
|
block_t* blockL; // linked list of blocks
|
||||||
std::atomic<int> clean_cnt; // count of blocks that need to be cleaned
|
|
||||||
|
std::atomic<int> cleanBlkN; // count of blocks that need to be cleaned
|
||||||
|
unsigned cleanProcN; // count of times the clear process has been run
|
||||||
|
|
||||||
node_t* stub; // dummy node
|
node_t* stub; // dummy node
|
||||||
std::atomic<node_t*> head; // last-in
|
std::atomic<node_t*> head; // last-in
|
||||||
@ -58,7 +61,7 @@ namespace cw
|
|||||||
if( p != nullptr )
|
if( p != nullptr )
|
||||||
{
|
{
|
||||||
mem::release(p->stub);
|
mem::release(p->stub);
|
||||||
mem::release(p->blockL);
|
mem::release(p->mem);
|
||||||
mem::release(p);
|
mem::release(p);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
@ -73,7 +76,7 @@ namespace cw
|
|||||||
{
|
{
|
||||||
if( b->eleN.load(std::memory_order_acquire) <= 0 )
|
if( b->eleN.load(std::memory_order_acquire) <= 0 )
|
||||||
{
|
{
|
||||||
unsigned cc = p->clean_cnt.fetch_add(-1,std::memory_order_relaxed);
|
unsigned cc = p->cleanBlkN.fetch_add(-1,std::memory_order_relaxed);
|
||||||
assert(cc>=1);
|
assert(cc>=1);
|
||||||
|
|
||||||
// Note: b->full_flag==true and p->eleN==0 so it is safe to reset the block
|
// Note: b->full_flag==true and p->eleN==0 so it is safe to reset the block
|
||||||
@ -85,6 +88,7 @@ namespace cw
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
p->cleanProcN += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -111,7 +115,7 @@ namespace cw
|
|||||||
|
|
||||||
t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel);
|
t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel);
|
||||||
|
|
||||||
push(t->share->qH,t,sizeof(t));
|
push(t->share->qH,t,sizeof(test_t));
|
||||||
|
|
||||||
t->iter += 1;
|
t->iter += 1;
|
||||||
|
|
||||||
@ -129,7 +133,6 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt
|
|||||||
rc_t rc = kOkRC;
|
rc_t rc = kOkRC;
|
||||||
nbmpscq_t* p = nullptr;
|
nbmpscq_t* p = nullptr;
|
||||||
unsigned byteN = 0;
|
unsigned byteN = 0;
|
||||||
uint8_t* mem = nullptr;
|
|
||||||
|
|
||||||
if((rc = destroy(hRef)) != kOkRC )
|
if((rc = destroy(hRef)) != kOkRC )
|
||||||
goto errLabel;
|
goto errLabel;
|
||||||
@ -139,16 +142,16 @@ cw::rc_t cw::nbmpscq::create( handle_t& hRef, unsigned initBlkN, unsigned blkByt
|
|||||||
p->stub = mem::allocZ<node_t>();
|
p->stub = mem::allocZ<node_t>();
|
||||||
p->head = p->stub; // last-in
|
p->head = p->stub; // last-in
|
||||||
p->tail = p->stub; // first-out
|
p->tail = p->stub; // first-out
|
||||||
p->clean_cnt = 0;
|
p->cleanBlkN = 0;
|
||||||
|
|
||||||
p->blkN = initBlkN;
|
p->blkN = initBlkN;
|
||||||
p->blkByteN = blkByteN;
|
p->blkByteN = blkByteN;
|
||||||
byteN = initBlkN * (sizeof(block_t) + blkByteN );
|
byteN = initBlkN * (sizeof(block_t) + blkByteN );
|
||||||
mem = mem::allocZ<uint8_t>(byteN);
|
p->mem = mem::allocZ<uint8_t>(byteN);
|
||||||
|
|
||||||
for(unsigned i=0; i<byteN; i+=(sizeof(block_t) + blkByteN))
|
for(unsigned i=0; i<byteN; i+=(sizeof(block_t) + blkByteN))
|
||||||
{
|
{
|
||||||
block_t* b = (block_t*)(mem+i);
|
block_t* b = (block_t*)(p->mem+i);
|
||||||
b->buf = (uint8_t*)(b + 1);
|
b->buf = (uint8_t*)(b + 1);
|
||||||
b->bufByteN = blkByteN;
|
b->bufByteN = blkByteN;
|
||||||
|
|
||||||
@ -202,13 +205,13 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
|
|||||||
|
|
||||||
for(; b!=nullptr; b=b->link)
|
for(; b!=nullptr; b=b->link)
|
||||||
{
|
{
|
||||||
if( !b->full_flag.load(std::memory_order_acquire) )
|
if( b->full_flag.load(std::memory_order_acquire) == false )
|
||||||
{
|
{
|
||||||
unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel);
|
unsigned idx = b->index.fetch_add(nodeByteN, std::memory_order_acq_rel);
|
||||||
|
|
||||||
if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN )
|
if( idx >= b->bufByteN || idx+nodeByteN > b->bufByteN )
|
||||||
{
|
{
|
||||||
p->clean_cnt.fetch_add(1,std::memory_order_relaxed);
|
p->cleanBlkN.fetch_add(1,std::memory_order_relaxed);
|
||||||
b->full_flag.store(true,std::memory_order_release);
|
b->full_flag.store(true,std::memory_order_release);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -217,10 +220,10 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
|
|||||||
n->blobByteN = blobByteN;
|
n->blobByteN = blobByteN;
|
||||||
n->block = b;
|
n->block = b;
|
||||||
|
|
||||||
b->eleN.fetch_add(1,std::memory_order_release);
|
|
||||||
|
|
||||||
memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN);
|
memcpy(b->buf+idx+sizeof(node_t),blob,blobByteN);
|
||||||
|
|
||||||
|
b->eleN.fetch_add(1,std::memory_order_release);
|
||||||
|
|
||||||
n->next.store(nullptr);
|
n->next.store(nullptr);
|
||||||
// Note that the elements of the queue are only accessed from the end of the queue (tail).
|
// Note that the elements of the queue are only accessed from the end of the queue (tail).
|
||||||
// New nodes can therefore safely be updated in two steps:
|
// New nodes can therefore safely be updated in two steps:
|
||||||
@ -234,12 +237,13 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
|
|||||||
// 2. Set the old-head next pointer to the new node (thereby adding the new node to the list)
|
// 2. Set the old-head next pointer to the new node (thereby adding the new node to the list)
|
||||||
prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer
|
prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if( b == nullptr )
|
if( b == nullptr )
|
||||||
rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow.");
|
rc = cwLogError(kBufTooSmallRC,"NbMpScQueue overflow. %i %i",p->cleanProcN, p->cleanBlkN.load());
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
@ -286,26 +290,42 @@ cw::rc_t cw::nbmpscq::advance( handle_t h )
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if( p->clean_cnt.load(std::memory_order_relaxed) > 0 )
|
if( p->cleanBlkN.load(std::memory_order_relaxed) > 0 )
|
||||||
_clean(p);
|
_clean(p);
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
cw::rc_t cw::nbmpscq::test( object_t* cfg )
|
cw::rc_t cw::nbmpscq::test( const object_t* cfg )
|
||||||
{
|
{
|
||||||
rc_t rc=kOkRC,rc0,rc1;
|
rc_t rc=kOkRC,rc0,rc1;
|
||||||
|
|
||||||
const int testArrayN = 2;
|
unsigned testArrayN = 2;
|
||||||
test_t testArray[testArrayN];
|
test_t* testArray = nullptr;
|
||||||
const unsigned blkN = 2;
|
unsigned blkN = 2;
|
||||||
const unsigned blkByteN = 1024;
|
unsigned blkByteN = 1024;
|
||||||
time::spec_t t0 = time::current_time();
|
time::spec_t t0 = time::current_time();
|
||||||
|
unsigned testDurMs = 0;
|
||||||
test_share_t share;
|
test_share_t share;
|
||||||
handle_t qH;
|
handle_t qH;
|
||||||
thread_mach::handle_t tmH;
|
thread_mach::handle_t tmH;
|
||||||
|
|
||||||
memset(&testArray,0,sizeof(testArray));
|
if((rc = cfg->getv("blkN",blkN,
|
||||||
|
"blkByteN",blkByteN,
|
||||||
|
"testDurMs",testDurMs,
|
||||||
|
"threadN",testArrayN)) != kOkRC )
|
||||||
|
{
|
||||||
|
rc = cwLogError(rc,"Test params parse failed.");
|
||||||
|
goto errLabel;
|
||||||
|
}
|
||||||
|
|
||||||
|
if( testArrayN == 0 )
|
||||||
|
{
|
||||||
|
rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0.");
|
||||||
|
goto errLabel;
|
||||||
|
}
|
||||||
|
|
||||||
|
testArray = mem::allocZ<test_t>(testArrayN);
|
||||||
|
|
||||||
// create the queue
|
// create the queue
|
||||||
if((rc = create( qH, blkN, blkByteN )) != kOkRC )
|
if((rc = create( qH, blkN, blkByteN )) != kOkRC )
|
||||||
@ -315,7 +335,7 @@ cw::rc_t cw::nbmpscq::test( object_t* cfg )
|
|||||||
}
|
}
|
||||||
|
|
||||||
share.qH = qH;
|
share.qH = qH;
|
||||||
share.cnt = 0;
|
share.cnt.store(0);
|
||||||
|
|
||||||
for(unsigned i=0; i<testArrayN; ++i)
|
for(unsigned i=0; i<testArrayN; ++i)
|
||||||
{
|
{
|
||||||
@ -338,13 +358,13 @@ cw::rc_t cw::nbmpscq::test( object_t* cfg )
|
|||||||
goto errLabel;
|
goto errLabel;
|
||||||
}
|
}
|
||||||
|
|
||||||
while( time::elapsedMs(t0) < 1000*10 )
|
while( time::elapsedMs(t0) < testDurMs )
|
||||||
{
|
{
|
||||||
blob_t b = next(qH);
|
blob_t b = next(qH);
|
||||||
if( b.blob != nullptr )
|
if( b.blob != nullptr )
|
||||||
{
|
{
|
||||||
test_t* t = (test_t*)b.blob;
|
test_t* t = (test_t*)b.blob;
|
||||||
printf("%i %i %i\n",t->id,t->iter,t->value);
|
printf("%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN);
|
||||||
advance(qH);
|
advance(qH);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -356,7 +376,10 @@ cw::rc_t cw::nbmpscq::test( object_t* cfg )
|
|||||||
if((rc1 = destroy(qH)) != kOkRC )
|
if((rc1 = destroy(qH)) != kOkRC )
|
||||||
cwLogError(rc1,"nbmpsc queue destroy failed.");
|
cwLogError(rc1,"nbmpsc queue destroy failed.");
|
||||||
|
|
||||||
printf("P:%i %i\n",testArray[0].iter, testArray[1].iter);
|
if( testArray != nullptr )
|
||||||
|
printf("P:%i %i\n",testArray[0].iter, testArray[1].iter);
|
||||||
|
|
||||||
|
mem::release(testArray);
|
||||||
|
|
||||||
return rcSelect(rc,rc0,rc1);
|
return rcSelect(rc,rc0,rc1);
|
||||||
|
|
||||||
|
@ -1,6 +1,32 @@
|
|||||||
#ifndef cwNbMpScQueue_h
|
#ifndef cwNbMpScQueue_h
|
||||||
#define cwNbMpScQueue_h
|
#define cwNbMpScQueue_h
|
||||||
|
|
||||||
|
/*
|
||||||
|
Non-blocking, Lock-free Queue:
|
||||||
|
=================================
|
||||||
|
|
||||||
|
Push
|
||||||
|
----
|
||||||
|
0. Produceers go to next block, if the write-position is valid,
|
||||||
|
then fetch-add the write-position forward to allocate space.
|
||||||
|
|
||||||
|
1. If after the fetch-add the area is valid then
|
||||||
|
- atomically incr ele-count,
|
||||||
|
- copy in ele
|
||||||
|
- place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue().
|
||||||
|
|
||||||
|
2. else (the area is invalid) goto 0.
|
||||||
|
|
||||||
|
Pop
|
||||||
|
----
|
||||||
|
1. copy out next ele.
|
||||||
|
2. decr. block->ele_count
|
||||||
|
3. if the ele-count is 0 and write-offset is invalid
|
||||||
|
reset the write-offset to 0.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
namespace cw
|
namespace cw
|
||||||
{
|
{
|
||||||
namespace nbmpscq
|
namespace nbmpscq
|
||||||
@ -21,7 +47,7 @@ namespace cw
|
|||||||
blob_t next( handle_t h );
|
blob_t next( handle_t h );
|
||||||
rc_t advance( handle_t h );
|
rc_t advance( handle_t h );
|
||||||
|
|
||||||
rc_t test( object_t* cfg );
|
rc_t test( const object_t* cfg );
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user