From 1cb0db11b2c533775b77cd9f6050e145ab3b8f0c Mon Sep 17 00:00:00 2001 From: kevin Date: Sat, 22 Mar 2025 17:38:30 -0400 Subject: [PATCH] cwFlowType.h/cpp : Implement variable change notification. - Added modVarMapA[] and related variables. - Added var_schedule_notification() and proc_notify(). - Replaced class_members_str.value with .notify. --- cwFlowTypes.cpp | 137 ++++++++++++++++++++++++++++-------------------- cwFlowTypes.h | 28 ++++++---- 2 files changed, 98 insertions(+), 67 deletions(-) diff --git a/cwFlowTypes.cpp b/cwFlowTypes.cpp index d17056a..f0267b7 100644 --- a/cwFlowTypes.cpp +++ b/cwFlowTypes.cpp @@ -36,6 +36,7 @@ namespace cw { kUiCreateVarDescFl, "no_ui" }, // even if the proc ui is enabled, don't show this var { kUiDisableVarDescFl,"ui_disable" }, { kUiHideVarDescFl, "ui_hide" }, + { kNotifyVarDescFl, "notify" }, { kInvalidVarDescFl, "" } }; @@ -149,7 +150,7 @@ namespace cw con_var->label,con_var->label_sfx_id ); // Call the value() function on the connected variable - if((rc = var_call_custom_value_func(con_var)) != kOkRC ) + if((rc = var_schedule_notification(con_var)) != kOkRC ) break; } @@ -162,62 +163,37 @@ namespace cw // This function runs in a multi-thread context. rc_t _mod_var_map_update( variable_t* var ) { - // if the var is in already modVarMapA[] then there is nothing to do - // (use acquire to prevent rd/wr from moving before this op) - if( var->modN.load(std::memory_order_acquire) > 0 ) - return kOkRC; + rc_t rc = kOkRC; + if( cwIsFlag(var->varDesc->flags,kNotifyVarDescFl ) ) + { + // if the var is already modVarMapA[] then there is nothing to do + // (use acquire to prevent rd/wr from moving before this op) + if( var->modN.load(std::memory_order_acquire) > 0 ) + return kOkRC; - // reserve a slot in proc->modVarMapA[] - // (use acquire to prevent rd/wr from moving before this op) - if( var->proc->modVarMapFullCnt.fetch_add(1,std::memory_order_acquire) >= var->proc->modVarMapN ) - return kBufTooSmallRC; - - - // Get the next empty slot in proc->modVarMapA[] - // (use acquire to prevent rd/wr from moving before this op) - unsigned idx = var->proc->modVarMapHeadIdx.fetch_add(1,std::memory_order_acquire) % var->proc->modVarMapN; - - var->proc->modVarMapA[ idx ] = var; - - // mark the var as in the list - var->modN.fetch_add(1,std::memory_order_release); - - return kOkRC; - } - - // Call proc->proc_desc->value() on every var in the proc->modVarMapA[]. - // This function is called inside proc->proc_desc->exec(). - rc_t _mod_var_map_dispatch( proc_t* proc, bool callback_fl ) - { - // get the count of variables to be updated - unsigned n = proc->modVarMapFullCnt.load( std::memory_order_acquire ); - - if( n ) - { - if( callback_fl ) + // reserve a slot in proc->modVarMapA[] + // (use acquire to prevent rd/wr from moving before this op) + if( var->proc->modVarMapFullCnt.fetch_add(1,std::memory_order_acquire) >= var->proc->modVarMapN ) { - for(unsigned i=0; imodVarMapA[ proc->modVarMapTailIdx ]; - - // callback to inform the proc that the var has changed - proc->class_desc->members->value( var->proc, var ); - - // mark this var as having been removed from the modVarMapA[] - var->modN.store(0,std::memory_order_relaxed ); - - // increment modVarMapA[]'s tail index - proc->modVarMapTailIdx = (proc->modVarMapTailIdx + 1) % proc->modVarMapN; - } + rc = cwLogError(kBufTooSmallRC,"The mod var map overflowed on '%s:%i-%s:%i'",cwStringNullGuard(var->proc->label),var->proc->label_sfx_id,cwStringNullGuard(var->label),var->label_sfx_id); + goto errLabel; } - - // decrement the count of elemnts in the modVarMapA[] - proc->modVarMapFullCnt.fetch_sub(n, std::memory_order_release ); - } + + // Get the next empty slot in proc->modVarMapA[] + // (use acquire to prevent rd/wr from moving before this op) + unsigned idx = var->proc->modVarMapHeadIdx.fetch_add(1,std::memory_order_acquire) % var->proc->modVarMapN; - return kOkRC; + var->proc->modVarMapA[ idx ] = var; + + // mark the var as in the list + var->modN.fetch_add(1,std::memory_order_release); + } + errLabel: + return rc; } + + + // 'argTypeFlag' is the type (tflag) of 'val'. template< typename T > @@ -288,7 +264,7 @@ namespace cw // call because calls' to 'proc.value()' will see the proc in a incomplete state) // Note 2: If this call returns an error then the value assignment is cancelled // and the value does not change. - rc = var_call_custom_value_func( var ); + rc = var_schedule_notification( var ); } //printf("%p set: %s:%s 0x%x\n",var->value, var->proc->label,var->label,var->value->tflag); @@ -1179,6 +1155,52 @@ char* cw::flow::proc_expand_filename( const proc_t* proc, const char* fname ) return fn1; } +// Call proc->proc_desc->value() on every var in the proc->modVarMapA[]. +// This function is called inside proc->proc_desc->exec() and is therefore guaranteed to be executed without +// contention from other threads. +cw::rc_t cw::flow::proc_notify( proc_t* proc, unsigned flags ) +{ + rc_t rc = kOkRC; + unsigned modN = 0; + + if( proc->modVarMapN == 0 ) + { + if( cwIsNotFlag(flags,kQuietPnFl) ) + rc =cwLogError(kInvalidStateRC,"Calling proc_notify() on the processor '%s:%i' is invalid because it does not have any variables marked for notification.",cwStringNullGuard(proc->label),proc->label_sfx_id); + goto errLabel; + } + + // get the count of variables to be updated + modN = proc->modVarMapFullCnt.load( std::memory_order_acquire ); + + if( modN ) + { + if( cwIsFlag(flags,kCallbackPnFl) ) + { + for(unsigned i=0; imodVarMapA[ proc->modVarMapTailIdx ]; + + // callback to inform the proc that the var has changed + proc->class_desc->members->notify( var->proc, var ); + + // mark this var as having been removed from the modVarMapA[] + var->modN.store(0,std::memory_order_release ); + + // increment modVarMapA[]'s tail index + proc->modVarMapTailIdx = (proc->modVarMapTailIdx + 1) % proc->modVarMapN; + } + } + + // decrement the count of elemnts in the modVarMapA[] + proc->modVarMapFullCnt.fetch_sub(modN, std::memory_order_release ); + } + +errLabel: + return rc; +} + cw::rc_t cw::flow::var_create( proc_t* proc, const char* var_label, unsigned sfx_id, unsigned id, unsigned chIdx, const object_t* value_cfg, unsigned altTypeFl, variable_t*& varRef ) { rc_t rc = kOkRC; @@ -1342,16 +1364,14 @@ errLabel: return chN; } - -cw::rc_t cw::flow::var_call_custom_value_func( variable_t* var ) +cw::rc_t cw::flow::var_schedule_notification( variable_t* var ) { rc_t rc; - if((rc = var->proc->class_desc->members->value( var->proc, var )) != kOkRC ) + if((rc = _mod_var_map_update( var )) != kOkRC ) goto errLabel; - + if( var->flags & kLogVarFl ) { - if( var->proc->ctx->printLogHdrFl ) { cwLogPrint("%s","exe cycle: process: id: variable: id vid ch : : : type:value : destination\n"); @@ -1383,6 +1403,7 @@ errLabel: } + cw::rc_t cw::flow::var_flags( proc_t* proc, unsigned chIdx, const char* var_label, unsigned sfx_id, unsigned& flags_ref ) { rc_t rc = kOkRC; diff --git a/cwFlowTypes.h b/cwFlowTypes.h index 36642f0..c659c05 100644 --- a/cwFlowTypes.h +++ b/cwFlowTypes.h @@ -9,7 +9,7 @@ namespace cw struct variable_str; typedef rc_t (*member_func_t)( struct proc_str* ctx ); - typedef rc_t (*member_value_func_t)( struct proc_str* ctx, struct variable_str* var ); + typedef rc_t (*member_notify_func_t)( struct proc_str* ctx, struct variable_str* var ); // var_desc_t attribute flags enum @@ -23,16 +23,17 @@ namespace cw kUdpOutVarDescFl = 0x020, kUiCreateVarDescFl = 0x040, kUiDisableVarDescFl = 0x080, - kUiHideVarDescFl = 0x100 + kUiHideVarDescFl = 0x100, + kNotifyVarDescFl = 0x200, }; typedef struct class_members_str { - member_func_t create; - member_func_t destroy; - member_value_func_t value; - member_func_t exec; - member_func_t report; + member_func_t create; + member_func_t destroy; + member_notify_func_t notify; + member_func_t exec; + member_func_t report; } class_members_t; typedef struct var_desc_str @@ -486,6 +487,15 @@ namespace cw // The returned string must be release with a call to mem::free(). char* proc_expand_filename( const proc_t* proc, const char* fname ); + // Call this function from inside the proc instance exec() routine, with flags=kCallbackPnFl, + // to get callbacks on variables marked for notification on change. Note that variables must have + // their var. description 'kNotifyVarDescFl' (var. desc flag: 'notify') set in order + // to generate callbacks. + // Set kQuietPnFl to avoid warning messages when this function is called on proc's + // that do not have any variables marked for notification. + enum { kCallbackPnFl=0x01, kQuietPnFl=0x02 }; + rc_t proc_notify( proc_t* proc, unsigned flags = kCallbackPnFl ); + //------------------------------------------------------------------------------------------------------------------------ // @@ -509,8 +519,8 @@ namespace cw // Otherwise returns count of channels no including kAnyChIdx. (e.g. mono=1, stereo=2, quad=4 ...) unsigned var_channel_count( proc_t* proc, const char* var_label, unsigned sfx_id ); - // Wrapper around call to var->proc->members->value() - rc_t var_call_custom_value_func( variable_t* var ); + // Calls the _mod_var_map_update() on connected vars and implements the var 'log' functionality + rc_t var_schedule_notification( variable_t* var ); // Sets and get the var->flags field unsigned var_flags( proc_t* proc, unsigned chIdx, const char* var_label, unsigned sfx_id, unsigned& flags_ref );