libcm/cmThread.c

2003 line
48 KiB
C

#include "cmPrefix.h"
#include "cmGlobal.h"
#include "cmRpt.h"
#include "cmErr.h"
#include "cmMem.h"
#include "cmMallocDebug.h"
#include "cmThread.h"
#include <pthread.h>
#include <unistd.h> // usleep
//#include <atomic_ops.h>
cmThreadH_t cmThreadNullHandle = {NULL};
enum
{
kDoExitThFl = 0x01,
kDoPauseThFl = 0x02,
kDoRunThFl = 0x04
};
typedef struct
{
cmErr_t err;
cmThreadFunc_t funcPtr;
pthread_t pthreadH;
cmThStateId_t state;
void* funcParam;
unsigned doFlags;
unsigned pauseMicroSecs;
unsigned waitMicroSecs;
} cmThThread_t;
cmThRC_t _cmThError( cmErr_t* err, cmThRC_t rc, int sysErr, const char* fmt, ... )
{
va_list vl;
va_start(vl,fmt);
cmErrVSysMsg(err,rc,sysErr,fmt,vl);
va_end(vl);
return rc;
}
void _cmThThreadCleanUpCallback(void* t)
{
((cmThThread_t*)t)->state = kExitedThId;
}
void* _cmThThreadCallback(void* param)
{
cmThThread_t* t = (cmThThread_t*)param;
// set a clean up handler - this will be called when the
// thread terminates unexpectedly or pthread_cleanup_pop() is called.
pthread_cleanup_push(_cmThThreadCleanUpCallback,t);
while( cmIsFlag(t->doFlags,kDoExitThFl) == false )
{
if( t->state == kPausedThId )
{
cmSleepUs( t->pauseMicroSecs );
if( cmIsFlag(t->doFlags,kDoRunThFl) )
{
t->doFlags = cmClrFlag(t->doFlags,kDoRunThFl);
t->state = kRunningThId;
}
}
else
{
if( t->funcPtr(t->funcParam)==false )
break;
if( cmIsFlag(t->doFlags,kDoPauseThFl) )
{
t->doFlags = cmClrFlag(t->doFlags,kDoPauseThFl);
t->state = kPausedThId;
}
}
}
pthread_cleanup_pop(1);
pthread_exit(NULL);
return t;
}
cmThThread_t* _cmThThreadFromHandle( cmThreadH_t h )
{
cmThThread_t* tp = (cmThThread_t*)h.h;
assert(tp != NULL);
return tp->state==kNotInitThId ? NULL : tp;
}
cmThRC_t _cmThWaitForState( cmThThread_t* t, unsigned stateId )
{
unsigned waitTimeMicroSecs = 0;
while( t->state != stateId && waitTimeMicroSecs < t->waitMicroSecs )
{
//cmSleepUs( t->waitMicroSecs );
cmSleepUs( 15000 );
waitTimeMicroSecs += 15000; //t->waitMicroSecs;
}
return t->state==stateId ? kOkThRC : kTimeOutThRC;
}
cmThRC_t cmThreadCreate( cmThreadH_t* hPtr, cmThreadFunc_t funcPtr, void* funcParam, cmRpt_t* rpt )
{
//pthread_attr_t attr;
cmThRC_t rc = kOkThRC;
cmThThread_t* tp = cmMemAllocZ( cmThThread_t, 1 );
int sysErr;
cmErrSetup(&tp->err,rpt,"Thread");
tp->funcPtr = funcPtr;
tp->funcParam = funcParam;
tp->state = kPausedThId;
tp->doFlags = 0;
tp->pauseMicroSecs = 50000;
tp->waitMicroSecs = 1000000;
if((sysErr = pthread_create(&tp->pthreadH,NULL,_cmThThreadCallback, (void*)tp )) != 0 )
{
tp->state = kNotInitThId;
rc = _cmThError(&tp->err,kCreateFailThRC,sysErr,"Thread create failed.");
}
hPtr->h = tp;
return rc;
}
cmThRC_t cmThreadDestroy( cmThreadH_t* hPtr )
{
cmThRC_t rc = kOkThRC;
if( hPtr==NULL || cmThreadIsValid(*hPtr)==false )
return rc;
cmThThread_t* t = _cmThThreadFromHandle(*hPtr );
if( t == NULL )
return kInvalidHandleThRC;
// tell the thread to exit
t->doFlags = cmSetFlag(t->doFlags,kDoExitThFl);
// wait for the thread to exit and then deallocate the thread object
if((rc = _cmThWaitForState(t,kExitedThId)) == kOkThRC )
{
cmMemFree(t);
hPtr->h = NULL;
}
else
{
rc = _cmThError(&t->err,rc,0,"Thread timed out waiting for destroy.");
}
return rc;
}
cmThRC_t cmThreadPause( cmThreadH_t h, unsigned cmdFlags )
{
cmThRC_t rc = kOkThRC;
bool pauseFl = cmIsFlag(cmdFlags,kPauseThFl);
bool waitFl = cmIsFlag(cmdFlags,kWaitThFl);
cmThThread_t* t = _cmThThreadFromHandle(h);
unsigned waitId;
if( t == NULL )
return kInvalidHandleThRC;
bool isPausedFl = t->state == kPausedThId;
if( isPausedFl == pauseFl )
return kOkThRC;
if( pauseFl )
{
t->doFlags = cmSetFlag(t->doFlags,kDoPauseThFl);
waitId = kPausedThId;
}
else
{
t->doFlags = cmSetFlag(t->doFlags,kDoRunThFl);
waitId = kRunningThId;
}
if( waitFl )
rc = _cmThWaitForState(t,waitId);
if( rc != kOkThRC )
_cmThError(&t->err,rc,0,"Thread timed out waiting for '%s'.", pauseFl ? "pause" : "un-pause");
return rc;
}
cmThStateId_t cmThreadState( cmThreadH_t h )
{
cmThThread_t* tp = _cmThThreadFromHandle(h);
if( tp == NULL )
return kNotInitThId;
return tp->state;
}
bool cmThreadIsValid( cmThreadH_t h )
{ return h.h != NULL; }
unsigned cmThreadPauseTimeOutMicros( cmThreadH_t h )
{
cmThThread_t* tp = _cmThThreadFromHandle(h);
return tp->pauseMicroSecs;
}
void cmThreadSetPauseTimeOutMicros( cmThreadH_t h, unsigned usecs )
{
cmThThread_t* tp = _cmThThreadFromHandle(h);
tp->pauseMicroSecs = usecs;
}
unsigned cmThreadWaitTimeOutMicros( cmThreadH_t h )
{
cmThThread_t* tp = _cmThThreadFromHandle(h);
return tp->waitMicroSecs;
}
void cmThreadSetWaitTimeOutMicros( cmThreadH_t h, unsigned usecs )
{
cmThThread_t* tp = _cmThThreadFromHandle(h);
tp->waitMicroSecs = usecs;
}
bool _cmThreadTestCb( void* p )
{
unsigned* ip = (unsigned*)p;
ip[0]++;
return true;
}
void cmThreadTest(cmRpt_t* rpt)
{
cmThreadH_t th0;
unsigned val = 0;
if( cmThreadCreate(&th0,_cmThreadTestCb,&val,rpt) == kOkThRC )
{
if( cmThreadPause(th0,0) != kOkThRC )
{
cmRptPrintf(rpt,"Thread start failed.\n");
return;
}
char c = 0;
cmRptPrintf(rpt,"o=print p=pause s=state q=quit\n");
while( c != 'q' )
{
c = (char)fgetc(stdin);
fflush(stdin);
switch(c)
{
case 'o':
cmRptPrintf(rpt,"val: 0x%x\n",val);
break;
case 's':
cmRptPrintf(rpt,"state=%i\n",cmThreadState(th0));
break;
case 'p':
{
cmRC_t rc;
if( cmThreadState(th0) == kPausedThId )
rc = cmThreadPause(th0,kWaitThFl);
else
rc = cmThreadPause(th0,kPauseThFl|kWaitThFl);
if( rc == kOkThRC )
cmRptPrintf(rpt,"new state:%i\n", cmThreadState(th0));
else
cmRptPrintf(rpt,"cmThreadPause() failed.");
}
break;
case 'q':
break;
//default:
//cmRptPrintf(rpt,"Unknown:%c\n",c);
}
}
if( cmThreadDestroy(&th0) != kOkThRC )
cmRptPrintf(rpt,"Thread destroy failed.\n");
}
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
typedef struct
{
cmErr_t err;
pthread_mutex_t mutex;
pthread_cond_t cvar;
} cmThreadMutex_t;
cmThreadMutexH_t kThreadMutexNULL = {NULL};
cmThreadMutex_t* _cmThreadMutexFromHandle( cmThreadMutexH_t h )
{
cmThreadMutex_t* p = (cmThreadMutex_t*)h.h;
assert(p != NULL);
return p;
}
cmThRC_t cmThreadMutexCreate( cmThreadMutexH_t* hPtr, cmRpt_t* rpt )
{
int sysErr;
cmThreadMutex_t* p = cmMemAllocZ( cmThreadMutex_t, 1 );
cmErrSetup(&p->err,rpt,"Thread Mutex");
if((sysErr = pthread_mutex_init(&p->mutex,NULL)) != 0 )
return _cmThError(&p->err,kCreateFailThRC,sysErr,"Thread mutex create failed.");
if((sysErr = pthread_cond_init(&p->cvar,NULL)) != 0 )
return _cmThError(&p->err,kCreateFailThRC,sysErr,"Thread Condition var. create failed.");
hPtr->h = p;
return kOkThRC;
}
cmThRC_t cmThreadMutexDestroy( cmThreadMutexH_t* hPtr )
{
int sysErr;
cmThreadMutex_t* p = _cmThreadMutexFromHandle(*hPtr);
if( p == NULL )
return kInvalidHandleThRC;
if((sysErr = pthread_cond_destroy(&p->cvar)) != 0)
return _cmThError(&p->err,kDestroyFailThRC,sysErr,"Thread condition var. destroy failed.");
if((sysErr = pthread_mutex_destroy(&p->mutex)) != 0)
return _cmThError(&p->err,kDestroyFailThRC,sysErr,"Thread mutex destroy failed.");
cmMemFree(p);
hPtr->h = NULL;
return kOkThRC;
}
cmThRC_t cmThreadMutexTryLock( cmThreadMutexH_t h, bool* lockFlPtr )
{
cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
int sysErr = pthread_mutex_trylock(&p->mutex);
switch(sysErr)
{
case EBUSY:
*lockFlPtr = false;
break;
case 0:
*lockFlPtr = true;
break;
default:
return _cmThError(&p->err,kLockFailThRC,sysErr,"Thread mutex try-lock failed.");;
}
return kOkThRC;
}
cmThRC_t cmThreadMutexLock( cmThreadMutexH_t h )
{
cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
int sysErr = pthread_mutex_lock(&p->mutex);
if( sysErr == 0 )
return kOkThRC;
return _cmThError(&p->err,kLockFailThRC,sysErr,"Thread mutex lock failed.");
}
cmThRC_t cmThreadMutexUnlock( cmThreadMutexH_t h )
{
cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
int sysErr = pthread_mutex_unlock(&p->mutex);
if( sysErr == 0 )
return kOkThRC;
return _cmThError(&p->err,kUnlockFailThRC,sysErr,"Thread mutex unlock failed.");
}
bool cmThreadMutexIsValid( cmThreadMutexH_t h )
{ return h.h != NULL; }
cmThRC_t cmThreadMutexWaitOnCondVar( cmThreadMutexH_t h, bool lockFl )
{
cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
int sysErr;
if( lockFl )
if( (sysErr=pthread_mutex_lock(&p->mutex)) != 0 )
_cmThError(&p->err,kLockFailThRC,sysErr,"Thread lock failed on cond. var. wait.");
if((sysErr = pthread_cond_wait(&p->cvar,&p->mutex)) != 0 )
_cmThError(&p->err,kCVarWaitFailThRC,sysErr,"Thread cond. var. wait failed.");
return kOkThRC;
}
cmThRC_t cmThreadMutexSignalCondVar( cmThreadMutexH_t h )
{
int sysErr;
cmThreadMutex_t* p = _cmThreadMutexFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
if((sysErr = pthread_cond_signal(&p->cvar)) != 0 )
return _cmThError(&p->err,kCVarSignalFailThRC,sysErr,"Thread cond. var. signal failed.");
return kOkThRC;
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
cmTsQueueH_t cmTsQueueNullHandle = { NULL };
enum { cmTsQueueBufCnt = 2 };
typedef struct
{
unsigned allocCnt; // count of bytes allocated for the buffer
unsigned fullCnt; // count of bytes used in the buffer
char* basePtr; // base of buffer memory
unsigned* msgPtr; // pointer to first msg
unsigned msgCnt;
} cmTsQueueBuf;
typedef struct
{
cmThreadMutexH_t mutexH;
cmTsQueueBuf bufArray[cmTsQueueBufCnt];
unsigned inBufIdx;
unsigned outBufIdx;
char* memPtr;
cmTsQueueCb_t cbFunc;
void* userCbPtr;
} cmTsQueue_t;
cmTsQueue_t* _cmTsQueueFromHandle( cmTsQueueH_t h )
{
cmTsQueue_t* p = h.h;
assert(p != NULL);
return p;
}
cmThRC_t _cmTsQueueDestroy( cmTsQueue_t* p )
{
cmThRC_t rc;
if( p == NULL )
return kInvalidHandleThRC;
if( p->mutexH.h != NULL )
if((rc = cmThreadMutexDestroy(&p->mutexH)) != kOkThRC )
return rc;
if( p->memPtr != NULL )
cmMemPtrFree(&p->memPtr);
cmMemPtrFree(&p);
return kOkThRC;
}
cmThRC_t cmTsQueueCreate( cmTsQueueH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* userCbPtr, cmRpt_t* rpt )
{
cmTsQueue_t* p = cmMemAllocZ( cmTsQueue_t, 1 );
unsigned i;
if( cmThreadMutexCreate(&p->mutexH,rpt) != kOkThRC )
goto errLabel;
p->memPtr = cmMemAllocZ( char, bufByteCnt*cmTsQueueBufCnt );
p->outBufIdx = 0;
p->inBufIdx = 1;
p->cbFunc = cbFunc;
p->userCbPtr = userCbPtr;
for(i=0; i<cmTsQueueBufCnt; ++i)
{
p->bufArray[i].allocCnt = bufByteCnt;
p->bufArray[i].fullCnt = 0;
p->bufArray[i].basePtr = p->memPtr + (i*bufByteCnt);
p->bufArray[i].msgPtr = NULL;
p->bufArray[i].msgCnt = 0;
}
hPtr->h = p;
return kOkThRC;
errLabel:
_cmTsQueueDestroy(p);
return kCreateFailThRC;
}
cmThRC_t cmTsQueueDestroy( cmTsQueueH_t* hPtr )
{
cmThRC_t rc = kOkThRC;
if( (hPtr != NULL) && cmTsQueueIsValid(*hPtr))
if((rc = _cmTsQueueDestroy(_cmTsQueueFromHandle(*hPtr))) == kOkThRC )
hPtr->h = NULL;
return rc;
}
cmThRC_t cmTsQueueSetCallback( cmTsQueueH_t h, cmTsQueueCb_t cbFunc, void* cbArg )
{
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
p->cbFunc = cbFunc;
p->userCbPtr = cbArg;
return kOkThRC;
}
unsigned cmTsQueueAllocByteCount( cmTsQueueH_t h )
{
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
unsigned n = 0;
if( cmThreadMutexLock(p->mutexH) == kOkThRC )
{
n = p->bufArray[ p->inBufIdx ].allocCnt;
cmThreadMutexUnlock(p->mutexH);
}
return n;
}
unsigned cmTsQueueAvailByteCount( cmTsQueueH_t h )
{
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
unsigned n = 0;
if(cmThreadMutexLock(p->mutexH) == kOkThRC )
{
n = p->bufArray[ p->inBufIdx ].allocCnt - p->bufArray[ p->inBufIdx].fullCnt;
cmThreadMutexUnlock(p->mutexH);
}
return n;
}
cmThRC_t _cmTsQueueEnqueueMsg( cmTsQueueH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{
cmThRC_t rc;
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
// lock the mutex
if((rc = cmThreadMutexLock(p->mutexH)) == kOkThRC )
{
cmTsQueueBuf* b = p->bufArray + p->inBufIdx; // ptr to buf recd
const char* ep = b->basePtr + b->allocCnt; // end of buf data space
unsigned *mp = (unsigned*)(b->basePtr + b->fullCnt); // ptr to size of new msg space
char* dp = (char*)(mp+1); // ptr to data area of new msg space
unsigned ttlByteCnt = 0; // track size of msg data
unsigned i = 0;
// get the total size of the msg
for(i=0; i<arrayCnt; ++i)
ttlByteCnt += msgByteCntArray[i];
// if the msg is too big for the queue buf
if( dp + ttlByteCnt > ep )
rc = kBufFullThRC;
else
{
// for each segment of the incoming msg
for(i=0; i<arrayCnt; ++i)
{
// get the size of the segment
unsigned n = msgByteCntArray[i];
// copy in the segment
memcpy(dp,msgPtrArray[i],n);
dp += n; //
}
assert(dp <= ep );
// write the size ofthe msg into the buffer
*mp = ttlByteCnt;
// update the pointer to the first msg
if( b->msgPtr == NULL )
b->msgPtr = mp;
// track the count of msgs in this buffer
++b->msgCnt;
// update fullCnt last since dequeue uses fullCnt to
// notice that a msg may be waiting
b->fullCnt += sizeof(unsigned) + ttlByteCnt;
}
cmThreadMutexUnlock(p->mutexH);
}
return rc;
}
cmThRC_t cmTsQueueEnqueueSegMsg( cmTsQueueH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{ return _cmTsQueueEnqueueMsg(h,msgPtrArray,msgByteCntArray,arrayCnt); }
cmThRC_t cmTsQueueEnqueueMsg( cmTsQueueH_t h, const void* dataPtr, unsigned byteCnt )
{
const void* msgPtrArray[] = { dataPtr };
unsigned msgByteCntArray[] = { byteCnt };
return _cmTsQueueEnqueueMsg(h,msgPtrArray,msgByteCntArray,1);
}
cmThRC_t cmTsQueueEnqueueIdMsg( cmTsQueueH_t h, unsigned id, const void* dataPtr, unsigned byteCnt )
{
const void* msgPtrArray[] = { &id, dataPtr };
unsigned msgByteCntArray[] = { sizeof(id), byteCnt };
return _cmTsQueueEnqueueMsg(h,msgPtrArray,msgByteCntArray,2);
}
cmThRC_t _cmTsQueueDequeueMsg( cmTsQueue_t* p, void* retBuf, unsigned refBufByteCnt )
{
cmTsQueueBuf* b = p->bufArray + p->outBufIdx;
// if the output buffer is empty - there is nothing to do
if( b->fullCnt == 0 )
return kBufEmptyThRC;
assert( b->msgPtr != NULL );
// get the output msg size and data
unsigned msgByteCnt = *b->msgPtr;
char* msgDataPtr = (char*)(b->msgPtr + 1);
// transmit the msg via a callback
if( retBuf == NULL && p->cbFunc != NULL )
p->cbFunc(p->userCbPtr,msgByteCnt,msgDataPtr);
else
{
// retBuf may be NULL if the func is being used by cmTsQueueDequeueByteCount()
if( retBuf == NULL || msgByteCnt > refBufByteCnt )
return kBufTooSmallThRC;
// copy the msg to a buffer
if( retBuf != NULL )
memcpy(retBuf,msgDataPtr,msgByteCnt);
}
// update the buffer
b->fullCnt -= sizeof(unsigned) + msgByteCnt;
b->msgPtr = (unsigned*)(msgDataPtr + msgByteCnt);
--(b->msgCnt);
if( b->fullCnt == 0 )
{
assert(b->msgCnt == 0);
b->msgPtr = NULL;
}
return kOkThRC;
}
cmThRC_t cmTsQueueDequeueMsg( cmTsQueueH_t h, void* retBuf, unsigned refBufByteCnt )
{
cmThRC_t rc;
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
if( p == NULL )
return kInvalidHandleThRC;
// dequeue the next msg from the current output buffer
if((rc =_cmTsQueueDequeueMsg( p, retBuf, refBufByteCnt )) != kBufEmptyThRC )
return rc;
// the current output buffer was empty
cmTsQueueBuf* b = p->bufArray + p->inBufIdx;
// if the input buffer has msg's ...
if( b->fullCnt > 0 )
{
bool lockFl = false;
// ...attempt to lock the mutex ...
if( (cmThreadMutexTryLock(p->mutexH,&lockFl) == kOkThRC) && lockFl )
{
// ... swap the input and the output buffers ...
unsigned tmp = p->inBufIdx;
p->inBufIdx = p->outBufIdx;
p->outBufIdx = tmp;
// .. unlock the mutex
cmThreadMutexUnlock(p->mutexH);
// ... and dequeue the first msg from the new output buffer
rc = _cmTsQueueDequeueMsg( p, retBuf, refBufByteCnt );
}
}
return rc;
}
bool cmTsQueueMsgWaiting( cmTsQueueH_t h )
{
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
if( p == NULL )
return false;
if( p->bufArray[p->outBufIdx].fullCnt )
return true;
return p->bufArray[p->inBufIdx].fullCnt > 0;
}
unsigned cmTsQueueDequeueMsgByteCount( cmTsQueueH_t h )
{
cmTsQueue_t* p = _cmTsQueueFromHandle(h);
if( p == NULL )
return 0;
// if output msgs are available then the msgPtr points to the size of the msg
if( p->bufArray[p->outBufIdx].fullCnt )
return *(p->bufArray[p->outBufIdx].msgPtr);
// no msgs are waiting in the output buffer
// force the buffers to swap - returns kBufEmptyThRC if there are
// still no msgs waiting after the swap (the input buf was also empty)
if( cmTsQueueDequeueMsg(h,NULL,0) == kBufTooSmallThRC )
{
// the buffers swapped so there must be msg waiting
assert( p->bufArray[p->outBufIdx].fullCnt );
return *(p->bufArray[p->outBufIdx].msgPtr);
}
return 0;
}
bool cmTsQueueIsValid( cmTsQueueH_t h )
{ return h.h != NULL; }
//--------------------------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------
#ifdef NOT_DEF
enum { kThBufCnt=2 };
typedef struct
{
char* buf;
volatile unsigned ii;
volatile unsigned oi;
} cmThBuf_t;
typedef struct
{
cmErr_t err;
cmThBuf_t a[kThBufCnt];
volatile unsigned ibi;
unsigned bn;
cmTsQueueCb_t cbFunc;
void* cbArg;
} cmTs1p1c_t;
cmTs1p1c_t* _cmTs1p1cHandleToPtr( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = (cmTs1p1c_t*)h.h;
assert( p != NULL );
return p;
}
cmThRC_t _cmTs1p1cDestroy( cmTs1p1c_t* p )
{
unsigned i;
for(i=0; i<kThBufCnt; ++i)
cmMemFree(p->a[i].buf);
cmMemFree(p);
return kOkThRC;
}
cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
{
cmThRC_t rc;
if((rc = cmTs1p1cDestroy(hPtr)) != kOkThRC )
return rc;
unsigned i;
cmTs1p1c_t* p = cmMemAllocZ(cmTs1p1c_t,1);
cmErrSetup(&p->err,rpt,"TS 1p1c Queue");
for(i=0; i<kThBufCnt; ++i)
{
p->a[i].buf = cmMemAllocZ(char,bufByteCnt);
p->a[i].ii = 0;
p->a[i].oi = bufByteCnt;
}
p->ibi = 0;
p->bn = bufByteCnt;
p->cbFunc = cbFunc;
p->cbArg = cbArg;
hPtr->h = p;
return rc;
}
cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hp )
{
cmThRC_t rc = kOkThRC;
if( hp == NULL || cmTs1p1cIsValid(*hp)==false )
return kOkThRC;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(*hp);
if(( rc = _cmTs1p1cDestroy(p)) != kOkThRC )
return rc;
hp->h = NULL;
return rc;
}
cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{
cmThRC_t rc = kOkThRC;
unsigned mn = 0;
unsigned i;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
cmThBuf_t* ib = p->a + p->ibi;
// get the total count of bytes for this msg
for(i=0; i<arrayCnt; ++i)
mn += msgByteCntArray[i];
unsigned dn = mn + sizeof(unsigned);
// if the message is too big for even an empty buffer
if( dn > p->bn )
return cmErrMsg(&p->err,kBufFullThRC,"A msg containing %i bytes will never be able to fit in a queue with an empty size of %i bytes.",dn,p->bn);
// if the msg won't fit in the current input buffer then try swapping buffers.
if( ib->ii + dn > p->bn )
{
// get the current output buffer
cmThBuf_t* ob = p->a + (p->ibi==0 ? 1 : 0);
// Empty buffers will be set such that: oi==bn and ii==0.
//
// Note that setting ii to 0 in an output buffer is the last operation
// performed on an empty output buffer. ii==0 is therefore the
// signal that an output buffer can be reused for input.
// if the output buffer is not empty - then an overflow occurred
if( ob->ii != 0 )
return cmErrMsg(&p->err,kBufFullThRC,"The msq queue cannot accept a %i byte msg into %i bytes.",dn, p->bn - ib->ii);
// setup the initial output location of the new output buffer
ib->oi = 0;
// swap buffers
p->ibi = (p->ibi + 1) % kThBufCnt;
// get the new input buffer
ib = ob;
}
// get a pointer to the base of the write location
char* dp = ib->buf + ib->ii;
// write the length of the message
*(unsigned*)dp = mn;
dp += sizeof(unsigned);
// write the body of the message
for(i=0; i<arrayCnt; ++i)
{
memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
dp += msgByteCntArray[i];
}
// this MUST be executed last - we'll use 'dp' in the calculation
// (even though ib->ii += dn would be more straight forward way
// to accomplish the same thing) to prevent the optimizer from
// moving the assignment prior to the for loop.
ib->ii += dp - (ib->buf + ib->ii);
return rc;
}
cmThRC_t cmTs1p1cEnqueueMsg( cmTsQueueH_t h, const void* dataPtr, unsigned byteCnt )
{ return cmTs1p1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
unsigned cmTs1p1cAllocByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
return p->bn;
}
unsigned cmTs1p1cAvailByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
return p->bn - p->a[ p->ibi ].ii;
}
cmThRC_t cmTs1p1cDequeueMsg( cmTs1p1cH_t h, void* dataPtr, unsigned byteCnt )
{
cmThRC_t rc = kOkThRC;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
cmThBuf_t* ob = p->a + (p->ibi == 0 ? 1 : 0);
// empty buffers always are set to: oi==bn && ii==0
if( ob->oi >= ob->ii )
return kBufEmptyThRC;
// get the size of the msg
unsigned mn = *(unsigned*)(ob->buf + ob->oi);
// increment the current output location to the msg body
ob->oi += sizeof(unsigned);
// copy or send the msg
if( dataPtr != NULL )
{
if( byteCnt < mn )
return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,mn);
memcpy(dataPtr, ob->buf + ob->oi, mn);
}
else
{
p->cbFunc(p->cbArg, mn, ob->buf + ob->oi );
}
ob->oi += mn;
// if we are reading correctly ob->oi should land
// exactly on ob->ii when the buffer is empty
assert( ob->oi <= ob->ii );
// if the buffer is empty
if( ob->oi == ob->ii )
{
ob->oi = p->bn; // mark the buffer as empty
ob->ii = 0; //
}
return rc;
}
unsigned cmTs1p1cDequeueMsgByteCount( cmTsQueueH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
cmThBuf_t* ob = p->a + (p->ibi == 0 ? 1 : 0);
// empty buffers always are set to: oi==bn && ii==0
if( ob->oi >= ob->ii )
return 0;
// get the size of the msg
return *(unsigned*)(ob->buf + ob->oi);
}
bool cmTs1p1cMsgWaiting( cmTsQueueH_t h )
{ return cmTs1p1cDequeueMsgByteCount(h) > 0; }
bool cmTs1p1cIsValid( cmTs1p1cH_t h )
{ return h.h != NULL; }
#endif
//--------------------------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------
typedef struct
{
volatile unsigned ii;
cmErr_t err;
char* buf;
unsigned bn;
cmTsQueueCb_t cbFunc;
void* cbArg;
volatile unsigned oi;
} cmTs1p1c_t;
cmTs1p1c_t* _cmTs1p1cHandleToPtr( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = (cmTs1p1c_t*)h.h;
assert( p != NULL );
return p;
}
cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
{
cmThRC_t rc;
if((rc = cmTs1p1cDestroy(hPtr)) != kOkThRC )
return rc;
cmTs1p1c_t* p = cmMemAllocZ(cmTs1p1c_t,1);
cmErrSetup(&p->err,rpt,"1p1c Queue");
p->buf = cmMemAllocZ(char,bufByteCnt+sizeof(unsigned));
p->ii = 0;
p->oi = 0;
p->bn = bufByteCnt;
p->cbFunc = cbFunc;
p->cbArg = cbArg;
hPtr->h = p;
return rc;
}
cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hp )
{
cmThRC_t rc = kOkThRC;
if( hp == NULL || cmTs1p1cIsValid(*hp)==false )
return kOkThRC;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(*hp);
cmMemFree(p->buf);
cmMemFree(p);
hp->h = NULL;
return rc;
}
cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{
cmThRC_t rc = kOkThRC;
unsigned mn = 0;
unsigned i;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
// get the total count of bytes for this msg
for(i=0; i<arrayCnt; ++i)
mn += msgByteCntArray[i];
int dn = mn + sizeof(unsigned);
int oi = p->oi;
int bi = p->ii; // 'bi' is the idx of the leftmost cell which can be written
int en = p->bn; // 'en' is the idx of the cell just to the right of the rightmost cell that can be written
// note: If 'oi' marks the rightmost location then 'en' must be set
// one cell to the left of 'oi', because 'ii' can never be allowed to
// advance onto 'oi' - because 'oi'=='ii' marks an empty (NOT a full)
// queue.
//
// If 'bn' marks the rightmost location then 'ii' can advance onto 'bn'
// beause the true queue length is bn+1.
// if we need to wrap
if( en-bi < dn && oi<=bi )
{
bi = 0;
en = oi - 1; // note if oi==0 then en is negative - see note above re: oi==ii
assert( p->ii>=0 && p->ii <= p->bn );
*(unsigned*)(p->buf + p->ii) = cmInvalidIdx; // mark the wrap location
}
// if oi is between ii and bn
if( oi > bi )
en = oi - 1; // never allow ii to advance onto oi - see note above
// if the msg won't fit
if( en - bi < dn )
return cmErrMsg(&p->err,kBufFullThRC,"%i consecutive bytes is not available in the queue.",dn);
// set the msg byte count - the msg byte cnt precedes the msg body
char* dp = p->buf + bi;
*(unsigned*)dp = dn - sizeof(unsigned);
dp += sizeof(unsigned);
// copy the msg into the buffer
for(i=0,dn=0; i<arrayCnt; ++i)
{
memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
dp += msgByteCntArray[i];
dn += msgByteCntArray[i];
}
// incrementing p->ii must occur last - the unnecessary accumulation
// of dn in the above loop is intended to prevent this line from
// begin moved before the copy loop.
p->ii = bi + dn + sizeof(unsigned);
assert( p->ii >= 0 && p->ii <= p->bn);
return rc;
}
cmThRC_t cmTs1p1cEnqueueMsg( cmTs1p1cH_t h, const void* dataPtr, unsigned byteCnt )
{ return cmTs1p1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
unsigned cmTs1p1cAllocByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
return p->bn;
}
unsigned cmTs1p1cAvailByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
unsigned oi = p->oi;
unsigned ii = p->ii;
return oi < ii ? p->bn - ii + oi : oi - ii;
}
unsigned _cmTs1p1cDequeueMsgByteCount( cmTs1p1c_t* p )
{
// if the buffer is empty
if( p->ii == p->oi )
return 0;
// get the length of the next msg
unsigned mn = *(unsigned*)(p->buf + p->oi);
// if the msg length is cmInvalidIdx ...
if( mn == cmInvalidIdx )
{
p->oi = 0; // ... wrap to buf begin and try again
return _cmTs1p1cDequeueMsgByteCount(p);
}
return mn;
}
cmThRC_t cmTs1p1cDequeueMsg( cmTs1p1cH_t h, void* dataPtr, unsigned byteCnt )
{
cmThRC_t rc = kOkThRC;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
unsigned mn;
if((mn = _cmTs1p1cDequeueMsgByteCount(p)) == 0 )
return kBufEmptyThRC;
void* mp = p->buf + p->oi + sizeof(unsigned);
if( dataPtr != NULL )
{
if( byteCnt < mn )
return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,mn);
memcpy(dataPtr,mp,mn);
}
else
{
p->cbFunc(p->cbArg,mn,mp);
}
p->oi += mn + sizeof(unsigned);
return rc;
}
unsigned cmTs1p1cDequeueMsgByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
return _cmTs1p1cDequeueMsgByteCount(p);
}
bool cmTs1p1cMsgWaiting( cmTs1p1cH_t h )
{ return cmTs1p1cDequeueMsgByteCount(h) > 0; }
bool cmTs1p1cIsValid( cmTs1p1cH_t h )
{ return h.h != NULL; }
//============================================================================================================================
bool cmThIntCAS( int* addr, int old, int new )
{ return __sync_bool_compare_and_swap(addr,old,new); }
bool cmThUIntCAS( unsigned* addr, unsigned old, unsigned new )
{ return __sync_bool_compare_and_swap(addr,old,new); }
bool cmThFloatCAS( float* addr, float old, float new )
{ return __sync_bool_compare_and_swap((unsigned*)addr, *(unsigned*)(&old),*(unsigned*)(&new)); }
bool cmThPtrCAS( void* addr, void* old, void* neww )
{
#ifdef OS_64
return __sync_bool_compare_and_swap((long long*)addr, (long long)old, (long long)neww);
#else
return __sync_bool_compare_and_swap((int*)addr,(int)old,(int)neww);
#endif
}
void cmThIntIncr( int* addr, int incr )
{
// ... could also use __sync_add_and_fetch() ...
__sync_fetch_and_add(addr,incr);
}
void cmThUIntIncr( unsigned* addr, unsigned incr )
{
__sync_fetch_and_add(addr,incr);
}
void cmThFloatIncr(float* addr, float incr )
{
float old,new;
do
{
old = *addr;
new = old + incr;
}while( cmThFloatCAS(addr,old,new)==0 );
}
void cmThIntDecr( int* addr, int decr )
{
__sync_fetch_and_sub(addr,decr);
}
void cmThUIntDecr( unsigned* addr, unsigned decr )
{
__sync_fetch_and_sub(addr,decr);
}
void cmThFloatDecr(float* addr, float decr )
{
float old,new;
do
{
old = *addr;
new = old - decr;
}while( cmThFloatCAS(addr,old,new)==0 );
}
//============================================================================================================================
//
//
typedef pthread_t cmThreadId_t;
typedef struct
{
cmThreadId_t id; // id of this thread as returned by pthread_self()
char* buf; // buf[bn]
int ii; // input index
int oi; // output index (oi==ii == empty buffer)
} cmTsBuf_t;
// msg header - which is actually written AFTER the msg it is associated with
typedef struct cmTsHdr_str
{
int mn; // length of the msg
int ai; // buffer index
struct cmTsHdr_str* link; // pointer to next msg
} cmTsHdr_t;
typedef struct
{
cmErr_t err;
int bn; // bytes per thread buffer
cmTsBuf_t* a; // a[an] buffer array
unsigned an; // length of a[] - one buffer per thread
cmTsQueueCb_t cbFunc;
void* cbArg;
cmTsHdr_t* ilp; // prev msg hdr record
cmTsHdr_t* olp; // prev msg hdr record (wait for olp->link to be set to go to next record)
} cmTsMp1c_t;
cmTsMp1cH_t cmTsMp1cNullHandle = cmSTATIC_NULL_HANDLE;
void _cmTsMp1cPrint( cmTsMp1c_t* p )
{
unsigned i;
for(i=0; i<p->an; ++i)
printf("%2i ii:%3i oi:%3i\n",i,p->a[i].ii,p->a[i].oi);
}
cmTsMp1c_t* _cmTsMp1cHandleToPtr( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = (cmTsMp1c_t*)h.h;
assert(p != NULL);
return p;
}
unsigned _cmTsMp1cBufIndex( cmTsMp1c_t* p, cmThreadId_t id )
{
unsigned i;
for(i=0; i<p->an; ++i)
if( p->a[i].id == id )
return i;
p->an = i+1;
p->a = cmMemResizePZ(cmTsBuf_t,p->a,p->an);
p->a[i].buf = cmMemAllocZ(char,p->bn);
p->a[i].id = id;
return i;
}
cmThRC_t cmTsMp1cDestroy( cmTsMp1cH_t* hp )
{
if( hp == NULL || cmTsMp1cIsValid(*hp) == false )
return kOkThRC;
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(*hp);
unsigned i;
for(i=0; i<p->an; ++i)
cmMemFree(p->a[i].buf);
cmMemPtrFree(&p->a);
cmMemFree(p);
hp->h = NULL;
return kOkThRC;
}
cmThRC_t cmTsMp1cCreate( cmTsMp1cH_t* hp, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
{
cmThRC_t rc;
if((rc = cmTsMp1cDestroy(hp)) != kOkThRC )
return rc;
cmTsMp1c_t* p = cmMemAllocZ(cmTsMp1c_t,1);
cmErrSetup(&p->err,rpt,"TsMp1c Queue");
p->a = NULL;
p->an = 0;
p->bn = bufByteCnt;
p->cbFunc = cbFunc;
p->cbArg = cbArg;
p->ilp = NULL;
p->olp = NULL;
hp->h = p;
return rc;
}
void cmTsMp1cSetCbFunc( cmTsMp1cH_t h, cmTsQueueCb_t cbFunc, void* cbArg )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
p->cbFunc = cbFunc;
p->cbArg = cbArg;
}
cmTsQueueCb_t cmTsMp1cCbFunc( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
return p->cbFunc;
}
void* cmTsMp1cCbArg( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
return p->cbArg;
}
//#define CAS(addr,old,new) __sync_bool_compare_and_swap(addr,old,new)
//#define CAS(addr,old,neww) cmThPtrCAS(addr,old,neww)
cmThRC_t cmTsMp1cEnqueueSegMsg( cmTsMp1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{
cmThRC_t rc = kOkThRC;
unsigned mn = 0;
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
unsigned ai = _cmTsMp1cBufIndex( p, pthread_self() );
cmTsBuf_t* b = p->a + ai;
int i,bi,ei;
cmTsHdr_t hdr;
// Use a stored oi for the duration of this function.
// b->oi may be changed by the dequeue thread but storing it here
// at least prevents it from changing during the course of the this function.
// Note: b->oi is only used to check for buffer full. Even if it changes
// it would only mean that more bytes were available than calculated based
// on the stored value. A low estimate of the actual bytes available is
// never unsafe.
volatile int oi = b->oi;
// get the total count of bytes for this msg
for(i=0; i<arrayCnt; ++i)
mn += msgByteCntArray[i];
// dn = count of msg bytes + count of header bytes
int dn = mn + sizeof(hdr);
// if oi is ahead of ii in the queue then we must write
// in the area between ii and oi
if( oi > b->ii )
{
ei = oi-1; // (never allow ii to equal oi (that's the empty condition))
bi = b->ii;
}
else // otherwise oi is same or before ii in the queue and we have the option to wrap
{
// if the new msg will not fit at the end of the queue ....
if( b->ii + dn > p->bn )
{
bi = 0; // ... then wrap to the beginning
ei = oi-1; // (never allow ii to equal oi (that's the empty condition))
}
else
{
ei = p->bn; // otherwise write at the current location
bi = b->ii;
}
}
if( bi + dn > ei )
return cmErrMsg(&p->err,kBufFullThRC,"%i consecutive bytes is not available in the queue.",dn);
char* dp = b->buf + bi;
// write the msg
for(i=0; i<arrayCnt; ++i)
{
memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
dp += msgByteCntArray[i];
}
// setup the msg header
hdr.ai = ai;
hdr.mn = mn;
hdr.link = NULL;
// write the msg header (following the msg body in memory)
cmTsHdr_t* hp = (cmTsHdr_t*)dp;
memcpy(hp,&hdr,sizeof(hdr));
// increment the buffers input index
b->ii = bi + dn;
// update the link list head to point to this msg hdr
cmTsHdr_t* old_hp, *new_hp;
do
{
old_hp = p->ilp;
new_hp = hp;
}while(!cmThPtrCAS(&p->ilp,old_hp,new_hp));
// link the prev recd to this recd
if( old_hp != NULL )
old_hp->link = hp;
// if this is the first record written by this queue then prime the output list
do
{
old_hp = p->olp;
new_hp = hp;
if( old_hp != NULL )
break;
}while(!cmThPtrCAS(&p->olp,old_hp,new_hp));
return rc;
}
cmThRC_t cmTsMp1cEnqueueMsg( cmTsMp1cH_t h, const void* dataPtr, unsigned byteCnt )
{ return cmTsMp1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
unsigned cmTsMp1cAllocByteCount( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
return p->bn;
}
unsigned cmTsMp1cAvailByteCount( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
unsigned ai = _cmTsMp1cBufIndex(p,pthread_self());
const cmTsBuf_t* b = p->a + ai;
if( b->oi > b->ii )
return b->oi - b->ii - 1;
return (p->bn - b->ii) + b->oi - 1;
}
unsigned _cmTsMp1cNextMsgByteCnt( cmTsMp1c_t* p )
{
if( p->olp == NULL )
return 0;
// if the current msg has not yet been read
if( p->olp->mn != 0 )
return p->olp->mn;
// if the current msg has been read but a new next msg has been linked
if( p->olp->mn == 0 && p->olp->link != NULL )
{
// advance the buffer output msg past the prev msg header
char* hp = (char*)(p->olp + 1);
p->a[p->olp->ai].oi = hp - p->a[p->olp->ai].buf;
// advance msg pointer to point to the new msg header
p->olp = p->olp->link;
// return the size of the new msg
return p->olp->mn;
}
return 0;
}
cmThRC_t cmTsMp1cDequeueMsg( cmTsMp1cH_t h, void* dataPtr, unsigned byteCnt )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
// if there are no messages waiting
if( _cmTsMp1cNextMsgByteCnt(p) == 0 )
return kBufEmptyThRC;
char* hp = (char*)p->olp;
char* dp = hp - p->olp->mn; // the msg body is before the msg hdr
if( dataPtr == NULL )
{
p->cbFunc(p->cbArg,p->olp->mn,dp);
}
else
{
if( p->olp->mn > byteCnt )
return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,p->olp->mn);
memcpy(dataPtr,dp,p->olp->mn);
}
// advance the buffers output index past the msg body
p->a[p->olp->ai].oi = hp - p->a[p->olp->ai].buf;
// mark the msg as read
p->olp->mn = 0;
return kOkThRC;
}
bool cmTsMp1cMsgWaiting( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
return _cmTsMp1cNextMsgByteCnt(p) != 0;
}
unsigned cmTsMp1cDequeueMsgByteCount( cmTsMp1cH_t h )
{
cmTsMp1c_t* p = _cmTsMp1cHandleToPtr(h);
return _cmTsMp1cNextMsgByteCnt(p);
}
bool cmTsMp1cIsValid( cmTsMp1cH_t h )
{ return h.h != NULL; }
//============================================================================================================================/
//
// cmTsQueueTest()
//
// param recd for use by _cmTsQueueCb0() and
// the msg record passed between the sender
// threads and the receiver thread
typedef struct
{
unsigned id;
cmTsQueueH_t qH;
int val;
} _cmTsQCbParam_t;
// Generate a random number and put it in a TS queue
bool _cmTsQueueCb0(void* param)
{
_cmTsQCbParam_t* p = (_cmTsQCbParam_t*)param;
p->val = rand(); // generate a random number
// send the msg
if( cmTsQueueEnqueueMsg( p->qH, p, sizeof(_cmTsQCbParam_t)) == kOkThRC )
printf("in:%i %i\n",p->id,p->val);
else
printf("in error %i\n",p->id);
cmSleepUs(100*1000);
return true;
}
// Monitor a TS queue for incoming messages from _cmTsQueueCb1()
bool _cmTsQueueCb1(void* param)
{
// the thread param is a ptr to the TS queue to monitor.
cmTsQueueH_t* qp = (cmTsQueueH_t*)param;
cmThRC_t rc;
_cmTsQCbParam_t msg;
// dequeue any waiting messages
if((rc = cmTsQueueDequeueMsg( *qp, &msg, sizeof(msg))) == kOkThRC )
printf("out:%i %i\n",msg.id,msg.val);
else
{
if( rc != kBufEmptyThRC )
printf("out error:%i\n", rc);
}
return true;
}
// Test the TS queue by starting sender threads (threads 0 & 1)
// and a receiver thread (thread 2) and sending messages
// from the sender to the receiver.
void cmTsQueueTest( cmRpt_t* rpt )
{
cmThreadH_t th0=cmThreadNullHandle,th1=cmThreadNullHandle,th2=cmThreadNullHandle;
cmTsQueueH_t q=cmTsQueueNullHandle;
_cmTsQCbParam_t param0, param1;
// create a TS Queue
if( cmTsQueueCreate(&q,100,NULL,NULL,rpt) != kOkThRC )
goto errLabel;
// create thread 0
param0.id = 0;
param0.qH = q;
if( cmThreadCreate(&th0,_cmTsQueueCb0,&param0,rpt) != kOkThRC )
goto errLabel;
// create thread 1
param1.id = 1;
param1.qH = q;
if( cmThreadCreate(&th1,_cmTsQueueCb0,&param1,rpt) != kOkThRC )
goto errLabel;
// create thread 2
if( cmThreadCreate(&th2,_cmTsQueueCb1,&q,rpt) != kOkThRC )
goto errLabel;
// start thread 0
if( cmThreadPause(th0,0) != kOkThRC )
goto errLabel;
// start thread 1
if( cmThreadPause(th1,0) != kOkThRC )
goto errLabel;
// start thread 2
if( cmThreadPause(th2,0) != kOkThRC )
goto errLabel;
printf("any key to quit.");
getchar();
errLabel:
if( cmThreadIsValid(th0) )
if( cmThreadDestroy(&th0) != kOkThRC )
printf("Error destroying thread 0\n");
if( cmThreadIsValid(th1) )
if( cmThreadDestroy(&th1) != kOkThRC )
printf("Error destroying thread 1\n");
if( cmThreadIsValid(th2) )
if( cmThreadDestroy(&th2) != kOkThRC )
printf("Error destroying thread 1\n");
if( cmTsQueueIsValid(q) )
if( cmTsQueueDestroy(&q) != kOkThRC )
printf("Error destroying queue\n");
}
//============================================================================================================================/
//
// cmTs1p1cTest()
//
// param recd for use by _cmTsQueueCb0() and
// the msg record passed between the sender
// threads and the receiver thread
typedef struct
{
unsigned id;
cmTs1p1cH_t qH;
int val;
} _cmTs1p1cCbParam_t;
cmTs1p1cH_t cmTs1p1cNullHandle = cmSTATIC_NULL_HANDLE;
// Generate a random number and put it in a TS queue
bool _cmTs1p1cCb0(void* param)
{
_cmTs1p1cCbParam_t* p = (_cmTs1p1cCbParam_t*)param;
p->val = rand(); // generate a random number
// send the msg
if( cmTs1p1cEnqueueMsg( p->qH, p, sizeof(_cmTs1p1cCbParam_t)) == kOkThRC )
printf("in:%i %i\n",p->id,p->val);
else
printf("in error %i\n",p->id);
++p->id;
cmSleepUs(100*1000);
return true;
}
// Monitor a TS queue for incoming messages from _cmTs1p1cCb1()
bool _cmTs1p1cCb1(void* param)
{
// the thread param is a ptr to the TS queue to monitor.
cmTs1p1cH_t* qp = (cmTs1p1cH_t*)param;
cmThRC_t rc;
_cmTs1p1cCbParam_t msg;
// dequeue any waiting messages
if((rc = cmTs1p1cDequeueMsg( *qp, &msg, sizeof(msg))) == kOkThRC )
printf("out:%i %i\n",msg.id,msg.val);
else
{
if( rc != kBufEmptyThRC )
printf("out error:%i\n", rc);
}
return true;
}
// Test the TS queue by starting sender threads (threads 0 & 1)
// and a receiver thread (thread 2) and sending messages
// from the sender to the receiver.
void cmTs1p1cTest( cmRpt_t* rpt )
{
cmThreadH_t th0=cmThreadNullHandle,th1=cmThreadNullHandle,th2=cmThreadNullHandle;
cmTs1p1cH_t q=cmTs1p1cNullHandle;
_cmTs1p1cCbParam_t param1;
// create a TS Queue
if( cmTs1p1cCreate(&q,28*2,NULL,NULL,rpt) != kOkThRC )
goto errLabel;
// create thread 1
param1.id = 0;
param1.qH = q;
if( cmThreadCreate(&th1,_cmTs1p1cCb0,&param1,rpt) != kOkThRC )
goto errLabel;
// create thread 2
if( cmThreadCreate(&th2,_cmTs1p1cCb1,&q,rpt) != kOkThRC )
goto errLabel;
// start thread 1
if( cmThreadPause(th1,0) != kOkThRC )
goto errLabel;
// start thread 2
if( cmThreadPause(th2,0) != kOkThRC )
goto errLabel;
printf("any key to quit.");
getchar();
errLabel:
if( cmThreadIsValid(th0) )
if( cmThreadDestroy(&th0) != kOkThRC )
printf("Error destroying thread 0\n");
if( cmThreadIsValid(th1) )
if( cmThreadDestroy(&th1) != kOkThRC )
printf("Error destroying thread 1\n");
if( cmThreadIsValid(th2) )
if( cmThreadDestroy(&th2) != kOkThRC )
printf("Error destroying thread 1\n");
if( cmTs1p1cIsValid(q) )
if( cmTs1p1cDestroy(&q) != kOkThRC )
printf("Error destroying queue\n");
}
//============================================================================================================================/
//
// cmTsMp1cTest()
//
// param recd for use by _cmTsQueueCb0() and
// the msg record passed between the sender
// threads and the receiver thread
typedef struct
{
unsigned id;
cmTsMp1cH_t qH;
int val;
} _cmTsMp1cCbParam_t;
unsigned _cmTsMp1cVal = 0;
// Incr the global value _cmTsMp1cVal and put it in a TS queue
bool _cmTsMp1cCb0(void* param)
{
_cmTsMp1cCbParam_t* p = (_cmTsMp1cCbParam_t*)param;
p->val = __sync_fetch_and_add(&_cmTsMp1cVal,1);
// send the msg
if( cmTsMp1cEnqueueMsg( p->qH, p, sizeof(_cmTsMp1cCbParam_t)) == kOkThRC )
printf("in:%i %i\n",p->id,p->val);
else
printf("in error %i\n",p->id);
cmSleepUs(100*1000);
return true;
}
// Monitor a TS queue for incoming messages from _cmTsMp1cCb1()
bool _cmTsMp1cCb1(void* param)
{
// the thread param is a ptr to the TS queue to monitor.
cmTsMp1cH_t* qp = (cmTsMp1cH_t*)param;
cmThRC_t rc;
_cmTsMp1cCbParam_t msg;
// dequeue any waiting messages
if((rc = cmTsMp1cDequeueMsg( *qp, &msg, sizeof(msg))) == kOkThRC )
printf("out - cons id:%i val:%i\n",msg.id,msg.val);
else
{
if( rc != kBufEmptyThRC )
printf("out error:%i\n", rc);
}
return true;
}
// Test the TS queue by starting sender threads (threads 0 & 1)
// and a receiver thread (thread 2) and sending messages
// from the sender to the receiver.
void cmTsMp1cTest( cmRpt_t* rpt )
{
cmThreadH_t th0=cmThreadNullHandle,th1=cmThreadNullHandle,th2=cmThreadNullHandle;
cmTsMp1cH_t q=cmTsMp1cNullHandle;
_cmTsMp1cCbParam_t param0, param1;
// create a TS Queue
if( cmTsMp1cCreate(&q,1000,NULL,NULL,rpt) != kOkThRC )
goto errLabel;
// create thread 0 - producer 0
param0.id = 0;
param0.qH = q;
if( cmThreadCreate(&th0,_cmTsMp1cCb0,&param0,rpt) != kOkThRC )
goto errLabel;
// create thread 1 - producer 1
param1.id = 1;
param1.qH = q;
if( cmThreadCreate(&th1,_cmTsMp1cCb0,&param1,rpt) != kOkThRC )
goto errLabel;
// create thread 2 - consumer 0
if( cmThreadCreate(&th2,_cmTsMp1cCb1,&q,rpt) != kOkThRC )
goto errLabel;
// start thread 0
if( cmThreadPause(th0,0) != kOkThRC )
goto errLabel;
// start thread 1
if( cmThreadPause(th1,0) != kOkThRC )
goto errLabel;
// start thread 2
if( cmThreadPause(th2,0) != kOkThRC )
goto errLabel;
printf("any key to quit.");
getchar();
errLabel:
if( cmThreadIsValid(th0) )
if( cmThreadDestroy(&th0) != kOkThRC )
printf("Error destroying thread 0\n");
if( cmThreadIsValid(th1) )
if( cmThreadDestroy(&th1) != kOkThRC )
printf("Error destroying thread 1\n");
if( cmThreadIsValid(th2) )
if( cmThreadDestroy(&th2) != kOkThRC )
printf("Error destroying thread 1\n");
if( cmTsMp1cIsValid(q) )
if( cmTsMp1cDestroy(&q) != kOkThRC )
printf("Error destroying queue\n");
}
void cmSleepUs( unsigned microseconds )
{ usleep(microseconds); }
void cmSleepMs( unsigned milliseconds )
{ cmSleepUs(milliseconds*1000); }