diff --git a/cwMpScNbCircQueue.h b/cwMpScNbCircQueue.h index 2749389..93b790f 100644 --- a/cwMpScNbCircQueue.h +++ b/cwMpScNbCircQueue.h @@ -4,19 +4,29 @@ namespace cw { namespace mp_sc_nb_circ_queue { - + template< typename T > + struct node_str + { + std::atomic* > next; + T t; + }; + template< typename T > struct cq_str { - T* array; + struct node_str* array; unsigned alloc_cnt; - unsigned tail_idx; - std::atomic head_idx; - std::atomic cnt; + unsigned index_mask; + std::atomic res_cnt; + std::atomic res_head_idx; + + std::atomic*> head; // last-in + struct node_str* tail; // first-out + struct node_str stub; // dummy node }; template< typename T > - struct cq_str* alloc( unsigned alloc_cnt ) + struct cq_str* create( unsigned alloc_cnt ) { struct cq_str* p = mem::allocZ>(); @@ -24,14 +34,24 @@ namespace cw // a bit mask rather than modulo to keep the head and tail indexes in range alloc_cnt = math::nextPowerOfTwo(alloc_cnt); - p->array = mem::allocZ(alloc_cnt); - p->alloc_cnt = alloc_cnt; + p->array = mem::allocZ< struct node_str >(alloc_cnt); + p->alloc_cnt = alloc_cnt; + p->index_mask = alloc_cnt - 1; // decr. alloc_cnt by 1 to create an index mask + p->res_cnt.store(0); + p->res_head_idx.store(0); + + p->stub.next.store(nullptr); + p->head.store(&p->stub); + p->tail = &p->stub; + + for(unsigned i=0; iarray[i].next.store(nullptr); return p; } template< typename T > - rc_t free( struct cq_str*& p_ref) + rc_t destroy( struct cq_str*& p_ref) { if( p_ref != nullptr ) { @@ -43,51 +63,77 @@ namespace cw template< typename T > - rc_t push( struct cq_str* p, T* value ) + rc_t push( struct cq_str* p, T value ) { rc_t rc = kOkRC; - // allocate a slot in the array - on succes this thread has space to push + // allocate a slot in the array - on succes this thread owns space on the array // (acquire prevents reordering with rd/wr ops below - sync with fetch_sub() below) - if( p->cnt.fetch_add(1,std::memory_order_acquire) >= p->alloc_cnt ) + if( p->res_cnt.fetch_add(1,std::memory_order_acquire) >= p->alloc_cnt ) { + // a slot is not available - undo the increment + p->res_cnt.fetch_sub(1,std::memory_order_release); + rc = kBufTooSmallRC; } else { - // get the current head and then advance it - unsigned idx = p->head_idx.fetch_add(1,std::memory_order_acquire) & p->alloc_cnt; + // get the location of the reserved slot and then advance the res_head_idx. + unsigned idx = p->res_head_idx.fetch_add(1,std::memory_order_acquire) & p->index_mask; + + struct node_str* n = p->array + idx; + + // store the pushed element in the reserved slot + n->t = value; + n->next.store(nullptr); + + // Note that the elements of the queue are only accessed from the front of the queue (tail). + // New nodes are added to the end of the list (head). + // New node will therefore always have it's next ptr set to null. + + // 1. Atomically set head to the new node and return 'old-head'. + // We use acq_release to prevent code movement above or below this instruction. + struct node_str* prev = p->head.exchange(n,std::memory_order_acq_rel); + + // Note that at this point only the new node may have the 'old-head' as it's predecssor. + // Other threads may therefore safely interrupt at this point - they will + // have the new node as their predecessor. Note that none of these nodes are accessible + // yet because __tail next__ pointer is still pointing to the 'old-head' - whose next pointer + // is still null. + + // 2. Set the old-head next pointer to the new node (thereby adding the new node to the list) + prev->next.store(n,std::memory_order_release); // RELEASE 'next' to consumer - p->array[ idx ] = value; } return rc; } template< typename T > - rc_t pop( struct cq_str* p, T*& value_ref ) + rc_t pop( struct cq_str* p, T& value_ref ) { - rc_t rc = kOkRC; - unsigned n = p->cnt.load(std::memory_order_acquire); - - if( n == 0 ) - { - rc = kEofRC; - } - else - { + rc_t rc = kEofRC; - value_ref = p->array[ p->tail_idx ]; + // We always access the tail element through tail->next. Always accessing the + // next tail element via the tail->next pointer is critical to correctness. + // See note in push(). + struct node_str* next = p->tail->next.load(std::memory_order_acquire); // ACQUIRE 'next' from producer - p->tail_idx = (p->tail_idx + 1) & p->alloc_cnt; - - p->cnt.fetch_sub(1,std::memory_order_release ); + if( next != nullptr ) + { + value_ref = next->t; + + p->tail = next; + + // decrease the count of full slots + p->res_cnt.fetch_sub(1,std::memory_order_release); + + rc = kOkRC; + } return rc; } - - rc_t test( const object_t* cfg ); - + } }