cwSpScBuf.cpp, cwThreadMach.cpp : Initial working version of spsc_buf and thread_mach.

This commit is contained in:
kevin.larke 2020-04-09 22:47:26 -04:00
parent fb92ec15de
commit 1ecaca7c07
2 changed files with 65 additions and 32 deletions

View File

@ -27,6 +27,7 @@ namespace cw
rc_t _destroy( spsc_buf_t* p ) rc_t _destroy( spsc_buf_t* p )
{ {
mem::release(p->buf);
mem::release(p); mem::release(p);
return kOkRC; return kOkRC;
} }
@ -39,7 +40,7 @@ namespace cw
if( r < w ) if( r < w )
return w - r; return w - r;
return (p->buf + p->bufByteN) - w + (r - p->buf); return p->bufByteN - (r - w);
} }
unsigned _emptyByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w ) unsigned _emptyByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w )
@ -61,6 +62,8 @@ cw::rc_t cw::spsc_buf::create( handle_t& hRef, unsigned bufByteN )
p->w = p->buf; p->w = p->buf;
p->r = p->buf; p->r = p->buf;
hRef.set(p);
return rc; return rc;
} }
@ -84,7 +87,7 @@ cw::rc_t cw::spsc_buf::copyIn( handle_t h, const void* iBuf, unsigned iN )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
spsc_buf_t* p = _handleToPtr(h); spsc_buf_t* p = _handleToPtr(h);
uint8_t* w = p->w.load(std::memory_order_relaxed); uint8_t* w = p->w.load(std::memory_order_acquire);
uint8_t* r = p->r.load(std::memory_order_acquire); uint8_t* r = p->r.load(std::memory_order_acquire);
uint8_t* e = p->buf + p->bufByteN; uint8_t* e = p->buf + p->bufByteN;
uint8_t* w1; uint8_t* w1;
@ -143,9 +146,11 @@ cw::rc_t cw::spsc_buf::copyIn( handle_t h, const void* iBuf, unsigned iN )
memcpy(p->buf,src+n0,n1); memcpy(p->buf,src+n0,n1);
w1 = p->buf + n1; w1 = p->buf + n1;
} }
p->w.store(w1,std::memory_order_release);
} }
p->w.store(w1,std::memory_order_release);
return rc; return rc;
@ -161,9 +166,9 @@ unsigned cw::spsc_buf::fullByteCount( handle_t h )
} }
cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsigned& returnedByteN_Ref ) cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsigned& returnedByteN_Ref )
{ {
spsc_buf_t* p = _handleToPtr(h); spsc_buf_t* p = _handleToPtr(h);
uint8_t* r = p->r.load(std::memory_order_relaxed); uint8_t* r = p->r.load(std::memory_order_acquire);
uint8_t* w = p->w.load(std::memory_order_acquire); uint8_t* w = p->w.load(std::memory_order_acquire);
uint8_t* e = p->buf + p->bufByteN; uint8_t* e = p->buf + p->bufByteN;
uint8_t* oBuf = static_cast<uint8_t*>(buf); uint8_t* oBuf = static_cast<uint8_t*>(buf);
@ -173,6 +178,11 @@ cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsign
returnedByteN_Ref = 0; returnedByteN_Ref = 0;
if( r == w )
{
return kOkRC;
}
// if the 'w' is in front of 'r' - then only one segment needs to be copied out // if the 'w' is in front of 'r' - then only one segment needs to be copied out
if( r < w ) if( r < w )
{ {
@ -194,11 +204,10 @@ cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsign
if( n1 ) if( n1 )
memcpy(oBuf+n0, p->buf, n1); memcpy(oBuf+n0, p->buf, n1);
returnedByteN_Ref = n0 + n1; returnedByteN_Ref = n0 + n1;
p->r.store(r1,std::memory_order_release); p->r.store(r1,std::memory_order_release);
return kOkRC; return kOkRC;
} }
@ -227,9 +236,11 @@ namespace cw
typedef struct ctx_str typedef struct ctx_str
{ {
unsigned id; // thread id unsigned id; // thread id
unsigned iter; // execution counter unsigned iter; // execution counter
shared_t* share; // shared variables unsigned msgN; // count of msg's processed
unsigned state; // used by consumer to hold the parser state
shared_t* share; // shared variables
} ctx_t; } ctx_t;
@ -254,8 +265,12 @@ namespace cw
spsc_buf::copyIn(c->share->h,&m,sizeof(m)); spsc_buf::copyIn(c->share->h,&m,sizeof(m));
c->iter++; c->msgN++;
} }
c->iter++;
} }
void _consumer( ctx_t* c ) void _consumer( ctx_t* c )
@ -268,52 +283,52 @@ namespace cw
kData kData
}; };
const unsigned kBufByteN = 128; const unsigned kBufByteN = 128;
uint8_t buf[ kBufByteN ]; uint8_t buf[ kBufByteN ];
unsigned retBytesRead = 0; unsigned retBytesRead = 0;
uint8_t msgByteN = 0; // Count of bytes in this msg data array uint8_t msgByteN = 0; // Count of bytes in this msg data array
uint8_t msgCheckSum = 0; // Checksum of this msg uint8_t msgCheckSum = 0; // Checksum of this msg
unsigned curMsgIdx = 0; // The parser location (0<=curMsgIdx < msgByteN) unsigned curMsgIdx = 0; // The parser location (0<=curMsgIdx < msgByteN)
uint8_t curCheckSum = 0; // The accumulating checksum uint8_t curCheckSum = 0; // The accumulating checksum
unsigned state;
if( c->iter == 0 ) if( c->iter == 0 )
{ {
c->share->readyFl.store(true,std::memory_order_release); c->share->readyFl.store(true,std::memory_order_release);
state = kBegin; c->state = kBegin;
} }
if(spsc_buf::copyOut( c->share->h, buf, kBufByteN, retBytesRead ) != kOkRC && retBytesRead > 0) if(spsc_buf::copyOut( c->share->h, buf, kBufByteN, retBytesRead ) == kOkRC && retBytesRead > 0)
{ {
uint8_t* b = buf; uint8_t* b = buf;
uint8_t* bend = b + retBytesRead; uint8_t* bend = b + retBytesRead;
for(; b < bend; ++b) for(; b < bend; ++b)
{ {
switch( state ) switch( c->state )
{ {
case kBegin: case kBegin:
msgByteN = *b; msgByteN = *b;
state = kChecksum; c->state = kChecksum;
break; break;
case kChecksum: case kChecksum:
msgCheckSum = *b; msgCheckSum = *b;
curCheckSum = 0; curCheckSum = 0;
curMsgIdx = 0; curMsgIdx = 0;
state = kData; c->state = kData;
break; break;
case kData: case kData:
curCheckSum += (*b); curCheckSum += (*b);
curMsgIdx += 1; curMsgIdx += 1;
if( curMsgIdx == msgByteN ) if( curMsgIdx == msgByteN )
{ {
if( curCheckSum != msgCheckSum ) if( curCheckSum != msgCheckSum )
cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,msgCheckSum); cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,msgCheckSum);
state = kBegin; c->state = kBegin;
c->msgN++;
} }
break; break;
@ -344,6 +359,9 @@ namespace cw
default: default:
assert(0); assert(0);
} }
sleepMs( rand() & 0xf );
return true; return true;
} }
@ -360,30 +378,38 @@ cw::rc_t cw::spsc_buf::test()
const int ctxArrayN = 2; const int ctxArrayN = 2;
ctx_t ctxArray[ctxArrayN]; ctx_t ctxArray[ctxArrayN];
shared_t share; shared_t share;
const int bufByteN = 128; const int bufByteN = 1024;
memset(&ctxArray,0,sizeof(ctxArray));
// setup the thread context array
ctxArray[0].id = 0; ctxArray[0].id = 0;
ctxArray[0].share = &share; ctxArray[0].share = &share;
ctxArray[1].id = 1; ctxArray[1].id = 1;
ctxArray[1].share = &share; ctxArray[1].share = &share;
share.readyFl.store(false,std::memory_order_release); share.readyFl.store(false,std::memory_order_release);
// create the SPSC buffer
if((rc = create( share.h, bufByteN )) != kOkRC ) if((rc = create( share.h, bufByteN )) != kOkRC )
return cwLogError(rc,"spsc_buf create failed."); return cwLogError(rc,"spsc_buf create failed.");
// create the thread machine
if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC ) if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC )
{ {
rc = cwLogError(rc,"Thread machine create failed."); rc = cwLogError(rc,"Thread machine create failed.");
goto errLabel; goto errLabel;
} }
// start the thread machine
if((rc = thread_mach::start(h)) != kOkRC ) if((rc = thread_mach::start(h)) != kOkRC )
{ {
cwLogError(rc,"Thread machine start failed."); cwLogError(rc,"Thread machine start failed.");
goto errLabel; goto errLabel;
} }
sleepMs(5000);
errLabel: errLabel:
if((rc0 = thread_mach::destroy(h)) != kOkRC ) if((rc0 = thread_mach::destroy(h)) != kOkRC )
cwLogError(rc0,"Thread machine destroy failed."); cwLogError(rc0,"Thread machine destroy failed.");
@ -391,6 +417,8 @@ cw::rc_t cw::spsc_buf::test()
if((rc1 = spsc_buf::destroy(share.h)) != kOkRC ) if((rc1 = spsc_buf::destroy(share.h)) != kOkRC )
cwLogError(rc1,"spsc_buf destroy failed."); cwLogError(rc1,"spsc_buf destroy failed.");
printf("P:%i msgs:%i C:%i msgs:%i\n",ctxArray[0].iter, ctxArray[0].msgN, ctxArray[1].iter, ctxArray[1].msgN);
return rcSelect(rc,rc0,rc1); return rcSelect(rc,rc0,rc1);
} }

View File

@ -37,6 +37,9 @@ namespace cw
} }
} }
mem::release(p->threadA);
mem::release(p);
return rc; return rc;
} }
@ -68,6 +71,8 @@ cw::rc_t cw::thread_mach::create( handle_t& hRef, threadFunc_t threadFunc, void*
} }
} }
hRef.set(p);
errLabel: errLabel:
if( rc != kOkRC ) if( rc != kOkRC )
_destroy(p); _destroy(p);