diff --git a/cwSpScBuf.cpp b/cwSpScBuf.cpp index 22531ee..72b0f91 100644 --- a/cwSpScBuf.cpp +++ b/cwSpScBuf.cpp @@ -27,6 +27,7 @@ namespace cw rc_t _destroy( spsc_buf_t* p ) { + mem::release(p->buf); mem::release(p); return kOkRC; } @@ -39,7 +40,7 @@ namespace cw if( r < w ) return w - r; - return (p->buf + p->bufByteN) - w + (r - p->buf); + return p->bufByteN - (r - w); } unsigned _emptyByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w ) @@ -61,6 +62,8 @@ cw::rc_t cw::spsc_buf::create( handle_t& hRef, unsigned bufByteN ) p->w = p->buf; p->r = p->buf; + hRef.set(p); + return rc; } @@ -84,7 +87,7 @@ cw::rc_t cw::spsc_buf::copyIn( handle_t h, const void* iBuf, unsigned iN ) { rc_t rc = kOkRC; spsc_buf_t* p = _handleToPtr(h); - uint8_t* w = p->w.load(std::memory_order_relaxed); + uint8_t* w = p->w.load(std::memory_order_acquire); uint8_t* r = p->r.load(std::memory_order_acquire); uint8_t* e = p->buf + p->bufByteN; uint8_t* w1; @@ -143,9 +146,11 @@ cw::rc_t cw::spsc_buf::copyIn( handle_t h, const void* iBuf, unsigned iN ) memcpy(p->buf,src+n0,n1); w1 = p->buf + n1; } + + p->w.store(w1,std::memory_order_release); + } - p->w.store(w1,std::memory_order_release); return rc; @@ -161,9 +166,9 @@ unsigned cw::spsc_buf::fullByteCount( handle_t h ) } cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsigned& returnedByteN_Ref ) -{ +{ spsc_buf_t* p = _handleToPtr(h); - uint8_t* r = p->r.load(std::memory_order_relaxed); + uint8_t* r = p->r.load(std::memory_order_acquire); uint8_t* w = p->w.load(std::memory_order_acquire); uint8_t* e = p->buf + p->bufByteN; uint8_t* oBuf = static_cast(buf); @@ -173,6 +178,11 @@ cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsign returnedByteN_Ref = 0; + if( r == w ) + { + return kOkRC; + } + // if the 'w' is in front of 'r' - then only one segment needs to be copied out if( r < w ) { @@ -194,11 +204,10 @@ cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsign if( n1 ) memcpy(oBuf+n0, p->buf, n1); - returnedByteN_Ref = n0 + n1; p->r.store(r1,std::memory_order_release); - + return kOkRC; } @@ -227,9 +236,11 @@ namespace cw typedef struct ctx_str { - unsigned id; // thread id - unsigned iter; // execution counter - shared_t* share; // shared variables + unsigned id; // thread id + unsigned iter; // execution counter + unsigned msgN; // count of msg's processed + unsigned state; // used by consumer to hold the parser state + shared_t* share; // shared variables } ctx_t; @@ -254,8 +265,12 @@ namespace cw spsc_buf::copyIn(c->share->h,&m,sizeof(m)); - c->iter++; + c->msgN++; + } + + c->iter++; + } void _consumer( ctx_t* c ) @@ -268,52 +283,52 @@ namespace cw kData }; - const unsigned kBufByteN = 128; + const unsigned kBufByteN = 128; uint8_t buf[ kBufByteN ]; - unsigned retBytesRead = 0; - uint8_t msgByteN = 0; // Count of bytes in this msg data array - uint8_t msgCheckSum = 0; // Checksum of this msg - unsigned curMsgIdx = 0; // The parser location (0<=curMsgIdx < msgByteN) - uint8_t curCheckSum = 0; // The accumulating checksum - unsigned state; + unsigned retBytesRead = 0; + uint8_t msgByteN = 0; // Count of bytes in this msg data array + uint8_t msgCheckSum = 0; // Checksum of this msg + unsigned curMsgIdx = 0; // The parser location (0<=curMsgIdx < msgByteN) + uint8_t curCheckSum = 0; // The accumulating checksum if( c->iter == 0 ) { c->share->readyFl.store(true,std::memory_order_release); - state = kBegin; + c->state = kBegin; } - if(spsc_buf::copyOut( c->share->h, buf, kBufByteN, retBytesRead ) != kOkRC && retBytesRead > 0) + if(spsc_buf::copyOut( c->share->h, buf, kBufByteN, retBytesRead ) == kOkRC && retBytesRead > 0) { - + uint8_t* b = buf; uint8_t* bend = b + retBytesRead; for(; b < bend; ++b) { - switch( state ) + switch( c->state ) { case kBegin: msgByteN = *b; - state = kChecksum; + c->state = kChecksum; break; case kChecksum: - msgCheckSum = *b; + msgCheckSum = *b; curCheckSum = 0; - curMsgIdx = 0; - state = kData; + curMsgIdx = 0; + c->state = kData; break; case kData: curCheckSum += (*b); - curMsgIdx += 1; + curMsgIdx += 1; if( curMsgIdx == msgByteN ) { if( curCheckSum != msgCheckSum ) cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,msgCheckSum); - state = kBegin; + c->state = kBegin; + c->msgN++; } break; @@ -344,6 +359,9 @@ namespace cw default: assert(0); } + + sleepMs( rand() & 0xf ); + return true; } @@ -360,30 +378,38 @@ cw::rc_t cw::spsc_buf::test() const int ctxArrayN = 2; ctx_t ctxArray[ctxArrayN]; shared_t share; - const int bufByteN = 128; + const int bufByteN = 1024; + + memset(&ctxArray,0,sizeof(ctxArray)); + // setup the thread context array ctxArray[0].id = 0; ctxArray[0].share = &share; ctxArray[1].id = 1; ctxArray[1].share = &share; share.readyFl.store(false,std::memory_order_release); - + + // create the SPSC buffer if((rc = create( share.h, bufByteN )) != kOkRC ) return cwLogError(rc,"spsc_buf create failed."); - + + // create the thread machine if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC ) { rc = cwLogError(rc,"Thread machine create failed."); goto errLabel; } - + + // start the thread machine if((rc = thread_mach::start(h)) != kOkRC ) { cwLogError(rc,"Thread machine start failed."); goto errLabel; } + sleepMs(5000); + errLabel: if((rc0 = thread_mach::destroy(h)) != kOkRC ) cwLogError(rc0,"Thread machine destroy failed."); @@ -391,6 +417,8 @@ cw::rc_t cw::spsc_buf::test() if((rc1 = spsc_buf::destroy(share.h)) != kOkRC ) cwLogError(rc1,"spsc_buf destroy failed."); + printf("P:%i msgs:%i C:%i msgs:%i\n",ctxArray[0].iter, ctxArray[0].msgN, ctxArray[1].iter, ctxArray[1].msgN); + return rcSelect(rc,rc0,rc1); } diff --git a/cwThreadMach.cpp b/cwThreadMach.cpp index 18d7b06..c67b62f 100644 --- a/cwThreadMach.cpp +++ b/cwThreadMach.cpp @@ -37,6 +37,9 @@ namespace cw } } + mem::release(p->threadA); + mem::release(p); + return rc; } @@ -68,6 +71,8 @@ cw::rc_t cw::thread_mach::create( handle_t& hRef, threadFunc_t threadFunc, void* } } + hRef.set(p); + errLabel: if( rc != kOkRC ) _destroy(p);