diff --git a/py/gen_wavetables/README.md b/py/gen_wavetables/README.md new file mode 100644 index 0000000..ab372b7 --- /dev/null +++ b/py/gen_wavetables/README.md @@ -0,0 +1,42 @@ +TODO: +----- +calc_wavetable.py: +- Estimate the actual pitch of the sample. +- Stop finding wavetables when the amplitude of the next table falls below a threshold. +- Generate a report showing the count of wavetables per note. + + +wt_util.py +---------- +Utilities used by all other modules + +gen_midi_csv.py +--------------- +Generate a MIDI file in CSV format to trigger the sampler with a sequence of velocities for a given pitch. + +sample_ivory.py +--------------- +Use the MIDI file from 'gen_midi_csv.py' to trigger the sampler and record the resulting audio and onset/offset TSV marker file. + +calc_sample_atk_dur.py +---------------------- +Calculate a list [(vel,bsi,esi)] which indicates the attack wavetable. + +calc_wavetables.py +------------------ +Create a JSON file of wave tables for all pitches and velocities. + +wt_osc.py +--------- +Generate a set of notes using the wavetables found by calc_wavetables.py. +This program implements a wavetable oscillator which can interpret the wavetables +created by calc_wavetables.py + + +Obsolete +--------------------- +wt_study.py +low_hz_loops.py +debug_plot.py +gen_wave_tables.py +sample_looper.py diff --git a/py/gen_wavetables/calc_sample_atk_dur.py b/py/gen_wavetables/calc_sample_atk_dur.py new file mode 100644 index 0000000..582868b --- /dev/null +++ b/py/gen_wavetables/calc_sample_atk_dur.py @@ -0,0 +1,380 @@ +import matplotlib.pyplot as plt +import numpy as np +import math +import wt_util + +from kneed import KneeLocator + +def rms( aV, wnd_smp_cnt, hop_smp_cnt, dbFl = True ): + + assert( wnd_smp_cnt % hop_smp_cnt == 0 and hop_smp_cnt < wnd_smp_cnt ) + + + rmsL = [] + bi = 0 + ei = wnd_smp_cnt + + while ei <= len(aV): + + rms = np.pow( np.mean( np.pow(aV[bi:ei],2) ), 0.5 ) + if dbFl: + rms = -100.0 if rms == 0 else 20*math.log10(rms) + + rmsL.append(rms) + + bi += hop_smp_cnt + ei = bi + wnd_smp_cnt + + # repeat the first RMS value (wnd_smp_cnt/hop_smp_cnt)-1 times + # so that rmsL[] indexes relate to aV[] indexes like this: + # av_idx = rms_idx * hop_smp_cnt + rmsL = [ rmsL[0] ] * (int(wnd_smp_cnt/hop_smp_cnt)-1) + rmsL + + return rmsL + +def calc_sample_atk_dur(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms ): + + aM,srate = wt_util.parse_audio_file( audio_fname ) + markL = wt_util.parse_marker_file( mark_tsv_fname ) + rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000)) + rms_hop_smp_cnt = 64 #int(round(rms_hop_ms * srate / 1000)) + ch_cnt = aM.shape[1] + + rmsL = [[] for _ in range(ch_cnt) ] + + for beg_sec,end_sec,vel_label in markL: + + bi = int(round(beg_sec * srate)) + ei = int(round(end_sec * srate)) + int(srate) + + for ch_idx in range(ch_cnt): + rmsL[ch_idx] += rms(aM[bi:ei,ch_idx],rms_wnd_smp_cnt,rms_hop_smp_cnt) + + + _,ax = plt.subplots(ch_cnt,1) + + for ch_idx in range(ch_cnt): + ax[ch_idx].plot(rmsL[ch_idx]) + + plt.show() + + +def generate_gate_knee(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, min_gate_dur_ms, threshDb ): + + aM,srate = wt_util.parse_audio_file( audio_fname ) + markL = wt_util.parse_marker_file( mark_tsv_fname ) + rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000)) + rms_hop_smp_cnt = int(round(rms_hop_ms * srate / 1000)) + min_gate_smp_cnt= int(round(min_gate_dur_ms * srate / 1000)) + ch_cnt = aM.shape[1] + frm_cnt = aM.shape[0] + rmsL = [] + ch_rmsL = [] + + for ch_idx in range(ch_cnt): + rmsL.append( rms( aM[:,ch_idx], rms_wnd_smp_cnt, rms_hop_smp_cnt ) ) + ch_rmsL.append( np.mean( rmsL[-1] ) ) + + bsiL = [ int(round(beg_sec*srate)) for beg_sec,_,_ in markL ] + + asiL = [] + riL = [] + bi = 1 + ei = rms_hop_smp_cnt + eV = np.zeros((frm_cnt,)) + + # use the channel whith the most energy to determine the gate + ch_idx = np.argmax(ch_rmsL) + + for beg_sec,end_sec,_ in markL: + rbi = int(round(beg_sec*srate/rms_hop_smp_cnt)) + rei = int(round(end_sec*srate/rms_hop_smp_cnt)) + offs = 10 + y = rmsL[ch_idx][rbi+offs:rei] + x = np.arange(len(y)) + k1 = KneeLocator(x, y, curve="convex", direction="decreasing", interp_method="polynomial") + + ri = rbi + offs + k1.knee + riL.append( ri ) + bsiL.append( int(rbi*rms_hop_smp_cnt) ) + asiL.append( int(ri * rms_hop_smp_cnt) ) + + gateL = [(bsi,esi) for bsi,esi in zip(bsiL,asiL) ] + + # force all gates to have a duration of at least min_gate_smp_cnt + if True: + for i,(bsi,esi) in enumerate(gateL): + if esi-bsi < min_gate_smp_cnt: + #print("gate ext:",esi-bsi,min_gate_smp_cnt) + gateL[i] = (bsi,bsi+min_gate_smp_cnt) + + # verify that successive gates do not overlap + if i> 0: + assert gateL[i][0] > gateL[i-1][1] + + if i < len(gateL)-1: + assert gateL[i][1] < gateL[i+1][0] + + if True: + + beL = [ (int(round(beg_secs*srate)), int(round(end_secs*srate))) for beg_secs,end_secs,_ in markL ] + beL = [ (max(0,int((bi)/rms_hop_smp_cnt)), max(0,int((ei)/rms_hop_smp_cnt))) for bi,ei in beL ] + + _,ax = plt.subplots(3,1) + ax[0].plot(rmsL[0]) + for bi,ei in beL: + ax[0].vlines([bi,ei],-100.0,0.0,color="red") + + for ri in riL: + ax[0].vlines([ri],-100,0,color="green") + + ax[1].plot(rmsL[1]) + for bi,ei in beL: + ax[1].vlines([bi,ei],-100.0,0.0,color="red") + + ax[2].plot(eV) + plt.show() + + if False: + for i,(bi,ei) in enumerate(beL): + offs = 10 + y = [ pow(10,z/20.0) for z in rmsL[0][bi+offs:ei] ] + y = rmsL[0][bi+offs:ei] + x = np.arange(len(y)) + k1 = KneeLocator(x, y, curve="convex", direction="decreasing", interp_method="polynomial") + k1.plot_knee() + plt.title(f"{i} {offs+k1.knee} {offs+k1.knee*rms_hop_smp_cnt/srate:.3f}") + plt.show() + + + return gateL,ch_rmsL + +def generate_gate_db(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, min_gate_dur_ms, threshDb ): + + aM,srate = wt_util.parse_audio_file( audio_fname ) + markL = wt_util.parse_marker_file( mark_tsv_fname ) + rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000)) + rms_hop_smp_cnt = int(round(rms_hop_ms * srate / 1000)) + min_gate_smp_cnt= int(round(min_gate_dur_ms * srate / 1000)) + ch_cnt = aM.shape[1] + frm_cnt = aM.shape[0] + rmsL = [] + ch_rmsL = [] + + for ch_idx in range(ch_cnt): + rmsL.append( rms( aM[:,ch_idx], rms_wnd_smp_cnt, rms_hop_smp_cnt ) ) + ch_rmsL.append( np.mean( rmsL[-1] ) ) + + bsiL = [ int(round(beg_sec*srate)) for beg_sec,_,_ in markL ] + asiL = [] + riL = [] + bi = 1 + ei = rms_hop_smp_cnt + eV = np.zeros((frm_cnt,)) + + # use the channel whith the most energy to determine the gate + ch_idx = np.argmax(ch_rmsL) + + bsi_idx = 1 + cur_on_fl = 1.0 # 1.0 when the gate is high + active_fl = True # True if the gate is allowed to switch from low to high + pend_fl = True # True if the next attack is pending + + for i in range(len(rmsL[ch_idx])): + + # pend_fl prevents the gate from being turned off until the + # actual attack has occurred (it goes false once an RMS above the thresh is seen) + if pend_fl: + pend_fl = rmsL[ch_idx][i] <= threshDb + + # if the rms is below the threshold + off_fl = rmsL[ch_idx][i] < threshDb #and rmsL[][i] < threshDb + + # if the rms is below the threshold and the gate detector is enabled ... + if off_fl and active_fl and not pend_fl: + # ... then turn off the gate + cur_on_fl = 0.0 + active_fl = False + riL.append(i) + asiL.append(bi) + + eV[bi:ei] = cur_on_fl + + # track the smp idx of the current rms value + bi = i * rms_hop_smp_cnt + ei = bi + rms_hop_smp_cnt + + # if we are crossing into the next velocity sample + if bsi_idx < len(bsiL) and bsiL[ bsi_idx ] <= bi : + + # be sure that this onset follows an offset + # (which won't occur if the signal never goes above the threshold) + if cur_on_fl != 0: + + gesi = bsiL[bsi_idx-1] + min_gate_smp_cnt + asiL.append( gesi ) + riL.append( int(round(gesi/rms_hop_smp_cnt)) ) + eV[gesi:ei] = 0 + + #assert( cur_on_fl == 0 ) + + active_fl = True + pend_fl = True + cur_on_fl = 1.0 + bsi_idx += 1 + + + # if the offset for the last note was not detected + if len(asiL) == len(bsiL)-1: + asiL.append(frm_cnt-1) + + gateL = [(bsi,esi) for bsi,esi in zip(bsiL,asiL) ] + + # force all gates to have a duration of at least min_gate_smp_cnt + if True: + for i,(bsi,esi) in enumerate(gateL): + if esi-bsi < min_gate_smp_cnt: + #print("gate ext:",esi-bsi,min_gate_smp_cnt) + gateL[i] = (bsi,bsi+min_gate_smp_cnt) + + # verify that successive gates do not overlap + if i> 0: + assert gateL[i][0] > gateL[i-1][1] + + if i < len(gateL)-1: + assert gateL[i][1] < gateL[i+1][0] + + if False: + + beL = [ (int(round(beg_secs*srate)), int(round(end_secs*srate))) for beg_secs,end_secs,_ in markL ] + beL = [ (max(0,int((bi)/rms_hop_smp_cnt)), max(0,int((ei)/rms_hop_smp_cnt))) for bi,ei in beL ] + + _,ax = plt.subplots(3,1) + ax[0].plot(rmsL[0]) + for bi,ei in beL: + ax[0].vlines([bi,ei],-100.0,0.0,color="red") + + for ri in riL: + ax[0].vlines([ri],-100,0,color="green") + + ax[1].plot(rmsL[1]) + for bi,ei in beL: + ax[1].vlines([bi,ei],-100.0,0.0,color="red") + + ax[2].plot(eV) + plt.show() + + + return gateL,ch_rmsL + + +def generate_gate_pct(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, atk_min_dur_ms, threshPct ): + + aM,srate = wt_util.parse_audio_file( audio_fname ) + markL = wt_util.parse_marker_file( mark_tsv_fname ) + + rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000)) + rms_hop_smp_cnt = int(round(rms_hop_ms * srate / 1000)) + atk_min_smp_cnt = int(round(atk_min_dur_ms * srate / 1000)) + + ch_cnt = aM.shape[1] + frm_cnt = aM.shape[0] + rmsL = [] + ch_rmsL = [] + + for ch_idx in range(ch_cnt): + rmsL.append( rms( aM[:,ch_idx], rms_wnd_smp_cnt, rms_hop_smp_cnt, False ) ) + ch_rmsL.append( np.mean(rmsL[-1] )) + + beL = [ (int(round(beg_secs*srate)), int(round(end_secs*srate))) for beg_secs,end_secs,_ in markL ] + beL = [ (max(0,int(bi/rms_hop_smp_cnt)), max(0,int(ei/rms_hop_smp_cnt))) for bi,ei in beL ] + + gateL = [] + maxL = [] + for bri,eri in beL: + + rms_max = None + rms_max_i = None + rms_max_ch_i = None + for ch_idx in range(ch_cnt): + max_i = np.argmax( rmsL[ch_idx][bri:eri] ) + bri + + if rms_max is None or rms_max < rmsL[ch_idx][max_i]: + rms_max = rmsL[ch_idx][max_i] + rms_max_i = max_i + rms_max_ch_i = ch_idx + + maxL.append(rms_max) + + threshDb = rms_max * threshPct + + for i in range(rms_max_i+1,eri): + if rmsL[ch_idx][i] < threshDb: + gateL.append((bri,i)) + break + + + retL = [] + for bri,eri in gateL: + bsi = bri*rms_hop_smp_cnt + esi = eri*rms_hop_smp_cnt + if esi-bsi < atk_min_smp_cnt: + esi = bsi + atk_min_smp_cnt + retL.append((bsi,esi)) + + + if True: + _,ax = plt.subplots(2,1) + ax[0].plot(rmsL[0]) + for i,(bi,ei) in enumerate(gateL): + ax[0].vlines([bi,ei],0,maxL[i],color="red") + ax[1].plot(rmsL[1]) + for i,(bi,ei) in enumerate(gateL): + ax[1].vlines([bi,ei],0,maxL[i],color="red") + plt.show() + + return retL,ch_rmsL + + + +def gen_gated_audio( i_audio_fname, gateL, o_audio_fname, o_mark_tsv_fname ): + + aM,srate = wt_util.parse_audio_file( audio_fname ) + + markL = [] + gateV = np.zeros((aM.shape[0],)) + + # form the gate vector + for i,(bsi,esi) in enumerate(gateL): + gateV[bsi:esi] = 1 + markL.append((bsi/srate,esi/srate,f"{i}")) + + for ch_idx in range(aM.shape[1]): + aM[:,ch_idx] *= gateV + + wt_util.write_audio_file( aM, srate, o_audio_fname ) + wt_util.write_mark_tsv_file( markL, o_mark_tsv_fname ) + + + +if __name__ == "__main__": + + audio_fname = "/home/kevin/temp/wt5/wav/060_samples.wav" + mark_tsv_fname = "/home/kevin/temp/wt5/60_marker.txt" + rms_wnd_ms = 50 + rms_hop_ms = 10 + + #calc_sample_atk_dur(audio_fname,mark_tsv_fname,rms_wnd_ms,rms_hop_ms) + + # Generate a list [(bsi,esi)] indicating the beginning and end of the attack portion + # of each sample where the end is determined by a threshold in dB. + #threshDb = -50.0 + #gateL = generate_gate_db(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, threshDb ) + + + # Generate a list [(bsi,esi)] indicating the beginning and end of the attack portion + # of each sample where the end is determined by a percent decrease from the peak value. + threshPct = 0.75 + gateL = generate_gate_pct(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, threshPct ) + + gen_gated_audio( audio_fname, gateL, "/home/kevin/temp/temp.wav", "/home/kevin/temp/temp_mark.txt") diff --git a/py/gen_wavetables/calc_wavetables.py b/py/gen_wavetables/calc_wavetables.py new file mode 100644 index 0000000..f23772f --- /dev/null +++ b/py/gen_wavetables/calc_wavetables.py @@ -0,0 +1,447 @@ +import os +import math +import json +import types +import wt_util +import calc_sample_atk_dur +import numpy as np +import matplotlib.pyplot as plt +import multiproc as mp + +from scipy.interpolate import CubicSpline + + +def upsample( aV, N, interp_degree ): +# aV[] - signal vector +# N - upsample factor (must be an integer >= 2) +# interp_degree - "linear" , "cubic" + + N = int(N) + + assert( N>= 2) + + aN = len(aV) + z = np.zeros((aN,N)) + z[:,0] = aV + + # z is a copy of aV with zeros in the positions to be interpolated + z = np.squeeze(np.reshape(z,(aN*N,1))) + + # x contains the indexes into z which contain values from aV + x = [ i*N for i in range(aN) ] + + # xi contains the indexes into z which have zeros + xi = [ i for i in range(len(z)) if i not in x and i < x[-1] ] + + # calc values for the zeros in z + if interp_degree == "linear": + cs = CubicSpline(x,aV) + z[xi] = cs(xi) + + elif interp_degree == "cubic": + z[xi] = np.interp(xi,x,aV) + else: + assert(0) + + # The last N-1 values are not set because they would require extrapolation + # (they have no value to their right). Instead we set these values + # as the mean of the preceding N values. + k = (len(z)-N)+1 + for i in range(N-1): + z[k+i] = np.mean(z[ k+i-N:k+i]) + + return z #z[0:-(N-1)] + + + +def estimate_pitch_ac( aV, si, hzL, srate, argsD ): + # aV[] - audio vector containing a wavetable that starts at aV[si] + # hzL[] - a list of candidate pitches + # srate - sample rate of aV[] + # args[cycle_cnt] - count of cycles to autocorrelate on either side of the reference pitch at aV[si:] + # (1=correlate with the cycle at aV[ si-fsmp_per+cyc:] and the cycle at aV[si+fsmp_per_cyc], + # (2=correlate with cycles at aV[ si-2*fsmp_per+cyc:],aV[ si-fsmp_per+cyc:],aV[ si+fsmp_per+cyc:],aV[ si-2*fsmp_per+cyc:]) + # args[up_fact] - Set to and integer greater than 1 to upsample the signal prior to estimating the pitch + # args[up_interp_degree] - Upsampling interpolator "linear" or "cubic" + + def _auto_corr( aV, si, fsmp_per_cyc, cycle_offset_idx, interp_degree ): + + smp_per_cyc = int(math.floor(fsmp_per_cyc)) + + xi = [si + (cycle_offset_idx * fsmp_per_cyc) + i for i in range(smp_per_cyc)] + x_min = int(math.floor(xi[0])) + x_max = int(math.ceil(xi[-1])) + x = [ i for i in range(x_min,x_max) ] + y = aV[x] + + if interp_degree == "cubic": + cs = CubicSpline(x,y) + yi = cs(xi) + elif interp_degree == "linear": + yi = np.interp(xi,x,y) + else: + assert(0) + + # calc the sum of squared differences between the reference cycle and the 'offset' cycle + ac = np.sum(np.pow(yi - aV[si:si+smp_per_cyc],2.0)) + + return ac + + + def auto_corr( aV, si, fsmp_per_cyc, cycle_cnt, interp_degree ): + + ac = 0 + for i in range(1,cycle_cnt+1): + ac = _auto_corr(aV,si,fsmp_per_cyc, i, interp_degree) + ac += _auto_corr(aV,si,fsmp_per_cyc, -i, interp_degree) + + # return the average sum of squared diff's per cycle + return ac/(cycle_cnt*2) + + + def ac_upsample( aV, si, fsmp_per_cyc, cycle_cnt, up_fact, up_interp_degree ): + + pad = 0 # count of leading/trailing pad positions to allow for interpolation + + if up_interp_degree == "cubic": + pad = 2 + elif up_interp_degre == "linear": + pad = 1 + else: + assert(0) + + # calc the beg/end of the signal segment to upsample + bi = si - math.ceil(fsmp_per_cyc * cycle_cnt) - pad + ei = si + math.ceil(fsmp_per_cyc * (cycle_cnt + 1)) + pad + + up_aV = upsample(aV[bi:ei],up_fact,up_interp_degree) + + # calc. index of the center signal value + u_si = (si-bi)*up_fact + + # the center value should not change after upsampling + assert aV[si] == up_aV[u_si] + + return up_aV,u_si + + + args = types.SimpleNamespace(**argsD) + + # if upsampling was requested + if args.up_fact > 1: + hz_min = min(hzL) # Select the freq candidate with the longest period, + max_fsmp_per_cyc = srate/hz_min # because we want to upsample just enough of the signal to test for all possible candidates, + aV,si = ac_upsample( aV, si, max_fsmp_per_cyc, args.cycle_cnt, args.up_fact, args.up_interp_degree ) + srate = srate * args.up_fact + + + # calc. the auto-correlation for every possible candidate frequency + acL = [] + for hz in hzL: + fsmp_per_cyc = srate / hz + acL.append( auto_corr(aV,si,fsmp_per_cyc,args.cycle_cnt,args.interp_degree) ) + + + + if False: + _,ax = plt.subplots(1,1) + ax.plot(hzL,acL) + plt.show() + + # winning candidate is the one with the lowest AC score + cand_hz_idx = np.argmin(acL) + + return hzL[cand_hz_idx] + +# Note that we want a higher rate of pitch tracking than wave table generation - thus +# we downsample the pitch tracking interval by some integer factor to arrive at the +# rate at the wave table generation period. +def gen_wave_table_list( audio_fname, + mark_tsv_fname, gateL, + midi_pitch, + pitch_track_interval_secs, + wt_interval_down_sample_fact, + min_wt_db, + dom_ch_idx, + est_hz_argD, + ac_argD ): + + est_hz_args = types.SimpleNamespace(**est_hz_argD) + + aM,srate = wt_util.parse_audio_file(audio_fname) + markL = wt_util.parse_marker_file(mark_tsv_fname) + ch_cnt = aM.shape[1] + frm_cnt = aM.shape[0] + pt_interval_smp = int(round(pitch_track_interval_secs*srate)) + wt_interval_fact= int(wt_interval_down_sample_fact) + hz = wt_util.midi_pitch_to_hz(midi_pitch) + fsmp_per_cyc = srate/hz + fsmp_per_wt = fsmp_per_cyc * 2 + smp_per_wt = int(math.floor(fsmp_per_wt)) + + # calc. the range of possible pitch estimates + hz_min = wt_util.midi_pitch_to_hz(midi_pitch-1) + hz_ctr = wt_util.midi_pitch_to_hz(midi_pitch) + hz_max = wt_util.midi_pitch_to_hz(midi_pitch+1) + cents_per_semi = 100 + + # hzL is a list of candidate pitches with a range of +/- 1 semitone and a resolution of 1 cent + hzCandL = [ hz_min + i*(hz_ctr-hz_min)/100.0 for i in range(cents_per_semi) ] + [ hz_ctr + i*(hz_max-hz_ctr)/100.0 for i in range(cents_per_semi) ] + + assert( len(markL) == len(gateL) ) + + # setup the return data structure + pitchD = { "midi_pitch":midi_pitch, + "srate":srate, + "est_hz_mean":None, + "est_hz_err_cents":None, + "est_hz_std_cents":None, + "wt_interval_secs":pitch_track_interval_secs * wt_interval_fact, + "dominant_ch_idx":int(dom_ch_idx), + "audio_fname":audio_fname, + "mark_tsv_fname":mark_tsv_fname, + "velL":[] + } + + hzL = [] + for i,(beg_sec,end_sec,vel_label) in enumerate(markL): + bsi = int(round(beg_sec*srate)) + esi = int(round(end_sec*srate)) + vel = int(vel_label) + eai = gateL[i][1] # end of attack + + velD = { "vel":vel, "bsi":bsi, "chL":[ [] for _ in range(ch_cnt)] } + + for ch_idx in range(ch_cnt): + + i = 0 + while True: + + wt_smp_idx = eai + i*pt_interval_smp + + # select the first zero crossing after the end of the attack + # as the start of the first sustain wavetable + wtbi = wt_util.find_zero_crossing(aM[:,ch_idx],wt_smp_idx,1) + + #if len(velD['chL'][ch_idx]) == 0: + # print(midi_pitch,vel,(wtbi-bsi)/srate) + + if wtbi == None: + break; + + wtei = wtbi + smp_per_wt + + if wtei > esi: + break + + # estimate the pitch near wave tables which are: on the 'dominant' channel, + # above a certain velocity and not too far into the decay + if ch_idx==dom_ch_idx and est_hz_args.min_wt_idx <= i and i <= est_hz_args.max_wt_idx and vel >= est_hz_args.min_vel: + est_hz = estimate_pitch_ac( aM[:,dom_ch_idx],wtbi,hzCandL,srate,ac_argD) + hzL.append( est_hz ) + #print(vel, i, est_hz) + + if i % wt_interval_fact == 0: + # measure the RMS of the wavetable + wt_rms = float(np.pow(np.mean(np.pow(aM[wtbi:wtei,ch_idx],2.0)),0.5)) + + # filter out quiet wavetable but guarantee that there are always at least two wt's. + if 20*math.log10(wt_rms) > min_wt_db or len(velD['chL'][ch_idx]) < 2: + + # store the location and RMS of the wavetable + velD['chL'][ch_idx].append({"wtbi":int(wtbi),"wtei":int(wtei),"rms":float(wt_rms), "est_hz":0}) + + i+=1 + + + pitchD['velL'].append(velD) + + # update est_hz in each of the wavetable records + est_hz = np.mean(hzL) + est_hz_delta = np.array(hzCandL) - est_hz + est_hz_idx = np.argmin(np.abs(est_hz_delta)) + est_hz_std = np.std(hzL) + + if est_hz_delta[est_hz_idx] > 0: + est_hz_std_cents = est_hz_std / ((hz_ctr-hz_min)/100.0) + else: + est_hz_std_cents = est_hz_std / ((hz_max-hz_ctr)/100.0) + + est_hz_err_cents = est_hz_idx - cents_per_semi + + print(f"{midi_pitch} est pitch:{est_hz}(hz) err:{est_hz_err_cents}(cents)" ) + + pitchD["est_hz_mean"] = float(est_hz) + pitchD["est_hz_err_cents"] = float(est_hz_err_cents) + pitchD["est_hz_std_cents"] = float(est_hz_std_cents) + + return pitchD + +def _gen_wave_table_bank( src_dir, midi_pitch, argD ): + + args = types.SimpleNamespace(**argD) + + audio_fname = os.path.join(src_dir,f"wav/{midi_pitch:03}_samples.wav") + mark_tsv_fname = os.path.join(src_dir,f"{midi_pitch:03}_marker.txt") + + if True: + gateL,ch_avgRmsL = calc_sample_atk_dur.generate_gate_db(audio_fname, + mark_tsv_fname, + args.rms_wnd_ms, + args.rms_hop_ms, + args.atk_min_dur_ms, + args.atk_end_thresh_db ) + + if False: + gateL,ch_avgRmsL = calc_sample_atk_dur.generate_gate_pct(audio_fname, + mark_tsv_fname, + args.rms_wnd_ms, + args.rms_hop_ms, + args.atk_min_dur_ms, + 0.1 ) + + dom_ch_idx = np.argmax(ch_avgRmsL) + + pitchD = gen_wave_table_list( audio_fname, + mark_tsv_fname, + gateL, + midi_pitch, + args.pitch_track_interval_secs, + args.wt_interval_down_sample_fact, + args.min_wt_db, + dom_ch_idx, + args.est_hz, + args.ac ) + + return pitchD + + + + + +def gen_wave_table_bank_mp( processN, src_dir, midi_pitchL, out_fname, argD ): + + def _multi_proc_func( procId, procArgsD, taskArgsD ): + + return _gen_wave_table_bank( procArgsD['src_dir'], + taskArgsD['midi_pitch'], + procArgsD['argD'] ) + + procArgsD = { + "src_dir":src_dir, + "argD": argD + } + + taskArgsL = [ { 'midi_pitch':midi_pitch } for midi_pitch in midi_pitchL ] + + processN = min(processN,len(taskArgsL)) + + if processN > 0: + pitchL = mp.local_distribute_main( processN,_multi_proc_func,procArgsD,taskArgsL ) + else: + pitchL = [ _gen_wave_table_bank( src_dir, r['midi_pitch'], argD ) for r in range(taskArgsL) ] + + + pitchL = sorted(pitchL,key=lambda x:x['midi_pitch']) + + with open(out_fname,"w") as f: + json.dump({"pitchL":pitchL, "instr":"piano", "argD":argD},f) + + + +def plot_rms( wtb_json_fname ): + + with open(wtb_json_fname) as f: + pitchL = json.load(f)['pitchL'] + + pitchL = sorted(pitchL,key=lambda x:x['midi_pitch']) + + rmsnL = [] + for pitchD in pitchL: + _,ax = plt.subplots(1,1) + for wtVelD in pitchD['wtL']: + for velChL in wtVelD['wtL']: + rmsL = [ 20*math.log10(wt['rms']) for wt in velChL ] + ax.plot(rmsL) + rmsnL.append(len(rmsL)) + + plt.title(f"{pitchD['midi_pitch']}") + plt.show() + +def plot_atk_dur( wtb_json_fname ): + + with open(wtb_json_fname) as f: + pitchL = json.load(f)['pitchL'] + + pitchL = sorted(pitchL,key=lambda x:x['midi_pitch']) + + rmsnL = [] + for pitchD in pitchL: + _,ax = plt.subplots(1,1) + + secL = [ (v['chL'][0][0]['wtbi']-v['bsi'])/pitchD['srate'] for v in pitchD['velL'] ] + velL = [ x['vel'] for x in pitchD['velL'] ] + ax.plot(velL,secL,marker=".") + + plt.title(f"{pitchD['midi_pitch']}") + plt.show() + + +def plot_hz( wtb_json_fname ): + + with open(wtb_json_fname) as f: + pitchL = json.load(f)['pitchL'] + + pitchL = sorted(pitchL,key=lambda x:x['midi_pitch']) + + _,ax = plt.subplots(3,1) + + midiL = [ pitchD['midi_pitch'] for pitchD in pitchL ] + hzL = [ pitchD["est_hz_mean"] for pitchD in pitchL ] + hzStdL = [ pitchD["est_hz_std_cents"] for pitchD in pitchL ] + hzErrL = [ pitchD["est_hz_err_cents"] for pitchD in pitchL ] + + ax[0].plot(midiL,hzL) + ax[1].plot(hzL,hzStdL) + ax[2].hlines([0,10,20],midiL[0],midiL[-1],color="red") + ax[2].plot(midiL,hzErrL) + + + plt.show() + + + +if __name__ == "__main__": + + midi_pitchL = [ pitch for pitch in range(21,109) ] + #midi_pitchL = [60 ] + out_fname = "/home/kevin/temp/temp_5.json" + src_dir= "/home/kevin/temp/wt6" + + argD = { + 'rms_wnd_ms':50, + 'rms_hop_ms':10, + 'atk_min_dur_ms':1000, + 'atk_end_thresh_db':-43.0, + 'min_wt_db':-80.0, + 'pitch_track_interval_secs':0.25, + 'wt_interval_down_sample_fact':8.0, # wt_interval_secs = pitch_track_interval_secs * wt_interval_down_sample_fact + 'est_hz': { + 'min_vel':50, + 'min_wt_idx':2, + 'max_wt_idx':4 + }, + 'ac': { + 'cycle_cnt':8, # count of cycles to use for auto-corr. pitch detection + 'interp_degree':"cubic", + 'up_fact':2, + 'up_interp_degree':"cubic" + } + } + + gen_wave_table_bank_mp(20, src_dir, midi_pitchL, out_fname, argD ) + + #plot_rms(out_fname) + #plot_hz(out_fname) + plot_atk_dur(out_fname) + diff --git a/py/gen_wavetables/gen_midi_csv.py b/py/gen_wavetables/gen_midi_csv.py new file mode 100644 index 0000000..befa738 --- /dev/null +++ b/py/gen_wavetables/gen_midi_csv.py @@ -0,0 +1,170 @@ +import csv,os + +def gen_sample_midi_events(pitch,velA,note_on_sec,note_off_sec,dampFl): + + markA = [] + msgA = [] + tpqn = 1260 + bpm = 60 + + ticks_per_sec = tpqn * bpm / 60.0 + ticks_per_note_on = ticks_per_sec * note_on_sec + ticks_per_note_off = ticks_per_sec * note_off_sec + uid = 0 + dticks = 0 + cur_sec = 0; + + r = { 'uid':len(msgA), + 'tpQN':tpqn, + 'bpm':bpm, + 'dticks':0, + 'ch':None, + 'status':None, + 'd0':None, + 'd1':None } + + msgA.append(r); + + for vel in velA: + + ch = 0 + note_status = 0x90 + ctl_status = 0xb0 + damp_ctl = 0x40 + + if dampFl: + r = { 'uid':len(msgA), + 'tpQN':None, + 'bpm':None, + 'dticks':dticks, + 'ch':ch, + 'status':ctl_status, + 'd0':damp_ctl, + 'd1':65 } + + msgA.append(r) + dticks = 0 + + + + r = { 'uid':len(msgA), + 'tpQN':None, + 'bpm':None, + 'dticks':dticks, + 'ch':ch, + 'status':note_status, + 'd0':pitch, + 'd1':vel } + + msgA.append(r) + + dticks = ticks_per_note_on + + r = { 'uid':len(msgA), + 'tpQN':None, + 'bpm':None, + 'dticks':dticks, + 'ch':ch, + 'status':note_status, + 'd0':pitch, + 'd1':0 } + + msgA.append(r) + + if dampFl: + r = { 'uid':len(msgA), + 'tpQN':None, + 'bpm':None, + 'dticks':0, + 'ch':ch, + 'status':ctl_status, + 'd0':damp_ctl, + 'd1':0 } + + msgA.append(r) + + dticks = ticks_per_note_off + + markA.append( (cur_sec, cur_sec+note_on_sec, vel) ) + + cur_sec += note_on_sec + note_off_sec + + return msgA,markA + +def write_file( fname, msgA ): + + fieldnames = list(msgA[0].keys()) + + with open(fname,"w") as f: + wtr = csv.DictWriter(f, fieldnames=fieldnames) + + wtr.writeheader() + + for m in msgA: + wtr.writerow(m) + +def write_marker_file(fname, markA ): + + with open(fname,"w") as f: + for beg_sec,end_sec,vel in markA: + f.write(f"{beg_sec}\t{end_sec}\t{vel}\n") + +def gen_midi_csv_and_marker_files( pitch, velA, note_on_sec, note_off_sec, damp_fl, out_dir ): + + if not os.path.isdir(out_dir): + os.mkdir(out_dir) + + msgA,markA = gen_sample_midi_events(pitch,velA,note_on_sec,note_off_sec,damp_fl) + + damp_label = "damp_" if damp_fl else "" + + midi_csv_fname = os.path.join(out_dir,f"{pitch:03}_{damp_label}sample.csv") + mark_fname = os.path.join(out_dir,f"{pitch:03}_{damp_label}marker.txt") + + write_file(midi_csv_fname,msgA) + write_marker_file(mark_fname,markA) + + return midi_csv_fname, mark_fname + +def gen_complete_midi_csv( pitchA, velA, note_on_sec, note_off_sec, out_fname ): + damp_fl = False + msgL = [] + for i,pitch in enumerate(pitchA): + msgA,_ = gen_sample_midi_events(pitch,velA,note_on_sec,note_off_sec,damp_fl) + if i > 0: + msgA = msgA[1:] + msgL += msgA + + write_file(out_fname,msgL) + +if __name__ == "__main__": + + # min_pitch = 21 + # max_pitch = 108 + + out_dir = "/home/kevin/temp" + dampFl = False + velA = [ 1,5,10,16,21,26,32,37,42,48,53,58,64,69,74,80,85,90,96,101,106,112,117,122,127] + note_off_sec = 2.0 + + if False: + pitchL = [ 21, 60 ] + noteDurL = [ 20.0, 20.0 ] + + if True: + pitchL = [ i for i in range(21,109) ] + noteDurL = [ 20.0 for _ in range(len(pitchL)) ] + + if False: + dampFlL = [ False, True ] if dampFl else [ False ] + + for pitch,note_on_sec in zip(pitchL,noteDurL): + for damp_fl in dampFlL: + csv_fname, mark_fname = gen_midi_csv_and_marker_files( pitch, velA, note_on_sec, note_off_sec, damp_fl, out_dir ) + + if True: + note_on_sec = 5 + note_off_sec = 1 + out_fname = "/home/kevin/temp/all_midi.csv" + gen_complete_midi_csv(pitchL, velA, note_on_sec, note_off_sec, out_fname) + diff --git a/py/gen_wavetables/multiproc.py b/py/gen_wavetables/multiproc.py new file mode 100755 index 0000000..d434ac2 --- /dev/null +++ b/py/gen_wavetables/multiproc.py @@ -0,0 +1,182 @@ +import os,subprocess,logging,multiprocessing,queue,yaml,types,time + + +class processor(multiprocessing.Process): + def __init__(self,procId,iQ,oQ,procArgs,procFunc): + super(processor,self).__init__() + self.procId = procId # the id of this process + self.iQ = iQ # process input queue (shared by all processes) + self.oQ = oQ # process output queue (shared by all processes) + self.cycleIdx = 0 # count of times the user function has been called + self.procArgs = procArgs # arguments to the user defined function (procFunc) that are used for the life of the process + self.procFunc = procFunc # user defined function this process will execute + + def run(self): + super(processor,self).run() + + self.cycleIdx = 0 + + # loop until the user process returns false + while True: + + # if no msg is available + if self.iQ.empty(): + time.sleep(0.1) + + # attempt to get the message + try: + msg = self.iQ.get(block=False) + except queue.Empty: + continue # the dequeue attempt failed + + # if the message is 'None' then end the process + if msg == None: + break + + # run the user function + r = self._func(msg) + + # send the result of the function back to the main process + self.oQ.put(r) + + self.cycleIdx += 1 + + def _func(self,taskArgs): + resultD = self.procFunc( self.procId, self.procArgs, taskArgs ) + + return resultD + + + +def _local_distribute_dispatcher( inQ, outQ, processN, taskArgsL, procArgs, processArgsFunc, processResultFunc, verboseLevel ): + + bestScore = None + iterN = 0 # total count of jobs completed and pending + pendingN = 0 # total count of jobs pending + nextSrcIdx = 0 + resultL = [] + t0 = time.time() + + while len(resultL) < len(taskArgsL): + + # if available processes exist and all source files have not been sent for processing already + if pendingN < processN and nextSrcIdx < len(taskArgsL): + + # if a args processing function was given + args = taskArgsL[nextSrcIdx] + if processArgsFunc is not None: + args = processArgsFunc(procArgs,args) + + inQ.put( args ) + + nextSrcIdx += 1 + pendingN += 1 + t0 = time.time() + + if verboseLevel>=3: + print(f"Send: remaining:{len(taskArgsL)-nextSrcIdx} pend:{pendingN} result:{len(resultL)}") + + + # if a process completed + elif not outQ.empty(): + + # attempt to get the message + try: + resultD = outQ.get(block=False) + except queue.Empty: + if verboseLevel > 0: + print("********* A message dequeue attempt failed.") + continue # the dequeue attempt failed + + # if a result processing function was given + if processResultFunc is not None: + resultD = processResultFunc( procArgs, resultD ) + + resultL.append(resultD) + + pendingN -= 1 + t0 = time.time() + + if verboseLevel>=3: + print(f"Recv: remaining:{len(taskArgsL)-nextSrcIdx} pend:{pendingN} result:{len(resultL)}") + + + # nothing to do - sleep + else: + time.sleep(0.1) + + + t1 = time.time() + if t1 - t0 > 60: + if verboseLevel >= 2: + print(f"Wait: remaining:{len(taskArgsL)-nextSrcIdx} pend:{pendingN} result:{len(resultL)}") + t0 = t1 + + return resultL + + +def local_distribute_main(processN, procFunc, procArgs, taskArgsL, processArgsFunc=None, processResultFunc=None, verboseLevel=3): + """ Distribute the function 'procFunc' to 'procN' local processes. + This function will call procFunc(procArgs,taskArgsL[i]) len(taskArgsL) times + and return the result of each call in the list resultL[]. + The function will be run in processN parallel processes. + Input: + + :processN: Count of processes to run in parallel. + + :procFunc: A python function of the form: myProc(procId,procArgs,taskArgsL[i]). + This function is run in a remote process. + + :procArgs: A data structure holding read-only arguments which are fixed accross all processes. + This data structure is duplicated on all remote processes. + + :taskArgsL: A list of data structures holding the per-call arguments to 'procFunc()'. + Note that taskArgsL[i] may never be 'None' because None is used by the + processes control system to indicate that the process should be shutdown. + + :processArgsFunc: A function of the form args = processArgsFunc(procArgs,args) + which can be used to modify the arg. record from taskArgssL[] prior to the call + to 'procFunc()'. This function runs locally in the calling functions process. + + :processResultFunc: A function of the form result = processResulftFunc(procArgs,result). + which is called on the result of procFunc() prior to the result being store in the + return result list. This function runs locally in the calling functions process. + """ + + processN = processN + mgr = multiprocessing.Manager() + inQ = mgr.Queue() + outQ = mgr.Queue() + processL = [] + + # create and start the processes + for i in range(processN): + pr = processor(i,inQ,outQ,procArgs,procFunc) + processL.append( pr ) + pr.start() + + # service the processes + resultL = _local_distribute_dispatcher(inQ, outQ, processN, taskArgsL, procArgs, processArgsFunc, processResultFunc, verboseLevel) + + # tell the processes to stop + for pr in processL: + inQ.put(None) + + # join the processes + for pr in processL: + while True: + pr.join(1.0) + if pr.is_alive(): + time.sleep(1) + else: + break + + return resultL + + + + + + + + diff --git a/py/gen_wavetables/old/debug_plot.py b/py/gen_wavetables/old/debug_plot.py new file mode 100644 index 0000000..ca3b758 --- /dev/null +++ b/py/gen_wavetables/old/debug_plot.py @@ -0,0 +1,42 @@ +import csv + +import matplotlib.pyplot as plt +import numpy as np + + +with open("/home/kevin/temp/temp_60_ch_0_c.csv") as f: + rdr = csv.reader(f) + + smp_per_cycle = 183.468309 + + idL = [] + yiL = [] + xiL = [] + fracL = [] + xV = [] + eV = [] + yV = [] + + for r in rdr: + idL.append(int(r[0])) + iL.append(int(r[1])) + yiL.append(int(r[2])) + xiL.append(int(r[3])) + fracL.append(float(r[4])) + xV.append(float(r[5])) + eV.append(float(r[6])) + yV.append(float(r[7])) + + + xf = [ x+f for x,f in zip(xiL,fracL) ] + + fig, ax = plt.subplots(4,1) + + ax[0].plot(xV) + ax[1].plot(yV) + ax[2].plot(eV) + ax[3].plot(xf) + + plt.show() + + diff --git a/py/gen_wavetables/old/gen_wave_tables.py b/py/gen_wavetables/old/gen_wave_tables.py new file mode 100644 index 0000000..8260bbd --- /dev/null +++ b/py/gen_wavetables/old/gen_wave_tables.py @@ -0,0 +1,94 @@ +import os +import gen_midi_csv as gmc +import sample_looper as loop +import subprocess + + +def gen_ivory_player_caw_pgm( out_dir, caw_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label ): + + caw_template = f""" + {{ + base_dir: "{out_dir}" + io_dict: "{out_dir}/io.cfg" + proc_dict: "~/src/caw/src/libcw/flow/proc_dict.cfg", + subnet_dict: "~/src/caw/src/libcw/flow/subnet_dict.cfg", + + programs: {{ + + sample_generator: {{ + non_real_time_fl:false, + network: {{ + procs: {{ + mf: {{ class: midi_file, args:{{ csv_fname:"{midi_csv_fname}" }}}}, + mout: {{ class: midi_out in:{{ in:mf.out }}, args:{{ dev_label:"{midi_dev_label}", port_label:"{midi_port_label}" }}}}, + stop: {{ class: halt, in:{{ in:mf.done_fl }}}} + + ain: {{ class: audio_in, args:{{ dev_label:"main" }}}}, + split: {{ class: audio_split, in:{{ in:ain.out }} args:{{ select: [0,0, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1] }}}}, + af: {{ class: audio_file_out, in:{{ in:split.out0 }}, args:{{ fname:"{audio_fname}"}}}}, + aout: {{ class: audio_out, in:{{ in:ain.out }}, args:{{ dev_label:"main"}}}}, + }} + }} + }} + }} + }} + """ + with open(caw_fname, "w") as f: + f.write(caw_template) + + +def gen_audio_file( out_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, audio_fname ): + + + caw_cfg_fname = os.path.join(out_dir,f"{midi_pitch:03}_caw.cfg") + + gen_caw_ivory_player_pgm(out_dir, caw_cfg_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label ) + + argL = [ caw_exec_fname, "exec", caw_cfg_fname, "sample_generator" ] + + print(" ".join(argL)) + + p = subprocess.Popen(argL,stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + out,err =p.communicate() + + if p.returncode != 0: + print("The call '%s' to failed with code:%i Message:%s %s"," ".join(argL),p.returncode,out,err) + + +if __name__ == "__main__": + + # piano pitch range: 21-108 + + midi_pitch = 60 + out_dir = "/home/kevin/temp/wt1" + note_on_sec = 1.5 + note_off_sec = 0.5 + velA = [1,8,15,22,29,36,42,49,56,63,70,77,84,91,98,105,112,119,126] + velA = [ 1,5,10,16,21,26,32,37,42,48,53,58,64,69,74,80,85,90,96,101,106,112,117,122,127] + midi_dev_label = "MIDIFACE 2x2" + midi_port_label = "MIDIFACE 2x2 Midi Out 1" + caw_exec_fname = "/home/kevin/src/caw/build/linux/debug/bin/caw" + + + # Generate the MIDI CSV used to trigger the sampler and a TSV which indicates the location of the + # triggered notes in seconds. + midi_csv_fname, mark_tsv_fname = gmc.gen_midi_csv_and_marker_files( midi_pitch, velA, note_on_sec, note_off_sec, out_dir ) + + audio_fname = os.path.join(out_dir,"wav",f"{midi_pitch:03}_samples.wav") + + #gen_audio_file( out_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, midi_pitch ) + + loop_marker_out_fname = os.path.join(out_dir,f"{midi_pitch}_loop_mark.txt") + wt_out_fname = os.path.join(out_dir,"bank",f"{midi_pitch}_wt.json") + + argsD = { + 'end_offset_ms':100, + 'loop_dur_ms':20, # 21=40, 60=20 + 'midi_pitch':midi_pitch, + 'guess_cnt':5 + } + + + + loop.gen_loop_positions( audio_fname, mark_tsv_fname, midi_pitch, argsD, loop_marker_out_fname, wt_out_fname ) diff --git a/py/gen_wavetables/old/low_hz_loops.py b/py/gen_wavetables/old/low_hz_loops.py new file mode 100644 index 0000000..83ca52e --- /dev/null +++ b/py/gen_wavetables/old/low_hz_loops.py @@ -0,0 +1,238 @@ +import wt_util +import math +import numpy as np +import matplotlib.pyplot as plt +import audiofile as af + +def sign(x): + return x<0 + +def find_zero_crossing( xV, si, inc ): +# find the next zero crossing before/after si + + while si > 0: + + if sign(xV[si-1])==False and sign(xV[si])==True: + break; + si += inc + + return si + +def plot_xfades( zV, zL ): + + fig,ax = plt.subplots(1,1) + + ax.plot(zV) + for bi,ei in zL: + ax.vlines([bi,ei],-0.5,0.5) + plt.show() + +def decay_compensation( xV, smp_per_cyc ): + + def _calc_rms( sV, rmsN ): + bi = 0 + ei = rmsN + rmsL = [] + while ei < len(sV): + rms = np.pow(np.mean( np.pow( sV[bi:ei], 2 )),0.5) + rmsL.append(rms) + bi += 1 + ei += 1 + + return rmsL + + # calc the RMS env + rmsN = int(round(smp_per_cyc)) + rmsL = _calc_rms( xV, rmsN) + + # fit a line to the RMS env + x = np.arange(len(rmsL)) + A = np.array( [x, np.ones((len(rmsL),))] ).T + m, c = np.linalg.lstsq(A, rmsL)[0] + + # use the slope of the line to boost + # to progressively boost the gain + # of the signal over time + yV = np.copy(xV) + yi = rmsN + for i,r in enumerate(rmsL): + if yi < len(yV): + c = r + i*(-m) + g = c/r + yV[yi] *= g + yi += 1 + + + + if False: + # calc. the RMS env of the compensated signal + c_rmsL = _calc_rms(yV,rmsN) + + db0 = 20.0*math.log10(rmsL[0]) + db1 = 20.0*math.log10(rmsL[-1]) + print(db0-db1, m*len(rmsL), len(rmsL)) + + #c_rmsL = [ r + i*(-m) for i,r in enumerate(rmsL) ] + + fig,ax = plt.subplots(1,1) + ax.plot(rmsL) + ax.plot(x,m*x+c,'r') + ax.plot(c_rmsL) + plt.show() + + return yV + +def apply_gain( xxV,bi,ei,g, g_coeff ): + + i = bi + while i < ei: + + n = min(ei-i,64) + + for j in range(n): + xxV[i+j] *= g + + g *= g_coeff + + i += n + + return g + +def gen_note( xV, xbi, xei, loop_dur_smp, y_dur_smp, xfade_smp, smp_per_cyc, g_coeff, bli=None ): + + hannV = np.hanning( xfade_smp*2 + 1 ) + fin_env = hannV[0:xfade_smp+1] + fout_env = hannV[xfade_smp:] + env_smp_cnt = len(fin_env) + assert( len(fout_env) == env_smp_cnt) + #print(fin_env[-1],fout_env[0],len(fin_env),len(fout_env)) + + if bli is None: + bli = find_zero_crossing( xV, xei - loop_dur_smp, -1 ) + + aN = (bli - xbi) + env_smp_cnt + aV = np.copy(xV[xbi:xbi+aN]) + + aV[-env_smp_cnt:] *= fout_env + + lV = np.copy(xV[bli:bli+loop_dur_smp]) + + if False: + # compensate for loop period decay + lV = decay_compensation(lV,smp_per_cyc) + + zV = np.zeros((y_dur_smp,)) + + zV[0:aN] = aV + zbi = aN-env_smp_cnt + zei = zbi + len(lV) + + zL = [] + g = 1.0 + while(zei < len(zV)): + + zL.append((zbi,zei)) + + elV = np.copy(lV) + elV[0:env_smp_cnt] *= fin_env + elV[-env_smp_cnt:] *= fout_env + + zV[zbi:zei] += elV + + g = apply_gain(zV,zbi,zei-env_smp_cnt,g,g_coeff) + + zbi = zei - env_smp_cnt + zei = zbi + len(elV) + + lV = np.flip(lV) + + + return zV,zL,bli,xV[bli:bli+loop_dur_smp] + + +def main( i_audio_fname, mark_tsv_fname, midi_pitch, loop_secs, note_dur_secs, xfade_ms, inter_note_sec, g_coeff, o_audio_fname, o_mark_tsv_fname ): + + i_markL = wt_util.parse_marker_file(mark_tsv_fname) + xM,srate = wt_util.parse_audio_file(i_audio_fname) + chN = xM.shape[1] + + fund_hz = wt_util.midi_pitch_to_hz(midi_pitch) + smp_per_cyc = srate / fund_hz + loop_dur_fsmp = loop_secs * srate + cyc_per_loop = int(loop_dur_fsmp / smp_per_cyc) + loop_dur_fsmp = cyc_per_loop * smp_per_cyc + loop_dur_smp = int(math.floor(loop_dur_fsmp)) + xfade_smp = int(round(srate * xfade_ms / 1000.0)) + inter_note_smp = int(round(srate*inter_note_sec)) + note_dur_smp = int(round(srate*note_dur_secs)) + note_cnt = len(i_markL) + + print(f"{smp_per_cyc:.3f} smps/cyc {smp_per_cyc/srate:.3f} secs/cyc") + print(f"loop {cyc_per_loop} cycles dur: {loop_dur_fsmp/srate:.3f} secs") + print(f"xfade: {xfade_ms} ms {xfade_smp} smp") + + yN = note_cnt * (note_dur_smp + inter_note_smp) + yM = np.zeros((yN,chN)) + yi = 0; + o_markL = [] + + for beg_sec,end_sec,vel_label in i_markL: + + vel = int(vel_label) + bsi = int(round(beg_sec * srate)) + esi = int(round(end_sec * srate)) + + bli = None + for ch_i in range(chN): + + zV,zL,bli,lV = gen_note( xM[:,ch_i], bsi, esi, loop_dur_smp, note_dur_smp, xfade_smp, smp_per_cyc, g_coeff, bli ) + + if vel > 70: + #plot_xfades(zV,zL) + pass + + yM[yi:yi+len(zV),ch_i] = zV + + if ch_i == 0: + o_markL.append( (yi/srate, (yi+len(zV))/srate, vel )) + + yi += len(zV) + inter_note_smp + + + + if False: + fig,ax = plt.subplots(2,1) + ax[0].plot(yM[:,0]) + ax[1].plot(yM[:,1]) + plt.show(); + + wt_util.write_audio_file( yM, srate, o_audio_fname ) + wt_util.write_mark_tsv_file( o_markL, o_mark_tsv_fname ) + +def test_scipy(audio_fname): + samplerate = 44100.0 + fs = 100 + t = np.linspace(0., 1., int(samplerate)) + data = np.sin(2. * np.pi * fs * t) + data = np.array([data,data]) + wt_util.write_audio_file(data.T, samplerate, audio_fname) + +if __name__ == "__main__": + + midi_pitch = 21 + loop_secs = 0.4 + note_dur_secs = 7.0 + xfade_ms = 2 + inter_note_sec = 0.1 + g_coeff = 0.9995 + + i_audio_fname = "/home/kevin/temp/wt3/wav/21_samples.wav" + mark_tsv_fname = "/home/kevin/temp/wt3/21_marker.txt" + + o_audio_fname = "/home/kevin/temp/temp.wav" + o_mark_tsv_fname = "/home/kevin/temp/temp_mark.txt" + + main( i_audio_fname, mark_tsv_fname, midi_pitch, loop_secs, note_dur_secs, xfade_ms, inter_note_sec, g_coeff, o_audio_fname, o_mark_tsv_fname ) + + if False: + test_scipy(o_audio_fname) diff --git a/py/gen_wavetables/old/sample_looper.py b/py/gen_wavetables/old/sample_looper.py new file mode 100644 index 0000000..96d0e18 --- /dev/null +++ b/py/gen_wavetables/old/sample_looper.py @@ -0,0 +1,392 @@ +import math +import json +import array +import types +import matplotlib.pyplot as plt +import numpy as np + +import wt_util + +def plot_overlap( xV, bi, ei, wndN, smp_per_cycle, title ): + + fig, ax = plt.subplots(1,1) + + x0 = [ i for i in range(bi-wndN,bi+wndN) ] + x1 = [ x-x0[0] for x in x0 ] + ax.plot(x1,xV[x0]) + + x0 = [ i for i in range(ei-wndN,ei+wndN) ] + x1 = [ x-x0[0] for x in x0 ] + ax.plot(x1,xV[x0]) + + plt.title(title) + + plt.show() + +def sign(x): + return x<0 + +def find_zero_crossing( xV, si, inc ): +# find the next zero crossing before/after si + + while si > 0: + + if sign(xV[si-1])==False and sign(xV[si])==True: + break; + si += inc + + return si + +def meas_fit(xV,ei,bi,wndN): + + if bi-wndN < 0 or ei+wndN > len(xV): + return None + + v0 = xV[ei-wndN:ei+wndN]/0x7fffffff + v1 = xV[bi-wndN:bi+wndN]/0x7fffffff + + dv = (v1-v0) * (v1-v0) + return np.mean(dv) + +def find_loop_points_1( xV, bsi, esi, smp_per_cycle, wndN, est_N ): + + # find the first zero crossing after the end of the search range + ei = find_zero_crossing(xV,esi,-1) + + min_d = None + min_bi = None + bi = bsi + + # make est_N guesses + for i in range(0,est_N): + + # find the next zero crossing after bi + bi = find_zero_crossing(xV,bi,1) + + # measure the quality of the fit with the end of the loop + d = meas_fit(xV,ei,bi,wndN) + + #print(i,bi,d) + + # store the best loop begin point + if min_bi is None or d < min_d: + min_d = d + min_bi = bi + + # advance + bi += int(wndN) #smp_per_cycle + + + return min_bi, ei, min_d + +def find_loop_points_2(xV,bsi,esi, smp_per_cycle, wndN, est_N ): + + def _track_min( min_i, min_d, i, d ): + if min_i is None or d rms_max: + i_max = i + rms_max = rms + + rmsV.append(float(rms)) + + return [ i_max, 0 if i_max==1 else 1 ] + +def process_all_samples( markL, smpM, srate, args ): + + wtL = [] + + #fund_hz = 13.75 * math.pow(2,(-9.0/12.0)) * math.pow(2.0,(args.midi_pitch / 12.0)) + fund_hz = midi_pitch_to_hz(args.midi_pitch) + + smp_per_cycle = int(srate / fund_hz) + end_offs_smp_idx = max(smp_per_cycle,int(args.end_offset_ms * srate / 1000)) + loop_dur_smp = int(args.loop_dur_ms * srate / 1000) + wndN = int(smp_per_cycle/6) + + print(f"Hz:{fund_hz} smp/cycle:{smp_per_cycle} loop_dur:{loop_dur_smp} cycles/loop:{loop_dur_smp/smp_per_cycle} wndN:{wndN}") + + # for each sampled note + for beg_sec,end_sec,vel_label in markL: + + beg_smp_idx = int(beg_sec * srate) + end_smp_idx = int(end_sec * srate) + + r = { + "instr":"piano", + "pitch":args.midi_pitch, + "vel": int(vel_label), + "beg_smp_idx":beg_smp_idx, + "end_smp_idx":None, + "chL": [] + } + + # determine the loop search range from the end of the note sample + eli = end_smp_idx - end_offs_smp_idx + bli = eli - loop_dur_smp + + ch_map = determine_track_order( smpM, beg_smp_idx, end_smp_idx) + + #print(ch_map) + + esi = beg_smp_idx; + for i in range(0,smpM.shape[1]): + + ch_idx = ch_map[i] + + xV = smpM[:,ch_idx] + + if True: + #if i == 0: + # s_per_c = srate / fund_hz + # bi,ei,cost = find_loop_points(xV,bli,eli,s_per_c,wndN,args.guess_cnt) + + s_per_c = srate / fund_hz + bi,ei,cost = find_loop_points_4(xV,bli,eli,s_per_c,wndN,args.guess_cnt) + + if False: + bi,ei,cost = find_loop_points_2(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt) + + if False: + if i == 0: + bi,ei,cost = find_loop_points(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt) + else: + bi,ei,cost = find_best_zero_crossing(xV,bi,ei,wndN) + + if False: + if i == 0: + bi,ei,cost = find_loop_points(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt) + else: + pass + + + #print(i,bi,ei) + eli = ei # attempt to make the eli the second channel close to the first + + loop_dur_sec = (ei-bi)/srate + cyc_per_loop = int(round((ei-bi)/smp_per_cycle)) + plot_title = f"vel:{vel_label} cyc/loop:{cyc_per_loop} dur:{loop_dur_sec*1000:.1f} ms {ei-bi} smp ch:{ch_idx} cost:{0 if cost<= 0 else math.log(cost):.2f}" + plot_overlap(xV,bi,ei,wndN,smp_per_cycle,plot_title) + + r["chL"].append({ + "ch_idx":ch_idx, + "segL":[] }) + + + r["chL"][-1]["segL"].append({ + "cost":0, + "cyc_per_loop":1, + "bsi":beg_smp_idx, + "esi":bi }) + + r["chL"][-1]["segL"].append({ + "cost":cost, + "cyc_per_loop":cyc_per_loop, + "bsi":bi, + "esi":ei }) + + esi = max(esi,ei) + + r['end_smp_idx'] = esi + + r["chL"] = sorted(r["chL"],key=lambda x: x["ch_idx"]) + wtL.append(r) + + return wtL + + + +def write_loop_label_file( fname, wtL, srate ): + + with open(fname,"w") as f: + for r in wtL: + for cr in r['chL']: + for sr in cr['segL']: + beg_sec = sr['bsi'] / srate + end_sec = sr['esi'] / srate + if sr['cost']!=0: + cost = math.log(sr['cost']) + label = f"ch:{cr['ch_idx']} {r['vel']} {cost:.2f}" + f.write(f"{beg_sec}\t{end_sec}\t{label}\n") + + +def write_wt_file( fname, audio_fname, wtL, srate ): + + r = { + "audio_fname":audio_fname, + #"srate":srate, + "wt":wtL + } + + with open(fname,"w") as f: + json.dump(r,f); + + +def gen_loop_positions( audio_fname, marker_tsv_fname, pitch, argsD, loop_marker_fname, wt_fname ): + + args = types.SimpleNamespace(**argsD) + markL = wt_util.parse_marker_file(marker_tsv_fname) + smpM,srate = wt_util.parse_audio_file(audio_fname) + chN = smpM.shape[1] + wtL = process_all_samples(markL,smpM,srate,args) + + write_loop_label_file(loop_marker_fname, wtL, srate) + + write_wt_file(wt_fname,audio_fname, wtL,srate) + + + +if __name__ == "__main__": + + audio_fname = "/home/kevin/temp/wt/wav/60_samples.wav" + marker_tsv_fname = "/home/kevin/temp/wt/60_marker.txt" + + loop_marker_fname = "/home/kevin/temp/wt/60_loop_mark.txt" + wt_fname = "/home/kevin/temp/wt/bank/60_wt.json" + midi_pitch = 60 + + argsD = { + 'end_offset_ms':100, + 'loop_dur_ms':100, + 'midi_pitch':midi_pitch, + 'guess_cnt':40 + + } + + gen_loop_positions( audio_fname, marker_tsv_fname, midi_pitch, argsD, loop_marker_fname, wt_fname ) diff --git a/py/gen_wavetables/old/wt_study.py b/py/gen_wavetables/old/wt_study.py new file mode 100644 index 0000000..7f0071f --- /dev/null +++ b/py/gen_wavetables/old/wt_study.py @@ -0,0 +1,323 @@ +import math +import json +import array +import types +import matplotlib.pyplot as plt +import numpy as np +import wt_util + + +def sign(x): + return x<0 + +def find_zero_crossing( xV, si, inc ): +# find the next zero crossing before/after si + + while si > 0: + + if sign(xV[si-1])==False and sign(xV[si])==True: + break; + si += inc + + return si + +def table_read_2( tab, frac ): + + i0 = math.floor(frac + 1) + i1 = i0 + 1 + f = frac - int(frac) + + return tab[i0] + (tab[i1] - tab[i0]) * f + +def hann_read( x, N ): + + while x > N: + x -= N + + x = x - (N/2) + + return (0.5 + 0.5 * math.cos(2*math.pi * x / N)) + + +def sine_0(): + + srate = 48000.0 + hz = wt_util.midi_pitch_to_hz(6) + + fsmp_per_cyc = srate / hz # fractional samples per cycle + fsmp_per_wt = 2* fsmp_per_cyc # fractional samples per wavetable + smp_per_wt = int(math.floor(fsmp_per_wt)) # integer samples per wavetable + + # Note that when wrapping from the last sample to the first there is less + # than one sample period and so the wavetable phase after the wrap will be fractional + + # fill two wave tables with two identical cycles of a sine signal + wt0 = [0] + [ math.sin(2*math.pi*hz*i/srate) for i in range(smp_per_wt) ] + [0] + wt1 = [0] + [ math.sin(2*math.pi*hz*i/srate) for i in range(smp_per_wt) ] + [0] + + xN = smp_per_wt * 4 + + phs0 = 0 + phs1 = fsmp_per_wt/2 + + + y0 = [] + y1 = [] + y2 = [] + h0 = [] + h1 = [] + for i in range(xN): + + # read the wave tables + s0 = table_read_2( wt0, phs0 ) + s1 = table_read_2( wt1, phs1 ) + y0.append( s0 ) + y1.append( s1 ) + + # calc the envelopes + e0 = hann_read(phs0,fsmp_per_wt) + e1 = hann_read(phs1,fsmp_per_wt) + h0.append( e0 ) + h1.append( e1 ) + + # sum the two signals + y2.append( e0*s0 + e1*s1 ) + + # advance the phases of the oscillators + phs0 += 1 + if phs0 >= smp_per_wt: + phs0 -= smp_per_wt + + phs1 += 1 + if phs1 >= smp_per_wt: + phs1 -= smp_per_wt + + + + + fix,ax = plt.subplots(4,1) + ax[0].plot(y0) + ax[1].plot(y1) + ax[2].plot(h0) + ax[2].plot(h1) + ax[3].plot(y2) + plt.show() + + + +def piano_0(): + + i_audio_fname = "/home/kevin/temp/wt1/wav/060_samples.wav" + o_audio_fname = "/home/kevin/temp/temp.wav" + marker_tsv_fname = "/home/kevin/temp/wt1/60_marker.txt" + midi_pitch = 60 + offs_ms = 50 + note_dur_secs = 3 + inter_note_secs = 0.5 + + markL = wt_util.parse_marker_file(marker_tsv_fname) + aM,srate = wt_util.parse_audio_file(i_audio_fname) + + hz = wt_util.midi_pitch_to_hz(midi_pitch) + fsmp_per_cyc = srate/hz + fsmp_per_wt = fsmp_per_cyc * 2 + smp_per_wt = int(math.floor(fsmp_per_wt)) + offs_smp = int(math.floor(offs_ms * srate / 1000)) + ch_cnt = aM.shape[1] + note_dur_smp = int(round(note_dur_secs*srate)) + inter_note_smp = int(round(inter_note_secs*srate)) + + yN = len(markL) * (note_dur_smp + inter_note_smp) + yM = np.zeros((yN,ch_cnt)) + yi = 0 + + for beg_sec,end_sec,vel_label in markL: + bsi = int(round(beg_sec * srate)) + esi = int(round(end_sec * srate)) + + for ch_idx in range(ch_cnt): + + wtbi = find_zero_crossing(aM[:,ch_idx],esi-offs_smp,-1) + wtei = wtbi + smp_per_wt + wt = [0] + aM[wtbi:wtei,ch_idx].tolist() + [0] + wt[0] = aM[wtei-1,ch_idx] + wt[-1] = wt[1] + + atkN = wtbi - bsi + yM[yi:yi+atkN,ch_idx] = aM[bsi:wtbi,ch_idx] + + phs0 = 0 + phs1 =fsmp_per_wt/2 + + for i in range(note_dur_smp-atkN): + + s0 = table_read_2( wt, phs0 ) + s1 = table_read_2( wt, phs1 ) + + e0 = hann_read(phs0,fsmp_per_wt) + e1 = hann_read(phs1,fsmp_per_wt) + + # advance the phases of the oscillators + phs0 += 1 + if phs0 >= smp_per_wt: + phs0 -= smp_per_wt + + phs1 += 1 + if phs1 >= smp_per_wt: + phs1 -= smp_per_wt + + + yM[yi+atkN+i,ch_idx] = e0*s0 + e1*s1 + + yi += note_dur_smp + inter_note_smp + + + + wt_util.write_audio_file( yM, srate, o_audio_fname ) + +def select_wave_table( aV, si, smp_per_wt ): + + wtbi = find_zero_crossing(aV,si,-1) + wtei = wtbi + smp_per_wt + wt = [0] + aV[wtbi:wtei].tolist() + [0] + wt[0] = aV[wtei-1] + wt[-1] = wt[1] + + return wt,wtbi + + +def piano_1(): + + o_audio_fname = "/home/kevin/temp/temp.wav" + o_mark_tsv_fname = "/home/kevin/temp/temp_mark.txt" + + if False: + i_audio_fname = "/home/kevin/temp/wt1/wav/060_samples.wav" + marker_tsv_fname = "/home/kevin/temp/wt1/60_marker.txt" + midi_pitch = 60 + offs_0_ms = 100 + offs_1_ms = 50 + g_coeff = 0.9985 + + + if True: + i_audio_fname = "/home/kevin/temp/wt3/wav/21_samples.wav" + marker_tsv_fname = "/home/kevin/temp/wt3/21_marker.txt" + midi_pitch = 21 + offs_0_ms = 100 + offs_1_ms = 80 + g_coeff = 0.9992 + + note_dur_secs = 6 + inter_note_secs = 0.5 + + markL = wt_util.parse_marker_file(marker_tsv_fname) + aM,srate = wt_util.parse_audio_file(i_audio_fname) + + hz = wt_util.midi_pitch_to_hz(midi_pitch) + fsmp_per_cyc = srate/hz + fsmp_per_wt = fsmp_per_cyc * 2 + smp_per_wt = int(math.floor(fsmp_per_wt)) + offs_0_smp = int(math.floor(offs_0_ms * srate / 1000)) + offs_1_smp = int(math.floor(offs_1_ms * srate / 1000)) + ch_cnt = aM.shape[1] + note_dur_smp = int(round(note_dur_secs*srate)) + inter_note_smp = int(round(inter_note_secs*srate)) + + yN = len(markL) * (note_dur_smp + inter_note_smp) + yM = np.zeros((yN,ch_cnt)) + yi = 0 + + oMarkL = [] + + for beg_sec,end_sec,vel_label in markL: + bsi = int(round(beg_sec * srate)) + esi = int(round(end_sec * srate)) + + for ch_idx in range(ch_cnt): + + wt0,wtbi = select_wave_table( aM[:,ch_idx], esi-offs_0_smp, smp_per_wt ) + wt1,_ = select_wave_table( aM[:,ch_idx], esi-offs_1_smp, smp_per_wt) + + + rms0 = np.pow( np.mean( np.pow(wt0,2) ),0.5) + rms1 = np.pow( np.mean( np.pow(wt1,2) ),0.5) + + wt1 = [ w*rms0/rms1 for w in wt1 ] + + # The attack abutts the wavetable at it's center point + # so we need to offset the end of the attack half way + # through the first wave table. + abi = int(wtbi + smp_per_wt/2) + + atkN = abi - bsi + yM[yi:yi+atkN,ch_idx] = aM[bsi:abi,ch_idx] + + oMarkL.append(((yi+atkN)/srate, (yi+atkN)/srate, f"{vel_label}-{ch_idx}")) + + + phs0 = 0 + phs1 = fsmp_per_wt/2 + phs2 = fsmp_per_wt/4 + phs3 = fsmp_per_wt/4 + fsmp_per_wt/2 + g = 1.0 + g_phs = 0 + for i in range(note_dur_smp-atkN): + + s0 = table_read_2( wt0, phs0 ) + s1 = table_read_2( wt0, phs1 ) + + s2 = table_read_2( wt1, phs2 ) + s3 = table_read_2( wt1, phs3 ) + + e0 = hann_read(phs0,fsmp_per_wt) + e1 = hann_read(phs1,fsmp_per_wt) + + e2 = hann_read(phs0,fsmp_per_wt) + e3 = hann_read(phs1,fsmp_per_wt) + + # advance the phases of the oscillators + phs0 += 1 + if phs0 >= smp_per_wt: + phs0 -= smp_per_wt + + phs1 += 1 + if phs1 >= smp_per_wt: + phs1 -= smp_per_wt + + phs2 += 1 + if phs2 >= smp_per_wt: + phs2 -= smp_per_wt + + phs3 += 1 + if phs3 >= smp_per_wt: + phs3 -= smp_per_wt + + + mix_g = math.cos(0.25*2*math.pi*i/srate) + + #yM[yi+atkN+i,ch_idx] = g* (mix_g*(e0*s0 + e1*s1) + (1.0-mix_g)*(e2*s2 + e3*s3)) + yM[yi+atkN+i,ch_idx] = g* (e0*s0 + e1*s1) + + g_phs += 1 + if g_phs >= 64: + g *= g_coeff + g_phs = 0 + + + yi += note_dur_smp + inter_note_smp + + + + wt_util.write_audio_file( yM, srate, o_audio_fname ) + + wt_util.write_mark_tsv_file(oMarkL, o_mark_tsv_fname) + + + + + +if __name__ == "__main__": + + #sine_0() + piano_1() diff --git a/py/gen_wavetables/sample_ivory.py b/py/gen_wavetables/sample_ivory.py new file mode 100644 index 0000000..c1fa5e6 --- /dev/null +++ b/py/gen_wavetables/sample_ivory.py @@ -0,0 +1,99 @@ +import subprocess +import os + + +def gen_ivory_player_caw_pgm( out_dir, caw_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label ): + + caw_template = f""" + {{ + base_dir: "{out_dir}" + io_dict: "{out_dir}/io.cfg" + proc_dict: "~/src/caw/src/libcw/flow/proc_dict.cfg", + subnet_dict: "~/src/caw/src/libcw/flow/subnet_dict.cfg", + + programs: {{ + + sample_generator: {{ + non_real_time_fl:false, + network: {{ + procs: {{ + mf: {{ class: midi_file, args:{{ csv_fname:"{midi_csv_fname}" }}}}, + mout: {{ class: midi_out in:{{ in:mf.out }}, args:{{ dev_label:"{midi_dev_label}", port_label:"{midi_port_label}" }}}}, + stop: {{ class: halt, in:{{ in:mf.done_fl }}}} + + ain: {{ class: audio_in, args:{{ dev_label:"main" }}}}, + split: {{ class: audio_split, in:{{ in:ain.out }} args:{{ select: [0,0, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1] }}}}, + af: {{ class: audio_file_out, in:{{ in:split.out0 }}, args:{{ fname:"{audio_fname}"}}}}, + aout: {{ class: audio_out, in:{{ in:ain.out }}, args:{{ dev_label:"main"}}}}, + }} + }} + }} + }} + }} + """ + with open(caw_fname, "w") as f: + f.write(caw_template) + + +def gen_audio_file( out_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, caw_cfg_fname, audio_fname ): + + gen_ivory_player_caw_pgm(out_dir, caw_cfg_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label ) + + argL = [ caw_exec_fname, "exec", caw_cfg_fname, "sample_generator" ] + + print(" ".join(argL)) + + p = subprocess.Popen(argL,stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + out,err =p.communicate() + + if p.returncode != 0: + print("The call '%s' to failed with code:%i Message:%s %s"," ".join(argL),p.returncode,out,err) + +def gen_all_files( work_dir,midi_dev_label,midi_port_label,caw_exec_fname,midi_filterL=None ): + + fnL = os.listdir(work_dir) + + for fn in fnL: + if "." in fn: + fn_ext_partL = fn.split(".") + if fn_ext_partL[1] == "csv": + fn_partL = fn_ext_partL[0].split("_") + + mark_fn = "_".join(fn_partL[0:-1]) + "_marker.txt" + + damp_fl = fn_partL[1] == "damp" + + midi_pitch = int(fn_partL[0]) + + if midi_filterL is None or (midi_filterL is not None and midi_pitch in midi_filterL): + + wav_dir = os.path.join(work_dir,"wav") + + if not os.path.isdir(wav_dir): + os.mkdir(wav_dir) + + damp_label = "damp_" if damp_fl else "" + audio_fname = os.path.join(wav_dir,f"{midi_pitch:03}_{damp_label}samples.wav") + + midi_csv_fname = os.path.join(work_dir,fn) + mark_tsv_fname = os.path.join(work_dir,mark_fn) + + caw_cfg_fname = os.path.join(work_dir,f"{midi_pitch:03}_{damp_label}caw.cfg") + + print(midi_pitch,midi_csv_fname,mark_tsv_fname,audio_fname) + + gen_audio_file( work_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, caw_cfg_fname, audio_fname ) + + +if __name__ == "__main__": + + work_dir = "/home/kevin/temp/wt6" + midi_dev_label = "MIDIFACE 2x2" + midi_port_label = "MIDIFACE 2x2 Midi Out 1" + caw_exec_fname = "/home/kevin/src/caw/build/linux/debug/bin/caw" + midi_filterL = None + gen_all_files(work_dir,midi_dev_label,midi_port_label,caw_exec_fname,midi_filterL) + + + diff --git a/py/gen_wavetables/temp.py b/py/gen_wavetables/temp.py new file mode 100644 index 0000000..81cd4fe --- /dev/null +++ b/py/gen_wavetables/temp.py @@ -0,0 +1,55 @@ +def process_all_samples_0( markL, smpM, srate, args ): + + wtL = [] + + fund_hz = 13.75 * math.pow(2,(-9.0/12.0)) * math.pow(2.0,(args.midi_pitch / 12.0)) + + end_offs_smp_idx = int(args.end_offset_ms * srate / 1000) + loop_dur_smp = int(args.loop_dur_ms * srate / 1000) + smp_per_cycle = int(srate / fund_hz) + + print(f"Hz:{fund_hz} smp/cycle:{smp_per_cycle}") + + for beg_sec,end_sec,vel_label in markL: + for ch_idx in range(0,smpM.shape[1]): + beg_smp_idx = int(beg_sec * srate) + end_smp_idx = int(end_sec * srate) + + eli = end_smp_idx - end_offs_smp_idx + bli = eli - loop_dur_smp + + #print(beg_smp_idx,bli,eli,end_smp_idx) + + xV = smpM[:,ch_idx] + wndN = int(smp_per_cycle/3) + bi,ei,cost = find_loop_points(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt) + + plot_title = f"vel:{vel_label} ch:{ch_idx} cost:{math.log(cost):.2f}" + #plot_overlap(xV,bi,ei,wndN,smp_per_cycle,plot_title) + + wtL.append( { + "pitch":args.midi_pitch, + "vel": int(vel_label), + "cost":cost, + "ch_idx":ch_idx, + "beg_smp_idx":beg_smp_idx, + "end_smp_idx":end_smp_idx, + "beg_loop_idx":bi, + "end_loop_idx":ei }) + + return wtL + + +def write_loop_label_file_0( fname, wtL, srate ): + + with open(fname,"w") as f: + for r in wtL: + beg_sec = r['beg_smp_idx'] / srate + end_sec = r['end_smp_idx'] / srate + # f.write(f"{beg_sec}\t{end_sec}\t{r['vel_label']}\n") + + beg_sec = r['beg_loop_idx'] / srate + end_sec = r['end_loop_idx'] / srate + cost = math.log(r['cost']) + label = f"ch:{r['ch_idx']} {cost:.2f}" + f.write(f"{beg_sec}\t{end_sec}\t{label}\n") diff --git a/py/gen_wavetables/wt_osc.py b/py/gen_wavetables/wt_osc.py new file mode 100644 index 0000000..2897d4b --- /dev/null +++ b/py/gen_wavetables/wt_osc.py @@ -0,0 +1,201 @@ +import os +import math +import json +import wt_util +import calc_sample_atk_dur +import calc_wavetables +import numpy as np +import multiproc as mp +def table_read_2( tab, frac ): + + i0 = math.floor(frac + 1) + i1 = i0 + 1 + f = frac - int(frac) + + return tab[i0] + (tab[i1] - tab[i0]) * f + +def hann_read( x, N ): + + while x > N: + x -= N + + x = x - (N/2) + + return (0.5 + 0.5 * math.cos(2*math.pi * x / N)) + +def prepare_wt( aV, wtL, wt_idx ): + + wt_smp_cnt = wtL[0]['wtei'] - wtL[0]['wtbi'] + + if wt_idx >= len(wtL): + wt = np.zeros((wt_smp_cnt,)) + else: + wt = np.copy(aV[ wtL[wt_idx]['wtbi']: wtL[wt_idx]['wtei'] ]) + + wt = [0] + wt.tolist() + [0] + wt[0] = wt[-2] + wt[-1] = wt[0] + + return np.array(wt) + +def get_wt( aV, wtL, wt_idx ): + + wt0 = prepare_wt(aV,wtL,wt_idx) + wt1 = prepare_wt(aV,wtL,wt_idx+1) + + return wt0,wt1 + +def gen_osc_output( i_audio_fname, velL, midi_pitch, note_dur_sec, inter_note_sec, wt_interval_sec, o_audio_fname ): + + smp_per_dsp_frm = 64 + + aM,srate = wt_util.parse_audio_file(i_audio_fname) + ch_cnt = aM.shape[1] + + note_dur_smp = int(round(note_dur_sec * srate)) + inter_note_smp = int(round(inter_note_sec * srate)) + wt_interval_smp = int(round(wt_interval_sec * srate)) + + yN = len(velL) * (note_dur_smp + inter_note_smp) + yM = np.zeros((yN,ch_cnt)) + yi = 0 + + for velD in velL: + + bsi = velD['bsi'] + hz = wt_util.midi_pitch_to_hz(midi_pitch) + fsmp_per_cyc = srate/hz + fsmp_per_wt = fsmp_per_cyc * 2 + smp_per_wt = int(math.floor(fsmp_per_wt)) + + for ch_idx in range(ch_cnt): + + wtL = velD['chL'][ch_idx] + + if len(wtL) == 0: + print(f"pitch:{midi_pitch} vel:{velD['vel']} ch:{ch_idx} has no wavetables.") + continue + + # The attack abutts the wavetable at it's center point + # so we need to offset the end of the attack half way + # through the first wave table. + abi = int(wtL[0]['wtbi'] + smp_per_wt/2) + + atkN = min(abi - bsi,note_dur_smp) + + print(velD['vel'],yi+atkN,yN,atkN/srate) + + yM[yi:yi+atkN,ch_idx] = aM[bsi:bsi+atkN,ch_idx] + + wt_idx = 0 + wt0,wt1 = get_wt( aM[:,ch_idx], wtL, wt_idx ) + wt = wt0 + wt_int_phs = 0 + + phs0 = 0 + phs1 = fsmp_per_wt/2 + + for i in range(note_dur_smp-atkN): + + s0 = table_read_2( wt, phs0 ) + s1 = table_read_2( wt, phs1 ) + + e0 = hann_read(phs0,fsmp_per_wt) + e1 = hann_read(phs1,fsmp_per_wt) + + yM[yi+atkN+i,ch_idx] = e0*s0 + e1*s1 + + # advance the phases of the oscillators + phs0 += 1 + if phs0 >= smp_per_wt: + phs0 -= smp_per_wt + + phs1 += 1 + if phs1 >= smp_per_wt: + phs1 -= smp_per_wt + + wt_int_phs += 1 + + if wt_int_phs % smp_per_dsp_frm == 0: + wt_mix = min(1.0, wt_int_phs / wt_interval_smp) + wt = ((1.0-wt_mix) * wt0) + (wt_mix * wt1) + + if wt_int_phs >= wt_interval_smp: + wt_idx += 1 + wt0,wt1 = get_wt( aM[:,ch_idx], wtL, wt_idx ) + wt = wt0 + wt_int_phs = 0 + wt_mix = 0 + + + yi += note_dur_smp + inter_note_smp + + print(o_audio_fname) + wt_util.write_audio_file( yM, srate, o_audio_fname ) + + + +def gen_from_wt_json( processN, wt_json_fname, out_dir, note_dur_sec, inter_note_sec, pitch_filtL=None ): + + def _multi_proc_func( procId, procArgsD, taskArgsD ): + + gen_osc_output(**taskArgsD) + + + if not os.path.isdir(out_dir): + os.mkdir(out_dir) + + with open(wt_json_fname) as f: + pitchL = json.load(f)['pitchL'] + + taskArgsL = [] + for pitchD in pitchL: + if pitch_filtL is None or pitchD['midi_pitch'] in pitch_filtL: + taskArgsL.append( { + "i_audio_fname":pitchD['audio_fname'], + "velL":pitchD['velL'], + "midi_pitch":pitchD['midi_pitch'], + "note_dur_sec":note_dur_sec, + "inter_note_sec":inter_note_sec, + "wt_interval_sec":pitchD['wt_interval_secs'], + "o_audio_fname":os.path.join(out_dir,f"{pitchD['midi_pitch']:03}_osc.wav") + }) + + processN = min(processN,len(taskArgsL)) + mp.local_distribute_main( processN,_multi_proc_func,{},taskArgsL ) + + + + + + + + +if __name__ == "__main__": + + if True: + wt_json_fname = "/home/kevin/temp/temp_5.json" + out_dir = "/home/kevin/temp/wt_osc_1" + note_dur_sec = 10.0 + inter_note_sec = 1.0 + processN = 20 + pitch_filtL = None #[ 27 ] + gen_from_wt_json(processN, wt_json_fname,out_dir,note_dur_sec,inter_note_sec, pitch_filtL) + + if False: + midi_pitch = 60 + audio_fname = "/home/kevin/temp/wt5/wav/060_samples.wav" + mark_tsv_fname = "/home/kevin/temp/wt5/60_marker.txt" + rms_wnd_ms = 50 + rms_hop_ms = 10 + atkEndThreshDb = -43.0 + wt_interval_secs = 1.0 + note_dur_sec = 10.0 + inter_note_sec = 1.0 + + + gateL = calc_sample_atk_dur.generate_gate_db(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, atkEndThreshDb ) + + wtL = calc_wavetables.gen_wave_table_list( audio_fname, mark_tsv_fname, gateL, midi_pitch, wt_interval_secs ) + + gen_osc_output(audio_fname,wtL,note_dur_sec,inter_note_sec, wt_interval_secs, "/home/kevin/temp/temp.wav") diff --git a/py/gen_wavetables/wt_util.py b/py/gen_wavetables/wt_util.py new file mode 100644 index 0000000..69b1e15 --- /dev/null +++ b/py/gen_wavetables/wt_util.py @@ -0,0 +1,89 @@ +import wave as w +import math +import array +import numpy as np +import scipy.io.wavfile + +def midi_pitch_to_hz( midi_pitch ): + return 13.75 * math.pow(2,(-9.0/12.0)) * math.pow(2.0,(midi_pitch / 12.0)) + + +def parse_marker_file( marker_fname ): + + markL = [] + with open(marker_fname) as f: + for line in f: + tokL = line.split("\t"); + + assert( len(tokL) == 3 ) + + markL.append( ( float(tokL[0]), float(tokL[1]), tokL[2] ) ) + + return markL + +def parse_audio_file( audio_fname ): + + max_smp_val = float(0x7fffffff) + + with w.open(audio_fname,"rb") as f: + print(f"ch:{f.getnchannels()} bits:{f.getsampwidth()*8} srate:{f.getframerate()} frms:{f.getnframes()}") + + srate = f.getframerate() + frmN = f.getnframes() + data_bytes = f.readframes(frmN) + smpM = np.array(array.array('i',data_bytes)) + + # max_smp_val assumes 32 bits + assert( f.getsampwidth() == 4 ) + + smpM = smpM / max_smp_val + + smpM = np.reshape(smpM,(frmN,2)) + + return smpM,srate + +def write_audio_file( xM, srate, audio_fname ): + + xM *= np.iinfo(np.int32).max + + scipy.io.wavfile.write(audio_fname, srate, xM.astype(np.int32)) + +def write_audio_file_0( xM, srate, audio_fname ): + + # Convert to (little-endian) 32 bit integers. + xM = (xM * (2 ** 31 - 1)).astype(" 0 and si < len(xV): + + if sign(xV[si-1])==False and sign(xV[si])==True: + return si + + si += inc + + return None + +