From 089e10e3633ce702bc32bf5d20ee946f002e69b0 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 16 Sep 2024 10:43:02 -0400 Subject: [PATCH] cwFlowNet.cpp,cwFlowProc.cpp,proc_dict.cfg : Implement 'poly' parallel execution. --- cwFlowNet.cpp | 17 +++++++------ cwFlowProc.cpp | 63 ++++++++++++++++++++++++++++++++++++++-------- flow/proc_dict.cfg | 9 ++----- 3 files changed, 63 insertions(+), 26 deletions(-) diff --git a/cwFlowNet.cpp b/cwFlowNet.cpp index cac5edf..cd8fbe3 100644 --- a/cwFlowNet.cpp +++ b/cwFlowNet.cpp @@ -2865,14 +2865,13 @@ namespace cw } } -cw::rc_t cw::flow::network_create( flow_t* p, - const object_t* networkCfg, - network_t& net, - variable_t* proxyVarL, - unsigned polyCnt ) +cw::rc_t cw::flow::network_create( flow_t* p, + const object_t* networkCfg, + network_t& net, + variable_t* proxyVarL, + unsigned polyCnt ) { rc_t rc = kOkRC; - if((rc = networkCfg->getv("procs",net.procsCfg)) != kOkRC ) { rc = cwLogError(rc,"Failed on parsing required network cfg. elements."); @@ -2890,10 +2889,12 @@ cw::rc_t cw::flow::network_create( flow_t* p, net.proc_arrayN = 0; net.poly_voiceA = mem::allocZ(polyCnt); + // for each subnet for(unsigned i=0; i= net.proc_arrayN ) { diff --git a/cwFlowProc.cpp b/cwFlowProc.cpp index c937d68..a74269f 100644 --- a/cwFlowProc.cpp +++ b/cwFlowProc.cpp @@ -220,18 +220,38 @@ namespace cw kCountPId, }; + typedef struct voice_str + { + unsigned voice_idx; + struct network_str* net; + } voice_t; + typedef struct { - unsigned count; - network_t net; - bool parallel_fl; - thread_mach::handle_t threadMachH; + unsigned count; // count of duplicate subnets in 'net' + network_t net; // internal network containing 'count' duplicate sub-nets + bool parallel_fl; // true if the subnets should be executed in parallel + thread_tasks::handle_t threadTasksH; // + thread_tasks::task_t* taskA; // taskA[ count ] + voice_t* voiceA; // voiceA[ count ] + } inst_t; - bool _thread_func( void* arg ) + rc_t _voice_thread_func( void* arg ) { + rc_t rc = kOkRC; + voice_t* v = (voice_t*)arg; + + poly_voice_t* pv = v->net->poly_voiceA + v->voice_idx; - return true; + if((rc = exec_cycle(*v->net, pv->proc_idx, pv->proc_cnt)) != kOkRC ) + { + rc = cwLogError(rc,"Parallel subnet exec failed on voice %i.",v->voice_idx); + goto errLabel; + } + + errLabel: + return rc; } @@ -247,8 +267,10 @@ namespace cw if((rc = var_register_and_get( proc, kAnyChIdx, kParallelFlPId, "parallel_fl", kBaseSfxId, inst->parallel_fl, kCountPId, "count", kBaseSfxId, inst->count )) != kOkRC ) + { goto errLabel; - + } + if( inst->count == 0 ) { cwLogWarning("The 'poly' %s:%i was given a count of 0.",proc->label,proc->label_sfx_id); @@ -261,7 +283,8 @@ namespace cw goto errLabel; } - + // create the network object - which will hold 'count' subnets - each a duplicate of the + // network described by 'networkCfg'. if((rc = network_create(proc->ctx,networkCfg,inst->net,proxyVarL,inst->count)) != kOkRC ) { rc = cwLogError(rc,"Creation failed on the internal network."); @@ -270,12 +293,25 @@ namespace cw if( inst->parallel_fl ) { - if((rc = thread_mach::create( inst->threadMachH, _thread_func, proc->net->poly_voiceA, sizeof(poly_voice_t), inst->count )) != kOkRC ) + // create a thread_tasks object + if((rc = thread_tasks::create( inst->threadTasksH, inst->count )) != kOkRC ) { rc = cwLogError(rc,"Thread machine create failed."); goto errLabel; } + // the taskA[] array is needed to hold voice specific info. for the call to thread_tasks::run() + inst->taskA = mem::allocZ(inst->count); + inst->voiceA = mem::allocZ(inst->count); + + for(unsigned i=0; icount; ++i) + { + inst->voiceA[i].voice_idx = i; + inst->voiceA[i].net = &inst->net; + + inst->taskA[i].func = _voice_thread_func; + inst->taskA[i].arg = inst->voiceA + i; + } } @@ -292,7 +328,9 @@ namespace cw inst_t* p = (inst_t*)proc->userPtr; network_destroy(p->net); - thread_mach::destroy(p->threadMachH); + thread_tasks::destroy(p->threadTasksH); + mem::release( p->taskA); + mem::release( p->voiceA); mem::release( proc->userPtr ); return kOkRC; @@ -310,7 +348,10 @@ namespace cw if( p->parallel_fl ) { - + if((rc = thread_tasks::run(p->threadTasksH,p->taskA,p->count)) != kOkRC ) + { + rc = cwLogError(rc,"poly internal network parallel exec failed."); + } } else { diff --git a/flow/proc_dict.cfg b/flow/proc_dict.cfg index 8ea04d8..4f91f27 100644 --- a/flow/proc_dict.cfg +++ b/flow/proc_dict.cfg @@ -558,15 +558,10 @@ } } - subnet: { - vars: { - } - } - poly: { vars: { - count: { type:uint, flags:['init'], value:1, doc:"Count of network duplicates." }, - parallel_fl: { type:bool, flags:['init'], value:false, doc:"True to run voices concurrently." } + count: { type:uint, flags:["init"], value:1, doc:"Count of network duplicates." }, + parallel_fl: { type:bool, flags:["init"], value:false, doc:"True to run voices concurrently." } } }