cmThread.h/c: Added cmTs1p1cSetCallback().

This commit is contained in:
kpl 2013-10-20 20:44:08 -07:00
parent 53948d977d
commit c1289070d2
2 changed files with 11 additions and 226 deletions

View File

@ -825,233 +825,7 @@ bool cmTsQueueIsValid( cmTsQueueH_t h )
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
#ifdef NOT_DEF
enum { kThBufCnt=2 };
typedef struct
{
char* buf;
volatile unsigned ii;
volatile unsigned oi;
} cmThBuf_t;
typedef struct
{
cmErr_t err;
cmThBuf_t a[kThBufCnt];
volatile unsigned ibi;
unsigned bn;
cmTsQueueCb_t cbFunc;
void* cbArg;
} cmTs1p1c_t;
cmTs1p1c_t* _cmTs1p1cHandleToPtr( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = (cmTs1p1c_t*)h.h;
assert( p != NULL );
return p;
}
cmThRC_t _cmTs1p1cDestroy( cmTs1p1c_t* p )
{
unsigned i;
for(i=0; i<kThBufCnt; ++i)
cmMemFree(p->a[i].buf);
cmMemFree(p);
return kOkThRC;
}
cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt )
{
cmThRC_t rc;
if((rc = cmTs1p1cDestroy(hPtr)) != kOkThRC )
return rc;
unsigned i;
cmTs1p1c_t* p = cmMemAllocZ(cmTs1p1c_t,1);
cmErrSetup(&p->err,rpt,"TS 1p1c Queue");
for(i=0; i<kThBufCnt; ++i)
{
p->a[i].buf = cmMemAllocZ(char,bufByteCnt);
p->a[i].ii = 0;
p->a[i].oi = bufByteCnt;
}
p->ibi = 0;
p->bn = bufByteCnt;
p->cbFunc = cbFunc;
p->cbArg = cbArg;
hPtr->h = p;
return rc;
}
cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hp )
{
cmThRC_t rc = kOkThRC;
if( hp == NULL || cmTs1p1cIsValid(*hp)==false )
return kOkThRC;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(*hp);
if(( rc = _cmTs1p1cDestroy(p)) != kOkThRC )
return rc;
hp->h = NULL;
return rc;
}
cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{
cmThRC_t rc = kOkThRC;
unsigned mn = 0;
unsigned i;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
cmThBuf_t* ib = p->a + p->ibi;
// get the total count of bytes for this msg
for(i=0; i<arrayCnt; ++i)
mn += msgByteCntArray[i];
unsigned dn = mn + sizeof(unsigned);
// if the message is too big for even an empty buffer
if( dn > p->bn )
return cmErrMsg(&p->err,kBufFullThRC,"A msg containing %i bytes will never be able to fit in a queue with an empty size of %i bytes.",dn,p->bn);
// if the msg won't fit in the current input buffer then try swapping buffers.
if( ib->ii + dn > p->bn )
{
// get the current output buffer
cmThBuf_t* ob = p->a + (p->ibi==0 ? 1 : 0);
// Empty buffers will be set such that: oi==bn and ii==0.
//
// Note that setting ii to 0 in an output buffer is the last operation
// performed on an empty output buffer. ii==0 is therefore the
// signal that an output buffer can be reused for input.
// if the output buffer is not empty - then an overflow occurred
if( ob->ii != 0 )
return cmErrMsg(&p->err,kBufFullThRC,"The msq queue cannot accept a %i byte msg into %i bytes.",dn, p->bn - ib->ii);
// setup the initial output location of the new output buffer
ib->oi = 0;
// swap buffers
p->ibi = (p->ibi + 1) % kThBufCnt;
// get the new input buffer
ib = ob;
}
// get a pointer to the base of the write location
char* dp = ib->buf + ib->ii;
// write the length of the message
*(unsigned*)dp = mn;
dp += sizeof(unsigned);
// write the body of the message
for(i=0; i<arrayCnt; ++i)
{
memcpy(dp,msgPtrArray[i],msgByteCntArray[i]);
dp += msgByteCntArray[i];
}
// this MUST be executed last - we'll use 'dp' in the calculation
// (even though ib->ii += dn would be more straight forward way
// to accomplish the same thing) to prevent the optimizer from
// moving the assignment prior to the for loop.
ib->ii += dp - (ib->buf + ib->ii);
return rc;
}
cmThRC_t cmTs1p1cEnqueueMsg( cmTsQueueH_t h, const void* dataPtr, unsigned byteCnt )
{ return cmTs1p1cEnqueueSegMsg(h,&dataPtr,&byteCnt,1); }
unsigned cmTs1p1cAllocByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
return p->bn;
}
unsigned cmTs1p1cAvailByteCount( cmTs1p1cH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
return p->bn - p->a[ p->ibi ].ii;
}
cmThRC_t cmTs1p1cDequeueMsg( cmTs1p1cH_t h, void* dataPtr, unsigned byteCnt )
{
cmThRC_t rc = kOkThRC;
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
cmThBuf_t* ob = p->a + (p->ibi == 0 ? 1 : 0);
// empty buffers always are set to: oi==bn && ii==0
if( ob->oi >= ob->ii )
return kBufEmptyThRC;
// get the size of the msg
unsigned mn = *(unsigned*)(ob->buf + ob->oi);
// increment the current output location to the msg body
ob->oi += sizeof(unsigned);
// copy or send the msg
if( dataPtr != NULL )
{
if( byteCnt < mn )
return cmErrMsg(&p->err,kBufTooSmallThRC,"The return buffer constains too few bytes (%i) to contain %i bytes.",byteCnt,mn);
memcpy(dataPtr, ob->buf + ob->oi, mn);
}
else
{
p->cbFunc(p->cbArg, mn, ob->buf + ob->oi );
}
ob->oi += mn;
// if we are reading correctly ob->oi should land
// exactly on ob->ii when the buffer is empty
assert( ob->oi <= ob->ii );
// if the buffer is empty
if( ob->oi == ob->ii )
{
ob->oi = p->bn; // mark the buffer as empty
ob->ii = 0; //
}
return rc;
}
unsigned cmTs1p1cDequeueMsgByteCount( cmTsQueueH_t h )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
cmThBuf_t* ob = p->a + (p->ibi == 0 ? 1 : 0);
// empty buffers always are set to: oi==bn && ii==0
if( ob->oi >= ob->ii )
return 0;
// get the size of the msg
return *(unsigned*)(ob->buf + ob->oi);
}
bool cmTs1p1cMsgWaiting( cmTsQueueH_t h )
{ return cmTs1p1cDequeueMsgByteCount(h) > 0; }
bool cmTs1p1cIsValid( cmTs1p1cH_t h )
{ return h.h != NULL; }
#endif
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
@ -1110,6 +884,14 @@ cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hp )
return rc; return rc;
} }
cmThRC_t cmTs1p1cSetCallback( cmTs1p1cH_t h, cmTsQueueCb_t cbFunc, void* cbArg )
{
cmTs1p1c_t* p = _cmTs1p1cHandleToPtr(h);
p->cbFunc = cbFunc;
p->cbArg = cbArg;
return kOkThRC;
}
cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt ) cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt )
{ {
cmThRC_t rc = kOkThRC; cmThRC_t rc = kOkThRC;

View File

@ -200,8 +200,11 @@ extern "C" {
extern cmTs1p1cH_t cmTs1p1cNullHandle; extern cmTs1p1cH_t cmTs1p1cNullHandle;
cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt ); cmThRC_t cmTs1p1cCreate( cmTs1p1cH_t* hPtr, unsigned bufByteCnt, cmTsQueueCb_t cbFunc, void* cbArg, cmRpt_t* rpt );
cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hPtr ); cmThRC_t cmTs1p1cDestroy( cmTs1p1cH_t* hPtr );
cmThRC_t cmTs1p1cSetCallback( cmTs1p1cH_t h, cmTsQueueCb_t cbFunc, void* cbArg );
cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt ); cmThRC_t cmTs1p1cEnqueueSegMsg( cmTs1p1cH_t h, const void* msgPtrArray[], unsigned msgByteCntArray[], unsigned arrayCnt );
cmThRC_t cmTs1p1cEnqueueMsg( cmTs1p1cH_t h, const void* dataPtr, unsigned byteCnt ); cmThRC_t cmTs1p1cEnqueueMsg( cmTs1p1cH_t h, const void* dataPtr, unsigned byteCnt );