py/gen_wavetables : Initial commit.

This commit is contained in:
kevin 2024-09-05 11:17:08 -04:00
parent 0047881c8a
commit cabb968f9f
14 changed files with 2754 additions and 0 deletions

View File

@ -0,0 +1,42 @@
TODO:
-----
calc_wavetable.py:
- Estimate the actual pitch of the sample.
- Stop finding wavetables when the amplitude of the next table falls below a threshold.
- Generate a report showing the count of wavetables per note.
wt_util.py
----------
Utilities used by all other modules
gen_midi_csv.py
---------------
Generate a MIDI file in CSV format to trigger the sampler with a sequence of velocities for a given pitch.
sample_ivory.py
---------------
Use the MIDI file from 'gen_midi_csv.py' to trigger the sampler and record the resulting audio and onset/offset TSV marker file.
calc_sample_atk_dur.py
----------------------
Calculate a list [(vel,bsi,esi)] which indicates the attack wavetable.
calc_wavetables.py
------------------
Create a JSON file of wave tables for all pitches and velocities.
wt_osc.py
---------
Generate a set of notes using the wavetables found by calc_wavetables.py.
This program implements a wavetable oscillator which can interpret the wavetables
created by calc_wavetables.py
Obsolete
---------------------
wt_study.py
low_hz_loops.py
debug_plot.py
gen_wave_tables.py
sample_looper.py

View File

@ -0,0 +1,380 @@
import matplotlib.pyplot as plt
import numpy as np
import math
import wt_util
from kneed import KneeLocator
def rms( aV, wnd_smp_cnt, hop_smp_cnt, dbFl = True ):
assert( wnd_smp_cnt % hop_smp_cnt == 0 and hop_smp_cnt < wnd_smp_cnt )
rmsL = []
bi = 0
ei = wnd_smp_cnt
while ei <= len(aV):
rms = np.pow( np.mean( np.pow(aV[bi:ei],2) ), 0.5 )
if dbFl:
rms = -100.0 if rms == 0 else 20*math.log10(rms)
rmsL.append(rms)
bi += hop_smp_cnt
ei = bi + wnd_smp_cnt
# repeat the first RMS value (wnd_smp_cnt/hop_smp_cnt)-1 times
# so that rmsL[] indexes relate to aV[] indexes like this:
# av_idx = rms_idx * hop_smp_cnt
rmsL = [ rmsL[0] ] * (int(wnd_smp_cnt/hop_smp_cnt)-1) + rmsL
return rmsL
def calc_sample_atk_dur(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms ):
aM,srate = wt_util.parse_audio_file( audio_fname )
markL = wt_util.parse_marker_file( mark_tsv_fname )
rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000))
rms_hop_smp_cnt = 64 #int(round(rms_hop_ms * srate / 1000))
ch_cnt = aM.shape[1]
rmsL = [[] for _ in range(ch_cnt) ]
for beg_sec,end_sec,vel_label in markL:
bi = int(round(beg_sec * srate))
ei = int(round(end_sec * srate)) + int(srate)
for ch_idx in range(ch_cnt):
rmsL[ch_idx] += rms(aM[bi:ei,ch_idx],rms_wnd_smp_cnt,rms_hop_smp_cnt)
_,ax = plt.subplots(ch_cnt,1)
for ch_idx in range(ch_cnt):
ax[ch_idx].plot(rmsL[ch_idx])
plt.show()
def generate_gate_knee(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, min_gate_dur_ms, threshDb ):
aM,srate = wt_util.parse_audio_file( audio_fname )
markL = wt_util.parse_marker_file( mark_tsv_fname )
rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000))
rms_hop_smp_cnt = int(round(rms_hop_ms * srate / 1000))
min_gate_smp_cnt= int(round(min_gate_dur_ms * srate / 1000))
ch_cnt = aM.shape[1]
frm_cnt = aM.shape[0]
rmsL = []
ch_rmsL = []
for ch_idx in range(ch_cnt):
rmsL.append( rms( aM[:,ch_idx], rms_wnd_smp_cnt, rms_hop_smp_cnt ) )
ch_rmsL.append( np.mean( rmsL[-1] ) )
bsiL = [ int(round(beg_sec*srate)) for beg_sec,_,_ in markL ]
asiL = []
riL = []
bi = 1
ei = rms_hop_smp_cnt
eV = np.zeros((frm_cnt,))
# use the channel whith the most energy to determine the gate
ch_idx = np.argmax(ch_rmsL)
for beg_sec,end_sec,_ in markL:
rbi = int(round(beg_sec*srate/rms_hop_smp_cnt))
rei = int(round(end_sec*srate/rms_hop_smp_cnt))
offs = 10
y = rmsL[ch_idx][rbi+offs:rei]
x = np.arange(len(y))
k1 = KneeLocator(x, y, curve="convex", direction="decreasing", interp_method="polynomial")
ri = rbi + offs + k1.knee
riL.append( ri )
bsiL.append( int(rbi*rms_hop_smp_cnt) )
asiL.append( int(ri * rms_hop_smp_cnt) )
gateL = [(bsi,esi) for bsi,esi in zip(bsiL,asiL) ]
# force all gates to have a duration of at least min_gate_smp_cnt
if True:
for i,(bsi,esi) in enumerate(gateL):
if esi-bsi < min_gate_smp_cnt:
#print("gate ext:",esi-bsi,min_gate_smp_cnt)
gateL[i] = (bsi,bsi+min_gate_smp_cnt)
# verify that successive gates do not overlap
if i> 0:
assert gateL[i][0] > gateL[i-1][1]
if i < len(gateL)-1:
assert gateL[i][1] < gateL[i+1][0]
if True:
beL = [ (int(round(beg_secs*srate)), int(round(end_secs*srate))) for beg_secs,end_secs,_ in markL ]
beL = [ (max(0,int((bi)/rms_hop_smp_cnt)), max(0,int((ei)/rms_hop_smp_cnt))) for bi,ei in beL ]
_,ax = plt.subplots(3,1)
ax[0].plot(rmsL[0])
for bi,ei in beL:
ax[0].vlines([bi,ei],-100.0,0.0,color="red")
for ri in riL:
ax[0].vlines([ri],-100,0,color="green")
ax[1].plot(rmsL[1])
for bi,ei in beL:
ax[1].vlines([bi,ei],-100.0,0.0,color="red")
ax[2].plot(eV)
plt.show()
if False:
for i,(bi,ei) in enumerate(beL):
offs = 10
y = [ pow(10,z/20.0) for z in rmsL[0][bi+offs:ei] ]
y = rmsL[0][bi+offs:ei]
x = np.arange(len(y))
k1 = KneeLocator(x, y, curve="convex", direction="decreasing", interp_method="polynomial")
k1.plot_knee()
plt.title(f"{i} {offs+k1.knee} {offs+k1.knee*rms_hop_smp_cnt/srate:.3f}")
plt.show()
return gateL,ch_rmsL
def generate_gate_db(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, min_gate_dur_ms, threshDb ):
aM,srate = wt_util.parse_audio_file( audio_fname )
markL = wt_util.parse_marker_file( mark_tsv_fname )
rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000))
rms_hop_smp_cnt = int(round(rms_hop_ms * srate / 1000))
min_gate_smp_cnt= int(round(min_gate_dur_ms * srate / 1000))
ch_cnt = aM.shape[1]
frm_cnt = aM.shape[0]
rmsL = []
ch_rmsL = []
for ch_idx in range(ch_cnt):
rmsL.append( rms( aM[:,ch_idx], rms_wnd_smp_cnt, rms_hop_smp_cnt ) )
ch_rmsL.append( np.mean( rmsL[-1] ) )
bsiL = [ int(round(beg_sec*srate)) for beg_sec,_,_ in markL ]
asiL = []
riL = []
bi = 1
ei = rms_hop_smp_cnt
eV = np.zeros((frm_cnt,))
# use the channel whith the most energy to determine the gate
ch_idx = np.argmax(ch_rmsL)
bsi_idx = 1
cur_on_fl = 1.0 # 1.0 when the gate is high
active_fl = True # True if the gate is allowed to switch from low to high
pend_fl = True # True if the next attack is pending
for i in range(len(rmsL[ch_idx])):
# pend_fl prevents the gate from being turned off until the
# actual attack has occurred (it goes false once an RMS above the thresh is seen)
if pend_fl:
pend_fl = rmsL[ch_idx][i] <= threshDb
# if the rms is below the threshold
off_fl = rmsL[ch_idx][i] < threshDb #and rmsL[][i] < threshDb
# if the rms is below the threshold and the gate detector is enabled ...
if off_fl and active_fl and not pend_fl:
# ... then turn off the gate
cur_on_fl = 0.0
active_fl = False
riL.append(i)
asiL.append(bi)
eV[bi:ei] = cur_on_fl
# track the smp idx of the current rms value
bi = i * rms_hop_smp_cnt
ei = bi + rms_hop_smp_cnt
# if we are crossing into the next velocity sample
if bsi_idx < len(bsiL) and bsiL[ bsi_idx ] <= bi :
# be sure that this onset follows an offset
# (which won't occur if the signal never goes above the threshold)
if cur_on_fl != 0:
gesi = bsiL[bsi_idx-1] + min_gate_smp_cnt
asiL.append( gesi )
riL.append( int(round(gesi/rms_hop_smp_cnt)) )
eV[gesi:ei] = 0
#assert( cur_on_fl == 0 )
active_fl = True
pend_fl = True
cur_on_fl = 1.0
bsi_idx += 1
# if the offset for the last note was not detected
if len(asiL) == len(bsiL)-1:
asiL.append(frm_cnt-1)
gateL = [(bsi,esi) for bsi,esi in zip(bsiL,asiL) ]
# force all gates to have a duration of at least min_gate_smp_cnt
if True:
for i,(bsi,esi) in enumerate(gateL):
if esi-bsi < min_gate_smp_cnt:
#print("gate ext:",esi-bsi,min_gate_smp_cnt)
gateL[i] = (bsi,bsi+min_gate_smp_cnt)
# verify that successive gates do not overlap
if i> 0:
assert gateL[i][0] > gateL[i-1][1]
if i < len(gateL)-1:
assert gateL[i][1] < gateL[i+1][0]
if False:
beL = [ (int(round(beg_secs*srate)), int(round(end_secs*srate))) for beg_secs,end_secs,_ in markL ]
beL = [ (max(0,int((bi)/rms_hop_smp_cnt)), max(0,int((ei)/rms_hop_smp_cnt))) for bi,ei in beL ]
_,ax = plt.subplots(3,1)
ax[0].plot(rmsL[0])
for bi,ei in beL:
ax[0].vlines([bi,ei],-100.0,0.0,color="red")
for ri in riL:
ax[0].vlines([ri],-100,0,color="green")
ax[1].plot(rmsL[1])
for bi,ei in beL:
ax[1].vlines([bi,ei],-100.0,0.0,color="red")
ax[2].plot(eV)
plt.show()
return gateL,ch_rmsL
def generate_gate_pct(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, atk_min_dur_ms, threshPct ):
aM,srate = wt_util.parse_audio_file( audio_fname )
markL = wt_util.parse_marker_file( mark_tsv_fname )
rms_wnd_smp_cnt = int(round(rms_wnd_ms * srate / 1000))
rms_hop_smp_cnt = int(round(rms_hop_ms * srate / 1000))
atk_min_smp_cnt = int(round(atk_min_dur_ms * srate / 1000))
ch_cnt = aM.shape[1]
frm_cnt = aM.shape[0]
rmsL = []
ch_rmsL = []
for ch_idx in range(ch_cnt):
rmsL.append( rms( aM[:,ch_idx], rms_wnd_smp_cnt, rms_hop_smp_cnt, False ) )
ch_rmsL.append( np.mean(rmsL[-1] ))
beL = [ (int(round(beg_secs*srate)), int(round(end_secs*srate))) for beg_secs,end_secs,_ in markL ]
beL = [ (max(0,int(bi/rms_hop_smp_cnt)), max(0,int(ei/rms_hop_smp_cnt))) for bi,ei in beL ]
gateL = []
maxL = []
for bri,eri in beL:
rms_max = None
rms_max_i = None
rms_max_ch_i = None
for ch_idx in range(ch_cnt):
max_i = np.argmax( rmsL[ch_idx][bri:eri] ) + bri
if rms_max is None or rms_max < rmsL[ch_idx][max_i]:
rms_max = rmsL[ch_idx][max_i]
rms_max_i = max_i
rms_max_ch_i = ch_idx
maxL.append(rms_max)
threshDb = rms_max * threshPct
for i in range(rms_max_i+1,eri):
if rmsL[ch_idx][i] < threshDb:
gateL.append((bri,i))
break
retL = []
for bri,eri in gateL:
bsi = bri*rms_hop_smp_cnt
esi = eri*rms_hop_smp_cnt
if esi-bsi < atk_min_smp_cnt:
esi = bsi + atk_min_smp_cnt
retL.append((bsi,esi))
if True:
_,ax = plt.subplots(2,1)
ax[0].plot(rmsL[0])
for i,(bi,ei) in enumerate(gateL):
ax[0].vlines([bi,ei],0,maxL[i],color="red")
ax[1].plot(rmsL[1])
for i,(bi,ei) in enumerate(gateL):
ax[1].vlines([bi,ei],0,maxL[i],color="red")
plt.show()
return retL,ch_rmsL
def gen_gated_audio( i_audio_fname, gateL, o_audio_fname, o_mark_tsv_fname ):
aM,srate = wt_util.parse_audio_file( audio_fname )
markL = []
gateV = np.zeros((aM.shape[0],))
# form the gate vector
for i,(bsi,esi) in enumerate(gateL):
gateV[bsi:esi] = 1
markL.append((bsi/srate,esi/srate,f"{i}"))
for ch_idx in range(aM.shape[1]):
aM[:,ch_idx] *= gateV
wt_util.write_audio_file( aM, srate, o_audio_fname )
wt_util.write_mark_tsv_file( markL, o_mark_tsv_fname )
if __name__ == "__main__":
audio_fname = "/home/kevin/temp/wt5/wav/060_samples.wav"
mark_tsv_fname = "/home/kevin/temp/wt5/60_marker.txt"
rms_wnd_ms = 50
rms_hop_ms = 10
#calc_sample_atk_dur(audio_fname,mark_tsv_fname,rms_wnd_ms,rms_hop_ms)
# Generate a list [(bsi,esi)] indicating the beginning and end of the attack portion
# of each sample where the end is determined by a threshold in dB.
#threshDb = -50.0
#gateL = generate_gate_db(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, threshDb )
# Generate a list [(bsi,esi)] indicating the beginning and end of the attack portion
# of each sample where the end is determined by a percent decrease from the peak value.
threshPct = 0.75
gateL = generate_gate_pct(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, threshPct )
gen_gated_audio( audio_fname, gateL, "/home/kevin/temp/temp.wav", "/home/kevin/temp/temp_mark.txt")

