cwSpScBuf.h/cpp, cwThreadMach.h/cpp : initial commit.
This commit is contained in:
parent
6670a961be
commit
efc1c27f85
10
Makefile.am
10
Makefile.am
@ -8,10 +8,14 @@ libcwHDR += src/libcw/cwFileSys.h src/libcw/cwText.h src/libcw/cwFile.h s
|
||||
libcwSRC += src/libcw/cwFileSys.cpp src/libcw/cwText.cpp src/libcw/cwFile.cpp src/libcw/cwTime.cpp src/libcw/cwLex.cpp
|
||||
|
||||
libcwHDR += src/libcw/cwObject.h src/libcw/cwObjectTemplate.h src/libcw/cwTextBuf.h
|
||||
libcwSRC += src/libcw/cwObject.cpp src/libcw/cwTextBuf.cpp
|
||||
libcwSRC += src/libcw/cwObject.cpp src/libcw/cwTextBuf.cpp
|
||||
|
||||
libcwHDR += src/libcw/cwThread.h src/libcw/cwMutex.h src/libcw/cwThreadMach.h
|
||||
libcwSRC += src/libcw/cwThread.cpp src/libcw/cwMutex.cpp src/libcw/cwThreadMach.cpp
|
||||
|
||||
libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h
|
||||
libcwSRC += src/libcw/cwSpScBuf.cpp
|
||||
|
||||
libcwHDR += src/libcw/cwThread.h src/libcw/cwMutex.h src/libcw/cwMpScNbQueue.h
|
||||
libcwSRC += src/libcw/cwThread.cpp src/libcw/cwMutex.cpp
|
||||
|
||||
libcwHDR += src/libcw/cwWebSock.h src/libcw/cwWebSockSvr.h src/libcw/cwLib.h
|
||||
libcwSRC += src/libcw/cwWebSock.cpp src/libcw/cwWebSockSvr.cpp src/libcw/cwLib.cpp
|
||||
|
398
cwSpScBuf.cpp
Normal file
398
cwSpScBuf.cpp
Normal file
@ -0,0 +1,398 @@
|
||||
#include "cwCommon.h"
|
||||
#include "cwLog.h"
|
||||
#include "cwCommonImpl.h"
|
||||
#include "cwMem.h"
|
||||
#include "cwSpScBuf.h"
|
||||
#include "cwThread.h"
|
||||
#include "cwThreadMach.h"
|
||||
|
||||
namespace cw
|
||||
{
|
||||
namespace spsc_buf
|
||||
{
|
||||
typedef struct spsc_buf_str
|
||||
{
|
||||
uint8_t* buf;
|
||||
unsigned bufByteN;
|
||||
std::atomic<uint8_t*> w; // write ptr
|
||||
std::atomic<uint8_t*> r; // read ptr
|
||||
} spsc_buf_t;
|
||||
|
||||
// Note: r==w indicates an empty buffer.
|
||||
// Therefore 'w' may never be advanced such that it equals 'r',
|
||||
// however, 'r' may be advanced such that it equals 'w'.
|
||||
|
||||
spsc_buf_t* _handleToPtr( handle_t h )
|
||||
{ return handleToPtr<handle_t,spsc_buf_t>(h); }
|
||||
|
||||
rc_t _destroy( spsc_buf_t* p )
|
||||
{
|
||||
mem::release(p);
|
||||
return kOkRC;
|
||||
}
|
||||
|
||||
unsigned _fullByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w )
|
||||
{
|
||||
if( r == w )
|
||||
return 0;
|
||||
|
||||
if( r < w )
|
||||
return w - r;
|
||||
|
||||
return (p->buf + p->bufByteN) - w + (r - p->buf);
|
||||
}
|
||||
|
||||
unsigned _emptyByteCount( spsc_buf_t* p, uint8_t* r, uint8_t* w )
|
||||
{ return p->bufByteN - _fullByteCount(p,r,w); }
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
cw::rc_t cw::spsc_buf::create( handle_t& hRef, unsigned bufByteN )
|
||||
{
|
||||
rc_t rc;
|
||||
if((rc = destroy(hRef)) != kOkRC )
|
||||
return rc;
|
||||
|
||||
spsc_buf_t* p = mem::allocZ<spsc_buf_t>();
|
||||
p->buf = mem::allocZ<uint8_t>(bufByteN);
|
||||
p->bufByteN = bufByteN;
|
||||
p->w = p->buf;
|
||||
p->r = p->buf;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
cw::rc_t cw::spsc_buf::destroy( handle_t& hRef )
|
||||
{
|
||||
rc_t rc = kOkRC;
|
||||
if( !hRef.isValid() )
|
||||
return kOkRC;
|
||||
|
||||
spsc_buf_t* p = _handleToPtr(hRef);
|
||||
if((rc = _destroy(p)) != kOkRC )
|
||||
return rc;
|
||||
|
||||
hRef.clear();
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
cw::rc_t cw::spsc_buf::copyIn( handle_t h, const void* iBuf, unsigned iN )
|
||||
{
|
||||
rc_t rc = kOkRC;
|
||||
spsc_buf_t* p = _handleToPtr(h);
|
||||
uint8_t* w = p->w.load(std::memory_order_relaxed);
|
||||
uint8_t* r = p->r.load(std::memory_order_acquire);
|
||||
uint8_t* e = p->buf + p->bufByteN;
|
||||
uint8_t* w1;
|
||||
unsigned n0;
|
||||
unsigned n1 = 0;
|
||||
|
||||
// if r is behind w (then the write may split into two parts)
|
||||
if( r <= w )
|
||||
{
|
||||
|
||||
// if there is space between w and the EOB to accept the write ...
|
||||
if( iN <= e-w )
|
||||
{
|
||||
n0 = iN; // fill the space after w
|
||||
|
||||
if( w + iN == r )
|
||||
{
|
||||
rc = kBufTooSmallRC;
|
||||
}
|
||||
}
|
||||
else // ... otherwise the write must wrap
|
||||
{
|
||||
n0 = e-w; // fill the space between w and EOB
|
||||
n1 = iN-n0; // then begin writing at the beginning of the buffer
|
||||
|
||||
if( p->buf + n1 >= r )
|
||||
{
|
||||
rc = kBufTooSmallRC;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
else // r > w : r is in front of w (the write will not split)
|
||||
{
|
||||
if( iN < r - w )
|
||||
{
|
||||
n0 = iN;
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = kBufTooSmallRC;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
if( rc != kOkRC )
|
||||
rc = cwLogError(rc,"spsc_buf overflowed.");
|
||||
else
|
||||
{
|
||||
const uint8_t* src = static_cast<const uint8_t*>(iBuf);
|
||||
memcpy(w,src,n0);
|
||||
w1 = w + n0;
|
||||
if( n1 )
|
||||
{
|
||||
memcpy(p->buf,src+n0,n1);
|
||||
w1 = p->buf + n1;
|
||||
}
|
||||
}
|
||||
|
||||
p->w.store(w1,std::memory_order_release);
|
||||
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
unsigned cw::spsc_buf::fullByteCount( handle_t h )
|
||||
{
|
||||
spsc_buf_t* p = _handleToPtr(h);
|
||||
uint8_t* r = p->r.load(std::memory_order_acquire);
|
||||
uint8_t* w = p->w.load(std::memory_order_acquire);
|
||||
|
||||
return _fullByteCount(p,r,w);
|
||||
}
|
||||
|
||||
cw::rc_t cw::spsc_buf::copyOut( handle_t h, void* buf, unsigned bufByteN, unsigned& returnedByteN_Ref )
|
||||
{
|
||||
spsc_buf_t* p = _handleToPtr(h);
|
||||
uint8_t* r = p->r.load(std::memory_order_relaxed);
|
||||
uint8_t* w = p->w.load(std::memory_order_acquire);
|
||||
uint8_t* e = p->buf + p->bufByteN;
|
||||
uint8_t* oBuf = static_cast<uint8_t*>(buf);
|
||||
uint8_t* r1 = nullptr;
|
||||
unsigned n0 = 0;
|
||||
unsigned n1 = 0;
|
||||
|
||||
returnedByteN_Ref = 0;
|
||||
|
||||
// if the 'w' is in front of 'r' - then only one segment needs to be copied out
|
||||
if( r < w )
|
||||
{
|
||||
n0 = w-r;
|
||||
r1 = r + n0;
|
||||
}
|
||||
else // otherwise two segments need to be copied out
|
||||
{
|
||||
n0 = e-r;
|
||||
n1 = w-p->buf;
|
||||
r1 = p->buf + n1;
|
||||
}
|
||||
|
||||
// check that the return buffer is large enough
|
||||
if( n0+n1 > bufByteN )
|
||||
return cwLogError(kBufTooSmallRC,"The return buffer is too small.");
|
||||
|
||||
memcpy(oBuf, r, n0);
|
||||
if( n1 )
|
||||
memcpy(oBuf+n0, p->buf, n1);
|
||||
|
||||
|
||||
returnedByteN_Ref = n0 + n1;
|
||||
|
||||
p->r.store(r1,std::memory_order_release);
|
||||
|
||||
return kOkRC;
|
||||
}
|
||||
|
||||
|
||||
|
||||
namespace cw
|
||||
{
|
||||
namespace spsc_buf
|
||||
{
|
||||
const int kDataByteN = 14;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
typedef struct msg_str
|
||||
{
|
||||
uint8_t dataByteN;
|
||||
uint8_t checksum;
|
||||
uint8_t data[ kDataByteN ];
|
||||
} msg_t;
|
||||
#pragma pack(pop)
|
||||
|
||||
typedef struct shared_str
|
||||
{
|
||||
spsc_buf::handle_t h; // Shared SPSC queue
|
||||
std::atomic<bool> readyFl; // The consumer sets the readyFl at program startup when it is ready to start emptying the queue.
|
||||
} shared_t; // This prevents the producer from immediately filling the queue before the consumer start.s
|
||||
|
||||
typedef struct ctx_str
|
||||
{
|
||||
unsigned id; // thread id
|
||||
unsigned iter; // execution counter
|
||||
shared_t* share; // shared variables
|
||||
} ctx_t;
|
||||
|
||||
|
||||
void _producer( ctx_t* c )
|
||||
{
|
||||
msg_t m;
|
||||
|
||||
bool readyFl = c->share->readyFl.load(std::memory_order_acquire);
|
||||
|
||||
if( readyFl )
|
||||
{
|
||||
|
||||
m.dataByteN = kDataByteN;
|
||||
m.checksum = 0;
|
||||
|
||||
uint8_t d = (c->iter & 0xff);
|
||||
for(int i=0; i<kDataByteN; ++i)
|
||||
{
|
||||
m.data[i] = d++;
|
||||
m.checksum += m.data[i];
|
||||
}
|
||||
|
||||
spsc_buf::copyIn(c->share->h,&m,sizeof(m));
|
||||
|
||||
c->iter++;
|
||||
}
|
||||
}
|
||||
|
||||
void _consumer( ctx_t* c )
|
||||
{
|
||||
// message parser state values
|
||||
enum
|
||||
{
|
||||
kBegin,
|
||||
kChecksum,
|
||||
kData
|
||||
};
|
||||
|
||||
const unsigned kBufByteN = 128;
|
||||
uint8_t buf[ kBufByteN ];
|
||||
unsigned retBytesRead = 0;
|
||||
uint8_t msgByteN = 0; // Count of bytes in this msg data array
|
||||
uint8_t msgCheckSum = 0; // Checksum of this msg
|
||||
unsigned curMsgIdx = 0; // The parser location (0<=curMsgIdx < msgByteN)
|
||||
uint8_t curCheckSum = 0; // The accumulating checksum
|
||||
unsigned state;
|
||||
|
||||
if( c->iter == 0 )
|
||||
{
|
||||
c->share->readyFl.store(true,std::memory_order_release);
|
||||
state = kBegin;
|
||||
}
|
||||
|
||||
if(spsc_buf::copyOut( c->share->h, buf, kBufByteN, retBytesRead ) != kOkRC && retBytesRead > 0)
|
||||
{
|
||||
|
||||
uint8_t* b = buf;
|
||||
uint8_t* bend = b + retBytesRead;
|
||||
|
||||
|
||||
for(; b < bend; ++b)
|
||||
{
|
||||
switch( state )
|
||||
{
|
||||
case kBegin:
|
||||
msgByteN = *b;
|
||||
state = kChecksum;
|
||||
break;
|
||||
|
||||
case kChecksum:
|
||||
msgCheckSum = *b;
|
||||
curCheckSum = 0;
|
||||
curMsgIdx = 0;
|
||||
state = kData;
|
||||
break;
|
||||
|
||||
case kData:
|
||||
curCheckSum += (*b);
|
||||
curMsgIdx += 1;
|
||||
if( curMsgIdx == msgByteN )
|
||||
{
|
||||
if( curCheckSum != msgCheckSum )
|
||||
cwLogError(kOpFailRC,"Checksum mismatch.0x%x != 0x%x ",curCheckSum,msgCheckSum);
|
||||
state = kBegin;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c->iter++;
|
||||
|
||||
}
|
||||
|
||||
bool _threadFunc( void* arg )
|
||||
{
|
||||
ctx_t* c = static_cast<ctx_t*>(arg);
|
||||
|
||||
switch( c->id )
|
||||
{
|
||||
case 0:
|
||||
_producer(c);
|
||||
break;
|
||||
|
||||
case 1:
|
||||
_consumer(c);
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
cw::rc_t cw::spsc_buf::test()
|
||||
{
|
||||
rc_t rc=kOkRC,rc0,rc1;
|
||||
|
||||
thread_mach::handle_t h;
|
||||
const int ctxArrayN = 2;
|
||||
ctx_t ctxArray[ctxArrayN];
|
||||
shared_t share;
|
||||
const int bufByteN = 128;
|
||||
|
||||
ctxArray[0].id = 0;
|
||||
ctxArray[0].share = &share;
|
||||
ctxArray[1].id = 1;
|
||||
ctxArray[1].share = &share;
|
||||
|
||||
share.readyFl.store(false,std::memory_order_release);
|
||||
|
||||
if((rc = create( share.h, bufByteN )) != kOkRC )
|
||||
return cwLogError(rc,"spsc_buf create failed.");
|
||||
|
||||
if((rc = thread_mach::create( h, _threadFunc, ctxArray, sizeof(ctx_t), ctxArrayN )) != kOkRC )
|
||||
{
|
||||
rc = cwLogError(rc,"Thread machine create failed.");
|
||||
goto errLabel;
|
||||
}
|
||||
|
||||
if((rc = thread_mach::start(h)) != kOkRC )
|
||||
{
|
||||
cwLogError(rc,"Thread machine start failed.");
|
||||
goto errLabel;
|
||||
}
|
||||
|
||||
errLabel:
|
||||
if((rc0 = thread_mach::destroy(h)) != kOkRC )
|
||||
cwLogError(rc0,"Thread machine destroy failed.");
|
||||
|
||||
if((rc1 = spsc_buf::destroy(share.h)) != kOkRC )
|
||||
cwLogError(rc1,"spsc_buf destroy failed.");
|
||||
|
||||
return rcSelect(rc,rc0,rc1);
|
||||
}
|
||||
|
||||
|
||||
|
32
cwSpScBuf.h
Normal file
32
cwSpScBuf.h
Normal file
@ -0,0 +1,32 @@
|
||||
#ifndef cwSpScBuf_h
|
||||
#define cwSpScBuf_h
|
||||
|
||||
namespace cw
|
||||
{
|
||||
|
||||
namespace spsc_buf
|
||||
{
|
||||
typedef handle<struct spsc_buf_str> handle_t;
|
||||
|
||||
rc_t create( handle_t& hRef, unsigned bufByteN );
|
||||
rc_t destroy( handle_t& hRef );
|
||||
|
||||
// Copy data into the filling buffer.
|
||||
rc_t copyIn( handle_t h, const void* buf, unsigned bufByteN );
|
||||
|
||||
// Get a count of the bytes contained in the filling buffer.
|
||||
unsigned fullByteCount( handle_t h );
|
||||
|
||||
// Swap the buffers and return the one containing data.
|
||||
rc_t copyOut( handle_t h, void* buf, unsigned bufByteN, unsigned& returnedByteN_Ref );
|
||||
|
||||
rc_t test();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
117
cwThreadMach.cpp
Normal file
117
cwThreadMach.cpp
Normal file
@ -0,0 +1,117 @@
|
||||
#include "cwCommon.h"
|
||||
#include "cwLog.h"
|
||||
#include "cwCommonImpl.h"
|
||||
#include "cwMem.h"
|
||||
#include "cwThread.h"
|
||||
#include "cwThreadMach.h"
|
||||
|
||||
namespace cw
|
||||
{
|
||||
namespace thread_mach
|
||||
{
|
||||
typedef struct thread_str
|
||||
{
|
||||
thread::handle_t thH;
|
||||
void* arg;
|
||||
} thread_t;
|
||||
|
||||
typedef struct thread_mach_str
|
||||
{
|
||||
thread_t* threadA;
|
||||
unsigned threadN;
|
||||
} thread_mach_t;
|
||||
|
||||
thread_mach_t* _handleToPtr( handle_t h )
|
||||
{ return handleToPtr<handle_t,thread_mach_t>(h); }
|
||||
|
||||
rc_t _destroy( thread_mach_t* p )
|
||||
{
|
||||
rc_t rc = kOkRC;
|
||||
|
||||
for(unsigned i=0; i<p->threadN; ++i)
|
||||
{
|
||||
if((rc = destroy(p->threadA[i].thH)) != kOkRC )
|
||||
{
|
||||
rc = cwLogError(rc,"Thread at index %i destroy failed.",i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
cw::rc_t cw::thread_mach::create( handle_t& hRef, threadFunc_t threadFunc, void* contextArray, unsigned contexRecdByteN, unsigned threadN )
|
||||
{
|
||||
rc_t rc;
|
||||
|
||||
if((rc = destroy(hRef)) != kOkRC )
|
||||
return rc;
|
||||
|
||||
thread_mach_t* p = mem::allocZ<thread_mach_t>();
|
||||
p->threadA = mem::allocZ<thread_t>(threadN);
|
||||
p->threadN = threadN;
|
||||
|
||||
uint8_t* ctxA = static_cast<uint8_t*>(contextArray);
|
||||
|
||||
for(unsigned i=0; i<threadN; ++i)
|
||||
{
|
||||
p->threadA[i].arg = ctxA + (i*contexRecdByteN);
|
||||
|
||||
if((rc = thread::create(p->threadA[i].thH, threadFunc, p->threadA[i].arg )) != kOkRC )
|
||||
{
|
||||
rc = cwLogError(rc,"Thread at index %i create failed.",i);
|
||||
goto errLabel;
|
||||
}
|
||||
}
|
||||
|
||||
errLabel:
|
||||
if( rc != kOkRC )
|
||||
_destroy(p);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
cw::rc_t cw::thread_mach::destroy( handle_t& hRef )
|
||||
{
|
||||
rc_t rc = kOkRC;
|
||||
if( !hRef.isValid() )
|
||||
return rc;
|
||||
|
||||
thread_mach_t* p = _handleToPtr(hRef);
|
||||
|
||||
if((rc = _destroy(p)) != kOkRC )
|
||||
return rc;
|
||||
|
||||
hRef.clear();
|
||||
return rc;
|
||||
}
|
||||
|
||||
cw::rc_t cw::thread_mach::start( handle_t h )
|
||||
{
|
||||
rc_t rc = kOkRC;
|
||||
rc_t rc0;
|
||||
|
||||
thread_mach_t* p = _handleToPtr(h);
|
||||
for(unsigned i=0; i<p->threadN; ++i)
|
||||
if((rc0 = thread::unpause( p->threadA[i].thH )) != kOkRC )
|
||||
rc = cwLogError(rc0,"Thread at index %i start failed.", i );
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
cw::rc_t cw::thread_mach::stop( handle_t h )
|
||||
{
|
||||
rc_t rc = kOkRC;
|
||||
rc_t rc0;
|
||||
|
||||
thread_mach_t* p = _handleToPtr(h);
|
||||
for(unsigned i=0; i<p->threadN; ++i)
|
||||
if((rc0 = thread::pause( p->threadA[i].thH, thread::kPauseFl | thread::kWaitFl )) != kOkRC )
|
||||
rc = cwLogError(rc0,"Thread at index %i stop failed.", i );
|
||||
|
||||
return rc;
|
||||
}
|
19
cwThreadMach.h
Normal file
19
cwThreadMach.h
Normal file
@ -0,0 +1,19 @@
|
||||
#ifndef cwThreadMach_H
|
||||
#define cwThreadMach_H
|
||||
|
||||
namespace cw
|
||||
{
|
||||
namespace thread_mach
|
||||
{
|
||||
typedef handle<struct thread_mach_str> handle_t;
|
||||
typedef thread::cbFunc_t threadFunc_t;
|
||||
|
||||
rc_t create( handle_t& hRef, threadFunc_t threadFunc, void* contextArray, unsigned contexRecdByteN, unsigned threadN );
|
||||
rc_t destroy( handle_t& hRef );
|
||||
rc_t start( handle_t h );
|
||||
rc_t stop( handle_t h );
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user