cwSpscQueueTmpl.h/cpp : Initial commit.

This commit is contained in:
kevin.larke 2020-04-19 13:01:21 -04:00
parent c48ba08514
commit b577d476d3
2 changed files with 23 additions and 20 deletions

View File

@ -34,6 +34,7 @@ namespace cw
unsigned iter; // execution counter unsigned iter; // execution counter
unsigned msgN; // count of msg's processed unsigned msgN; // count of msg's processed
shared_t* share; // shared variables shared_t* share; // shared variables
uint8_t prevId;
} ctx_t; } ctx_t;
@ -57,7 +58,7 @@ namespace cw
m->checksum += m->data[i]; m->checksum += m->data[i];
} }
c->share->q->push(m); c->share->q->publish();
c->msgN++; c->msgN++;
@ -84,7 +85,16 @@ namespace cw
if( curCheckSum != m->checksum ) if( curCheckSum != m->checksum )
cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,m->checksum); cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,m->checksum);
else
{
uint8_t id = c->prevId + 1;
if( id != m->data[0] )
cwLogInfo("drop ");
c->prevId = m->data[0];
}
c->msgN++;
} }
c->iter++; c->iter++;

View File

@ -10,31 +10,27 @@ namespace cw
public: public:
spScQueueTmpl( unsigned eleN ) spScQueueTmpl( unsigned eleN )
{ {
_aV = mem::allocZ<T*>(eleN); _aN = eleN;
_mem = mem::allocZ<T>(eleN); _aV = mem::allocZ<T>(eleN);
_wi.load(std::memory_order_release); _wi.store(0,std::memory_order_release);
_ri.load(std::memory_order_release); _ri.store(0,std::memory_order_release);
for(unsigned i=0; i<eleN; ++i)
_aV[i] = _mem + i;
} }
virtual ~spScQueueTmpl() virtual ~spScQueueTmpl()
{ {
mem::release(_aV); mem::release(_aV);
mem::release(_mem);
} }
T* get() T* get()
{ {
unsigned wi = _wi.load( std::memory_order_relaxed ); unsigned wi = _wi.load( std::memory_order_relaxed );
return _aV[wi]; return _aV + wi;
} }
rc_t push( T* v ) rc_t publish()
{ {
unsigned ri = _ri.load( std::memory_order_acquire ); unsigned ri = _ri.load( std::memory_order_acquire );
unsigned wi = _wi.load( std::memory_order_relaxed ); unsigned wi = _wi.load( std::memory_order_relaxed ); // _wi is written in this thread
// calc. the count of full elements // calc. the count of full elements
unsigned n = wi >= ri ? wi-ri : (_aN-ri) + wi; unsigned n = wi >= ri ? wi-ri : (_aN-ri) + wi;
@ -44,11 +40,9 @@ namespace cw
if( n >= _aN-1 ) if( n >= _aN-1 )
return kBufTooSmallRC; return kBufTooSmallRC;
// store the new element
_aV[wi] = v;
wi = (wi+1) % _aN; wi = (wi+1) % _aN;
// advance the write position
_wi.store( wi, std::memory_order_release ); _wi.store( wi, std::memory_order_release );
return kOkRC; return kOkRC;
@ -64,7 +58,7 @@ namespace cw
if( n == 0 ) if( n == 0 )
return nullptr; return nullptr;
T* v = _aV[ri]; T* v = _aV + ri;
ri = (ri+1) % _aN; ri = (ri+1) % _aN;
@ -78,8 +72,7 @@ namespace cw
private: private:
unsigned _aN = 0; unsigned _aN = 0;
T** _aV = nullptr; T* _aV = nullptr;
T* _mem;
std::atomic<unsigned> _wi; std::atomic<unsigned> _wi;
std::atomic<unsigned> _ri; std::atomic<unsigned> _ri;