cwFlowTypes.h/cpp, cwFlowNet.cpp : Initial implementation of proc_t.modVarMapA[].

This functionality however is not yet enabled.
This commit is contained in:
kevin 2025-03-19 16:34:02 -04:00
parent bdd458f902
commit 86afbe06e0
3 changed files with 77 additions and 4 deletions

View File

@ -1337,10 +1337,12 @@ namespace cw
if( max_vid != kInvalidId ) if( max_vid != kInvalidId )
{ {
// create the variable map array // create the variable map array
proc->varMapChN = max_chIdx + 1; proc->varMapChN = max_chIdx + 1;
proc->varMapIdN = max_vid + 1; proc->varMapIdN = max_vid + 1;
proc->varMapN = proc->varMapIdN * proc->varMapChN; proc->varMapN = proc->varMapIdN * proc->varMapChN;
proc->varMapA = mem::allocZ<variable_t*>( proc->varMapN ); proc->varMapA = mem::allocZ<variable_t*>( proc->varMapN );
proc->modVarMapN = proc->varMapN;
proc->modVarMapA = mem::allocZ<variable_t*>( proc->modVarMapN );
// assign each variable to a location in the map // assign each variable to a location in the map
for(variable_t* var=proc->varL; var!=nullptr; var=var->var_link) for(variable_t* var=proc->varL; var!=nullptr; var=var->var_link)

View File

@ -157,6 +157,67 @@ namespace cw
} }
// Incr the var->modN value and put the var pointer in var->proc->modVarMapA[]
// where it will be picked up by a later call to _mod-var_map_dispatch().
// This function runs in a multi-thread context.
rc_t _mod_var_map_update( variable_t* var )
{
// if the var is in already modVarMapA[] then there is nothing to do
// (use acquire to prevent rd/wr from moving before this op)
if( var->modN.load(std::memory_order_acquire) > 0 )
return kOkRC;
// reserve a slot in proc->modVarMapA[]
// (use acquire to prevent rd/wr from moving before this op)
if( var->proc->modVarMapFullCnt.fetch_add(1,std::memory_order_acquire) >= var->proc->modVarMapN )
return kBufTooSmallRC;
// Get the next empty slot in proc->modVarMapA[]
// (use acquire to prevent rd/wr from moving before this op)
unsigned idx = var->proc->modVarMapHeadIdx.fetch_add(1,std::memory_order_acquire) % var->proc->modVarMapN;
var->proc->modVarMapA[ idx ] = var;
// mark the var as in the list
var->modN.fetch_add(1,std::memory_order_release);
return kOkRC;
}
// Call proc->proc_desc->value() on every var in the proc->modVarMapA[].
// This function is called inside proc->proc_desc->exec().
rc_t _mod_var_map_dispatch( proc_t* proc, bool callback_fl )
{
// get the count of variables to be updated
unsigned n = proc->modVarMapFullCnt.load( std::memory_order_acquire );
if( n )
{
if( callback_fl )
{
for(unsigned i=0; i<n; ++i)
{
// get a pointer to the var that has been marked as modified
variable_t* var = proc->modVarMapA[ proc->modVarMapTailIdx ];
// callback to inform the proc that the var has changed
proc->class_desc->members->value( var->proc, var );
// mark this var as having been removed from the modVarMapA[]
var->modN.store(0,std::memory_order_relaxed );
// increment modVarMapA[]'s tail index
proc->modVarMapTailIdx = (proc->modVarMapTailIdx + 1) % proc->modVarMapN;
}
}
// decrement the count of elemnts in the modVarMapA[]
proc->modVarMapFullCnt.fetch_sub(n, std::memory_order_release );
}
return kOkRC;
}
// 'argTypeFlag' is the type (tflag) of 'val'. // 'argTypeFlag' is the type (tflag) of 'val'.
template< typename T > template< typename T >
@ -950,6 +1011,7 @@ void cw::flow::proc_destroy( proc_t* proc )
mem::release(proc->label); mem::release(proc->label);
mem::release(proc->varMapA); mem::release(proc->varMapA);
mem::release(proc->modVarMapA);
mem::release(proc); mem::release(proc);
} }

View File

@ -114,6 +114,9 @@ namespace cw
ui_var_t* ui_var; // this variables UI description ui_var_t* ui_var; // this variables UI description
std::atomic<struct variable_str*> ui_var_link; // UI update var link based on flow_t ui_var_head; std::atomic<struct variable_str*> ui_var_link; // UI update var link based on flow_t ui_var_head;
std::atomic<unsigned> modN; // count of modifactions made to this variable during this cycl
} variable_t; } variable_t;
@ -147,6 +150,12 @@ namespace cw
unsigned varMapN; // varMapN = varMapIdN * varMapChN unsigned varMapN; // varMapN = varMapIdN * varMapChN
variable_t** varMapA; // varMapA[ varMapN ] = allows fast lookup from ('vid','chIdx) to variable variable_t** varMapA; // varMapA[ varMapN ] = allows fast lookup from ('vid','chIdx) to variable
variable_t** modVarMapA; // modVarMapA[ modVarMapN ]
unsigned modVarMapN; // modVarMapN == varMapN
unsigned modVarMapTailIdx; // index of next full slot in varMapA[]
std::atomic<unsigned> modVarMapFullCnt; // count of elements in modVarMapA[]
std::atomic<unsigned> modVarMapHeadIdx; // index of next empty slot in varMapA[]
// For 'poly' proc's 'internal_net' is a list linked by network_t.poly_link. // For 'poly' proc's 'internal_net' is a list linked by network_t.poly_link.
struct network_str* internal_net; struct network_str* internal_net;
unsigned internal_net_cnt; // count of hetergenous networks contained in the internal_net linked list. unsigned internal_net_cnt; // count of hetergenous networks contained in the internal_net linked list.