View File

@ -0,0 +1,447 @@
import os
import math
import json
import types
import wt_util
import calc_sample_atk_dur
import numpy as np
import matplotlib.pyplot as plt
import multiproc as mp
from scipy.interpolate import CubicSpline
def upsample( aV, N, interp_degree ):
# aV[] - signal vector
# N - upsample factor (must be an integer >= 2)
# interp_degree - "linear" , "cubic"
N = int(N)
assert( N>= 2)
aN = len(aV)
z = np.zeros((aN,N))
z[:,0] = aV
# z is a copy of aV with zeros in the positions to be interpolated
z = np.squeeze(np.reshape(z,(aN*N,1)))
# x contains the indexes into z which contain values from aV
x = [ i*N for i in range(aN) ]
# xi contains the indexes into z which have zeros
xi = [ i for i in range(len(z)) if i not in x and i < x[-1] ]
# calc values for the zeros in z
if interp_degree == "linear":
cs = CubicSpline(x,aV)
z[xi] = cs(xi)
elif interp_degree == "cubic":
z[xi] = np.interp(xi,x,aV)
else:
assert(0)
# The last N-1 values are not set because they would require extrapolation
# (they have no value to their right). Instead we set these values
# as the mean of the preceding N values.
k = (len(z)-N)+1
for i in range(N-1):
z[k+i] = np.mean(z[ k+i-N:k+i])
return z #z[0:-(N-1)]
def estimate_pitch_ac( aV, si, hzL, srate, argsD ):
# aV[] - audio vector containing a wavetable that starts at aV[si]
# hzL[] - a list of candidate pitches
# srate - sample rate of aV[]
# args[cycle_cnt] - count of cycles to autocorrelate on either side of the reference pitch at aV[si:]
# (1=correlate with the cycle at aV[ si-fsmp_per+cyc:] and the cycle at aV[si+fsmp_per_cyc],
# (2=correlate with cycles at aV[ si-2*fsmp_per+cyc:],aV[ si-fsmp_per+cyc:],aV[ si+fsmp_per+cyc:],aV[ si-2*fsmp_per+cyc:])
# args[up_fact] - Set to and integer greater than 1 to upsample the signal prior to estimating the pitch
# args[up_interp_degree] - Upsampling interpolator "linear" or "cubic"
def _auto_corr( aV, si, fsmp_per_cyc, cycle_offset_idx, interp_degree ):
smp_per_cyc = int(math.floor(fsmp_per_cyc))
xi = [si + (cycle_offset_idx * fsmp_per_cyc) + i for i in range(smp_per_cyc)]
x_min = int(math.floor(xi[0]))
x_max = int(math.ceil(xi[-1]))
x = [ i for i in range(x_min,x_max) ]
y = aV[x]
if interp_degree == "cubic":
cs = CubicSpline(x,y)
yi = cs(xi)
elif interp_degree == "linear":
yi = np.interp(xi,x,y)
else:
assert(0)
# calc the sum of squared differences between the reference cycle and the 'offset' cycle
ac = np.sum(np.pow(yi - aV[si:si+smp_per_cyc],2.0))
return ac
def auto_corr( aV, si, fsmp_per_cyc, cycle_cnt, interp_degree ):
ac = 0
for i in range(1,cycle_cnt+1):
ac = _auto_corr(aV,si,fsmp_per_cyc, i, interp_degree)
ac += _auto_corr(aV,si,fsmp_per_cyc, -i, interp_degree)
# return the average sum of squared diff's per cycle
return ac/(cycle_cnt*2)
def ac_upsample( aV, si, fsmp_per_cyc, cycle_cnt, up_fact, up_interp_degree ):
pad = 0 # count of leading/trailing pad positions to allow for interpolation
if up_interp_degree == "cubic":
pad = 2
elif up_interp_degre == "linear":
pad = 1
else:
assert(0)
# calc the beg/end of the signal segment to upsample
bi = si - math.ceil(fsmp_per_cyc * cycle_cnt) - pad
ei = si + math.ceil(fsmp_per_cyc * (cycle_cnt + 1)) + pad
up_aV = upsample(aV[bi:ei],up_fact,up_interp_degree)
# calc. index of the center signal value
u_si = (si-bi)*up_fact
# the center value should not change after upsampling
assert aV[si] == up_aV[u_si]
return up_aV,u_si
args = types.SimpleNamespace(**argsD)
# if upsampling was requested
if args.up_fact > 1:
hz_min = min(hzL) # Select the freq candidate with the longest period,
max_fsmp_per_cyc = srate/hz_min # because we want to upsample just enough of the signal to test for all possible candidates,
aV,si = ac_upsample( aV, si, max_fsmp_per_cyc, args.cycle_cnt, args.up_fact, args.up_interp_degree )
srate = srate * args.up_fact
# calc. the auto-correlation for every possible candidate frequency
acL = []
for hz in hzL:
fsmp_per_cyc = srate / hz
acL.append( auto_corr(aV,si,fsmp_per_cyc,args.cycle_cnt,args.interp_degree) )
if False:
_,ax = plt.subplots(1,1)
ax.plot(hzL,acL)
plt.show()
# winning candidate is the one with the lowest AC score
cand_hz_idx = np.argmin(acL)
return hzL[cand_hz_idx]
# Note that we want a higher rate of pitch tracking than wave table generation - thus
# we downsample the pitch tracking interval by some integer factor to arrive at the
# rate at the wave table generation period.
def gen_wave_table_list( audio_fname,
mark_tsv_fname, gateL,
midi_pitch,
pitch_track_interval_secs,
wt_interval_down_sample_fact,
min_wt_db,
dom_ch_idx,
est_hz_argD,
ac_argD ):
est_hz_args = types.SimpleNamespace(**est_hz_argD)
aM,srate = wt_util.parse_audio_file(audio_fname)
markL = wt_util.parse_marker_file(mark_tsv_fname)
ch_cnt = aM.shape[1]
frm_cnt = aM.shape[0]
pt_interval_smp = int(round(pitch_track_interval_secs*srate))
wt_interval_fact= int(wt_interval_down_sample_fact)
hz = wt_util.midi_pitch_to_hz(midi_pitch)
fsmp_per_cyc = srate/hz
fsmp_per_wt = fsmp_per_cyc * 2
smp_per_wt = int(math.floor(fsmp_per_wt))
# calc. the range of possible pitch estimates
hz_min = wt_util.midi_pitch_to_hz(midi_pitch-1)
hz_ctr = wt_util.midi_pitch_to_hz(midi_pitch)
hz_max = wt_util.midi_pitch_to_hz(midi_pitch+1)
cents_per_semi = 100
# hzL is a list of candidate pitches with a range of +/- 1 semitone and a resolution of 1 cent
hzCandL = [ hz_min + i*(hz_ctr-hz_min)/100.0 for i in range(cents_per_semi) ] + [ hz_ctr + i*(hz_max-hz_ctr)/100.0 for i in range(cents_per_semi) ]
assert( len(markL) == len(gateL) )
# setup the return data structure
pitchD = { "midi_pitch":midi_pitch,
"srate":srate,
"est_hz_mean":None,
"est_hz_err_cents":None,
"est_hz_std_cents":None,
"wt_interval_secs":pitch_track_interval_secs * wt_interval_fact,
"dominant_ch_idx":int(dom_ch_idx),
"audio_fname":audio_fname,
"mark_tsv_fname":mark_tsv_fname,
"velL":[]
}
hzL = []
for i,(beg_sec,end_sec,vel_label) in enumerate(markL):
bsi = int(round(beg_sec*srate))
esi = int(round(end_sec*srate))
vel = int(vel_label)
eai = gateL[i][1] # end of attack
velD = { "vel":vel, "bsi":bsi, "chL":[ [] for _ in range(ch_cnt)] }
for ch_idx in range(ch_cnt):
i = 0
while True:
wt_smp_idx = eai + i*pt_interval_smp
# select the first zero crossing after the end of the attack
# as the start of the first sustain wavetable
wtbi = wt_util.find_zero_crossing(aM[:,ch_idx],wt_smp_idx,1)
#if len(velD['chL'][ch_idx]) == 0:
# print(midi_pitch,vel,(wtbi-bsi)/srate)
if wtbi == None:
break;
wtei = wtbi + smp_per_wt
if wtei > esi:
break
# estimate the pitch near wave tables which are: on the 'dominant' channel,
# above a certain velocity and not too far into the decay
if ch_idx==dom_ch_idx and est_hz_args.min_wt_idx <= i and i <= est_hz_args.max_wt_idx and vel >= est_hz_args.min_vel:
est_hz = estimate_pitch_ac( aM[:,dom_ch_idx],wtbi,hzCandL,srate,ac_argD)
hzL.append( est_hz )
#print(vel, i, est_hz)
if i % wt_interval_fact == 0:
# measure the RMS of the wavetable
wt_rms = float(np.pow(np.mean(np.pow(aM[wtbi:wtei,ch_idx],2.0)),0.5))
# filter out quiet wavetable but guarantee that there are always at least two wt's.
if 20*math.log10(wt_rms) > min_wt_db or len(velD['chL'][ch_idx]) < 2:
# store the location and RMS of the wavetable
velD['chL'][ch_idx].append({"wtbi":int(wtbi),"wtei":int(wtei),"rms":float(wt_rms), "est_hz":0})
i+=1
pitchD['velL'].append(velD)
# update est_hz in each of the wavetable records
est_hz = np.mean(hzL)
est_hz_delta = np.array(hzCandL) - est_hz
est_hz_idx = np.argmin(np.abs(est_hz_delta))
est_hz_std = np.std(hzL)
if est_hz_delta[est_hz_idx] > 0:
est_hz_std_cents = est_hz_std / ((hz_ctr-hz_min)/100.0)
else:
est_hz_std_cents = est_hz_std / ((hz_max-hz_ctr)/100.0)
est_hz_err_cents = est_hz_idx - cents_per_semi
print(f"{midi_pitch} est pitch:{est_hz}(hz) err:{est_hz_err_cents}(cents)" )
pitchD["est_hz_mean"] = float(est_hz)
pitchD["est_hz_err_cents"] = float(est_hz_err_cents)
pitchD["est_hz_std_cents"] = float(est_hz_std_cents)
return pitchD
def _gen_wave_table_bank( src_dir, midi_pitch, argD ):
args = types.SimpleNamespace(**argD)
audio_fname = os.path.join(src_dir,f"wav/{midi_pitch:03}_samples.wav")
mark_tsv_fname = os.path.join(src_dir,f"{midi_pitch:03}_marker.txt")
if True:
gateL,ch_avgRmsL = calc_sample_atk_dur.generate_gate_db(audio_fname,
mark_tsv_fname,
args.rms_wnd_ms,
args.rms_hop_ms,
args.atk_min_dur_ms,
args.atk_end_thresh_db )
if False:
gateL,ch_avgRmsL = calc_sample_atk_dur.generate_gate_pct(audio_fname,
mark_tsv_fname,
args.rms_wnd_ms,
args.rms_hop_ms,
args.atk_min_dur_ms,
0.1 )
dom_ch_idx = np.argmax(ch_avgRmsL)
pitchD = gen_wave_table_list( audio_fname,
mark_tsv_fname,
gateL,
midi_pitch,
args.pitch_track_interval_secs,
args.wt_interval_down_sample_fact,
args.min_wt_db,
dom_ch_idx,
args.est_hz,
args.ac )
return pitchD
def gen_wave_table_bank_mp( processN, src_dir, midi_pitchL, out_fname, argD ):
def _multi_proc_func( procId, procArgsD, taskArgsD ):
return _gen_wave_table_bank( procArgsD['src_dir'],
taskArgsD['midi_pitch'],
procArgsD['argD'] )
procArgsD = {
"src_dir":src_dir,
"argD": argD
}
taskArgsL = [ { 'midi_pitch':midi_pitch } for midi_pitch in midi_pitchL ]
processN = min(processN,len(taskArgsL))
if processN > 0:
pitchL = mp.local_distribute_main( processN,_multi_proc_func,procArgsD,taskArgsL )
else:
pitchL = [ _gen_wave_table_bank( src_dir, r['midi_pitch'], argD ) for r in range(taskArgsL) ]
pitchL = sorted(pitchL,key=lambda x:x['midi_pitch'])
with open(out_fname,"w") as f:
json.dump({"pitchL":pitchL, "instr":"piano", "argD":argD},f)
def plot_rms( wtb_json_fname ):
with open(wtb_json_fname) as f:
pitchL = json.load(f)['pitchL']
pitchL = sorted(pitchL,key=lambda x:x['midi_pitch'])
rmsnL = []
for pitchD in pitchL:
_,ax = plt.subplots(1,1)
for wtVelD in pitchD['wtL']:
for velChL in wtVelD['wtL']:
rmsL = [ 20*math.log10(wt['rms']) for wt in velChL ]
ax.plot(rmsL)
rmsnL.append(len(rmsL))
plt.title(f"{pitchD['midi_pitch']}")
plt.show()
def plot_atk_dur( wtb_json_fname ):
with open(wtb_json_fname) as f:
pitchL = json.load(f)['pitchL']
pitchL = sorted(pitchL,key=lambda x:x['midi_pitch'])
rmsnL = []
for pitchD in pitchL:
_,ax = plt.subplots(1,1)
secL = [ (v['chL'][0][0]['wtbi']-v['bsi'])/pitchD['srate'] for v in pitchD['velL'] ]
velL = [ x['vel'] for x in pitchD['velL'] ]
ax.plot(velL,secL,marker=".")
plt.title(f"{pitchD['midi_pitch']}")
plt.show()
def plot_hz( wtb_json_fname ):
with open(wtb_json_fname) as f:
pitchL = json.load(f)['pitchL']
pitchL = sorted(pitchL,key=lambda x:x['midi_pitch'])
_,ax = plt.subplots(3,1)
midiL = [ pitchD['midi_pitch'] for pitchD in pitchL ]
hzL = [ pitchD["est_hz_mean"] for pitchD in pitchL ]
hzStdL = [ pitchD["est_hz_std_cents"] for pitchD in pitchL ]
hzErrL = [ pitchD["est_hz_err_cents"] for pitchD in pitchL ]
ax[0].plot(midiL,hzL)
ax[1].plot(hzL,hzStdL)
ax[2].hlines([0,10,20],midiL[0],midiL[-1],color="red")
ax[2].plot(midiL,hzErrL)
plt.show()
if __name__ == "__main__":
midi_pitchL = [ pitch for pitch in range(21,109) ]
#midi_pitchL = [60 ]
out_fname = "/home/kevin/temp/temp_5.json"
src_dir= "/home/kevin/temp/wt6"
argD = {
'rms_wnd_ms':50,
'rms_hop_ms':10,
'atk_min_dur_ms':1000,
'atk_end_thresh_db':-43.0,
'min_wt_db':-80.0,
'pitch_track_interval_secs':0.25,
'wt_interval_down_sample_fact':8.0, # wt_interval_secs = pitch_track_interval_secs * wt_interval_down_sample_fact
'est_hz': {
'min_vel':50,
'min_wt_idx':2,
'max_wt_idx':4
},
'ac': {
'cycle_cnt':8, # count of cycles to use for auto-corr. pitch detection
'interp_degree':"cubic",
'up_fact':2,
'up_interp_degree':"cubic"
}
}
gen_wave_table_bank_mp(20, src_dir, midi_pitchL, out_fname, argD )
#plot_rms(out_fname)
#plot_hz(out_fname)
plot_atk_dur(out_fname)

