diff --git a/cwMtQueueTester.cpp b/cwMtQueueTester.cpp new file mode 100644 index 0000000..1288cc6 --- /dev/null +++ b/cwMtQueueTester.cpp @@ -0,0 +1,316 @@ +#include "cwCommon.h" +#include "cwLog.h" +#include "cwCommonImpl.h" +#include "cwTest.h" +#include "cwMem.h" +#include "cwTime.h" +#include "cwObject.h" +#include "cwMath.h" + +#include "cwNbMpScQueue.h" +#include "cwMpScNbCircQueue.h" +#include "cwMtQueueTester.h" + +#include "cwThread.h" +#include "cwThreadMach.h" +#include "cwFile.h" + + +namespace cw +{ + namespace mt_queue_tester + { + struct shared_str; + + typedef struct test_str + { + unsigned id; // thread id + unsigned iter; // execution counter + unsigned value; // + struct shared_str* share; // pointer to global shared data + } test_t; + + typedef test_t cq_data_t; + + typedef struct mp_sc_nb_circ_queue::cq_str cq_t; + + typedef struct shared_str + { + nbmpscq::handle_t qH; + cq_t* cq; + std::atomic cnt; + + } test_share_t; + + + + bool _nbmpscq_threadFunc( void* arg ) + { + test_t* t = (test_t*)arg; + + // get and increment a global shared counter + t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); + + // push the current thread instance record + push(t->share->qH,t,sizeof(test_t)); + + // incrmemen this threads exec. counter + t->iter += 1; + + sleepMs( rand() & 0xf ); + + return true; + } + + void nbmpscq_main( file::handle_t fH, nbmpscq::handle_t qH ) + { + nbmpscq::blob_t b = get(qH); + if( b.blob != nullptr ) + { + test_t* t = (test_t*)b.blob; + printf(fH,"%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN); + advance(qH); + } + + } + + bool _cq_threadFunc( void* arg ) + { + rc_t rc = kOkRC; + test_t* t = (test_t*)arg; + + // get and increment a global shared counter + t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); + + // push the current thread instance record + if((rc = mp_sc_nb_circ_queue::push(t->share->cq, *t )) != kOkRC ) + { + cwLogError(rc,"Circular queue is full."); + } + + // incrmement this threads exec. counter + t->iter += 1; + + sleepUs( rand() & 0xf ); + + return true; + } + + unsigned fail_N = 0; + void cq_main( file::handle_t fH, cq_t* cq ) + { + + cq_data_t t; + + unsigned res_cnt = cq->res_cnt.load(); + + if( mp_sc_nb_circ_queue::pop(cq, t ) == kOkRC ) + { + printf(fH,"%i %i %i %i\n",t.id,t.iter,t.value,res_cnt); + + fail_N = 0; + } + else + { + if( fail_N < 10 ) + { + //printf(fH,"F: %i %i : %i %i\n",res_idx,res_cnt,cq->tail_idx,value); + fail_N += 1; + } + } + } + } + + rc_t _check_results( const char* fname ) + { + rc_t rc = kOkRC; + unsigned lineN = 0; + unsigned* valueA = nullptr; + char* lineBuf = nullptr; + unsigned lineCharN = 0; + file::handle_t fH; + + cwLogInfo("Validation started ..."); + + if((rc = open(fH,fname,file::kReadFl)) != kOkRC ) + { + rc = cwLogError(rc,"Result file open failed on '%s'.",fname); + goto errLabel; + } + + if((rc = file::lineCount(fH,&lineN)) != kOkRC ) + { + rc = cwLogError(rc,"Line count could not be deteremined on '%s'.",fname); + goto errLabel; + } + + if( lineN == 0 ) + { + rc = cwLogError(rc,"Empty file detected on '%s'.",fname); + goto errLabel; + } + + lineN -= 1; + + valueA = mem::allocZ(lineN); + + for(unsigned i=0; igetv("blkN",blkN, + "blkByteN",blkByteN, + "circQueueEleCnt",cqEleCnt, + "circQueueFl",cqFl, + "testDurMs",testDurMs, + "threadN",threadN, + "out_fname",out_fname)) != kOkRC ) + { + rc = cwLogError(rc,"Test params parse failed."); + goto errLabel; + } + + if((rc = file::open(fH,out_fname,file::kWriteFl)) != kOkRC ) + { + rc = cwLogError(rc,"Error creating the output file:%s",cwStringNullGuard(out_fname)); + goto errLabel; + } + + if( threadN == 0 ) + { + rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); + goto errLabel; + } + + // create the thread intance records + threadA = mem::allocZ(threadN); + + // create the cwNbMpScQueue queue + if((rc = create( qH, blkN, blkByteN )) != kOkRC ) + { + rc = cwLogError(rc,"nbmpsc create failed."); + goto errLabel; + } + + thread_func = cqFl ? _cq_threadFunc : _nbmpscq_threadFunc; + share.cq = mp_sc_nb_circ_queue::create(cqEleCnt); // create the circular queue + share.qH = qH; + share.cnt.store(0); + + for(unsigned i=0; i