libcw/cwSpScBuf.cpp

432 lines
9.5 KiB
C++
Raw Permalink Normal View History

//| Copyright: (C) 2020-2024 Kevin Larke <contact AT larke DOT org>
//| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
#include "cwCommon.h"
#include "cwLog.h"
#include "cwCommonImpl.h"
2024-09-15 18:54:25 +00:00
#include "cwTest.h"
#include "cwMem.h"
#include "cwSpScBuf.h"
#include "cwThread.h"
#include "cwThreadMach.h"
namespace cw
{
namespace spsc_buf
{
typedef struct spsc_buf_str
{
std::atomic<uint8_t*> w; // write ptr
std::atomic<uint8_t*> r; // read ptr
2020-10-31 18:17:43 +00:00
uint8_t* buf;
unsigned bufByteN;
} spsc_buf_t;
// Note: r==w indicates an empty buffer.
// Therefore 'w' may never be advanced such that it equals 'r',
// however, 'r' may be advanced such that it equals 'w'.
spsc_buf_t* _handleToPtr( handle_t h )
{ return handleToPtr<handle_t,spsc_buf_t>(h); }
rc_t _destroy( spsc_buf_t* p )
{
mem::release(p->buf);
mem::release(p);
return kOkRC;
}
unsigned _fullByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w )
{
if( r == w )
return 0;
if( r < w )
return w - r;
return p->bufByteN - (r - w);
}
unsigned _emptyByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w )
{ return p->bufByteN - _fullByteCount(p,r,w); }
}
}
cw::rc_t cw::spsc_buf::create( handle_t& hRef, unsigned bufByteN )
{
rc_t rc;
if((rc = destroy(hRef)) != kOkRC )
return rc;
spsc_buf_t* p = mem::allocZ<spsc_buf_t>();
2020-10-31 18:17:43 +00:00
p->buf = mem::allocZ<uint8_t>(bufByteN);
p->bufByteN = bufByteN;
2020-10-31 18:17:43 +00:00
p->w.store(p->buf);
p->r.store(p->buf);
hRef.set(p);
2020-10-31 18:17:43 +00:00
return rc;
}
cw::rc_t cw::spsc_buf::destroy( handle_t& hRef )
{
rc_t rc = kOkRC;
if( !hRef.isValid() )
return kOkRC;
spsc_buf_t* p = _handleToPtr(hRef);
if((rc = _destroy(p)) != kOkRC )
return rc;
hRef.clear();
return rc;
}
cw::rc_t cw::spsc_buf::copyIn( handle_t h, const void* iBuf, unsigned iN )
{
rc_t rc = kOkRC;
spsc_buf_t* p = _handleToPtr(h);
uint8_t* w = p->w.load(std::memory_order_acquire);
uint8_t* r = p->r.load(std::memory_order_acquire);
uint8_t* e = p->buf + p->bufByteN;
uint8_t* w1;
unsigned n0;
unsigned n1 = 0;
// if r is behind w (then the write may split into two parts)
if( r <= w )
{
// if there is space between w and the EOB to accept the write ...
if( iN <= (unsigned)(e-w) )
{
n0 = iN; // fill the space after w
if( w + iN == r )
{
rc = kBufTooSmallRC;
}
}
else // ... otherwise the write must wrap
{
n0 = e-w; // fill the space between w and EOB
n1 = iN-n0; // then begin writing at the beginning of the buffer
if( p->buf + n1 >= r )
{
rc = kBufTooSmallRC;
}
}
}
else // r > w : r is in front of w (the write will not split)
{
if( iN < (unsigned)(r - w) )
{
n0 = iN;
}
else
{
rc = kBufTooSmallRC;
}
}
if( rc != kOkRC )
rc = cwLogError(rc,"spsc_buf overflowed.");
else
{
const uint8_t* src = static_cast<const uint8_t*>(iBuf);
memcpy(w,src,n0);
w1 = w + n0;
if( n1 )
{
memcpy(p->buf,src+n0,n1);
w1 = p->buf + n1;
}
p->w.store(w1,std::memory_order_release);
}
return rc;
}
unsigned cw::spsc_buf::fullByteCount( handle_t h )
{
spsc_buf_t* p = _handleToPtr(h);
uint8_t* r = p->r.load(std::memory_order_acquire);
uint8_t* w = p->w.load(std::memory_order_acquire);
return _fullByteCount(p,r,w);
}
cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsigned& returnedByteN_Ref )
{
spsc_buf_t* p = _handleToPtr(h);
uint8_t* r = p->r.load(std::memory_order_acquire);
uint8_t* w = p->w.load(std::memory_order_acquire);
uint8_t* e = p->buf + p->bufByteN;
uint8_t* oBuf = static_cast<uint8_t*>(buf);
uint8_t* r1 = nullptr;
unsigned n0 = 0;
unsigned n1 = 0;
returnedByteN_Ref = 0;
if( r == w )
{
return kOkRC;
}
// if the 'w' is in front of 'r' - then only one segment needs to be copied out
if( r < w )
{
n0 = w-r;
r1 = r + n0;
}
else // otherwise two segments need to be copied out
{
n0 = e-r;
n1 = w-p->buf;
r1 = p->buf + n1;
}
// check that the return buffer is large enough
if( n0+n1 > bufByteN )
return cwLogError(kBufTooSmallRC,"The return buffer is too small.");
memcpy(oBuf, r, n0);
if( n1 )
memcpy(oBuf+n0, p->buf, n1);
returnedByteN_Ref = n0 + n1;
p->r.store(r1,std::memory_order_release);
return kOkRC;
}
namespace cw
{
namespace spsc_buf
{
const int kDataByteN = 14;
#pragma pack(push, 1)
typedef struct msg_str
{
uint8_t dataByteN;
uint8_t checksum;
uint8_t data[ kDataByteN ];
} msg_t;
#pragma pack(pop)
typedef struct shared_str
{
spsc_buf::handle_t h; // Shared SPSC queue
std::atomic<bool> readyFl; // The consumer sets the readyFl at program startup when it is ready to start emptying the queue.
} shared_t; // This prevents the producer from immediately filling the queue before the consumer start.s
typedef struct ctx_str
{
unsigned id; // thread id
unsigned iter; // execution counter
unsigned msgN; // count of msg's processed
unsigned state; // used by consumer to hold the parser state
shared_t* share; // shared variables
} ctx_t;
void _producer( ctx_t* c )
{
msg_t m;
bool readyFl = c->share->readyFl.load(std::memory_order_acquire);
if( readyFl )
{
m.dataByteN = kDataByteN;
m.checksum = 0;
uint8_t d = (c->iter & 0xff);
for(int i=0; i<kDataByteN; ++i)
{
m.data[i] = d++;
m.checksum += m.data[i];
}
spsc_buf::copyIn(c->share->h,&m,sizeof(m));
c->msgN++;
}
c->iter++;
}
void _consumer( ctx_t* c )
{
// message parser state values
enum
{
kBegin,
kChecksum,
kData
};
const unsigned kBufByteN = 128;
uint8_t buf[ kBufByteN ];
unsigned retBytesRead = 0;
uint8_t msgByteN = 0; // Count of bytes in this msg data array
uint8_t msgCheckSum = 0; // Checksum of this msg
unsigned curMsgIdx = 0; // The parser location (0<=curMsgIdx < msgByteN)
uint8_t curCheckSum = 0; // The accumulating checksum
if( c->iter == 0 )
{
c->share->readyFl.store(true,std::memory_order_release);
c->state = kBegin;
}
if(spsc_buf::copyOut( c->share->h, buf, kBufByteN, retBytesRead ) == kOkRC && retBytesRead > 0)
{
uint8_t* b = buf;
uint8_t* bend = b + retBytesRead;
for(; b < bend; ++b)
{
switch( c->state )
{
case kBegin:
msgByteN = *b;
c->state = kChecksum;
break;
case kChecksum:
msgCheckSum = *b;
curCheckSum = 0;
curMsgIdx = 0;
c->state = kData;
break;
case kData:
curCheckSum += (*b);
curMsgIdx += 1;
if( curMsgIdx == msgByteN )
{
if( curCheckSum != msgCheckSum )
cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,msgCheckSum);
c->state = kBegin;
c->msgN++;
}
break;
default:
assert(0);
}
}
}
c->iter++;
}
bool _threadFunc( void* arg )
{
ctx_t* c = static_cast<ctx_t*>(arg);
switch( c->id )
{
case 0:
_producer(c);
break;
case 1:
_consumer(c);
break;
default:
assert(0);
}
sleepMs( rand() & 0xf );
return true;
}
}
}
cw::rc_t cw::spsc_buf::test()
{
rc_t rc=kOkRC,rc0,rc1;
thread_mach::handle_t h;
const int ctxArrayN = 2;
ctx_t ctxArray[ctxArrayN];
shared_t share;
const int bufByteN = 1024;
memset(&ctxArray,0,sizeof(ctxArray));
// setup the thread context array
ctxArray[0].id = 0;
ctxArray[0].share = &share;
ctxArray[1].id = 1;
ctxArray[1].share = &share;
2020-10-31 18:17:43 +00:00
share.readyFl.store(false,std::memory_order_release);
// create the SPSC buffer
if((rc = create( share.h, bufByteN )) != kOkRC )
2020-10-31 18:17:43 +00:00
{
return cwLogError(rc,"spsc_buf create failed.");
2020-10-31 18:17:43 +00:00
}
// create the thread machine
if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC )
{
rc = cwLogError(rc,"Thread machine create failed.");
goto errLabel;
}
// start the thread machine
if((rc = thread_mach::start(h)) != kOkRC )
{
cwLogError(rc,"Thread machine start failed.");
goto errLabel;
}
2020-10-31 18:17:43 +00:00
sleepMs(5000);
2020-10-31 18:17:43 +00:00
errLabel:
if((rc0 = thread_mach::destroy(h)) != kOkRC )
cwLogError(rc0,"Thread machine destroy failed.");
if((rc1 = spsc_buf::destroy(share.h)) != kOkRC )
cwLogError(rc1,"spsc_buf destroy failed.");
printf("P:%i msgs:%i C:%i msgs:%i\n",ctxArray[0].iter, ctxArray[0].msgN, ctxArray[1].iter, ctxArray[1].msgN);
return rcSelect(rc,rc0,rc1);
}