View File

@ -0,0 +1,170 @@
import csv,os
def gen_sample_midi_events(pitch,velA,note_on_sec,note_off_sec,dampFl):
markA = []
msgA = []
tpqn = 1260
bpm = 60
ticks_per_sec = tpqn * bpm / 60.0
ticks_per_note_on = ticks_per_sec * note_on_sec
ticks_per_note_off = ticks_per_sec * note_off_sec
uid = 0
dticks = 0
cur_sec = 0;
r = { 'uid':len(msgA),
'tpQN':tpqn,
'bpm':bpm,
'dticks':0,
'ch':None,
'status':None,
'd0':None,
'd1':None }
msgA.append(r);
for vel in velA:
ch = 0
note_status = 0x90
ctl_status = 0xb0
damp_ctl = 0x40
if dampFl:
r = { 'uid':len(msgA),
'tpQN':None,
'bpm':None,
'dticks':dticks,
'ch':ch,
'status':ctl_status,
'd0':damp_ctl,
'd1':65 }
msgA.append(r)
dticks = 0
r = { 'uid':len(msgA),
'tpQN':None,
'bpm':None,
'dticks':dticks,
'ch':ch,
'status':note_status,
'd0':pitch,
'd1':vel }
msgA.append(r)
dticks = ticks_per_note_on
r = { 'uid':len(msgA),
'tpQN':None,
'bpm':None,
'dticks':dticks,
'ch':ch,
'status':note_status,
'd0':pitch,
'd1':0 }
msgA.append(r)
if dampFl:
r = { 'uid':len(msgA),
'tpQN':None,
'bpm':None,
'dticks':0,
'ch':ch,
'status':ctl_status,
'd0':damp_ctl,
'd1':0 }
msgA.append(r)
dticks = ticks_per_note_off
markA.append( (cur_sec, cur_sec+note_on_sec, vel) )
cur_sec += note_on_sec + note_off_sec
return msgA,markA
def write_file( fname, msgA ):
fieldnames = list(msgA[0].keys())
with open(fname,"w") as f:
wtr = csv.DictWriter(f, fieldnames=fieldnames)
wtr.writeheader()
for m in msgA:
wtr.writerow(m)
def write_marker_file(fname, markA ):
with open(fname,"w") as f:
for beg_sec,end_sec,vel in markA:
f.write(f"{beg_sec}\t{end_sec}\t{vel}\n")
def gen_midi_csv_and_marker_files( pitch, velA, note_on_sec, note_off_sec, damp_fl, out_dir ):
if not os.path.isdir(out_dir):
os.mkdir(out_dir)
msgA,markA = gen_sample_midi_events(pitch,velA,note_on_sec,note_off_sec,damp_fl)
damp_label = "damp_" if damp_fl else ""
midi_csv_fname = os.path.join(out_dir,f"{pitch:03}_{damp_label}sample.csv")
mark_fname = os.path.join(out_dir,f"{pitch:03}_{damp_label}marker.txt")
write_file(midi_csv_fname,msgA)
write_marker_file(mark_fname,markA)
return midi_csv_fname, mark_fname
def gen_complete_midi_csv( pitchA, velA, note_on_sec, note_off_sec, out_fname ):
damp_fl = False
msgL = []
for i,pitch in enumerate(pitchA):
msgA,_ = gen_sample_midi_events(pitch,velA,note_on_sec,note_off_sec,damp_fl)
if i > 0:
msgA = msgA[1:]
msgL += msgA
write_file(out_fname,msgL)
if __name__ == "__main__":
# min_pitch = 21
# max_pitch = 108
out_dir = "/home/kevin/temp"
dampFl = False
velA = [ 1,5,10,16,21,26,32,37,42,48,53,58,64,69,74,80,85,90,96,101,106,112,117,122,127]
note_off_sec = 2.0
if False:
pitchL = [ 21, 60 ]
noteDurL = [ 20.0, 20.0 ]
if True:
pitchL = [ i for i in range(21,109) ]
noteDurL = [ 20.0 for _ in range(len(pitchL)) ]
if False:
dampFlL = [ False, True ] if dampFl else [ False ]
for pitch,note_on_sec in zip(pitchL,noteDurL):
for damp_fl in dampFlL:
csv_fname, mark_fname = gen_midi_csv_and_marker_files( pitch, velA, note_on_sec, note_off_sec, damp_fl, out_dir )
if True:
note_on_sec = 5
note_off_sec = 1
out_fname = "/home/kevin/temp/all_midi.csv"
gen_complete_midi_csv(pitchL, velA, note_on_sec, note_off_sec, out_fname)

