From 8c7d72298c2d5ce1e5737580a23506e17f7029bd Mon Sep 17 00:00:00 2001 From: "kevin.larke" Date: Sat, 18 Apr 2020 21:24:42 -0400 Subject: [PATCH] cwSpScQueueTmpl.h : Initial commit. --- Makefile.am | 4 +- cwSpScQueueTmpl.cpp | 167 ++++++++++++++++++++++++++++++++++++++++++++ cwSpScQueueTmpl.h | 95 +++++++++++++++++++++++++ 3 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 cwSpScQueueTmpl.cpp create mode 100644 cwSpScQueueTmpl.h diff --git a/Makefile.am b/Makefile.am index 05a31d8..85bbbd3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -13,8 +13,8 @@ libcwSRC += src/libcw/cwObject.cpp src/libcw/cwText libcwHDR += src/libcw/cwThread.h src/libcw/cwMutex.h src/libcw/cwThreadMach.h libcwSRC += src/libcw/cwThread.cpp src/libcw/cwMutex.cpp src/libcw/cwThreadMach.cpp -libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h -libcwSRC += src/libcw/cwSpScBuf.cpp +libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h src/libcw/cwSpScQueueTmpl.h +libcwSRC += src/libcw/cwSpScBuf.cpp src/libcw/cwSpScQueueTmpl.cpp libcwHDR += src/libcw/cwWebSock.h src/libcw/cwWebSockSvr.h src/libcw/cwLib.h diff --git a/cwSpScQueueTmpl.cpp b/cwSpScQueueTmpl.cpp new file mode 100644 index 0000000..112c697 --- /dev/null +++ b/cwSpScQueueTmpl.cpp @@ -0,0 +1,167 @@ +#include "cwCommon.h" +#include "cwLog.h" +#include "cwCommonImpl.h" +#include "cwMem.h" +#include "cwThread.h" +#include "cwThreadMach.h" + + +#include "cwSpScQueueTmpl.h" + +namespace cw +{ + const int kDataByteN = 14; + + typedef struct msg_str + { + uint8_t dataByteN; + uint8_t checksum; + uint8_t data[ kDataByteN ]; + } msg_t; + + typedef spScQueueTmpl queue_t; + + + typedef struct shared_str + { + queue_t* q; // Shared SPSC queue + std::atomic readyFl; // The consumer sets the readyFl at program startup when it is ready to start emptying the queue. + } shared_t; // This prevents the producer from immediately filling the queue before the consumer start.s + + typedef struct ctx_str + { + unsigned id; // thread id + unsigned iter; // execution counter + unsigned msgN; // count of msg's processed + shared_t* share; // shared variables + } ctx_t; + + + void _producer( ctx_t* c ) + { + + bool readyFl = c->share->readyFl.load(std::memory_order_acquire); + + if( readyFl ) + { + + msg_t* m = c->share->q->get(); + + m->dataByteN = kDataByteN; + m->checksum = 0; + + uint8_t d = (c->iter & 0xff); + for(int i=0; idata[i] = d++; + m->checksum += m->data[i]; + } + + c->share->q->push(m); + + c->msgN++; + + } + + c->iter++; + + } + + void _consumer( ctx_t* c ) + { + msg_t* m = nullptr; + + if( c->iter == 0 ) + { + c->share->readyFl.store(true,std::memory_order_release); + } + + if((m = c->share->q->pop()) != nullptr ) + { + uint8_t curCheckSum = 0; + for(unsigned i=0; idata[i]; + + if( curCheckSum != m->checksum ) + cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,m->checksum); + + } + + c->iter++; + + } + + bool _threadFunc( void* arg ) + { + ctx_t* c = static_cast(arg); + + switch( c->id ) + { + case 0: + _producer(c); + break; + + case 1: + _consumer(c); + break; + + default: + assert(0); + } + + sleepMs( rand() & 0xf ); + + return true; + } + + +} + +cw::rc_t cw::testSpScQueueTmpl() +{ + rc_t rc=kOkRC,rc0; + + thread_mach::handle_t h; + const int ctxArrayN = 2; + ctx_t ctxArray[ctxArrayN]; + shared_t share; + const int eleN = 128; + + memset(&ctxArray,0,sizeof(ctxArray)); + + // setup the thread context array + ctxArray[0].id = 0; + ctxArray[0].share = &share; + ctxArray[1].id = 1; + ctxArray[1].share = &share; + + share.readyFl.store(false,std::memory_order_release); + + share.q = new queue_t(eleN); + + // create the thread machine + if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC ) + { + rc = cwLogError(rc,"Thread machine create failed."); + goto errLabel; + } + + // start the thread machine + if((rc = thread_mach::start(h)) != kOkRC ) + { + cwLogError(rc,"Thread machine start failed."); + goto errLabel; + } + + sleepMs(5000); + + errLabel: + if((rc0 = thread_mach::destroy(h)) != kOkRC ) + cwLogError(rc0,"Thread machine destroy failed."); + + delete share.q; + + printf("P:%i msgs:%i C:%i msgs:%i\n",ctxArray[0].iter, ctxArray[0].msgN, ctxArray[1].iter, ctxArray[1].msgN); + + return rcSelect(rc,rc0); +} diff --git a/cwSpScQueueTmpl.h b/cwSpScQueueTmpl.h new file mode 100644 index 0000000..f555e38 --- /dev/null +++ b/cwSpScQueueTmpl.h @@ -0,0 +1,95 @@ +#ifndef cwSpScQueueTmpl_H +#define cwSpScQueueTmpl_H + +namespace cw +{ + + template< typename T > + class spScQueueTmpl + { + public: + spScQueueTmpl( unsigned eleN ) + { + _aV = mem::allocZ(eleN); + _mem = mem::allocZ(eleN); + _wi.load(std::memory_order_release); + _ri.load(std::memory_order_release); + + for(unsigned i=0; i= ri ? wi-ri : (_aN-ri) + wi; + + // there must always be at least one empty element because + // wi can never be advanced to equal ri. + if( n >= _aN-1 ) + return kBufTooSmallRC; + + // store the new element + _aV[wi] = v; + + wi = (wi+1) % _aN; + + _wi.store( wi, std::memory_order_release ); + + return kOkRC; + } + + T* pop() + { + unsigned ri = _ri.load( std::memory_order_relaxed ); + unsigned wi = _wi.load( std::memory_order_acquire ); + + unsigned n = wi >= ri ? wi-ri : (_aN-ri) + wi; + + if( n == 0 ) + return nullptr; + + T* v = _aV[ri]; + + ri = (ri+1) % _aN; + + _ri.store( ri, std::memory_order_release); + + return v; + } + + + + + private: + unsigned _aN = 0; + T** _aV = nullptr; + T* _mem; + std::atomic _wi; + std::atomic _ri; + + // Note: // _wi==_ri indicates an empty buffer. + // _wi may never be advanced such that it equals _ri, however + // _ri may be advanced such that it equals _wi. + }; + + + rc_t testSpScQueueTmpl(); + +} +#endif