cwFlowType.h/cpp : Implement variable change notification.

- Added modVarMapA[] and related variables.
- Added var_schedule_notification() and proc_notify().
- Replaced class_members_str.value with .notify.
This commit is contained in:
kevin 2025-03-22 17:38:30 -04:00
parent 0a9daab833
commit 1cb0db11b2
2 changed files with 98 additions and 67 deletions

View File

@ -36,6 +36,7 @@ namespace cw
{ kUiCreateVarDescFl, "no_ui" }, // even if the proc ui is enabled, don't show this var
{ kUiDisableVarDescFl,"ui_disable" },
{ kUiHideVarDescFl, "ui_hide" },
{ kNotifyVarDescFl, "notify" },
{ kInvalidVarDescFl, "<invalid>" }
};
@ -149,7 +150,7 @@ namespace cw
con_var->label,con_var->label_sfx_id );
// Call the value() function on the connected variable
if((rc = var_call_custom_value_func(con_var)) != kOkRC )
if((rc = var_schedule_notification(con_var)) != kOkRC )
break;
}
@ -162,7 +163,10 @@ namespace cw
// This function runs in a multi-thread context.
rc_t _mod_var_map_update( variable_t* var )
{
// if the var is in already modVarMapA[] then there is nothing to do
rc_t rc = kOkRC;
if( cwIsFlag(var->varDesc->flags,kNotifyVarDescFl ) )
{
// if the var is already modVarMapA[] then there is nothing to do
// (use acquire to prevent rd/wr from moving before this op)
if( var->modN.load(std::memory_order_acquire) > 0 )
return kOkRC;
@ -170,8 +174,10 @@ namespace cw
// reserve a slot in proc->modVarMapA[]
// (use acquire to prevent rd/wr from moving before this op)
if( var->proc->modVarMapFullCnt.fetch_add(1,std::memory_order_acquire) >= var->proc->modVarMapN )
return kBufTooSmallRC;
{
rc = cwLogError(kBufTooSmallRC,"The mod var map overflowed on '%s:%i-%s:%i'",cwStringNullGuard(var->proc->label),var->proc->label_sfx_id,cwStringNullGuard(var->label),var->label_sfx_id);
goto errLabel;
}
// Get the next empty slot in proc->modVarMapA[]
// (use acquire to prevent rd/wr from moving before this op)
@ -181,43 +187,13 @@ namespace cw
// mark the var as in the list
var->modN.fetch_add(1,std::memory_order_release);
return kOkRC;
}
errLabel:
return rc;
}
// Call proc->proc_desc->value() on every var in the proc->modVarMapA[].
// This function is called inside proc->proc_desc->exec().
rc_t _mod_var_map_dispatch( proc_t* proc, bool callback_fl )
{
// get the count of variables to be updated
unsigned n = proc->modVarMapFullCnt.load( std::memory_order_acquire );
if( n )
{
if( callback_fl )
{
for(unsigned i=0; i<n; ++i)
{
// get a pointer to the var that has been marked as modified
variable_t* var = proc->modVarMapA[ proc->modVarMapTailIdx ];
// callback to inform the proc that the var has changed
proc->class_desc->members->value( var->proc, var );
// mark this var as having been removed from the modVarMapA[]
var->modN.store(0,std::memory_order_relaxed );
// increment modVarMapA[]'s tail index
proc->modVarMapTailIdx = (proc->modVarMapTailIdx + 1) % proc->modVarMapN;
}
}
// decrement the count of elemnts in the modVarMapA[]
proc->modVarMapFullCnt.fetch_sub(n, std::memory_order_release );
}
return kOkRC;
}
// 'argTypeFlag' is the type (tflag) of 'val'.
template< typename T >
@ -288,7 +264,7 @@ namespace cw
// call because calls' to 'proc.value()' will see the proc in a incomplete state)
// Note 2: If this call returns an error then the value assignment is cancelled
// and the value does not change.
rc = var_call_custom_value_func( var );
rc = var_schedule_notification( var );
}
//printf("%p set: %s:%s 0x%x\n",var->value, var->proc->label,var->label,var->value->tflag);
@ -1179,6 +1155,52 @@ char* cw::flow::proc_expand_filename( const proc_t* proc, const char* fname )
return fn1;
}
// Call proc->proc_desc->value() on every var in the proc->modVarMapA[].
// This function is called inside proc->proc_desc->exec() and is therefore guaranteed to be executed without
// contention from other threads.
cw::rc_t cw::flow::proc_notify( proc_t* proc, unsigned flags )
{
rc_t rc = kOkRC;
unsigned modN = 0;
if( proc->modVarMapN == 0 )
{
if( cwIsNotFlag(flags,kQuietPnFl) )
rc =cwLogError(kInvalidStateRC,"Calling proc_notify() on the processor '%s:%i' is invalid because it does not have any variables marked for notification.",cwStringNullGuard(proc->label),proc->label_sfx_id);
goto errLabel;
}
// get the count of variables to be updated
modN = proc->modVarMapFullCnt.load( std::memory_order_acquire );
if( modN )
{
if( cwIsFlag(flags,kCallbackPnFl) )
{
for(unsigned i=0; i<modN; ++i)
{
// get a pointer to the var that has been marked as modified
variable_t* var = proc->modVarMapA[ proc->modVarMapTailIdx ];
// callback to inform the proc that the var has changed
proc->class_desc->members->notify( var->proc, var );
// mark this var as having been removed from the modVarMapA[]
var->modN.store(0,std::memory_order_release );
// increment modVarMapA[]'s tail index
proc->modVarMapTailIdx = (proc->modVarMapTailIdx + 1) % proc->modVarMapN;
}
}
// decrement the count of elemnts in the modVarMapA[]
proc->modVarMapFullCnt.fetch_sub(modN, std::memory_order_release );
}
errLabel:
return rc;
}
cw::rc_t cw::flow::var_create( proc_t* proc, const char* var_label, unsigned sfx_id, unsigned id, unsigned chIdx, const object_t* value_cfg, unsigned altTypeFl, variable_t*& varRef )
{
rc_t rc = kOkRC;
@ -1342,16 +1364,14 @@ errLabel:
return chN;
}
cw::rc_t cw::flow::var_call_custom_value_func( variable_t* var )
cw::rc_t cw::flow::var_schedule_notification( variable_t* var )
{
rc_t rc;
if((rc = var->proc->class_desc->members->value( var->proc, var )) != kOkRC )
if((rc = _mod_var_map_update( var )) != kOkRC )
goto errLabel;
if( var->flags & kLogVarFl )
{
if( var->proc->ctx->printLogHdrFl )
{
cwLogPrint("%s","exe cycle: process: id: variable: id vid ch : : : type:value : destination\n");
@ -1383,6 +1403,7 @@ errLabel:
}
cw::rc_t cw::flow::var_flags( proc_t* proc, unsigned chIdx, const char* var_label, unsigned sfx_id, unsigned& flags_ref )
{
rc_t rc = kOkRC;

View File

@ -9,7 +9,7 @@ namespace cw
struct variable_str;
typedef rc_t (*member_func_t)( struct proc_str* ctx );
typedef rc_t (*member_value_func_t)( struct proc_str* ctx, struct variable_str* var );
typedef rc_t (*member_notify_func_t)( struct proc_str* ctx, struct variable_str* var );
// var_desc_t attribute flags
enum
@ -23,14 +23,15 @@ namespace cw
kUdpOutVarDescFl = 0x020,
kUiCreateVarDescFl = 0x040,
kUiDisableVarDescFl = 0x080,
kUiHideVarDescFl = 0x100
kUiHideVarDescFl = 0x100,
kNotifyVarDescFl = 0x200,
};
typedef struct class_members_str
{
member_func_t create;
member_func_t destroy;
member_value_func_t value;
member_notify_func_t notify;
member_func_t exec;
member_func_t report;
} class_members_t;
@ -486,6 +487,15 @@ namespace cw
// The returned string must be release with a call to mem::free().
char* proc_expand_filename( const proc_t* proc, const char* fname );
// Call this function from inside the proc instance exec() routine, with flags=kCallbackPnFl,
// to get callbacks on variables marked for notification on change. Note that variables must have
// their var. description 'kNotifyVarDescFl' (var. desc flag: 'notify') set in order
// to generate callbacks.
// Set kQuietPnFl to avoid warning messages when this function is called on proc's
// that do not have any variables marked for notification.
enum { kCallbackPnFl=0x01, kQuietPnFl=0x02 };
rc_t proc_notify( proc_t* proc, unsigned flags = kCallbackPnFl );
//------------------------------------------------------------------------------------------------------------------------
//
@ -509,8 +519,8 @@ namespace cw
// Otherwise returns count of channels no including kAnyChIdx. (e.g. mono=1, stereo=2, quad=4 ...)
unsigned var_channel_count( proc_t* proc, const char* var_label, unsigned sfx_id );
// Wrapper around call to var->proc->members->value()
rc_t var_call_custom_value_func( variable_t* var );
// Calls the _mod_var_map_update() on connected vars and implements the var 'log' functionality
rc_t var_schedule_notification( variable_t* var );
// Sets and get the var->flags field
unsigned var_flags( proc_t* proc, unsigned chIdx, const char* var_label, unsigned sfx_id, unsigned& flags_ref );