#include "cwCommon.h" #include "cwLog.h" #include "cwCommonImpl.h" #include "cwTest.h" #include "cwMem.h" #include "cwTime.h" #include "cwObject.h" #include "cwMath.h" #include "cwNbMpScQueue.h" #include "cwMpScNbCircQueue.h" #include "cwMtQueueTester.h" #include "cwThread.h" #include "cwThreadMach.h" #include "cwFile.h" namespace cw { namespace mt_queue_tester { struct shared_str; typedef struct test_str { unsigned id; // thread id unsigned iter; // execution counter unsigned value; // struct shared_str* share; // pointer to global shared data } test_t; typedef test_t cq_data_t; typedef struct mp_sc_nb_circ_queue::cq_str cq_t; typedef struct shared_str { nbmpscq::handle_t qH; cq_t* cq; std::atomic cnt; } test_share_t; bool _nbmpscq_threadFunc( void* arg ) { test_t* t = (test_t*)arg; // get and increment a global shared counter t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); // push the current thread instance record push(t->share->qH,t,sizeof(test_t)); // incrmemen this threads exec. counter t->iter += 1; sleepMs( rand() & 0xf ); return true; } void nbmpscq_main( file::handle_t fH, nbmpscq::handle_t qH ) { nbmpscq::blob_t b = get(qH); if( b.blob != nullptr ) { test_t* t = (test_t*)b.blob; printf(fH,"%i %i %i %i\n",t->id,t->iter,t->value,b.blobByteN); advance(qH); } } bool _cq_threadFunc( void* arg ) { rc_t rc = kOkRC; test_t* t = (test_t*)arg; // get and increment a global shared counter t->value = t->share->cnt.fetch_add(1,std::memory_order_acq_rel); // push the current thread instance record if((rc = mp_sc_nb_circ_queue::push(t->share->cq, *t )) != kOkRC ) { cwLogError(rc,"Circular queue is full."); } // incrmement this threads exec. counter t->iter += 1; sleepUs( rand() & 0xf ); return true; } unsigned fail_N = 0; void cq_main( file::handle_t fH, cq_t* cq ) { cq_data_t t; unsigned res_cnt = cq->res_cnt.load(); if( mp_sc_nb_circ_queue::pop(cq, t ) == kOkRC ) { printf(fH,"%i %i %i %i\n",t.id,t.iter,t.value,res_cnt); fail_N = 0; } else { if( fail_N < 10 ) { //printf(fH,"F: %i %i : %i %i\n",res_idx,res_cnt,cq->tail_idx,value); fail_N += 1; } } } } rc_t _check_results( const char* fname ) { rc_t rc = kOkRC; unsigned lineN = 0; unsigned* valueA = nullptr; char* lineBuf = nullptr; unsigned lineCharN = 0; file::handle_t fH; cwLogInfo("Validation started ..."); if((rc = open(fH,fname,file::kReadFl)) != kOkRC ) { rc = cwLogError(rc,"Result file open failed on '%s'.",fname); goto errLabel; } if((rc = file::lineCount(fH,&lineN)) != kOkRC ) { rc = cwLogError(rc,"Line count could not be deteremined on '%s'.",fname); goto errLabel; } if( lineN == 0 ) { rc = cwLogError(rc,"Empty file detected on '%s'.",fname); goto errLabel; } lineN -= 1; valueA = mem::allocZ(lineN); for(unsigned i=0; igetv("blkN",blkN, "blkByteN",blkByteN, "circQueueEleCnt",cqEleCnt, "circQueueFl",cqFl, "testDurMs",testDurMs, "threadN",threadN, "out_fname",out_fname)) != kOkRC ) { rc = cwLogError(rc,"Test params parse failed."); goto errLabel; } if((rc = file::open(fH,out_fname,file::kWriteFl)) != kOkRC ) { rc = cwLogError(rc,"Error creating the output file:%s",cwStringNullGuard(out_fname)); goto errLabel; } if( threadN == 0 ) { rc = cwLogError(kInvalidArgRC,"The 'threadN' parameter must be greater than 0."); goto errLabel; } // create the thread intance records threadA = mem::allocZ(threadN); // create the cwNbMpScQueue queue if((rc = create( qH, blkN, blkByteN )) != kOkRC ) { rc = cwLogError(rc,"nbmpsc create failed."); goto errLabel; } thread_func = cqFl ? _cq_threadFunc : _nbmpscq_threadFunc; share.cq = mp_sc_nb_circ_queue::create(cqEleCnt); // create the circular queue share.qH = qH; share.cnt.store(0); for(unsigned i=0; i