cwSpScQueueTmpl.h : Initial commit.
This commit is contained in:
parent
9ec4c131b5
commit
8c7d72298c
@ -13,8 +13,8 @@ libcwSRC += src/libcw/cwObject.cpp src/libcw/cwText
|
||||
libcwHDR += src/libcw/cwThread.h src/libcw/cwMutex.h src/libcw/cwThreadMach.h
|
||||
libcwSRC += src/libcw/cwThread.cpp src/libcw/cwMutex.cpp src/libcw/cwThreadMach.cpp
|
||||
|
||||
libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h
|
||||
libcwSRC += src/libcw/cwSpScBuf.cpp
|
||||
libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h src/libcw/cwSpScQueueTmpl.h
|
||||
libcwSRC += src/libcw/cwSpScBuf.cpp src/libcw/cwSpScQueueTmpl.cpp
|
||||
|
||||
|
||||
libcwHDR += src/libcw/cwWebSock.h src/libcw/cwWebSockSvr.h src/libcw/cwLib.h
|
||||
|
167
cwSpScQueueTmpl.cpp
Normal file
167
cwSpScQueueTmpl.cpp
Normal file
@ -0,0 +1,167 @@
|
||||
#include "cwCommon.h"
|
||||
#include "cwLog.h"
|
||||
#include "cwCommonImpl.h"
|
||||
#include "cwMem.h"
|
||||
#include "cwThread.h"
|
||||
#include "cwThreadMach.h"
|
||||
|
||||
|
||||
#include "cwSpScQueueTmpl.h"
|
||||
|
||||
namespace cw
|
||||
{
|
||||
const int kDataByteN = 14;
|
||||
|
||||
typedef struct msg_str
|
||||
{
|
||||
uint8_t dataByteN;
|
||||
uint8_t checksum;
|
||||
uint8_t data[ kDataByteN ];
|
||||
} msg_t;
|
||||
|
||||
typedef spScQueueTmpl<msg_t> queue_t;
|
||||
|
||||
|
||||
typedef struct shared_str
|
||||
{
|
||||
queue_t* q; // Shared SPSC queue
|
||||
std::atomic<bool> readyFl; // The consumer sets the readyFl at program startup when it is ready to start emptying the queue.
|
||||
} shared_t; // This prevents the producer from immediately filling the queue before the consumer start.s
|
||||
|
||||
typedef struct ctx_str
|
||||
{
|
||||
unsigned id; // thread id
|
||||
unsigned iter; // execution counter
|
||||
unsigned msgN; // count of msg's processed
|
||||
shared_t* share; // shared variables
|
||||
} ctx_t;
|
||||
|
||||
|
||||
void _producer( ctx_t* c )
|
||||
{
|
||||
|
||||
bool readyFl = c->share->readyFl.load(std::memory_order_acquire);
|
||||
|
||||
if( readyFl )
|
||||
{
|
||||
|
||||
msg_t* m = c->share->q->get();
|
||||
|
||||
m->dataByteN = kDataByteN;
|
||||
m->checksum = 0;
|
||||
|
||||
uint8_t d = (c->iter & 0xff);
|
||||
for(int i=0; i<kDataByteN; ++i)
|
||||
{
|
||||
m->data[i] = d++;
|
||||
m->checksum += m->data[i];
|
||||
}
|
||||
|
||||
c->share->q->push(m);
|
||||
|
||||
c->msgN++;
|
||||
|
||||
}
|
||||
|
||||
c->iter++;
|
||||
|
||||
}
|
||||
|
||||
void _consumer( ctx_t* c )
|
||||
{
|
||||
msg_t* m = nullptr;
|
||||
|
||||
if( c->iter == 0 )
|
||||
{
|
||||
c->share->readyFl.store(true,std::memory_order_release);
|
||||
}
|
||||
|
||||
if((m = c->share->q->pop()) != nullptr )
|
||||
{
|
||||
uint8_t curCheckSum = 0;
|
||||
for(unsigned i=0; i<kDataByteN; ++i)
|
||||
curCheckSum += m->data[i];
|
||||
|
||||
if( curCheckSum != m->checksum )
|
||||
cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,m->checksum);
|
||||
|
||||
}
|
||||
|
||||
c->iter++;
|
||||
|
||||
}
|
||||
|
||||
bool _threadFunc( void* arg )
|
||||
{
|
||||
ctx_t* c = static_cast<ctx_t*>(arg);
|
||||
|
||||
switch( c->id )
|
||||
{
|
||||
case 0:
|
||||
_producer(c);
|
||||
break;
|
||||
|
||||
case 1:
|
||||
_consumer(c);
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
sleepMs( rand() & 0xf );
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
cw::rc_t cw::testSpScQueueTmpl()
|
||||
{
|
||||
rc_t rc=kOkRC,rc0;
|
||||
|
||||
thread_mach::handle_t h;
|
||||
const int ctxArrayN = 2;
|
||||
ctx_t ctxArray[ctxArrayN];
|
||||
shared_t share;
|
||||
const int eleN = 128;
|
||||
|
||||
memset(&ctxArray,0,sizeof(ctxArray));
|
||||
|
||||
// setup the thread context array
|
||||
ctxArray[0].id = 0;
|
||||
ctxArray[0].share = &share;
|
||||
ctxArray[1].id = 1;
|
||||
ctxArray[1].share = &share;
|
||||
|
||||
share.readyFl.store(false,std::memory_order_release);
|
||||
|
||||
share.q = new queue_t(eleN);
|
||||
|
||||
// create the thread machine
|
||||
if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC )
|
||||
{
|
||||
rc = cwLogError(rc,"Thread machine create failed.");
|
||||
goto errLabel;
|
||||
}
|
||||
|
||||
// start the thread machine
|
||||
if((rc = thread_mach::start(h)) != kOkRC )
|
||||
{
|
||||
cwLogError(rc,"Thread machine start failed.");
|
||||
goto errLabel;
|
||||
}
|
||||
|
||||
sleepMs(5000);
|
||||
|
||||
errLabel:
|
||||
if((rc0 = thread_mach::destroy(h)) != kOkRC )
|
||||
cwLogError(rc0,"Thread machine destroy failed.");
|
||||
|
||||
delete share.q;
|
||||
|
||||
printf("P:%i msgs:%i C:%i msgs:%i\n",ctxArray[0].iter, ctxArray[0].msgN, ctxArray[1].iter, ctxArray[1].msgN);
|
||||
|
||||
return rcSelect(rc,rc0);
|
||||
}
|
95
cwSpScQueueTmpl.h
Normal file
95
cwSpScQueueTmpl.h
Normal file
@ -0,0 +1,95 @@
|
||||
#ifndef cwSpScQueueTmpl_H
|
||||
#define cwSpScQueueTmpl_H
|
||||
|
||||
namespace cw
|
||||
{
|
||||
|
||||
template< typename T >
|
||||
class spScQueueTmpl
|
||||
{
|
||||
public:
|
||||
spScQueueTmpl( unsigned eleN )
|
||||
{
|
||||
_aV = mem::allocZ<T*>(eleN);
|
||||
_mem = mem::allocZ<T>(eleN);
|
||||
_wi.load(std::memory_order_release);
|
||||
_ri.load(std::memory_order_release);
|
||||
|
||||
for(unsigned i=0; i<eleN; ++i)
|
||||
_aV[i] = _mem + i;
|
||||
}
|
||||
|
||||
virtual ~spScQueueTmpl()
|
||||
{
|
||||
mem::release(_aV);
|
||||
mem::release(_mem);
|
||||
}
|
||||
|
||||
T* get()
|
||||
{
|
||||
unsigned wi = _wi.load( std::memory_order_relaxed );
|
||||
return _aV[wi];
|
||||
}
|
||||
|
||||
rc_t push( T* v )
|
||||
{
|
||||
unsigned ri = _ri.load( std::memory_order_acquire );
|
||||
unsigned wi = _wi.load( std::memory_order_relaxed );
|
||||
|
||||
// calc. the count of full elements
|
||||
unsigned n = wi >= ri ? wi-ri : (_aN-ri) + wi;
|
||||
|
||||
// there must always be at least one empty element because
|
||||
// wi can never be advanced to equal ri.
|
||||
if( n >= _aN-1 )
|
||||
return kBufTooSmallRC;
|
||||
|
||||
// store the new element
|
||||
_aV[wi] = v;
|
||||
|
||||
wi = (wi+1) % _aN;
|
||||
|
||||
_wi.store( wi, std::memory_order_release );
|
||||
|
||||
return kOkRC;
|
||||
}
|
||||
|
||||
T* pop()
|
||||
{
|
||||
unsigned ri = _ri.load( std::memory_order_relaxed );
|
||||
unsigned wi = _wi.load( std::memory_order_acquire );
|
||||
|
||||
unsigned n = wi >= ri ? wi-ri : (_aN-ri) + wi;
|
||||
|
||||
if( n == 0 )
|
||||
return nullptr;
|
||||
|
||||
T* v = _aV[ri];
|
||||
|
||||
ri = (ri+1) % _aN;
|
||||
|
||||
_ri.store( ri, std::memory_order_release);
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private:
|
||||
unsigned _aN = 0;
|
||||
T** _aV = nullptr;
|
||||
T* _mem;
|
||||
std::atomic<unsigned> _wi;
|
||||
std::atomic<unsigned> _ri;
|
||||
|
||||
// Note: // _wi==_ri indicates an empty buffer.
|
||||
// _wi may never be advanced such that it equals _ri, however
|
||||
// _ri may be advanced such that it equals _wi.
|
||||
};
|
||||
|
||||
|
||||
rc_t testSpScQueueTmpl();
|
||||
|
||||
}
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user