182
py/gen_wavetables/multiproc.py Executable file
View File

@ -0,0 +1,182 @@
import os,subprocess,logging,multiprocessing,queue,yaml,types,time
class processor(multiprocessing.Process):
def __init__(self,procId,iQ,oQ,procArgs,procFunc):
super(processor,self).__init__()
self.procId = procId # the id of this process
self.iQ = iQ # process input queue (shared by all processes)
self.oQ = oQ # process output queue (shared by all processes)
self.cycleIdx = 0 # count of times the user function has been called
self.procArgs = procArgs # arguments to the user defined function (procFunc) that are used for the life of the process
self.procFunc = procFunc # user defined function this process will execute
def run(self):
super(processor,self).run()
self.cycleIdx = 0
# loop until the user process returns false
while True:
# if no msg is available
if self.iQ.empty():
time.sleep(0.1)
# attempt to get the message
try:
msg = self.iQ.get(block=False)
except queue.Empty:
continue # the dequeue attempt failed
# if the message is 'None' then end the process
if msg == None:
break
# run the user function
r = self._func(msg)
# send the result of the function back to the main process
self.oQ.put(r)
self.cycleIdx += 1
def _func(self,taskArgs):
resultD = self.procFunc( self.procId, self.procArgs, taskArgs )
return resultD
def _local_distribute_dispatcher( inQ, outQ, processN, taskArgsL, procArgs, processArgsFunc, processResultFunc, verboseLevel ):
bestScore = None
iterN = 0 # total count of jobs completed and pending
pendingN = 0 # total count of jobs pending
nextSrcIdx = 0
resultL = []
t0 = time.time()
while len(resultL) < len(taskArgsL):
# if available processes exist and all source files have not been sent for processing already
if pendingN < processN and nextSrcIdx < len(taskArgsL):
# if a args processing function was given
args = taskArgsL[nextSrcIdx]
if processArgsFunc is not None:
args = processArgsFunc(procArgs,args)
inQ.put( args )
nextSrcIdx += 1
pendingN += 1
t0 = time.time()
if verboseLevel>=3:
print(f"Send: remaining:{len(taskArgsL)-nextSrcIdx} pend:{pendingN} result:{len(resultL)}")
# if a process completed
elif not outQ.empty():
# attempt to get the message
try:
resultD = outQ.get(block=False)
except queue.Empty:
if verboseLevel > 0:
print("********* A message dequeue attempt failed.")
continue # the dequeue attempt failed
# if a result processing function was given
if processResultFunc is not None:
resultD = processResultFunc( procArgs, resultD )
resultL.append(resultD)
pendingN -= 1
t0 = time.time()
if verboseLevel>=3:
print(f"Recv: remaining:{len(taskArgsL)-nextSrcIdx} pend:{pendingN} result:{len(resultL)}")
# nothing to do - sleep
else:
time.sleep(0.1)
t1 = time.time()
if t1 - t0 > 60:
if verboseLevel >= 2:
print(f"Wait: remaining:{len(taskArgsL)-nextSrcIdx} pend:{pendingN} result:{len(resultL)}")
t0 = t1
return resultL
def local_distribute_main(processN, procFunc, procArgs, taskArgsL, processArgsFunc=None, processResultFunc=None, verboseLevel=3):
""" Distribute the function 'procFunc' to 'procN' local processes.
This function will call procFunc(procArgs,taskArgsL[i]) len(taskArgsL) times
and return the result of each call in the list resultL[].
The function will be run in processN parallel processes.
Input:
:processN: Count of processes to run in parallel.
:procFunc: A python function of the form: myProc(procId,procArgs,taskArgsL[i]).
This function is run in a remote process.
:procArgs: A data structure holding read-only arguments which are fixed accross all processes.
This data structure is duplicated on all remote processes.
:taskArgsL: A list of data structures holding the per-call arguments to 'procFunc()'.
Note that taskArgsL[i] may never be 'None' because None is used by the
processes control system to indicate that the process should be shutdown.
:processArgsFunc: A function of the form args = processArgsFunc(procArgs,args)
which can be used to modify the arg. record from taskArgssL[] prior to the call
to 'procFunc()'. This function runs locally in the calling functions process.
:processResultFunc: A function of the form result = processResulftFunc(procArgs,result).
which is called on the result of procFunc() prior to the result being store in the
return result list. This function runs locally in the calling functions process.
"""
processN = processN
mgr = multiprocessing.Manager()
inQ = mgr.Queue()
outQ = mgr.Queue()
processL = []
# create and start the processes
for i in range(processN):
pr = processor(i,inQ,outQ,procArgs,procFunc)
processL.append( pr )
pr.start()
# service the processes
resultL = _local_distribute_dispatcher(inQ, outQ, processN, taskArgsL, procArgs, processArgsFunc, processResultFunc, verboseLevel)
# tell the processes to stop
for pr in processL:
inQ.put(None)
# join the processes
for pr in processL:
while True:
pr.join(1.0)
if pr.is_alive():
time.sleep(1)
else:
break
return resultL

View File

@ -0,0 +1,42 @@
import csv
import matplotlib.pyplot as plt
import numpy as np
with open("/home/kevin/temp/temp_60_ch_0_c.csv") as f:
rdr = csv.reader(f)
smp_per_cycle = 183.468309
idL = []
yiL = []
xiL = []
fracL = []
xV = []
eV = []
yV = []
for r in rdr:
idL.append(int(r[0]))
iL.append(int(r[1]))
yiL.append(int(r[2]))
xiL.append(int(r[3]))
fracL.append(float(r[4]))
xV.append(float(r[5]))
eV.append(float(r[6]))
yV.append(float(r[7]))
xf = [ x+f for x,f in zip(xiL,fracL) ]
fig, ax = plt.subplots(4,1)
ax[0].plot(xV)
ax[1].plot(yV)
ax[2].plot(eV)
ax[3].plot(xf)
plt.show()

View File

