##| Copyright: (C) 2019-2020 Kevin Larke ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file. import os, sys,json import matplotlib.pyplot as plt import numpy as np from scipy.io import wavfile from common import parse_yaml_cfg from rms_analysis import rms_analysis_main from rms_analysis import select_first_stable_note_by_delta_db from rms_analysis import select_first_stable_note_by_dur from rms_analysis import samples_to_linear_residual import rms_analysis as ra #from rms_analysis import audio_rms #from rms_analysis import locate_peak_indexes #from rms_analysis import audio_stft_rms #from rms_analysis import calc_harm_bins def is_nanV( xV ): for i in range(xV.shape[0]): if np.isnan( xV[i] ): return True return False def _find_max_take_id( inDir ): id = 0 while os.path.isdir( os.path.join(inDir, "%i" % id) ): id += 1 if id > 0: id -= 1 return id def form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None ): # append the midi pitch to the input directory inDir = os.path.join( inDir, str(midi_pitch)) dirL = os.listdir(inDir) pkL = [] maxTakeNumber = 0 # for each take in this directory for idir in dirL: take_number = int(idir) if not os.path.isfile(os.path.join( inDir,idir, "seq.json")): continue if analysisArgsD['useLastTakeOnlyFl']: if take_number > maxTakeNumber: pkL.clear() maxTakeNumber = take_number else: continue # analyze this takes audio and locate the note peaks r = rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) # store the peaks in pkL[ (db,us) ] for db,us in zip(r.pkDbL,r.pkUsL): pkL.append( (db,us) ) # sort the peaks on increasing attack pulse microseconds pkL = sorted( pkL, key= lambda x: x[1] ) # merge sample points that separated by less than 'minSampleDistUs' milliseconds pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] ) # split pkL pkDbL,pkUsL = tuple(zip(*pkL)) #------------------------------------------- # locate the first and last note min_pk_idx, max_pk_idx = find_min_max_peak_index( pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] ) print("MIN MAX:",min_pk_idx,pkUsL[min_pk_idx],max_pk_idx,pkUsL[max_pk_idx]) db1 = pkDbL[ max_pk_idx ] db0 = pkDbL[ min_pk_idx ] pulseUsL = [] pulseDbL = [] multValL = [] for out_idx in range(128): # calc the target volume db = db0 + (out_idx * (db1-db0)/127.0) multi_value_count = 0 # look for the target between each of the sampled points for i in range(1,len(pkDbL)): # if the target volume is between these two sample points if pkDbL[i-1] <= db and db < pkDbL[i]: # if the target has not already been located if len(pulseUsL) == out_idx: # interpolate the pulse time from between the sampled points frac = (db - pkDbL[i-1]) / (pkDbL[i] - pkDbL[i-1]) us = pkUsL[i-1] + frac * (pkUsL[i] - pkUsL[i-1]) db = pkDbL[i-1] + frac * (pkDbL[i] - pkDbL[i-1]) pulseUsL.append(us) pulseDbL.append(db) else: # this target db value was found between multiple sampled points # therefore the sampled volume function is not monotonic multi_value_count += 1 if multi_value_count > 0: multValL.append((out_idx,multi_value_count)) if len(multValL) > 0: # print("Multi-value pulse locations were found during velocity table formation: ",multValL) pass return pulseUsL,pulseDbL,r.holdDutyPctL def merge_close_sample_points( pkDbUsL, minSampleDistanceUs ): avg0Us = np.mean(np.diff([ x[1] for x in pkDbUsL ])) n0 = len(pkDbUsL) while True and n0>0: us0 = None db0 = None for i,(db,us) in enumerate(pkDbUsL): if i > 0 and us - us0 < minSampleDistanceUs: us1 = (us0 + us)/2 db1 = (db0 + db)/2 pkDbUsL[i-1] = (db1,us1) del pkDbUsL[i] break else: us0 = us db0 = db if i+1 == len(pkDbUsL): break avg1Us = np.mean(np.diff([ x[1] for x in pkDbUsL ])) print("%i sample points deleted by merging close points." % (n0 - len(pkDbUsL))) print("Mean time between samples - before:%f after:%f " % (avg0Us,avg1Us)) print("Min time between samples: %i " % (np.min(np.diff([x[1] for x in pkDbUsL])))) return pkDbUsL def _calc_resample_points( dPkDb, pkUs0, pkUs1, samplePerDb, minSampleDistUs ): dPkUs = pkUs1 - pkUs0 sampleCnt = max(int(round(abs(dPkDb) * samplePerDb)),samplePerDb) dUs = max(int(round(dPkUs/sampleCnt)),minSampleDistUs) sampleCnt = int(round(dPkUs/dUs)) dUs = int(round(dPkUs/sampleCnt)) usL = [ pkUs0 + dUs*j for j in range(sampleCnt+1)] return usL def calc_resample_ranges( pkDbL, pkUsL, min_pk_idx, max_pk_idx, maxDeltaDb, samplePerDb, minSampleDistUs ): if min_pk_idx == 0: print("No silent notes were generated. Decrease the minimum peak level or the hold voltage.") return None resampleUsSet = set() refPkDb = pkDbL[min_pk_idx] #pkDbL = pkDbL[ pkIdxL ] for i in range( min_pk_idx, max_pk_idx+1 ): d = pkDbL[i] - pkDbL[i-1] usL = [] # if this peak is less than maxDeltaDb above the previous pk or # it is below the previous max peak if d > maxDeltaDb or d <= 0 or pkDbL[i] < refPkDb: usL = _calc_resample_points( d, pkUsL[i-1], pkUsL[i], samplePerDb, minSampleDistUs ) if d <= 0 and i + 1 < len(pkDbL): d = pkDbL[i+1] - pkDbL[i] usL += _calc_resample_points( d, pkUsL[i-1], pkUsL[i], samplePerDb, minSampleDistUs ) if pkDbL[i] > refPkDb: refPkDb = pkDbL[i] if usL: resampleUsSet = resampleUsSet.union( usL ) return resampleUsSet def form_resample_pulse_time_list( inDir, midi_pitch, analysisArgsD ): """" This function merges all available data from previous takes to form a new list of pulse times to sample. """ inDir = os.path.join( inDir, str(midi_pitch) ) dirL = os.listdir(inDir) pkL = [] # for each take in this directory for idir in dirL: take_number = int(idir) # analyze this takes audio and locate the note peaks r = rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) # store the peaks in pkL[ (db,us) ] for db,us in zip(r.pkDbL,r.pkUsL): pkL.append( (db,us) ) # sort the peaks on increasing attack pulse microseconds pkL = sorted( pkL, key= lambda x: x[1] ) # merge sample points that separated by less than 'minSampleDistUs' milliseconds pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] ) # split pkL pkDbL,pkUsL = tuple(zip(*pkL)) # locate the first and last note min_pk_idx, max_pk_idx = find_min_max_peak_index( pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] ) # estimate the microsecond locations to resample resampleUsSet = calc_resample_ranges( pkDbL, pkUsL, min_pk_idx, max_pk_idx, analysisArgsD['maxDeltaDb'], analysisArgsD['samplesPerDb'], analysisArgsD['minSampleDistUs'] ) resampleUsL = sorted( list(resampleUsSet) ) #print(resampleUsL) return resampleUsL, pkDbL, pkUsL def plot_curve( ax, pulseUsL, rmsDbV ): coeff = np.polyfit(pulseUsL,rmsDbV,5) func = np.poly1d(coeff) ax.plot( pulseUsL, func(pulseUsL), color='red') def plot_resample_pulse_times_0( inDir, analysisArgsD, midi_pitch, printDir="" ): newPulseUsL, rmsDbV, pulseUsL = form_resample_pulse_time_list( inDir, midi_pitch, analysisArgsD ) velTblUsL,velTblDbL,_ = form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None ) fig,ax = plt.subplots() ax.plot(pulseUsL,rmsDbV,marker='.' ) for us in newPulseUsL: ax.axvline( x = us ) print(len(velTblUsL)) ax.plot(velTblUsL,velTblDbL,marker='.',linestyle='None',color='red') if printDir: plt.savefig(os.path.join(printDir,"plot_resample_times_0.png"),format="png") plt.show() def plot_resample_pulse_times( inDir, analysisArgsD, midi_pitch, printDir="" ): newPulseUsL, rmsDbV, pulseUsL = form_resample_pulse_time_list( inDir, midi_pitch, analysisArgsD ) velTblUsL,velTblDbL,_ = form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None ) fig,axL = plt.subplots(2,1,gridspec_kw={'height_ratios': [2, 1]}) axL[0].plot(pulseUsL,rmsDbV,marker='.',color="red" ) #plot_curve( ax, velTblUsL,velTblDbL) scoreV = samples_to_linear_residual( pulseUsL, rmsDbV) axL[0].plot(pulseUsL,rmsDbV + scoreV) axL[0].plot(pulseUsL,rmsDbV + np.power(scoreV,2.0)) axL[0].plot(pulseUsL,rmsDbV - np.power(scoreV,2.0)) axL[1].axhline(0.0,color='black') axL[1].axhline(1.0,color='black') axL[1].plot(pulseUsL,np.abs(scoreV * 100.0 / rmsDbV)) axL[1].set_ylim((0.0,50)) if printDir: plt.savefig(os.path.join(printDir,"plot_resample_times.png"),format="png") plt.show() def find_min_max_peak_index( pkDbL, minDb, maxDbOffs ): """ Find the min db and max db peak. """ # select only the peaks from rmsV[] to work with yV = pkDbL # get the max volume note max_i = np.argmax( yV ) maxDb = yV[ max_i ] min_i = max_i # starting from the max volume peak go backwards for i in range( max_i, 0, -1 ): # if this peak is within maxDbOffs of the loudest then choose this one instead #if maxDb - yV[i] < maxDbOffs: # max_i = i # if this peak is less than minDb then the previous note is the min note if yV[i] < minDb: break min_i = i if min_i >= max_i: min_i = 0 max_i = len(pkDbL)-1 if min_i == 0: print("No silent notes were generated. Decrease the minimum peak level or the hold voltage.") return min_i, max_i def find_skip_peaks( rmsV, pkIdxL, min_pk_idx, max_pk_idx ): """ Fine peaks associated with longer attacks pulses that are lower than peaks with a shorter attack pulse. These peaks indicate degenerate portions of the pulse/db curve which must be skipped during velocity table formation """ skipPkIdxL = [] yV = rmsV[pkIdxL] refPkDb = yV[min_pk_idx] for i in range( min_pk_idx+1, max_pk_idx+1 ): if yV[i] > refPkDb: refPkDb = yV[i] else: skipPkIdxL.append(i) return skipPkIdxL def find_out_of_range_peaks( rmsV, pkIdxL, min_pk_idx, max_pk_idx, maxDeltaDb ): """ Locate peaks which are more than maxDeltaDb from the previous peak. If two peaks are separated by more than maxDeltaDb then the range must be resampled """ oorPkIdxL = [] yV = rmsV[pkIdxL] for i in range( min_pk_idx, max_pk_idx+1 ): if i > 0: d = yV[i] - yV[i-1] if d > maxDeltaDb or d < 0: oorPkIdxL.append(i) return oorPkIdxL def plot_spectrum( ax, srate, binHz, specV, midiPitch, harmN ): """ Plot a single spectrum, 'specV' and the harmonic peak location boundaries.""" binN = specV.shape[0] harmLBinL,harmMBinL,harmUBinL = ra.calc_harm_bins( srate, binHz, midiPitch, harmN ) fundHz = harmMBinL[0] * binHz maxPlotHz = fundHz * (harmN+1) maxPlotBinN = int(round(maxPlotHz/binHz)) hzV = np.arange(binN) * (srate/(binN*2)) specV = 20.0 * np.log10(specV) ax.plot(hzV[0:maxPlotBinN], specV[0:maxPlotBinN] ) for h0,h1,h2 in zip(harmLBinL,harmMBinL,harmUBinL): ax.axvline( x=h0 * binHz, color="blue") ax.axvline( x=h1 * binHz, color="black") ax.axvline( x=h2 * binHz, color="blue") ax.set_ylabel("dB : %i " % (midiPitch)) def plot_spectral_ranges( inDir, pitchTakeL, rmsWndMs=300, rmsHopMs=30, harmN=5, dbLinRef=0.001, printDir="" ): """ Plot the spectrum from one note (7th from last) in each attack pulse length sequence referred to by pitchTakeL.""" plotN = len(pitchTakeL) fig,axL = plt.subplots(plotN,1) for plot_idx,(midiPitch,takeId) in enumerate(pitchTakeL): # get the audio and meta-data file names seqFn = os.path.join( inDir, str(midiPitch), str(takeId), "seq.json") audioFn = os.path.join( inDir, str(midiPitch), str(takeId), "audio.wav") # read the meta data object with open( seqFn, "rb") as f: r = json.load(f) # read the audio file srate, signalM = wavfile.read(audioFn) # convert the audio signal vector to contain only the first (left) channel if len(signalM.shape)>1: signalM = signalM[:,0].squeeze() sigV = signalM / float(0x7fff) # calc. the RMS envelope in the time domain rms0DbV, rms0_srate = ra.audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef ) # locate the sample index of the peak of each note attack pkIdx0L = ra.locate_peak_indexes( rms0DbV, rms0_srate, r['eventTimeL'] ) # select the 7th to last note for spectrum measurement # # TODO: come up with a better way to select the note to measure # spectrumSmpIdx = pkIdx0L[ len(pkIdx0L) - 7 ] # calc. the RMS envelope by taking the max spectral peak in each STFT window rmsDbV, rms_srate, specV, specHopIdx, binHz = ra.audio_stft_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef, spectrumSmpIdx) # specV[] is the spectrum of the note at spectrumSmpIdx # plot the spectrum and the harmonic selection ranges plot_spectrum( axL[plot_idx], srate, binHz, specV, midiPitch, harmN ) axL[-1].set_xlabel("Hertz") if printDir: plt.savefig(os.path.join(printDir,"plot_spectral_ranges.png"),format="png") plt.show() def td_plot( ax, inDir, midi_pitch, id, analysisArgsD ): # r = rms_analysis_main( inDir, midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) # find min/max peak in the sequence min_pk_idx, max_pk_idx = find_min_max_peak_index( r.pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] ) # find ranges of the sequence which should be skipped because they are noisy or unreliable skipPkIdxL = find_skip_peaks( r.rmsDbV, r.pkIdxL, min_pk_idx, max_pk_idx ) # find peaks whose difference to surrounding peaks is greater than 'maxDeltaDb'. jmpPkIdxL = find_out_of_range_peaks( r.rmsDbV, r.pkIdxL, min_pk_idx, max_pk_idx, analysisArgsD['maxDeltaDb'] ) secV = np.arange(0,len(r.rmsDbV)) / r.rms_srate # plot the harmonic RMS signal ax.plot( secV, r.rmsDbV, color='blue', label="Harmonic" ) # plot the time-domain RMS signal ax.plot( np.arange(0,len(r.tdRmsDbV)) / r.rms_srate, r.tdRmsDbV, color="black", label="TD" ) # print note beg/end/peak boundaries for i,(begMs, endMs) in enumerate(r.eventTimeL): pkSec = r.pkIdxL[i] / r.rms_srate endSec = pkSec + r.statsL[i].durMs / 1000.0 ax.axvline( x=begMs/1000.0, color="green") ax.axvline( x=endMs/1000.0, color="red") ax.axvline( x=pkSec, color="black") ax.axvline( x=endSec, color="black") ax.text(begMs/1000.0, 20.0, str(i) ) # plot peak markers for i,pki in enumerate(r.pkIdxL): marker = 4 if i==min_pk_idx or i==max_pk_idx else 5 color = "red" if i in skipPkIdxL else "black" ax.plot( [pki / r.rms_srate], [ r.rmsDbV[pki] ], marker=marker, color=color) if i in jmpPkIdxL: ax.plot( [pki / r.rms_srate], [ r.rmsDbV[pki] ], marker=6, color="blue") ax.legend(); ax.set_ylabel("dB"); return r def do_td_plot( inDir, analysisArgs, midi_pitch, takeId, printDir="" ): fig,axL = plt.subplots(3,1) fig.set_size_inches(18.5, 10.5, forward=True) # parse the file name inDir = os.path.join(inDir,str(midi_pitch),str(takeId)); # plot the time domain signal r = td_plot(axL[0],inDir,midi_pitch,takeId,analysisArgs) qualityV = np.array([ x.quality for x in r.statsL ]) * np.max(r.pkDbL) durMsV = np.array([ x.durMs for x in r.statsL ]) avgV = np.array([ x.durAvgDb for x in r.statsL ]) dV = np.diff(r.pkDbL) / r.pkDbL[1:] axL[1].plot( r.pkUsL, r.pkDbL, marker='.', label="harmonic" ) #axL[1].plot( r.pkUsL, qualityV, marker='.', label="quality" ) axL[1].plot( r.pkUsL, avgV, marker='.', label="harm-td avg" ) axL[1].legend() axL[1].set_ylabel("dB"); #axL[2].plot( r.pkUsL, durMsV, marker='.' ) axL[2].plot( r.pkUsL[1:], dV, marker='.',label='delta') axL[2].set_ylim([-1,1]) axL[2].legend() axL[2].set_ylabel("dB"); axL[2].set_xlabel("Microseconds") sni = select_first_stable_note_by_dur( durMsV ) if sni is not None: axL[1].plot( r.pkUsL[sni], r.pkDbL[sni], marker='*', color='red') sni = select_first_stable_note_by_delta_db( r.pkDbL ) if sni is not None: axL[2].plot( r.pkUsL[sni], dV[sni-1], marker='*', color='red') for i,s in enumerate(r.statsL): axL[1].text( r.pkUsL[i], r.pkDbL[i] + 1, "%i" % (i)) for i in range(1,len(r.pkUsL)): axL[2].text( r.pkUsL[i], dV[i-1], "%i" % (i)) if printDir: plt.savefig(os.path.join(printDir,"do_td_plot.png"),format="png") plt.show() def do_td_multi_plot( inDir, analysisArgs, pitchTakeL, printPlotFl=False ): #midi_pitch = int(inDir.split("/")[-1]) fig,axL = plt.subplots(len(pitchTakeL),1) for id,((pitch,takeId),ax) in enumerate(zip(pitchTakeL,axL)): td_plot(ax, os.path.join(inDir,str(pitch),str(takeId)), pitch, takeId, analysisArgs ) ax.set_xlabel("Seconds") if printDir: plt.savefig(os.path.join(printDir,"multi_plot.png"),format="png") plt.show() if __name__ == "__main__": printDir = os.path.expanduser("~/src/picadae_ac_3/doc") cfgFn = sys.argv[1] inDir = sys.argv[2] mode = sys.argv[3] cfg = parse_yaml_cfg( cfgFn ) if mode == "td_plot": # python plot_seq.py p_ac.yml ~/temp/p_ac_3_od td_plot 60 10 pitch = int(sys.argv[4]) take_id = int(sys.argv[5]) do_td_plot(inDir,cfg.analysisArgs, pitch, take_id, printDir ) elif mode == "td_multi_plot" or mode == 'plot_spectral_ranges': pitchTakeIdL = [] for i in range(4,len(sys.argv),2): pitchTakeIdL.append( (int(sys.argv[i]), int(sys.argv[i+1])) ) if mode == "td_multi_plot": # python plot_seq.py p_ac.yml ~/temp/p_ac_3_od td_multi_plot 36 2 48 3 60 4 do_td_multi_plot(inDir, cfg.analysisArgs, pitchTakeIdL, printDir ) else: # python plot_seq.py p_ac.yml ~/temp/p_ac_3_od plot_spectral_ranges 36 2 48 3 60 4 plot_spectral_ranges( inDir, pitchTakeIdL, printDir=printDir ) elif mode == "resample_pulse_times": # python plot_seq.py p_ac.yml ~/temp/p_ac_3_od resample_pulse_times 60 pitch = int(sys.argv[4]) plot_resample_pulse_times( inDir, cfg.analysisArgs, pitch, printDir ) else: print("Unknown plot mode:%s" % (mode))