99 lines
2.6 KiB
C++
99 lines
2.6 KiB
C++
//| Copyright: (C) 2020-2024 Kevin Larke <contact AT larke DOT org>
|
|
//| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
|
|
#ifndef cwNbMpScQueue_h
|
|
#define cwNbMpScQueue_h
|
|
|
|
/*
|
|
Non-blocking, Lock-free Queue:
|
|
=================================
|
|
|
|
Push
|
|
----
|
|
0. Produceers go down the a list of blocks (nbmpscq.blockL)
|
|
if a block is not already full then it atomically
|
|
fetch-add's block->write_idx by the size of the the element
|
|
to be inserted.
|
|
|
|
1. If after the fetch-add the write_idx is <= block->byteN then
|
|
- atomically incr ele-count,
|
|
- copy in ele
|
|
- place the block,ele-offset,ele-byte-cnt onto the NbMpScQueue().
|
|
|
|
2. else (the area is invalid) goto 0.
|
|
|
|
Pop
|
|
----
|
|
1. copy out next ele.
|
|
2. decr. block->ele_count
|
|
3. if the ele-count is 0 and write-offset is invalid
|
|
reset the write-offset to 0.
|
|
*/
|
|
|
|
|
|
|
|
namespace cw
|
|
{
|
|
namespace nbmpscq
|
|
{
|
|
typedef handle<struct nbmpscq_str> handle_t;
|
|
|
|
rc_t create( handle_t& hRef, unsigned initBlkN, unsigned blkByteN );
|
|
|
|
rc_t destroy( handle_t& hRef );
|
|
|
|
//
|
|
// Producer Function
|
|
//
|
|
|
|
// push() is called by multiple producer threads to insert
|
|
// an element in the queue. Note that the 'blob' is copied into
|
|
// the queue and therefore can be released by the caller.
|
|
rc_t push( handle_t h, const void* blob, unsigned blobByteN );
|
|
|
|
|
|
//
|
|
// Consumer Functions
|
|
//
|
|
|
|
typedef struct blob_str
|
|
{
|
|
rc_t rc;
|
|
const void* blob;
|
|
unsigned blobByteN;
|
|
} blob_t;
|
|
|
|
// get() is called by the single consumer thread to access the
|
|
// oldest record in the queue. Note that this call
|
|
// does not change the state of the queue.
|
|
blob_t get( handle_t h );
|
|
|
|
// advance() disposes of the oldest blob in the
|
|
// queue and makes the next blob current.
|
|
blob_t advance( handle_t h );
|
|
|
|
// The queue maintains a single internal iterator which the consumer
|
|
// may use to traverse stored records without removing them.
|
|
// The first call to peek() will return the oldest stored record.
|
|
// Each subsequent call to peek() will return the next stored record
|
|
// until no records are available - at which point blob_t.blob will be
|
|
// set to 'nullptr'. The following call will then revert to returning
|
|
// the oldest stored record.
|
|
blob_t peek( handle_t h );
|
|
|
|
// Reset peek to point to the oldest stored record.
|
|
void peek_reset( handle_t h );
|
|
|
|
// Return true if the queue is empty.
|
|
bool is_empty( handle_t h );
|
|
|
|
// Count of elements in the queue.
|
|
unsigned count( handle_t h );
|
|
|
|
rc_t test( const object_t* cfg );
|
|
|
|
}
|
|
}
|
|
|
|
|
|
#endif
|