diff --git a/Makefile.am b/Makefile.am index d45f897..e869c48 100644 --- a/Makefile.am +++ b/Makefile.am @@ -22,8 +22,8 @@ libcwSRC += src/libcw/cwObject.cpp src/libcw/cwText libcwHDR += src/libcw/cwThread.h src/libcw/cwMutex.h src/libcw/cwThreadMach.h libcwSRC += src/libcw/cwThread.cpp src/libcw/cwMutex.cpp src/libcw/cwThreadMach.cpp -libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h src/libcw/cwSpScQueueTmpl.h -libcwSRC += src/libcw/cwSpScBuf.cpp src/libcw/cwSpScQueueTmpl.cpp +libcwHDR += src/libcw/cwMpScNbQueue.h src/libcw/cwSpScBuf.h src/libcw/cwSpScQueueTmpl.h src/libcw/cwMpScNbCircQueue.h +libcwSRC += src/libcw/cwSpScBuf.cpp src/libcw/cwSpScQueueTmpl.cpp src/libcw/cwMpScNbCircQueue.cpp libcwHDR += src/libcw/cwNbMpScQueue.h libcwSRC += src/libcw/cwNbMpScQueue.cpp diff --git a/cwMpScNbCircQueue.cpp b/cwMpScNbCircQueue.cpp new file mode 100644 index 0000000..4dc1fce --- /dev/null +++ b/cwMpScNbCircQueue.cpp @@ -0,0 +1,20 @@ +#include "cwCommon.h" +#include "cwLog.h" +#include "cwCommonImpl.h" +#include "cwTest.h" +#include "cwMem.h" +#include "cwTime.h" +#include "cwObject.h" + +#include "cwMath.h" +#include "cwMpScNbCircQueue.h" + + + +cw::rc_t cw::mp_sc_nb_circ_queue::test( const object_t* obj ) +{ + rc_t rc = kOkRC; + + return rc; + +} diff --git a/cwMpScNbCircQueue.h b/cwMpScNbCircQueue.h new file mode 100644 index 0000000..2749389 --- /dev/null +++ b/cwMpScNbCircQueue.h @@ -0,0 +1,94 @@ +#ifndef cwMpScNbCircQueue_h +#define cwMpScNbCircQueue_h +namespace cw +{ + namespace mp_sc_nb_circ_queue + { + + template< typename T > + struct cq_str + { + T* array; + unsigned alloc_cnt; + unsigned tail_idx; + std::atomic head_idx; + std::atomic cnt; + }; + + template< typename T > + struct cq_str* alloc( unsigned alloc_cnt ) + { + struct cq_str* p = mem::allocZ>(); + + // increment alloc_cnt to next power of two so we can use + // a bit mask rather than modulo to keep the head and tail indexes in range + alloc_cnt = math::nextPowerOfTwo(alloc_cnt); + + p->array = mem::allocZ(alloc_cnt); + p->alloc_cnt = alloc_cnt; + + return p; + } + + template< typename T > + rc_t free( struct cq_str*& p_ref) + { + if( p_ref != nullptr ) + { + mem::release(p_ref->array); + mem::release(p_ref); + } + return kOkRC; + } + + + template< typename T > + rc_t push( struct cq_str* p, T* value ) + { + rc_t rc = kOkRC; + + // allocate a slot in the array - on succes this thread has space to push + // (acquire prevents reordering with rd/wr ops below - sync with fetch_sub() below) + if( p->cnt.fetch_add(1,std::memory_order_acquire) >= p->alloc_cnt ) + { + rc = kBufTooSmallRC; + } + else + { + // get the current head and then advance it + unsigned idx = p->head_idx.fetch_add(1,std::memory_order_acquire) & p->alloc_cnt; + + p->array[ idx ] = value; + } + return rc; + } + + template< typename T > + rc_t pop( struct cq_str* p, T*& value_ref ) + { + rc_t rc = kOkRC; + unsigned n = p->cnt.load(std::memory_order_acquire); + + if( n == 0 ) + { + rc = kEofRC; + } + else + { + + value_ref = p->array[ p->tail_idx ]; + + p->tail_idx = (p->tail_idx + 1) & p->alloc_cnt; + + p->cnt.fetch_sub(1,std::memory_order_release ); + } + + return rc; + } + + rc_t test( const object_t* cfg ); + + + } +} +#endif