piccal/plot_seq.py

634 lines
20 KiB
Python

##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
import os, sys,json
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import wavfile
from common import parse_yaml_cfg
from rms_analysis import rms_analysis_main
from rms_analysis import select_first_stable_note_by_delta_db
from rms_analysis import select_first_stable_note_by_dur
from rms_analysis import samples_to_linear_residual
import rms_analysis as ra
#from rms_analysis import audio_rms
#from rms_analysis import locate_peak_indexes
#from rms_analysis import audio_stft_rms
#from rms_analysis import calc_harm_bins
def is_nanV( xV ):
for i in range(xV.shape[0]):
if np.isnan( xV[i] ):
return True
return False
def _find_max_take_id( inDir ):
id = 0
while os.path.isdir( os.path.join(inDir, "%i" % id) ):
id += 1
if id > 0:
id -= 1
return id
def form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None ):
# append the midi pitch to the input directory
inDir = os.path.join( inDir, str(midi_pitch))
dirL = os.listdir(inDir)
pkL = []
maxTakeNumber = 0
# for each take in this directory
for idir in dirL:
take_number = int(idir)
if not os.path.isfile(os.path.join( inDir,idir, "seq.json")):
continue
if analysisArgsD['useLastTakeOnlyFl']:
if take_number > maxTakeNumber:
pkL.clear()
maxTakeNumber = take_number
else:
continue
# analyze this takes audio and locate the note peaks
r = rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD['rmsAnalysisArgs'] )
# store the peaks in pkL[ (db,us) ]
for db,us in zip(r.pkDbL,r.pkUsL):
pkL.append( (db,us) )
# sort the peaks on increasing attack pulse microseconds
pkL = sorted( pkL, key= lambda x: x[1] )
# merge sample points that separated by less than 'minSampleDistUs' milliseconds
pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] )
# split pkL
pkDbL,pkUsL = tuple(zip(*pkL))
#-------------------------------------------
# locate the first and last note
min_pk_idx, max_pk_idx = find_min_max_peak_index( pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] )
print("MIN MAX:",min_pk_idx,pkUsL[min_pk_idx],max_pk_idx,pkUsL[max_pk_idx])
db1 = pkDbL[ max_pk_idx ]
db0 = pkDbL[ min_pk_idx ]
pulseUsL = []
pulseDbL = []
multValL = []
for out_idx in range(128):
# calc the target volume
db = db0 + (out_idx * (db1-db0)/127.0)
multi_value_count = 0
# look for the target between each of the sampled points
for i in range(1,len(pkDbL)):
# if the target volume is between these two sample points
if pkDbL[i-1] <= db and db < pkDbL[i]:
# if the target has not already been located
if len(pulseUsL) == out_idx:
# interpolate the pulse time from between the sampled points
frac = (db - pkDbL[i-1]) / (pkDbL[i] - pkDbL[i-1])
us = pkUsL[i-1] + frac * (pkUsL[i] - pkUsL[i-1])
db = pkDbL[i-1] + frac * (pkDbL[i] - pkDbL[i-1])
pulseUsL.append(us)
pulseDbL.append(db)
else:
# this target db value was found between multiple sampled points
# therefore the sampled volume function is not monotonic
multi_value_count += 1
if multi_value_count > 0:
multValL.append((out_idx,multi_value_count))
if len(multValL) > 0:
# print("Multi-value pulse locations were found during velocity table formation: ",multValL)
pass
return pulseUsL,pulseDbL,r.holdDutyPctL
def merge_close_sample_points( pkDbUsL, minSampleDistanceUs ):
avg0Us = np.mean(np.diff([ x[1] for x in pkDbUsL ]))
n0 = len(pkDbUsL)
while True and n0>0:
us0 = None
db0 = None
for i,(db,us) in enumerate(pkDbUsL):
if i > 0 and us - us0 < minSampleDistanceUs:
us1 = (us0 + us)/2
db1 = (db0 + db)/2
pkDbUsL[i-1] = (db1,us1)
del pkDbUsL[i]
break
else:
us0 = us
db0 = db
if i+1 == len(pkDbUsL):
break
avg1Us = np.mean(np.diff([ x[1] for x in pkDbUsL ]))
print("%i sample points deleted by merging close points." % (n0 - len(pkDbUsL)))
print("Mean time between samples - before:%f after:%f " % (avg0Us,avg1Us))
print("Min time between samples: %i " % (np.min(np.diff([x[1] for x in pkDbUsL]))))
return pkDbUsL
def _calc_resample_points( dPkDb, pkUs0, pkUs1, samplePerDb, minSampleDistUs ):
dPkUs = pkUs1 - pkUs0
sampleCnt = max(int(round(abs(dPkDb) * samplePerDb)),samplePerDb)
dUs = max(int(round(dPkUs/sampleCnt)),minSampleDistUs)
sampleCnt = int(round(dPkUs/dUs))
dUs = int(round(dPkUs/sampleCnt))
usL = [ pkUs0 + dUs*j for j in range(sampleCnt+1)]
return usL
def calc_resample_ranges( pkDbL, pkUsL, min_pk_idx, max_pk_idx, maxDeltaDb, samplePerDb, minSampleDistUs ):
if min_pk_idx == 0:
print("No silent notes were generated. Decrease the minimum peak level or the hold voltage.")
return None
resampleUsSet = set()
refPkDb = pkDbL[min_pk_idx]
#pkDbL = pkDbL[ pkIdxL ]
for i in range( min_pk_idx, max_pk_idx+1 ):
d = pkDbL[i] - pkDbL[i-1]
usL = []
# if this peak is less than maxDeltaDb above the previous pk or
# it is below the previous max peak
if d > maxDeltaDb or d <= 0 or pkDbL[i] < refPkDb:
usL = _calc_resample_points( d, pkUsL[i-1], pkUsL[i], samplePerDb, minSampleDistUs )
if d <= 0 and i + 1 < len(pkDbL):
d = pkDbL[i+1] - pkDbL[i]
usL += _calc_resample_points( d, pkUsL[i-1], pkUsL[i], samplePerDb, minSampleDistUs )
if pkDbL[i] > refPkDb:
refPkDb = pkDbL[i]
if usL:
resampleUsSet = resampleUsSet.union( usL )
return resampleUsSet
def form_resample_pulse_time_list( inDir, midi_pitch, analysisArgsD ):
"""" This function merges all available data from previous takes to form
a new list of pulse times to sample.
"""
inDir = os.path.join( inDir, str(midi_pitch) )
dirL = os.listdir(inDir)
pkL = []
# for each take in this directory
for idir in dirL:
take_number = int(idir)
# analyze this takes audio and locate the note peaks
r = rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD['rmsAnalysisArgs'] )
# store the peaks in pkL[ (db,us) ]
for db,us in zip(r.pkDbL,r.pkUsL):
pkL.append( (db,us) )
# sort the peaks on increasing attack pulse microseconds
pkL = sorted( pkL, key= lambda x: x[1] )
# merge sample points that separated by less than 'minSampleDistUs' milliseconds
pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] )
# split pkL
pkDbL,pkUsL = tuple(zip(*pkL))
# locate the first and last note
min_pk_idx, max_pk_idx = find_min_max_peak_index( pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] )
# estimate the microsecond locations to resample
resampleUsSet = calc_resample_ranges( pkDbL, pkUsL, min_pk_idx, max_pk_idx, analysisArgsD['maxDeltaDb'], analysisArgsD['samplesPerDb'], analysisArgsD['minSampleDistUs'] )
resampleUsL = sorted( list(resampleUsSet) )
#print(resampleUsL)
return resampleUsL, pkDbL, pkUsL
def plot_curve( ax, pulseUsL, rmsDbV ):
coeff = np.polyfit(pulseUsL,rmsDbV,5)
func = np.poly1d(coeff)
ax.plot( pulseUsL, func(pulseUsL), color='red')
def plot_resample_pulse_times_0( inDir, analysisArgsD, midi_pitch, printDir="" ):
newPulseUsL, rmsDbV, pulseUsL = form_resample_pulse_time_list( inDir, midi_pitch, analysisArgsD )
velTblUsL,velTblDbL,_ = form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None )
fig,ax = plt.subplots()
ax.plot(pulseUsL,rmsDbV,marker='.' )
for us in newPulseUsL:
ax.axvline( x = us )
print(len(velTblUsL))
ax.plot(velTblUsL,velTblDbL,marker='.',linestyle='None',color='red')
if printDir:
plt.savefig(os.path.join(printDir,"plot_resample_times_0.png"),format="png")
plt.show()
def plot_resample_pulse_times( inDir, analysisArgsD, midi_pitch, printDir="" ):
newPulseUsL, rmsDbV, pulseUsL = form_resample_pulse_time_list( inDir, midi_pitch, analysisArgsD )
velTblUsL,velTblDbL,_ = form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None )
fig,axL = plt.subplots(2,1,gridspec_kw={'height_ratios': [2, 1]})
axL[0].plot(pulseUsL,rmsDbV,marker='.',color="red" )
#plot_curve( ax, velTblUsL,velTblDbL)
scoreV = samples_to_linear_residual( pulseUsL, rmsDbV)
axL[0].plot(pulseUsL,rmsDbV + scoreV)
axL[0].plot(pulseUsL,rmsDbV + np.power(scoreV,2.0))
axL[0].plot(pulseUsL,rmsDbV - np.power(scoreV,2.0))
axL[1].axhline(0.0,color='black')
axL[1].axhline(1.0,color='black')
axL[1].plot(pulseUsL,np.abs(scoreV * 100.0 / rmsDbV))
axL[1].set_ylim((0.0,50))
if printDir:
plt.savefig(os.path.join(printDir,"plot_resample_times.png"),format="png")
plt.show()
def find_min_max_peak_index( pkDbL, minDb, maxDbOffs ):
"""
Find the min db and max db peak.
"""
# select only the peaks from rmsV[] to work with
yV = pkDbL
# get the max volume note
max_i = np.argmax( yV )
maxDb = yV[ max_i ]
min_i = max_i
# starting from the max volume peak go backwards
for i in range( max_i, 0, -1 ):
# if this peak is within maxDbOffs of the loudest then choose this one instead
#if maxDb - yV[i] < maxDbOffs:
# max_i = i
# if this peak is less than minDb then the previous note is the min note
if yV[i] < minDb:
break
min_i = i
if min_i >= max_i:
min_i = 0
max_i = len(pkDbL)-1
if min_i == 0:
print("No silent notes were generated. Decrease the minimum peak level or the hold voltage.")
return min_i, max_i
def find_skip_peaks( rmsV, pkIdxL, min_pk_idx, max_pk_idx ):
""" Fine peaks associated with longer attacks pulses that are lower than peaks with a shorter attack pulse.
These peaks indicate degenerate portions of the pulse/db curve which must be skipped during velocity table formation
"""
skipPkIdxL = []
yV = rmsV[pkIdxL]
refPkDb = yV[min_pk_idx]
for i in range( min_pk_idx+1, max_pk_idx+1 ):
if yV[i] > refPkDb:
refPkDb = yV[i]
else:
skipPkIdxL.append(i)
return skipPkIdxL
def find_out_of_range_peaks( rmsV, pkIdxL, min_pk_idx, max_pk_idx, maxDeltaDb ):
""" Locate peaks which are more than maxDeltaDb from the previous peak.
If two peaks are separated by more than maxDeltaDb then the range must be resampled
"""
oorPkIdxL = []
yV = rmsV[pkIdxL]
for i in range( min_pk_idx, max_pk_idx+1 ):
if i > 0:
d = yV[i] - yV[i-1]
if d > maxDeltaDb or d < 0:
oorPkIdxL.append(i)
return oorPkIdxL
def plot_spectrum( ax, srate, binHz, specV, midiPitch, harmN ):
""" Plot a single spectrum, 'specV' and the harmonic peak location boundaries."""
binN = specV.shape[0]
harmLBinL,harmMBinL,harmUBinL = ra.calc_harm_bins( srate, binHz, midiPitch, harmN )
fundHz = harmMBinL[0] * binHz
maxPlotHz = fundHz * (harmN+1)
maxPlotBinN = int(round(maxPlotHz/binHz))
hzV = np.arange(binN) * (srate/(binN*2))
specV = 20.0 * np.log10(specV)
ax.plot(hzV[0:maxPlotBinN], specV[0:maxPlotBinN] )
for h0,h1,h2 in zip(harmLBinL,harmMBinL,harmUBinL):
ax.axvline( x=h0 * binHz, color="blue")
ax.axvline( x=h1 * binHz, color="black")
ax.axvline( x=h2 * binHz, color="blue")
ax.set_ylabel("dB : %i " % (midiPitch))
def plot_spectral_ranges( inDir, pitchTakeL, rmsWndMs=300, rmsHopMs=30, harmN=5, dbLinRef=0.001, printDir="" ):
""" Plot the spectrum from one note (7th from last) in each attack pulse length sequence referred to by pitchTakeL."""
plotN = len(pitchTakeL)
fig,axL = plt.subplots(plotN,1)
for plot_idx,(midiPitch,takeId) in enumerate(pitchTakeL):
# get the audio and meta-data file names
seqFn = os.path.join( inDir, str(midiPitch), str(takeId), "seq.json")
audioFn = os.path.join( inDir, str(midiPitch), str(takeId), "audio.wav")
# read the meta data object
with open( seqFn, "rb") as f:
r = json.load(f)
# read the audio file
srate, signalM = wavfile.read(audioFn)
# convert the audio signal vector to contain only the first (left) channel
if len(signalM.shape)>1:
signalM = signalM[:,0].squeeze()
sigV = signalM / float(0x7fff)
# calc. the RMS envelope in the time domain
rms0DbV, rms0_srate = ra.audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef )
# locate the sample index of the peak of each note attack
pkIdx0L = ra.locate_peak_indexes( rms0DbV, rms0_srate, r['eventTimeL'] )
# select the 7th to last note for spectrum measurement
#
# TODO: come up with a better way to select the note to measure
#
spectrumSmpIdx = pkIdx0L[ len(pkIdx0L) - 7 ]
# calc. the RMS envelope by taking the max spectral peak in each STFT window
rmsDbV, rms_srate, specV, specHopIdx, binHz = ra.audio_stft_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef, spectrumSmpIdx)
# specV[] is the spectrum of the note at spectrumSmpIdx
# plot the spectrum and the harmonic selection ranges
plot_spectrum( axL[plot_idx], srate, binHz, specV, midiPitch, harmN )
axL[-1].set_xlabel("Hertz")
if printDir:
plt.savefig(os.path.join(printDir,"plot_spectral_ranges.png"),format="png")
plt.show()
def td_plot( ax, inDir, midi_pitch, id, analysisArgsD ):
#
r = rms_analysis_main( inDir, midi_pitch, **analysisArgsD['rmsAnalysisArgs'] )
# find min/max peak in the sequence
min_pk_idx, max_pk_idx = find_min_max_peak_index( r.pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] )
# find ranges of the sequence which should be skipped because they are noisy or unreliable
skipPkIdxL = find_skip_peaks( r.rmsDbV, r.pkIdxL, min_pk_idx, max_pk_idx )
# find peaks whose difference to surrounding peaks is greater than 'maxDeltaDb'.
jmpPkIdxL = find_out_of_range_peaks( r.rmsDbV, r.pkIdxL, min_pk_idx, max_pk_idx, analysisArgsD['maxDeltaDb'] )
secV = np.arange(0,len(r.rmsDbV)) / r.rms_srate
# plot the harmonic RMS signal
ax.plot( secV, r.rmsDbV, color='blue', label="Harmonic" )
# plot the time-domain RMS signal
ax.plot( np.arange(0,len(r.tdRmsDbV)) / r.rms_srate, r.tdRmsDbV, color="black", label="TD" )
# print note beg/end/peak boundaries
for i,(begMs, endMs) in enumerate(r.eventTimeL):
pkSec = r.pkIdxL[i] / r.rms_srate
endSec = pkSec + r.statsL[i].durMs / 1000.0
ax.axvline( x=begMs/1000.0, color="green")
ax.axvline( x=endMs/1000.0, color="red")
ax.axvline( x=pkSec, color="black")
ax.axvline( x=endSec, color="black")
ax.text(begMs/1000.0, 20.0, str(i) )
# plot peak markers
for i,pki in enumerate(r.pkIdxL):
marker = 4 if i==min_pk_idx or i==max_pk_idx else 5
color = "red" if i in skipPkIdxL else "black"
ax.plot( [pki / r.rms_srate], [ r.rmsDbV[pki] ], marker=marker, color=color)
if i in jmpPkIdxL:
ax.plot( [pki / r.rms_srate], [ r.rmsDbV[pki] ], marker=6, color="blue")
ax.legend();
ax.set_ylabel("dB");
return r
def do_td_plot( inDir, analysisArgs, midi_pitch, takeId, printDir="" ):
fig,axL = plt.subplots(3,1)
fig.set_size_inches(18.5, 10.5, forward=True)
# parse the file name
inDir = os.path.join(inDir,str(midi_pitch),str(takeId));
# plot the time domain signal
r = td_plot(axL[0],inDir,midi_pitch,takeId,analysisArgs)
qualityV = np.array([ x.quality for x in r.statsL ]) * np.max(r.pkDbL)
durMsV = np.array([ x.durMs for x in r.statsL ])
avgV = np.array([ x.durAvgDb for x in r.statsL ])
dV = np.diff(r.pkDbL) / r.pkDbL[1:]
axL[1].plot( r.pkUsL, r.pkDbL, marker='.', label="harmonic" )
#axL[1].plot( r.pkUsL, qualityV, marker='.', label="quality" )
axL[1].plot( r.pkUsL, avgV, marker='.', label="harm-td avg" )
axL[1].legend()
axL[1].set_ylabel("dB");
#axL[2].plot( r.pkUsL, durMsV, marker='.' )
axL[2].plot( r.pkUsL[1:], dV, marker='.',label='delta')
axL[2].set_ylim([-1,1])
axL[2].legend()
axL[2].set_ylabel("dB");
axL[2].set_xlabel("Microseconds")
sni = select_first_stable_note_by_dur( durMsV )
if sni is not None:
axL[1].plot( r.pkUsL[sni], r.pkDbL[sni], marker='*', color='red')
sni = select_first_stable_note_by_delta_db( r.pkDbL )
if sni is not None:
axL[2].plot( r.pkUsL[sni], dV[sni-1], marker='*', color='red')
for i,s in enumerate(r.statsL):
axL[1].text( r.pkUsL[i], r.pkDbL[i] + 1, "%i" % (i))
for i in range(1,len(r.pkUsL)):
axL[2].text( r.pkUsL[i], dV[i-1], "%i" % (i))
if printDir:
plt.savefig(os.path.join(printDir,"do_td_plot.png"),format="png")
plt.show()
def do_td_multi_plot( inDir, analysisArgs, pitchTakeL, printPlotFl=False ):
#midi_pitch = int(inDir.split("/")[-1])
fig,axL = plt.subplots(len(pitchTakeL),1)
for id,((pitch,takeId),ax) in enumerate(zip(pitchTakeL,axL)):
td_plot(ax, os.path.join(inDir,str(pitch),str(takeId)), pitch, takeId, analysisArgs )
ax.set_xlabel("Seconds")
if printDir:
plt.savefig(os.path.join(printDir,"multi_plot.png"),format="png")
plt.show()
if __name__ == "__main__":
printDir = os.path.expanduser("~/src/picadae_ac_3/doc")
cfgFn = sys.argv[1]
inDir = sys.argv[2]
mode = sys.argv[3]
cfg = parse_yaml_cfg( cfgFn )
if mode == "td_plot":
# python plot_seq.py p_ac.yml ~/temp/p_ac_3_od td_plot 60 10
pitch = int(sys.argv[4])
take_id = int(sys.argv[5])
do_td_plot(inDir,cfg.analysisArgs, pitch, take_id, printDir )
elif mode == "td_multi_plot" or mode == 'plot_spectral_ranges':
pitchTakeIdL = []
for i in range(4,len(sys.argv),2):
pitchTakeIdL.append( (int(sys.argv[i]), int(sys.argv[i+1])) )
if mode == "td_multi_plot":
# python plot_seq.py p_ac.yml ~/temp/p_ac_3_od td_multi_plot 36 2 48 3 60 4
do_td_multi_plot(inDir, cfg.analysisArgs, pitchTakeIdL, printDir )
else:
# python plot_seq.py p_ac.yml ~/temp/p_ac_3_od plot_spectral_ranges 36 2 48 3 60 4
plot_spectral_ranges( inDir, pitchTakeIdL, printDir=printDir )
elif mode == "resample_pulse_times":
# python plot_seq.py p_ac.yml ~/temp/p_ac_3_od resample_pulse_times 60
pitch = int(sys.argv[4])
plot_resample_pulse_times( inDir, cfg.analysisArgs, pitch, printDir )
else:
print("Unknown plot mode:%s" % (mode))