cwFlowNet.cpp,cwFlowProc.cpp,proc_dict.cfg : Implement 'poly' parallel execution.

This commit is contained in:
kevin 2024-09-16 10:43:02 -04:00
parent b0ecdefa39
commit 089e10e363
3 changed files with 63 additions and 26 deletions

View File

@ -2865,14 +2865,13 @@ namespace cw
}
}
cw::rc_t cw::flow::network_create( flow_t* p,
const object_t* networkCfg,
network_t& net,
variable_t* proxyVarL,
unsigned polyCnt )
cw::rc_t cw::flow::network_create( flow_t* p,
const object_t* networkCfg,
network_t& net,
variable_t* proxyVarL,
unsigned polyCnt )
{
rc_t rc = kOkRC;
if((rc = networkCfg->getv("procs",net.procsCfg)) != kOkRC )
{
rc = cwLogError(rc,"Failed on parsing required network cfg. elements.");
@ -2890,10 +2889,12 @@ cw::rc_t cw::flow::network_create( flow_t* p,
net.proc_arrayN = 0;
net.poly_voiceA = mem::allocZ<poly_voice_t>(polyCnt);
// for each subnet
for(unsigned i=0; i<polyCnt; ++i)
{
assert( i<polyCnt && net.proc_arrayN < net.proc_arrayAllocN );
// track the first proc in each subnet
net.poly_voiceA[i].proc_idx = net.proc_arrayN;
net.poly_voiceA[i].net = &net;
@ -2962,7 +2963,7 @@ cw::rc_t cw::flow::exec_cycle( network_t& net, unsigned proc_idx, unsigned proc_
proc_cnt = net.proc_arrayN;
for(unsigned i=proc_idx; i<proc_cnt; ++i)
for(unsigned i=proc_idx; i<proc_idx+proc_cnt; ++i)
{
if( i >= net.proc_arrayN )
{

View File

@ -220,18 +220,38 @@ namespace cw
kCountPId,
};
typedef struct voice_str
{
unsigned voice_idx;
struct network_str* net;
} voice_t;
typedef struct
{
unsigned count;
network_t net;
bool parallel_fl;
thread_mach::handle_t threadMachH;
unsigned count; // count of duplicate subnets in 'net'
network_t net; // internal network containing 'count' duplicate sub-nets
bool parallel_fl; // true if the subnets should be executed in parallel
thread_tasks::handle_t threadTasksH; //
thread_tasks::task_t* taskA; // taskA[ count ]
voice_t* voiceA; // voiceA[ count ]
} inst_t;
bool _thread_func( void* arg )
rc_t _voice_thread_func( void* arg )
{
rc_t rc = kOkRC;
voice_t* v = (voice_t*)arg;
poly_voice_t* pv = v->net->poly_voiceA + v->voice_idx;
return true;
if((rc = exec_cycle(*v->net, pv->proc_idx, pv->proc_cnt)) != kOkRC )
{
rc = cwLogError(rc,"Parallel subnet exec failed on voice %i.",v->voice_idx);
goto errLabel;
}
errLabel:
return rc;
}
@ -247,8 +267,10 @@ namespace cw
if((rc = var_register_and_get( proc, kAnyChIdx,
kParallelFlPId, "parallel_fl", kBaseSfxId, inst->parallel_fl,
kCountPId, "count", kBaseSfxId, inst->count )) != kOkRC )
{
goto errLabel;
}
if( inst->count == 0 )
{
cwLogWarning("The 'poly' %s:%i was given a count of 0.",proc->label,proc->label_sfx_id);
@ -261,7 +283,8 @@ namespace cw
goto errLabel;
}
// create the network object - which will hold 'count' subnets - each a duplicate of the
// network described by 'networkCfg'.
if((rc = network_create(proc->ctx,networkCfg,inst->net,proxyVarL,inst->count)) != kOkRC )
{
rc = cwLogError(rc,"Creation failed on the internal network.");
@ -270,12 +293,25 @@ namespace cw
if( inst->parallel_fl )
{
if((rc = thread_mach::create( inst->threadMachH, _thread_func, proc->net->poly_voiceA, sizeof(poly_voice_t), inst->count )) != kOkRC )
// create a thread_tasks object
if((rc = thread_tasks::create( inst->threadTasksH, inst->count )) != kOkRC )
{
rc = cwLogError(rc,"Thread machine create failed.");
goto errLabel;
}
// the taskA[] array is needed to hold voice specific info. for the call to thread_tasks::run()
inst->taskA = mem::allocZ<thread_tasks::task_t>(inst->count);
inst->voiceA = mem::allocZ<voice_t>(inst->count);
for(unsigned i=0; i<inst->count; ++i)
{
inst->voiceA[i].voice_idx = i;
inst->voiceA[i].net = &inst->net;
inst->taskA[i].func = _voice_thread_func;
inst->taskA[i].arg = inst->voiceA + i;
}
}
@ -292,7 +328,9 @@ namespace cw
inst_t* p = (inst_t*)proc->userPtr;
network_destroy(p->net);
thread_mach::destroy(p->threadMachH);
thread_tasks::destroy(p->threadTasksH);
mem::release( p->taskA);
mem::release( p->voiceA);
mem::release( proc->userPtr );
return kOkRC;
@ -310,7 +348,10 @@ namespace cw
if( p->parallel_fl )
{
if((rc = thread_tasks::run(p->threadTasksH,p->taskA,p->count)) != kOkRC )
{
rc = cwLogError(rc,"poly internal network parallel exec failed.");
}
}
else
{

View File

@ -558,15 +558,10 @@
}
}
subnet: {
vars: {
}
}
poly: {
vars: {
count: { type:uint, flags:['init'], value:1, doc:"Count of network duplicates." },
parallel_fl: { type:bool, flags:['init'], value:false, doc:"True to run voices concurrently." }
count: { type:uint, flags:["init"], value:1, doc:"Count of network duplicates." },
parallel_fl: { type:bool, flags:["init"], value:false, doc:"True to run voices concurrently." }
}
}