cwNbMpScQueue.h/cpp : Moved testing into cwMtQueueTester.h/cpp. No functional changes.
This commit is contained in:
parent
82010db96b
commit
43e9c654a1
@ -10,9 +10,6 @@
|
|||||||
|
|
||||||
#include "cwNbMpScQueue.h"
|
#include "cwNbMpScQueue.h"
|
||||||
|
|
||||||
#include "cwThread.h"
|
|
||||||
#include "cwThreadMach.h"
|
|
||||||
#include "cwFile.h"
|
|
||||||
|
|
||||||
namespace cw
|
namespace cw
|
||||||
{
|
{
|
||||||
@ -132,42 +129,7 @@ namespace cw
|
|||||||
b.rc = kOkRC;
|
b.rc = kOkRC;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct shared_str
|
|
||||||
{
|
|
||||||
handle_t qH;
|
|
||||||
std::atomic<unsigned> cnt;
|
|
||||||
|
|
||||||
} test_share_t;
|
|
||||||
|
|
||||||
typedef struct test_str
|
|
||||||
{
|
|
||||||
unsigned id; // thread id
|
|
||||||
unsigned iter; // execution counter
|
|
||||||
unsigned value; //
|
|
||||||
test_share_t* share; // pointer to global shared data
|
|
||||||
} test_t;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool _test_threadFunc( void* arg )
|
|
||||||
{
|
|
||||||
test_t* t = (test_t*)arg;
|
|
||||||
|
|
||||||
// get and increment a global shared counter
|
|
||||||
t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel);
|
|
||||||
|
|
||||||
// push the current thread instance record
|
|
||||||
push(t->share->qH,t,sizeof(test_t));
|
|
||||||
|
|
||||||
// incrmemen this threads exec. counter
|
|
||||||
t->iter += 1;
|
|
||||||
|
|
||||||
sleepMs( rand() & 0xf );
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void _block_report( nbmpscq_t* p )
|
void _block_report( nbmpscq_t* p )
|
||||||
{
|
{
|
||||||
block_t* b = p->blockL;
|
block_t* b = p->blockL;
|
||||||
@ -301,14 +263,20 @@ cw::rc_t cw::nbmpscq::push( handle_t h, const void* blob, unsigned blobByteN )
|
|||||||
b->eleN.fetch_add(1,std::memory_order_release);
|
b->eleN.fetch_add(1,std::memory_order_release);
|
||||||
|
|
||||||
n->next.store(nullptr);
|
n->next.store(nullptr);
|
||||||
// Note that the elements of the queue are only accessed from the end of the queue (tail).
|
|
||||||
// New nodes can therefore safely be updated in two steps:
|
// Note that the elements of the queue are only accessed from the front of the queue (tail).
|
||||||
|
// New nodes are added to the end of the list (head).
|
||||||
|
// New node will therefore always have it's next ptr set to null.
|
||||||
|
|
||||||
// 1. Atomically set _head to the new node and return 'old-head'
|
// 1. Atomically set _head to the new node and return 'old-head'
|
||||||
|
// We use acq_release to prevent code movement above or below this instruction.
|
||||||
node_t* prev = p->head.exchange(n,std::memory_order_acq_rel);
|
node_t* prev = p->head.exchange(n,std::memory_order_acq_rel);
|
||||||
|
|
||||||
// Note that at this point only the new node may have the 'old-head' as it's predecssor.
|
// Note that at this point only the new node may have the 'old-head' as it's predecssor.
|
||||||
// Other threads may therefore safely interrupt at this point.
|
// Other threads may therefore safely interrupt at this point - they will
|
||||||
|
// have the new node as their predecessor. Note that none of these nodes are accessible
|
||||||
|
// yet because __tail next__ pointer is still pointing to the 'old-head' - whose next pointer
|
||||||
|
// is still null.
|
||||||
|
|
||||||
// 2. Set the old-head next pointer to the new node (thereby adding the new node to the list)
|
// 2. Set the old-head next pointer to the new node (thereby adding the new node to the list)
|
||||||
prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer
|
prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer
|
||||||
@ -341,7 +309,9 @@ cw::nbmpscq::blob_t cw::nbmpscq::get( handle_t h )
|
|||||||
blob_t blob;
|
blob_t blob;
|
||||||
nbmpscq_t* p = _handleToPtr(h);
|
nbmpscq_t* p = _handleToPtr(h);
|
||||||
|
|
||||||
// We always access the tail element through tail->next.
|
// We always access the tail element through tail->next. Always accessing the
|
||||||
|
// next tail element via the tail->next pointer is critical to correctness.
|
||||||
|
// See note in push().
|
||||||
node_t* node = p->tail->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
node_t* node = p->tail->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer
|
||||||
|
|
||||||
_init_blob( blob, node );
|
_init_blob( blob, node );
|
||||||
@ -436,107 +406,3 @@ unsigned cw::nbmpscq::count( handle_t h )
|
|||||||
return eleN;
|
return eleN;
|
||||||
}
|
}
|
||||||
|
|
||||||
cw::rc_t cw::nbmpscq::test( const object_t* cfg )
|
|
||||||
{
|
|
||||||
rc_t rc=kOkRC,rc0,rc1;
|
|
||||||
|
|
||||||
unsigned testArrayN = 2;
|
|
||||||
test_t* testArray = nullptr;
|
|
||||||
unsigned blkN = 2;
|
|
||||||
unsigned blkByteN = 1024;
|
|
||||||
const char* out_fname = nullptr;
|
|
||||||
time::spec_t t0 = time::current_time();
|
|
||||||
unsigned testDurMs = 0;
|
|
||||||
test_share_t share;
|
|
||||||
handle_t qH;
|
|
||||||
thread_mach::handle_t tmH;
|
|
||||||
file::handle_t fH;
|
|
||||||
|
|
||||||
if((rc = cfg->getv("blkN",blkN,
|
|
||||||
"blkByteN",blkByteN,
|
|
||||||
"testDurMs",testDurMs,
|
|
||||||
"threadN",testArrayN,
|
|
||||||
"out_fname",out_fname)) != kOkRC )
|
|
||||||
{
|
|
||||||
rc = cwLogError(rc,"Test params parse failed.");
|
|
||||||
goto errLabel;
|
|
||||||
}
|
|
||||||
|
|
||||||
if((rc = file::open(fH,out_fname,file::kWriteFl)) != kOkRC )
|
|
||||||
{
|
|
||||||
rc = cwLogError(rc,"Error creating the output file:%s",cwStringNullGuard(out_fname));
|
|
||||||
goto errLabel;
|
|
||||||
}
|
|
||||||
|
|
||||||
if( testArrayN == 0 )
|
|
||||||
{
|
|
||||||
rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0.");
|
|
||||||
goto errLabel;
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the thread intance records
|
|
||||||
testArray = mem::allocZ<test_t>(testArrayN);
|
|
||||||
|
|
||||||
// create the queue
|
|
||||||
if((rc = create( qH, blkN, blkByteN )) != kOkRC )
|
|
||||||
{
|
|
||||||
rc = cwLogError(rc,"nbmpsc create failed.");
|
|
||||||
goto errLabel;
|
|
||||||
}
|
|
||||||
|
|
||||||
share.qH = qH;
|
|
||||||
share.cnt.store(0);
|
|
||||||
|
|
||||||
for(unsigned i=0; i<testArrayN; ++i)
|
|
||||||
{
|
|
||||||
testArray[i].id = i;
|
|
||||||
testArray[i].share = &share;
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the thread machine
|
|
||||||
if((rc = thread_mach::create( tmH, _test_threadFunc, testArray, sizeof(test_t), testArrayN )) != kOkRC )
|
|
||||||
{
|
|
||||||
rc = cwLogError(rc,"Thread machine create failed.");
|
|
||||||
goto errLabel;
|
|
||||||
}
|
|
||||||
|
|
||||||
// start the thread machine
|
|
||||||
if((rc = thread_mach::start(tmH)) != kOkRC )
|
|
||||||
{
|
|
||||||
cwLogError(rc,"Thread machine start failed.");
|
|
||||||
goto errLabel;
|
|
||||||
}
|
|
||||||
|
|
||||||
// run the test for 'testDurMs' milliseconds
|
|
||||||
while( time::elapsedMs(t0) < testDurMs )
|
|
||||||
{
|
|
||||||
blob_t b = get(qH);
|
|
||||||
if( b.blob != nullptr )
|
|
||||||
{
|
|
||||||
test_t* t = (test_t*)b.blob;
|
|
||||||
printf(fH,"%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN);
|
|
||||||
advance(qH);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
errLabel:
|
|
||||||
file::close(fH);
|
|
||||||
|
|
||||||
if((rc0 = thread_mach::destroy(tmH)) != kOkRC )
|
|
||||||
cwLogError(rc0,"Thread machine destroy failed.");
|
|
||||||
|
|
||||||
if((rc1 = destroy(qH)) != kOkRC )
|
|
||||||
cwLogError(rc1,"nbmpsc queue destroy failed.");
|
|
||||||
|
|
||||||
if( testArray != nullptr )
|
|
||||||
printf("P:%i %i\n",testArray[0].iter, testArray[1].iter);
|
|
||||||
|
|
||||||
// TODO: read back the file and verify that none of the
|
|
||||||
// global incrment values were dropped.
|
|
||||||
|
|
||||||
mem::release(testArray);
|
|
||||||
|
|
||||||
return rcSelect(rc,rc0,rc1);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
@ -27,6 +27,10 @@ Pop
|
|||||||
2. decr. block->ele_count
|
2. decr. block->ele_count
|
||||||
3. if the ele-count is 0 and write-offset is invalid
|
3. if the ele-count is 0 and write-offset is invalid
|
||||||
reset the write-offset to 0.
|
reset the write-offset to 0.
|
||||||
|
|
||||||
|
|
||||||
|
This code is tested in cwMtQueueTester.h/cpp.
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
@ -89,8 +93,6 @@ namespace cw
|
|||||||
// Count of elements in the queue.
|
// Count of elements in the queue.
|
||||||
unsigned count( handle_t h );
|
unsigned count( handle_t h );
|
||||||
|
|
||||||
rc_t test( const object_t* cfg );
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user