@ -0,0 +1,94 @@
import os
import gen_midi_csv as gmc
import sample_looper as loop
import subprocess
def gen_ivory_player_caw_pgm( out_dir, caw_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label ):
caw_template = f"""
{{
base_dir: "{out_dir}"
io_dict: "{out_dir}/io.cfg"
proc_dict: "~/src/caw/src/libcw/flow/proc_dict.cfg",
subnet_dict: "~/src/caw/src/libcw/flow/subnet_dict.cfg",
programs: {{
sample_generator: {{
non_real_time_fl:false,
network: {{
procs: {{
mf: {{ class: midi_file, args:{{ csv_fname:"{midi_csv_fname}" }}}},
mout: {{ class: midi_out in:{{ in:mf.out }}, args:{{ dev_label:"{midi_dev_label}", port_label:"{midi_port_label}" }}}},
stop: {{ class: halt, in:{{ in:mf.done_fl }}}}
ain: {{ class: audio_in, args:{{ dev_label:"main" }}}},
split: {{ class: audio_split, in:{{ in:ain.out }} args:{{ select: [0,0, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1] }}}},
af: {{ class: audio_file_out, in:{{ in:split.out0 }}, args:{{ fname:"{audio_fname}"}}}},
aout: {{ class: audio_out, in:{{ in:ain.out }}, args:{{ dev_label:"main"}}}},
}}
}}
}}
}}
}}
"""
with open(caw_fname, "w") as f:
f.write(caw_template)
def gen_audio_file( out_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, audio_fname ):
caw_cfg_fname = os.path.join(out_dir,f"{midi_pitch:03}_caw.cfg")
gen_caw_ivory_player_pgm(out_dir, caw_cfg_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label )
argL = [ caw_exec_fname, "exec", caw_cfg_fname, "sample_generator" ]
print(" ".join(argL))
p = subprocess.Popen(argL,stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out,err =p.communicate()
if p.returncode != 0:
print("The call '%s' to failed with code:%i Message:%s %s"," ".join(argL),p.returncode,out,err)
if __name__ == "__main__":
# piano pitch range: 21-108
midi_pitch = 60
out_dir = "/home/kevin/temp/wt1"
note_on_sec = 1.5
note_off_sec = 0.5
velA = [1,8,15,22,29,36,42,49,56,63,70,77,84,91,98,105,112,119,126]
velA = [ 1,5,10,16,21,26,32,37,42,48,53,58,64,69,74,80,85,90,96,101,106,112,117,122,127]
midi_dev_label = "MIDIFACE 2x2"
midi_port_label = "MIDIFACE 2x2 Midi Out 1"
caw_exec_fname = "/home/kevin/src/caw/build/linux/debug/bin/caw"
# Generate the MIDI CSV used to trigger the sampler and a TSV which indicates the location of the
# triggered notes in seconds.
midi_csv_fname, mark_tsv_fname = gmc.gen_midi_csv_and_marker_files( midi_pitch, velA, note_on_sec, note_off_sec, out_dir )
audio_fname = os.path.join(out_dir,"wav",f"{midi_pitch:03}_samples.wav")
#gen_audio_file( out_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, midi_pitch )
loop_marker_out_fname = os.path.join(out_dir,f"{midi_pitch}_loop_mark.txt")
wt_out_fname = os.path.join(out_dir,"bank",f"{midi_pitch}_wt.json")
argsD = {
'end_offset_ms':100,
'loop_dur_ms':20, # 21=40, 60=20
'midi_pitch':midi_pitch,
'guess_cnt':5
}
loop.gen_loop_positions( audio_fname, mark_tsv_fname, midi_pitch, argsD, loop_marker_out_fname, wt_out_fname )

View File

@ -0,0 +1,238 @@
import wt_util
import math
import numpy as np
import matplotlib.pyplot as plt
import audiofile as af
def sign(x):
return x<0
def find_zero_crossing( xV, si, inc ):
# find the next zero crossing before/after si
while si > 0:
if sign(xV[si-1])==False and sign(xV[si])==True:
break;
si += inc
return si
def plot_xfades( zV, zL ):
fig,ax = plt.subplots(1,1)
ax.plot(zV)
for bi,ei in zL:
ax.vlines([bi,ei],-0.5,0.5)
plt.show()
def decay_compensation( xV, smp_per_cyc ):
def _calc_rms( sV, rmsN ):
bi = 0
ei = rmsN
rmsL = []
while ei < len(sV):
rms = np.pow(np.mean( np.pow( sV[bi:ei], 2 )),0.5)
rmsL.append(rms)
bi += 1
ei += 1
return rmsL
# calc the RMS env
rmsN = int(round(smp_per_cyc))
rmsL = _calc_rms( xV, rmsN)
# fit a line to the RMS env
x = np.arange(len(rmsL))
A = np.array( [x, np.ones((len(rmsL),))] ).T
m, c = np.linalg.lstsq(A, rmsL)[0]
# use the slope of the line to boost
# to progressively boost the gain
# of the signal over time
yV = np.copy(xV)
yi = rmsN
for i,r in enumerate(rmsL):
if yi < len(yV):
c = r + i*(-m)
g = c/r
yV[yi] *= g
yi += 1
if False:
# calc. the RMS env of the compensated signal
c_rmsL = _calc_rms(yV,rmsN)
db0 = 20.0*math.log10(rmsL[0])
db1 = 20.0*math.log10(rmsL[-1])
print(db0-db1, m*len(rmsL), len(rmsL))
#c_rmsL = [ r + i*(-m) for i,r in enumerate(rmsL) ]
fig,ax = plt.subplots(1,1)
ax.plot(rmsL)
ax.plot(x,m*x+c,'r')
ax.plot(c_rmsL)
plt.show()
return yV
def apply_gain( xxV,bi,ei,g, g_coeff ):
i = bi
while i < ei:
n = min(ei-i,64)
for j in range(n):
xxV[i+j] *= g
g *= g_coeff
i += n
return g
def gen_note( xV, xbi, xei, loop_dur_smp, y_dur_smp, xfade_smp, smp_per_cyc, g_coeff, bli=None ):
hannV = np.hanning( xfade_smp*2 + 1 )
fin_env = hannV[0:xfade_smp+1]
fout_env = hannV[xfade_smp:]
env_smp_cnt = len(fin_env)
assert( len(fout_env) == env_smp_cnt)
#print(fin_env[-1],fout_env[0],len(fin_env),len(fout_env))
if bli is None:
bli = find_zero_crossing( xV, xei - loop_dur_smp, -1 )
aN = (bli - xbi) + env_smp_cnt
aV = np.copy(xV[xbi:xbi+aN])
aV[-env_smp_cnt:] *= fout_env
lV = np.copy(xV[bli:bli+loop_dur_smp])
if False:
# compensate for loop period decay
lV = decay_compensation(lV,smp_per_cyc)
zV = np.zeros((y_dur_smp,))
zV[0:aN] = aV
zbi = aN-env_smp_cnt
zei = zbi + len(lV)
zL = []
g = 1.0
while(zei < len(zV)):
zL.append((zbi,zei))
elV = np.copy(lV)
elV[0:env_smp_cnt] *= fin_env
elV[-env_smp_cnt:] *= fout_env
zV[zbi:zei] += elV
g = apply_gain(zV,zbi,zei-env_smp_cnt,g,g_coeff)
zbi = zei - env_smp_cnt
zei = zbi + len(elV)
lV = np.flip(lV)
return zV,zL,bli,xV[bli:bli+loop_dur_smp]
def main( i_audio_fname, mark_tsv_fname, midi_pitch, loop_secs, note_dur_secs, xfade_ms, inter_note_sec, g_coeff, o_audio_fname, o_mark_tsv_fname ):
i_markL = wt_util.parse_marker_file(mark_tsv_fname)
xM,srate = wt_util.parse_audio_file(i_audio_fname)
chN = xM.shape[1]
fund_hz = wt_util.midi_pitch_to_hz(midi_pitch)
smp_per_cyc = srate / fund_hz
loop_dur_fsmp = loop_secs * srate
cyc_per_loop = int(loop_dur_fsmp / smp_per_cyc)
loop_dur_fsmp = cyc_per_loop * smp_per_cyc
loop_dur_smp = int(math.floor(loop_dur_fsmp))
xfade_smp = int(round(srate * xfade_ms / 1000.0))
inter_note_smp = int(round(srate*inter_note_sec))
note_dur_smp = int(round(srate*note_dur_secs))
note_cnt = len(i_markL)
print(f"{smp_per_cyc:.3f} smps/cyc {smp_per_cyc/srate:.3f} secs/cyc")
print(f"loop {cyc_per_loop} cycles dur: {loop_dur_fsmp/srate:.3f} secs")
print(f"xfade: {xfade_ms} ms {xfade_smp} smp")
yN = note_cnt * (note_dur_smp + inter_note_smp)
yM = np.zeros((yN,chN))
yi = 0;
o_markL = []
for beg_sec,end_sec,vel_label in i_markL:
vel = int(vel_label)
bsi = int(round(beg_sec * srate))
esi = int(round(end_sec * srate))
bli = None
for ch_i in range(chN):
zV,zL,bli,lV = gen_note( xM[:,ch_i], bsi, esi, loop_dur_smp, note_dur_smp, xfade_smp, smp_per_cyc, g_coeff, bli )
if vel > 70:
#plot_xfades(zV,zL)
pass
yM[yi:yi+len(zV),ch_i] = zV
if ch_i == 0:
o_markL.append( (yi/srate, (yi+len(zV))/srate, vel ))
yi += len(zV) + inter_note_smp
if False:
fig,ax = plt.subplots(2,1)
ax[0].plot(yM[:,0])
ax[1].plot(yM[:,1])
plt.show();
wt_util.write_audio_file( yM, srate, o_audio_fname )
wt_util.write_mark_tsv_file( o_markL, o_mark_tsv_fname )
def test_scipy(audio_fname):
samplerate = 44100.0
fs = 100
t = np.linspace(0., 1., int(samplerate))
data = np.sin(2. * np.pi * fs * t)
data = np.array([data,data])
wt_util.write_audio_file(data.T, samplerate, audio_fname)
if __name__ == "__main__":
midi_pitch = 21
loop_secs = 0.4
note_dur_secs = 7.0
xfade_ms = 2
inter_note_sec = 0.1
g_coeff = 0.9995
i_audio_fname = "/home/kevin/temp/wt3/wav/21_samples.wav"
mark_tsv_fname = "/home/kevin/temp/wt3/21_marker.txt"
o_audio_fname = "/home/kevin/temp/temp.wav"
o_mark_tsv_fname = "/home/kevin/temp/temp_mark.txt"
main( i_audio_fname, mark_tsv_fname, midi_pitch, loop_secs, note_dur_secs, xfade_ms, inter_note_sec, g_coeff, o_audio_fname, o_mark_tsv_fname )
if False:
test_scipy(o_audio_fname)

View File

@ -0,0 +1,392 @@
import math
import json
import array
import types
import matplotlib.pyplot as plt
import numpy as np
import wt_util
def plot_overlap( xV, bi, ei, wndN, smp_per_cycle, title ):
fig, ax = plt.subplots(1,1)
x0 = [ i for i in range(bi-wndN,bi+wndN) ]
x1 = [ x-x0[0] for x in x0 ]
ax.plot(x1,xV[x0])
x0 = [ i for i in range(ei-wndN,ei+wndN) ]
x1 = [ x-x0[0] for x in x0 ]
ax.plot(x1,xV[x0])
plt.title(title)
plt.show()
def sign(x):
return x<0
def find_zero_crossing( xV, si, inc ):
# find the next zero crossing before/after si
while si > 0:
if sign(xV[si-1])==False and sign(xV[si])==True:
break;
si += inc
return si
def meas_fit(xV,ei,bi,wndN):
if bi-wndN < 0 or ei+wndN > len(xV):
return None
v0 = xV[ei-wndN:ei+wndN]/0x7fffffff
v1 = xV[bi-wndN:bi+wndN]/0x7fffffff
dv = (v1-v0) * (v1-v0)
return np.mean(dv)
def find_loop_points_1( xV, bsi, esi, smp_per_cycle, wndN, est_N ):
# find the first zero crossing after the end of the search range
ei = find_zero_crossing(xV,esi,-1)
min_d = None
min_bi = None
bi = bsi
# make est_N guesses
for i in range(0,est_N):
# find the next zero crossing after bi
bi = find_zero_crossing(xV,bi,1)
# measure the quality of the fit with the end of the loop
d = meas_fit(xV,ei,bi,wndN)
#print(i,bi,d)
# store the best loop begin point
if min_bi is None or d < min_d:
min_d = d
min_bi = bi
# advance
bi += int(wndN) #smp_per_cycle
return min_bi, ei, min_d
def find_loop_points_2(xV,bsi,esi, smp_per_cycle, wndN, est_N ):
def _track_min( min_i, min_d, i, d ):
if min_i is None or d<min_d:
return i,d
return min_i,min_d
spc_2 = int(smp_per_cycle/2)
bzi,ezi = esi-spc_2, esi+spc_2
max_i = int(np.argmax(xV[bzi:ezi]) + bzi)
ei = find_zero_crossing(xV,max_i,1)
bi = max_i - int(round(((esi-bsi)/smp_per_cycle) * smp_per_cycle))
bi_p = bi
bi_n = bi-1
min_bi = None
min_d = None
for i in range(est_N):
# evaluate the fit of the next zero-crossing relative to bi_p
bi_p = find_zero_crossing(xV,bi_p,1)
if bi_p < ei:
d_p = meas_fit(xV,ei,bi_p,wndN)
min_bi,min_d = _track_min(min_bi,min_d,bi_p,d_p)
bi_p += 1 # advance bi_p forward
# evaluate the fit of the previous zero-crozzing relative to bi_n
bi_n = find_zero_crossing(xV,bi_n,-1)
d_n = meas_fit(xV,ei,bi_n,wndN)
min_bi,min_d = _track_min(min_bi,min_d,bi_n,d_n)
bi_n -= 1 # advance bi_n backward
return min_bi, ei, min_d
def find_loop_points_3(xV,bsi,esi, smp_per_cycle, wndN, est_N ):
spc_2 = int(smp_per_cycle/2)
bzi,ezi = bsi-spc_2, bsi+spc_2
max_i = int(np.argmax(xV[bzi:ezi]) + bzi)
bi = find_zero_crossing(xV,max_i,1)
ei = bi + math.ceil(smp_per_cycle)
#print(bi,ei,ei-bi,smp_per_cycle)
d = meas_fit(xV,ei,bi,wndN)
return bi,ei,d
def find_loop_points_4(xV,bsi,esi, smp_per_cycle, wndN, est_N ):
def _track_min( min_i, min_d, i, d ):
if d is not None and (min_i is None or d<min_d):
return i,d
return min_i,min_d
min_i = None
min_d = None
spc_2 = int(smp_per_cycle/2)
for i in range(est_N):
bzi,ezi = bsi-spc_2, bsi+spc_2
max_i = int(np.argmax(xV[bzi:ezi]) + bzi)
bi = find_zero_crossing(xV,max_i,1)
ei = bi + math.ceil(smp_per_cycle)
#print(bi,ei,ei-bi,smp_per_cycle)
d = meas_fit(xV,ei,bi,wndN)
min_i,min_d = _track_min(min_i,min_d,bi,d)
bsi += math.ceil(smp_per_cycle)
return min_i,min_i + math.ceil(smp_per_cycle),min_d
def find_loop_points(xV,bsi,esi, smp_per_cycle, wndN, est_N ):
def _track_min( min_i, min_d, i, d ):
if d is not None and (min_i is None or d<min_d):
return i,d
return min_i,min_d
min_i = None
min_d = None
spc_2 = int(smp_per_cycle/2)
bzi,ezi = bsi-spc_2, bsi+spc_2
max_i = int(np.argmax(xV[bzi:ezi]) + bzi)
bi = find_zero_crossing(xV,max_i,1)
for i in range(est_N):
ei = math.ceil(bi + (i+1)*smp_per_cycle)
#print(bi,ei,ei-bi,smp_per_cycle)
d = meas_fit(xV,ei,bi,wndN)
min_i,min_d = _track_min(min_i,min_d,ei,d)
return bi,min_i,min_d
def find_best_zero_crossing(xV,bi,ei,wndN):
bi0 = find_zero_crossing(xV,bi-1,-1)
bi1 = find_zero_crossing(xV,bi,1)
ei0 = find_zero_crossing(xV,ei-1,-1)
ei1 = find_zero_crossing(xV,ei,1)
beV = [ (ei0,bi0), (ei0,bi1), (ei1,bi0), (ei1,bi1) ]
i_min = None
d_min = None
for i,(ei,bi) in enumerate(beV):
d = meas_fit(xV,ei,bi,wndN)
if i_min is None or d < d_min:
i_min = i
d_min = d
ei,bi = beV[i_min]
return bi,ei,d_min
def determine_track_order( smpM, bli, eli ):
i_max = None
rms_max = None
rmsV = []
assert( smpM.shape[1] == 2 )
for i in range(smpM.shape[1]):
rms = np.mean(np.pow(smpM[bli:eli,i],2.0))
if i_max is None or rms > rms_max:
i_max = i
rms_max = rms
rmsV.append(float(rms))
return [ i_max, 0 if i_max==1 else 1 ]
def process_all_samples( markL, smpM, srate, args ):
wtL = []
#fund_hz = 13.75 * math.pow(2,(-9.0/12.0)) * math.pow(2.0,(args.midi_pitch / 12.0))
fund_hz = midi_pitch_to_hz(args.midi_pitch)
smp_per_cycle = int(srate / fund_hz)
end_offs_smp_idx = max(smp_per_cycle,int(args.end_offset_ms * srate / 1000))
loop_dur_smp = int(args.loop_dur_ms * srate / 1000)
wndN = int(smp_per_cycle/6)
print(f"Hz:{fund_hz} smp/cycle:{smp_per_cycle} loop_dur:{loop_dur_smp} cycles/loop:{loop_dur_smp/smp_per_cycle} wndN:{wndN}")
# for each sampled note
for beg_sec,end_sec,vel_label in markL:
beg_smp_idx = int(beg_sec * srate)
end_smp_idx = int(end_sec * srate)
r = {
"instr":"piano",
"pitch":args.midi_pitch,
"vel": int(vel_label),
"beg_smp_idx":beg_smp_idx,
"end_smp_idx":None,
"chL": []
}
# determine the loop search range from the end of the note sample
eli = end_smp_idx - end_offs_smp_idx
bli = eli - loop_dur_smp
ch_map = determine_track_order( smpM, beg_smp_idx, end_smp_idx)
#print(ch_map)
esi = beg_smp_idx;
for i in range(0,smpM.shape[1]):
ch_idx = ch_map[i]
xV = smpM[:,ch_idx]
if True:
#if i == 0:
# s_per_c = srate / fund_hz
# bi,ei,cost = find_loop_points(xV,bli,eli,s_per_c,wndN,args.guess_cnt)
s_per_c = srate / fund_hz
bi,ei,cost = find_loop_points_4(xV,bli,eli,s_per_c,wndN,args.guess_cnt)
if False:
bi,ei,cost = find_loop_points_2(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt)
if False:
if i == 0:
bi,ei,cost = find_loop_points(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt)
else:
bi,ei,cost = find_best_zero_crossing(xV,bi,ei,wndN)
if False:
if i == 0:
bi,ei,cost = find_loop_points(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt)
else:
pass
#print(i,bi,ei)
eli = ei # attempt to make the eli the second channel close to the first
loop_dur_sec = (ei-bi)/srate
cyc_per_loop = int(round((ei-bi)/smp_per_cycle))
plot_title = f"vel:{vel_label} cyc/loop:{cyc_per_loop} dur:{loop_dur_sec*1000:.1f} ms {ei-bi} smp ch:{ch_idx} cost:{0 if cost<= 0 else math.log(cost):.2f}"
plot_overlap(xV,bi,ei,wndN,smp_per_cycle,plot_title)
r["chL"].append({
"ch_idx":ch_idx,
"segL":[] })
r["chL"][-1]["segL"].append({
"cost":0,
"cyc_per_loop":1,
"bsi":beg_smp_idx,
"esi":bi })
r["chL"][-1]["segL"].append({
"cost":cost,
"cyc_per_loop":cyc_per_loop,
"bsi":bi,
"esi":ei })
esi = max(esi,ei)
r['end_smp_idx'] = esi
r["chL"] = sorted(r["chL"],key=lambda x: x["ch_idx"])
wtL.append(r)
return wtL
def write_loop_label_file( fname, wtL, srate ):
with open(fname,"w") as f:
for r in wtL:
for cr in r['chL']:
for sr in cr['segL']:
beg_sec = sr['bsi'] / srate
end_sec = sr['esi'] / srate
if sr['cost']!=0:
cost = math.log(sr['cost'])
label = f"ch:{cr['ch_idx']} {r['vel']} {cost:.2f}"
f.write(f"{beg_sec}\t{end_sec}\t{label}\n")
def write_wt_file( fname, audio_fname, wtL, srate ):
r = {
"audio_fname":audio_fname,
#"srate":srate,
"wt":wtL
}
with open(fname,"w") as f:
json.dump(r,f);
def gen_loop_positions( audio_fname, marker_tsv_fname, pitch, argsD, loop_marker_fname, wt_fname ):
args = types.SimpleNamespace(**argsD)
markL = wt_util.parse_marker_file(marker_tsv_fname)
smpM,srate = wt_util.parse_audio_file(audio_fname)
chN = smpM.shape[1]
wtL = process_all_samples(markL,smpM,srate,args)
write_loop_label_file(loop_marker_fname, wtL, srate)
write_wt_file(wt_fname,audio_fname, wtL,srate)
if __name__ == "__main__":
audio_fname = "/home/kevin/temp/wt/wav/60_samples.wav"
marker_tsv_fname = "/home/kevin/temp/wt/60_marker.txt"
loop_marker_fname = "/home/kevin/temp/wt/60_loop_mark.txt"
wt_fname = "/home/kevin/temp/wt/bank/60_wt.json"
midi_pitch = 60
argsD = {
'end_offset_ms':100,
'loop_dur_ms':100,
'midi_pitch':midi_pitch,
'guess_cnt':40
}
gen_loop_positions( audio_fname, marker_tsv_fname, midi_pitch, argsD, loop_marker_fname, wt_fname )

View File

@ -0,0 +1,323 @@
import math
import json
import array
import types
import matplotlib.pyplot as plt
import numpy as np
import wt_util
def sign(x):
return x<0
def find_zero_crossing( xV, si, inc ):
# find the next zero crossing before/after si
while si > 0:
if sign(xV[si-1])==False and sign(xV[si])==True:
break;
si += inc
return si
def table_read_2( tab, frac ):
i0 = math.floor(frac + 1)
i1 = i0 + 1
f = frac - int(frac)
return tab[i0] + (tab[i1] - tab[i0]) * f
def hann_read( x, N ):
while x > N:
x -= N
x = x - (N/2)
return (0.5 + 0.5 * math.cos(2*math.pi * x / N))
def sine_0():
srate = 48000.0
hz = wt_util.midi_pitch_to_hz(6)
fsmp_per_cyc = srate / hz # fractional samples per cycle
fsmp_per_wt = 2* fsmp_per_cyc # fractional samples per wavetable
smp_per_wt = int(math.floor(fsmp_per_wt)) # integer samples per wavetable
# Note that when wrapping from the last sample to the first there is less
# than one sample period and so the wavetable phase after the wrap will be fractional
# fill two wave tables with two identical cycles of a sine signal
wt0 = [0] + [ math.sin(2*math.pi*hz*i/srate) for i in range(smp_per_wt) ] + [0]
wt1 = [0] + [ math.sin(2*math.pi*hz*i/srate) for i in range(smp_per_wt) ] + [0]
xN = smp_per_wt * 4
phs0 = 0
phs1 = fsmp_per_wt/2
y0 = []
y1 = []
y2 = []
h0 = []
h1 = []
for i in range(xN):
# read the wave tables
s0 = table_read_2( wt0, phs0 )
s1 = table_read_2( wt1, phs1 )
y0.append( s0 )
y1.append( s1 )
# calc the envelopes
e0 = hann_read(phs0,fsmp_per_wt)
e1 = hann_read(phs1,fsmp_per_wt)
h0.append( e0 )
h1.append( e1 )
# sum the two signals
y2.append( e0*s0 + e1*s1 )
# advance the phases of the oscillators
phs0 += 1
if phs0 >= smp_per_wt:
phs0 -= smp_per_wt
phs1 += 1
if phs1 >= smp_per_wt:
phs1 -= smp_per_wt
fix,ax = plt.subplots(4,1)
ax[0].plot(y0)
ax[1].plot(y1)
ax[2].plot(h0)
ax[2].plot(h1)
ax[3].plot(y2)
plt.show()
def piano_0():
i_audio_fname = "/home/kevin/temp/wt1/wav/060_samples.wav"
o_audio_fname = "/home/kevin/temp/temp.wav"
marker_tsv_fname = "/home/kevin/temp/wt1/60_marker.txt"
midi_pitch = 60
offs_ms = 50
note_dur_secs = 3
inter_note_secs = 0.5
markL = wt_util.parse_marker_file(marker_tsv_fname)
aM,srate = wt_util.parse_audio_file(i_audio_fname)
hz = wt_util.midi_pitch_to_hz(midi_pitch)
fsmp_per_cyc = srate/hz
fsmp_per_wt = fsmp_per_cyc * 2
smp_per_wt = int(math.floor(fsmp_per_wt))
offs_smp = int(math.floor(offs_ms * srate / 1000))
ch_cnt = aM.shape[1]
note_dur_smp = int(round(note_dur_secs*srate))
inter_note_smp = int(round(inter_note_secs*srate))
yN = len(markL) * (note_dur_smp + inter_note_smp)
yM = np.zeros((yN,ch_cnt))
yi = 0
for beg_sec,end_sec,vel_label in markL:
bsi = int(round(beg_sec * srate))
esi = int(round(end_sec * srate))
for ch_idx in range(ch_cnt):
wtbi = find_zero_crossing(aM[:,ch_idx],esi-offs_smp,-1)
wtei = wtbi + smp_per_wt
wt = [0] + aM[wtbi:wtei,ch_idx].tolist() + [0]
wt[0] = aM[wtei-1,ch_idx]
wt[-1] = wt[1]
atkN = wtbi - bsi
yM[yi:yi+atkN,ch_idx] = aM[bsi:wtbi,ch_idx]
phs0 = 0
phs1 =fsmp_per_wt/2
for i in range(note_dur_smp-atkN):
s0 = table_read_2( wt, phs0 )
s1 = table_read_2( wt, phs1 )
e0 = hann_read(phs0,fsmp_per_wt)
e1 = hann_read(phs1,fsmp_per_wt)
# advance the phases of the oscillators
phs0 += 1
if phs0 >= smp_per_wt:
phs0 -= smp_per_wt
phs1 += 1
if phs1 >= smp_per_wt:
phs1 -= smp_per_wt
yM[yi+atkN+i,ch_idx] = e0*s0 + e1*s1
yi += note_dur_smp + inter_note_smp
wt_util.write_audio_file( yM, srate, o_audio_fname )
def select_wave_table( aV, si, smp_per_wt ):
wtbi = find_zero_crossing(aV,si,-1)
wtei = wtbi + smp_per_wt
wt = [0] + aV[wtbi:wtei].tolist() + [0]
wt[0] = aV[wtei-1]
wt[-1] = wt[1]
return wt,wtbi
def piano_1():
o_audio_fname = "/home/kevin/temp/temp.wav"
o_mark_tsv_fname = "/home/kevin/temp/temp_mark.txt"
if False:
i_audio_fname = "/home/kevin/temp/wt1/wav/060_samples.wav"
marker_tsv_fname = "/home/kevin/temp/wt1/60_marker.txt"
midi_pitch = 60
offs_0_ms = 100
offs_1_ms = 50
g_coeff = 0.9985
if True:
i_audio_fname = "/home/kevin/temp/wt3/wav/21_samples.wav"
marker_tsv_fname = "/home/kevin/temp/wt3/21_marker.txt"
midi_pitch = 21
offs_0_ms = 100
offs_1_ms = 80
g_coeff = 0.9992
note_dur_secs = 6
inter_note_secs = 0.5
markL = wt_util.parse_marker_file(marker_tsv_fname)
aM,srate = wt_util.parse_audio_file(i_audio_fname)
hz = wt_util.midi_pitch_to_hz(midi_pitch)
fsmp_per_cyc = srate/hz
fsmp_per_wt = fsmp_per_cyc * 2
smp_per_wt = int(math.floor(fsmp_per_wt))
offs_0_smp = int(math.floor(offs_0_ms * srate / 1000))
offs_1_smp = int(math.floor(offs_1_ms * srate / 1000))
ch_cnt = aM.shape[1]
note_dur_smp = int(round(note_dur_secs*srate))
inter_note_smp = int(round(inter_note_secs*srate))
yN = len(markL) * (note_dur_smp + inter_note_smp)
yM = np.zeros((yN,ch_cnt))
yi = 0
oMarkL = []
for beg_sec,end_sec,vel_label in markL:
bsi = int(round(beg_sec * srate))
esi = int(round(end_sec * srate))
for ch_idx in range(ch_cnt):
wt0,wtbi = select_wave_table( aM[:,ch_idx], esi-offs_0_smp, smp_per_wt )
wt1,_ = select_wave_table( aM[:,ch_idx], esi-offs_1_smp, smp_per_wt)
rms0 = np.pow( np.mean( np.pow(wt0,2) ),0.5)
rms1 = np.pow( np.mean( np.pow(wt1,2) ),0.5)
wt1 = [ w*rms0/rms1 for w in wt1 ]
# The attack abutts the wavetable at it's center point
# so we need to offset the end of the attack half way
# through the first wave table.
abi = int(wtbi + smp_per_wt/2)
atkN = abi - bsi
yM[yi:yi+atkN,ch_idx] = aM[bsi:abi,ch_idx]
oMarkL.append(((yi+atkN)/srate, (yi+atkN)/srate, f"{vel_label}-{ch_idx}"))
phs0 = 0
phs1 = fsmp_per_wt/2
phs2 = fsmp_per_wt/4
phs3 = fsmp_per_wt/4 + fsmp_per_wt/2
g = 1.0
g_phs = 0
for i in range(note_dur_smp-atkN):
s0 = table_read_2( wt0, phs0 )
s1 = table_read_2( wt0, phs1 )
s2 = table_read_2( wt1, phs2 )
s3 = table_read_2( wt1, phs3 )
e0 = hann_read(phs0,fsmp_per_wt)
e1 = hann_read(phs1,fsmp_per_wt)
e2 = hann_read(phs0,fsmp_per_wt)
e3 = hann_read(phs1,fsmp_per_wt)
# advance the phases of the oscillators
phs0 += 1
if phs0 >= smp_per_wt:
phs0 -= smp_per_wt
phs1 += 1
if phs1 >= smp_per_wt:
phs1 -= smp_per_wt
phs2 += 1
if phs2 >= smp_per_wt:
phs2 -= smp_per_wt
phs3 += 1
if phs3 >= smp_per_wt:
phs3 -= smp_per_wt
mix_g = math.cos(0.25*2*math.pi*i/srate)
#yM[yi+atkN+i,ch_idx] = g* (mix_g*(e0*s0 + e1*s1) + (1.0-mix_g)*(e2*s2 + e3*s3))
yM[yi+atkN+i,ch_idx] = g* (e0*s0 + e1*s1)
g_phs += 1
if g_phs >= 64:
g *= g_coeff
g_phs = 0
yi += note_dur_smp + inter_note_smp
wt_util.write_audio_file( yM, srate, o_audio_fname )
wt_util.write_mark_tsv_file(oMarkL, o_mark_tsv_fname)
if __name__ == "__main__":
#sine_0()
piano_1()

View File

@ -0,0 +1,99 @@
import subprocess
import os
def gen_ivory_player_caw_pgm( out_dir, caw_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label ):
caw_template = f"""
{{
base_dir: "{out_dir}"
io_dict: "{out_dir}/io.cfg"
proc_dict: "~/src/caw/src/libcw/flow/proc_dict.cfg",
subnet_dict: "~/src/caw/src/libcw/flow/subnet_dict.cfg",
programs: {{
sample_generator: {{
non_real_time_fl:false,
network: {{
procs: {{
mf: {{ class: midi_file, args:{{ csv_fname:"{midi_csv_fname}" }}}},
mout: {{ class: midi_out in:{{ in:mf.out }}, args:{{ dev_label:"{midi_dev_label}", port_label:"{midi_port_label}" }}}},
stop: {{ class: halt, in:{{ in:mf.done_fl }}}}
ain: {{ class: audio_in, args:{{ dev_label:"main" }}}},
split: {{ class: audio_split, in:{{ in:ain.out }} args:{{ select: [0,0, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1] }}}},
af: {{ class: audio_file_out, in:{{ in:split.out0 }}, args:{{ fname:"{audio_fname}"}}}},
aout: {{ class: audio_out, in:{{ in:ain.out }}, args:{{ dev_label:"main"}}}},
}}
}}
}}
}}
}}
"""
with open(caw_fname, "w") as f:
f.write(caw_template)
def gen_audio_file( out_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, caw_cfg_fname, audio_fname ):
gen_ivory_player_caw_pgm(out_dir, caw_cfg_fname, midi_csv_fname, audio_fname, midi_dev_label, midi_port_label )
argL = [ caw_exec_fname, "exec", caw_cfg_fname, "sample_generator" ]
print(" ".join(argL))
p = subprocess.Popen(argL,stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out,err =p.communicate()
if p.returncode != 0:
print("The call '%s' to failed with code:%i Message:%s %s"," ".join(argL),p.returncode,out,err)
def gen_all_files( work_dir,midi_dev_label,midi_port_label,caw_exec_fname,midi_filterL=None ):
fnL = os.listdir(work_dir)
for fn in fnL:
if "." in fn:
fn_ext_partL = fn.split(".")
if fn_ext_partL[1] == "csv":
fn_partL = fn_ext_partL[0].split("_")
mark_fn = "_".join(fn_partL[0:-1]) + "_marker.txt"
damp_fl = fn_partL[1] == "damp"
midi_pitch = int(fn_partL[0])
if midi_filterL is None or (midi_filterL is not None and midi_pitch in midi_filterL):
wav_dir = os.path.join(work_dir,"wav")
if not os.path.isdir(wav_dir):
os.mkdir(wav_dir)
damp_label = "damp_" if damp_fl else ""
audio_fname = os.path.join(wav_dir,f"{midi_pitch:03}_{damp_label}samples.wav")
midi_csv_fname = os.path.join(work_dir,fn)
mark_tsv_fname = os.path.join(work_dir,mark_fn)
caw_cfg_fname = os.path.join(work_dir,f"{midi_pitch:03}_{damp_label}caw.cfg")
print(midi_pitch,midi_csv_fname,mark_tsv_fname,audio_fname)
gen_audio_file( work_dir, midi_csv_fname, midi_dev_label, midi_port_label, caw_exec_fname, caw_cfg_fname, audio_fname )
if __name__ == "__main__":
work_dir = "/home/kevin/temp/wt6"
midi_dev_label = "MIDIFACE 2x2"
midi_port_label = "MIDIFACE 2x2 Midi Out 1"
caw_exec_fname = "/home/kevin/src/caw/build/linux/debug/bin/caw"
midi_filterL = None
gen_all_files(work_dir,midi_dev_label,midi_port_label,caw_exec_fname,midi_filterL)

55
py/gen_wavetables/temp.py Normal file
View File

@ -0,0 +1,55 @@
def process_all_samples_0( markL, smpM, srate, args ):
wtL = []
fund_hz = 13.75 * math.pow(2,(-9.0/12.0)) * math.pow(2.0,(args.midi_pitch / 12.0))
end_offs_smp_idx = int(args.end_offset_ms * srate / 1000)
loop_dur_smp = int(args.loop_dur_ms * srate / 1000)
smp_per_cycle = int(srate / fund_hz)
print(f"Hz:{fund_hz} smp/cycle:{smp_per_cycle}")
for beg_sec,end_sec,vel_label in markL:
for ch_idx in range(0,smpM.shape[1]):
beg_smp_idx = int(beg_sec * srate)
end_smp_idx = int(end_sec * srate)
eli = end_smp_idx - end_offs_smp_idx
bli = eli - loop_dur_smp
#print(beg_smp_idx,bli,eli,end_smp_idx)
xV = smpM[:,ch_idx]
wndN = int(smp_per_cycle/3)
bi,ei,cost = find_loop_points(xV,bli,eli,smp_per_cycle,wndN,args.guess_cnt)
plot_title = f"vel:{vel_label} ch:{ch_idx} cost:{math.log(cost):.2f}"
#plot_overlap(xV,bi,ei,wndN,smp_per_cycle,plot_title)
wtL.append( {
"pitch":args.midi_pitch,
"vel": int(vel_label),
"cost":cost,
"ch_idx":ch_idx,
"beg_smp_idx":beg_smp_idx,
"end_smp_idx":end_smp_idx,
"beg_loop_idx":bi,
"end_loop_idx":ei })
return wtL
def write_loop_label_file_0( fname, wtL, srate ):
with open(fname,"w") as f:
for r in wtL:
beg_sec = r['beg_smp_idx'] / srate
end_sec = r['end_smp_idx'] / srate
# f.write(f"{beg_sec}\t{end_sec}\t{r['vel_label']}\n")
beg_sec = r['beg_loop_idx'] / srate
end_sec = r['end_loop_idx'] / srate
cost = math.log(r['cost'])
label = f"ch:{r['ch_idx']} {cost:.2f}"
f.write(f"{beg_sec}\t{end_sec}\t{label}\n")

201
py/gen_wavetables/wt_osc.py Normal file
View File

@ -0,0 +1,201 @@
import os
import math
import json
import wt_util
import calc_sample_atk_dur
import calc_wavetables
import numpy as np
import multiproc as mp
def table_read_2( tab, frac ):
i0 = math.floor(frac + 1)
i1 = i0 + 1
f = frac - int(frac)
return tab[i0] + (tab[i1] - tab[i0]) * f
def hann_read( x, N ):
while x > N:
x -= N
x = x - (N/2)
return (0.5 + 0.5 * math.cos(2*math.pi * x / N))
def prepare_wt( aV, wtL, wt_idx ):
wt_smp_cnt = wtL[0]['wtei'] - wtL[0]['wtbi']
if wt_idx >= len(wtL):
wt = np.zeros((wt_smp_cnt,))
else:
wt = np.copy(aV[ wtL[wt_idx]['wtbi']: wtL[wt_idx]['wtei'] ])
wt = [0] + wt.tolist() + [0]
wt[0] = wt[-2]
wt[-1] = wt[0]
return np.array(wt)
def get_wt( aV, wtL, wt_idx ):
wt0 = prepare_wt(aV,wtL,wt_idx)
wt1 = prepare_wt(aV,wtL,wt_idx+1)
return wt0,wt1
def gen_osc_output( i_audio_fname, velL, midi_pitch, note_dur_sec, inter_note_sec, wt_interval_sec, o_audio_fname ):
smp_per_dsp_frm = 64
aM,srate = wt_util.parse_audio_file(i_audio_fname)
ch_cnt = aM.shape[1]
note_dur_smp = int(round(note_dur_sec * srate))
inter_note_smp = int(round(inter_note_sec * srate))
wt_interval_smp = int(round(wt_interval_sec * srate))
yN = len(velL) * (note_dur_smp + inter_note_smp)
yM = np.zeros((yN,ch_cnt))
yi = 0
for velD in velL:
bsi = velD['bsi']
hz = wt_util.midi_pitch_to_hz(midi_pitch)
fsmp_per_cyc = srate/hz
fsmp_per_wt = fsmp_per_cyc * 2
smp_per_wt = int(math.floor(fsmp_per_wt))
for ch_idx in range(ch_cnt):
wtL = velD['chL'][ch_idx]
if len(wtL) == 0:
print(f"pitch:{midi_pitch} vel:{velD['vel']} ch:{ch_idx} has no wavetables.")
continue
# The attack abutts the wavetable at it's center point
# so we need to offset the end of the attack half way
# through the first wave table.
abi = int(wtL[0]['wtbi'] + smp_per_wt/2)
atkN = min(abi - bsi,note_dur_smp)
print(velD['vel'],yi+atkN,yN,atkN/srate)
yM[yi:yi+atkN,ch_idx] = aM[bsi:bsi+atkN,ch_idx]
wt_idx = 0
wt0,wt1 = get_wt( aM[:,ch_idx], wtL, wt_idx )
wt = wt0
wt_int_phs = 0
phs0 = 0
phs1 = fsmp_per_wt/2
for i in range(note_dur_smp-atkN):
s0 = table_read_2( wt, phs0 )
s1 = table_read_2( wt, phs1 )
e0 = hann_read(phs0,fsmp_per_wt)
e1 = hann_read(phs1,fsmp_per_wt)
yM[yi+atkN+i,ch_idx] = e0*s0 + e1*s1
# advance the phases of the oscillators
phs0 += 1
if phs0 >= smp_per_wt:
phs0 -= smp_per_wt
phs1 += 1
if phs1 >= smp_per_wt:
phs1 -= smp_per_wt
wt_int_phs += 1
if wt_int_phs % smp_per_dsp_frm == 0:
wt_mix = min(1.0, wt_int_phs / wt_interval_smp)
wt = ((1.0-wt_mix) * wt0) + (wt_mix * wt1)
if wt_int_phs >= wt_interval_smp:
wt_idx += 1
wt0,wt1 = get_wt( aM[:,ch_idx], wtL, wt_idx )
wt = wt0
wt_int_phs = 0
wt_mix = 0
yi += note_dur_smp + inter_note_smp
print(o_audio_fname)
wt_util.write_audio_file( yM, srate, o_audio_fname )
def gen_from_wt_json( processN, wt_json_fname, out_dir, note_dur_sec, inter_note_sec, pitch_filtL=None ):
def _multi_proc_func( procId, procArgsD, taskArgsD ):
gen_osc_output(**taskArgsD)
if not os.path.isdir(out_dir):
os.mkdir(out_dir)
with open(wt_json_fname) as f:
pitchL = json.load(f)['pitchL']
taskArgsL = []
for pitchD in pitchL:
if pitch_filtL is None or pitchD['midi_pitch'] in pitch_filtL:
taskArgsL.append( {
"i_audio_fname":pitchD['audio_fname'],
"velL":pitchD['velL'],
"midi_pitch":pitchD['midi_pitch'],
"note_dur_sec":note_dur_sec,
"inter_note_sec":inter_note_sec,
"wt_interval_sec":pitchD['wt_interval_secs'],
"o_audio_fname":os.path.join(out_dir,f"{pitchD['midi_pitch']:03}_osc.wav")
})
processN = min(processN,len(taskArgsL))
mp.local_distribute_main( processN,_multi_proc_func,{},taskArgsL )
if __name__ == "__main__":
if True:
wt_json_fname = "/home/kevin/temp/temp_5.json"
out_dir = "/home/kevin/temp/wt_osc_1"
note_dur_sec = 10.0
inter_note_sec = 1.0
processN = 20
pitch_filtL = None #[ 27 ]
gen_from_wt_json(processN, wt_json_fname,out_dir,note_dur_sec,inter_note_sec, pitch_filtL)
if False:
midi_pitch = 60
audio_fname = "/home/kevin/temp/wt5/wav/060_samples.wav"
mark_tsv_fname = "/home/kevin/temp/wt5/60_marker.txt"
rms_wnd_ms = 50
rms_hop_ms = 10
atkEndThreshDb = -43.0
wt_interval_secs = 1.0
note_dur_sec = 10.0
inter_note_sec = 1.0
gateL = calc_sample_atk_dur.generate_gate_db(audio_fname,mark_tsv_fname,rms_wnd_ms, rms_hop_ms, atkEndThreshDb )
wtL = calc_wavetables.gen_wave_table_list( audio_fname, mark_tsv_fname, gateL, midi_pitch, wt_interval_secs )
gen_osc_output(audio_fname,wtL,note_dur_sec,inter_note_sec, wt_interval_secs, "/home/kevin/temp/temp.wav")

View File

@ -0,0 +1,89 @@
import wave as w
import math
import array
import numpy as np
import scipy.io.wavfile
def midi_pitch_to_hz( midi_pitch ):
return 13.75 * math.pow(2,(-9.0/12.0)) * math.pow(2.0,(midi_pitch / 12.0))
def parse_marker_file( marker_fname ):
markL = []
with open(marker_fname) as f:
for line in f:
tokL = line.split("\t");
assert( len(tokL) == 3 )
markL.append( ( float(tokL[0]), float(tokL[1]), tokL[2] ) )
return markL
def parse_audio_file( audio_fname ):
max_smp_val = float(0x7fffffff)
with w.open(audio_fname,"rb") as f:
print(f"ch:{f.getnchannels()} bits:{f.getsampwidth()*8} srate:{f.getframerate()} frms:{f.getnframes()}")
srate = f.getframerate()
frmN = f.getnframes()
data_bytes = f.readframes(frmN)
smpM = np.array(array.array('i',data_bytes))
# max_smp_val assumes 32 bits
assert( f.getsampwidth() == 4 )
smpM = smpM / max_smp_val
smpM = np.reshape(smpM,(frmN,2))
return smpM,srate
def write_audio_file( xM, srate, audio_fname ):
xM *= np.iinfo(np.int32).max
scipy.io.wavfile.write(audio_fname, srate, xM.astype(np.int32))
def write_audio_file_0( xM, srate, audio_fname ):
# Convert to (little-endian) 32 bit integers.
xM = (xM * (2 ** 31 - 1)).astype("<i4")
with w.open(audio_fname,"w") as f:
f.setnchannels(xM.shape[0])
f.setsampwidth(4)
f.setframerate(srate)
f.setnframes(xM.shape[1])
f.writeframes(xM.tobytes())
def write_mark_tsv_file( markL, fname ):
### markL = [(beg_sec,end_sec,label)]
with open(fname,"w") as f:
for beg_sec,end_sec,label in markL:
f.write(f"{beg_sec}\t{end_sec}\t{label}\n")
def find_zero_crossing( xV, si, inc ):
# find the next zero crossing before/after si
def sign(x):
return x<0
while si > 0 and si < len(xV):
if sign(xV[si-1])==False and sign(xV[si])==True:
return si
si += inc
return None