import os, sys import matplotlib.pyplot as plt import numpy as np from common import parse_yaml_cfg from rms_analysis import rms_analysis_main def is_nanV( xV ): for i in range(xV.shape[0]): if np.isnan( xV[i] ): return True return False def _find_max_take_id( inDir ): id = 0 while os.path.isdir( os.path.join(inDir, "%i" % id) ): id += 1 if id > 0: id -= 1 return id def form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None ): # append the midi pitch to the input directory #inDir = os.path.join( inDir, "%i" % (midi_pitch)) if False: # determine the take id if none was given if take_id is None: take_id = _find_max_take_id( inDir ) inDir = os.path.join(inDir,"%i" % (take_id)) assert( os.path.isdir(inDir)) # analyze the requested take audio r = rms_analysis_main( inDir, midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) pkL = [] # store the peaks in pkL[ (db,us) ] for db,us in zip(r.pkDbL,r.pkUsL): pkL.append( (db,us) ) # sort the peaks on increasing attack pulse microseconds pkL = sorted( pkL, key= lambda x: x[1] ) # split pkL pkDbL,pkUsL = tuple(zip(*pkL)) dirL = os.listdir(inDir) pkL = [] # for each take in this directory for idir in dirL: take_number = int(idir) # analyze this takes audio and locate the note peaks r = rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) # store the peaks in pkL[ (db,us) ] for db,us in zip(r.pkDbL,r.pkUsL): pkL.append( (db,us) ) # sort the peaks on increasing attack pulse microseconds pkL = sorted( pkL, key= lambda x: x[1] ) # merge sample points that separated by less than 'minSampleDistUs' milliseconds pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] ) # split pkL pkDbL,pkUsL = tuple(zip(*pkL)) #------------------------------------------- # locate the first and last note min_pk_idx, max_pk_idx = find_min_max_peak_index( pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] ) db1 = pkDbL[ max_pk_idx ] db0 = pkDbL[ min_pk_idx ] pulseUsL = [] pulseDbL = [] multValL = [] for out_idx in range(128): # calc the target volume db = db0 + (out_idx * (db1-db0)/127.0) multi_value_count = 0 # look for the target between each of the sampled points for i in range(1,len(pkDbL)): # if the target volume is between these two sample points if pkDbL[i-1] <= db and db < pkDbL[i]: # if the target has not already been located if len(pulseUsL) == out_idx: # interpolate the pulse time from between the sampled points frac = (db - pkDbL[i-1]) / (pkDbL[i] - pkDbL[i-1]) us = pkUsL[i-1] + frac * (pkUsL[i] - pkUsL[i-1]) db = pkDbL[i-1] + frac * (pkDbL[i] - pkDbL[i-1]) pulseUsL.append(us) pulseDbL.append(db) else: # this target db value was found between multiple sampled points # therefore the sampled volume function is not monotonic multi_value_count += 1 if multi_value_count > 0: multValL.append((out_idx,multi_value_count)) if len(multValL) > 0: print("Multi-value pulse locations were found during velocity table formation: ",multValL) return pulseUsL,pulseDbL def merge_close_sample_points( pkDbUsL, minSampleDistanceUs ): avg0Us = np.mean(np.diff([ x[1] for x in pkDbUsL ])) n0 = len(pkDbUsL) while True: us0 = None db0 = None for i,(db,us) in enumerate(pkDbUsL): if i > 0 and us - us0 < minSampleDistanceUs: us1 = (us0 + us)/2 db1 = (db0 + db)/2 pkDbUsL[i-1] = (db1,us1) del pkDbUsL[i] break else: us0 = us db0 = db if i+1 == len(pkDbUsL): break avg1Us = np.mean(np.diff([ x[1] for x in pkDbUsL ])) print("%i sample points deleted by merging close points." % (n0 - len(pkDbUsL))) print("Mean time between samples - before:%f after:%f " % (avg0Us,avg1Us)) print("Min time between samples: %i " % (np.min(np.diff([x[1] for x in pkDbUsL])))) return pkDbUsL def calc_resample_ranges( pkDbL, pkUsL, min_pk_idx, max_pk_idx, maxDeltaDb, samplePerDb ): if min_pk_idx == 0: print("No silent notes were generated. Decrease the minimum peak level or the hold voltage.") return None resampleUsSet = set() refPkDb = pkDbL[min_pk_idx] #pkDbL = pkDbL[ pkIdxL ] for i in range( min_pk_idx, max_pk_idx+1 ): d = pkDbL[i] - pkDbL[i-1] usL = [] # if this peak is less than maxDeltaDb above the previous pk or # it is below the previous max peak if d > maxDeltaDb or d <= 0 or pkDbL[i] < refPkDb: sampleCnt = max(int(round(abs(d) * samplePerDb)),samplePerDb) dUs = int(round((pkUsL[i] - pkUsL[i-1])/sampleCnt)) usL = [ pkUsL[i-1] + dUs*j for j in range(sampleCnt)] if i + 1 < len(pkDbL): d = pkDbL[i+1] - pkDbL[i] sampleCnt = max(int(round(abs(d) * samplePerDb)),samplePerDb) dUs = int(round((pkUsL[i+1] - pkUsL[i])/sampleCnt)) usL += [ pkUsL[i] + dUs*j for j in range(sampleCnt)] if pkDbL[i] > refPkDb: refPkDb = pkDbL[i] if usL: resampleUsSet = resampleUsSet.union( usL ) return resampleUsSet def form_resample_pulse_time_list( inDir, analysisArgsD ): """" This function merges all available data from previous takes to form a new list of pulse times to sample. """ # the last folder is always the midi pitch of the note under analysis midi_pitch = int( inDir.split("/")[-1] ) dirL = os.listdir(inDir) pkL = [] # for each take in this directory for idir in dirL: take_number = int(idir) # analyze this takes audio and locate the note peaks r = rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) # store the peaks in pkL[ (db,us) ] for db,us in zip(r.pkDbL,r.pkUsL): pkL.append( (db,us) ) # sort the peaks on increasing attack pulse microseconds pkL = sorted( pkL, key= lambda x: x[1] ) # merge sample points that separated by less than 'minSampleDistUs' milliseconds pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] ) # split pkL pkDbL,pkUsL = tuple(zip(*pkL)) # locate the first and last note min_pk_idx, max_pk_idx = find_min_max_peak_index( pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] ) # estimate the microsecond locations to resample resampleUsSet = calc_resample_ranges( pkDbL, pkUsL, min_pk_idx, max_pk_idx, analysisArgsD['maxDeltaDb'], analysisArgsD['samplesPerDb'] ) resampleUsL = sorted( list(resampleUsSet) ) return resampleUsL, pkDbL, pkUsL def plot_resample_pulse_times( inDir, analysisArgsD ): newPulseUsL, rmsDbV, pulseUsL = form_resample_pulse_time_list( inDir, analysisArgsD ) midi_pitch = int( inDir.split("/")[-1] ) velTblUsL,velTblDbL = form_final_pulse_list( inDir, midi_pitch, analysisArgsD, take_id=None ) fig,ax = plt.subplots() ax.plot(pulseUsL,rmsDbV ) for us in newPulseUsL: ax.axvline( x = us ) ax.plot(velTblUsL,velTblDbL,marker='.',linestyle='None') plt.show() def find_min_max_peak_index( pkDbL, minDb, maxDbOffs ): """ Find the min db and max db peak. """ # select only the peaks from rmsV[] to work with yV = pkDbL # get the max volume note max_i = np.argmax( yV ) maxDb = yV[ max_i ] min_i = max_i # starting from the max volume peak go backwards for i in range( max_i, 0, -1 ): # if this peak is within maxDbOffs of the loudest then choose this one instead if maxDb - yV[i] < maxDbOffs: max_i = i # if this peak is less than minDb then the previous note is the min note if yV[i] < minDb: break min_i = i assert( min_i < max_i ) if min_i == 0: print("No silent notes were generated. Decrease the minimum peak level or the hold voltage.") return min_i, max_i def find_skip_peaks( rmsV, pkIdxL, min_pk_idx, max_pk_idx ): """ Fine peaks associated with longer attacks pulses that are lower than peaks with a shorter attack pulse. These peaks indicate degenerate portions of the pulse/db curve which must be skipped during velocity table formation """ skipPkIdxL = [] yV = rmsV[pkIdxL] refPkDb = yV[min_pk_idx] for i in range( min_pk_idx+1, max_pk_idx+1 ): if yV[i] > refPkDb: refPkDb = yV[i] else: skipPkIdxL.append(i) return skipPkIdxL def find_out_of_range_peaks( rmsV, pkIdxL, min_pk_idx, max_pk_idx, maxDeltaDb ): """ Locate peaks which are more than maxDeltaDb from the previous peak. If two peaks are separated by more than maxDeltaDb then the range must be resampled """ oorPkIdxL = [] yV = rmsV[pkIdxL] for i in range( min_pk_idx, max_pk_idx+1 ): if i > 0: d = yV[i] - yV[i-1] if d > maxDeltaDb or d < 0: oorPkIdxL.append(i) return oorPkIdxL def plot_spectrum( ax, srate, binHz, specV, midiPitch, harmN ): """ Plot a single spectrum, 'specV' and the harmonic peak location boundaries.""" binN = specV.shape[0] harmLBinL,harmMBinL,harmUBinL = calc_harm_bins( srate, binHz, midiPitch, harmN ) fundHz = harmMBinL[0] * binHz maxPlotHz = fundHz * (harmN+1) maxPlotBinN = int(round(maxPlotHz/binHz)) hzV = np.arange(binN) * (srate/(binN*2)) specV = 20.0 * np.log10(specV) ax.plot(hzV[0:maxPlotBinN], specV[0:maxPlotBinN] ) for h0,h1,h2 in zip(harmLBinL,harmMBinL,harmUBinL): ax.axvline( x=h0 * binHz, color="blue") ax.axvline( x=h1 * binHz, color="black") ax.axvline( x=h2 * binHz, color="blue") ax.set_ylabel(str(midiPitch)) def plot_spectral_ranges( inDir, pitchL, rmsWndMs=300, rmsHopMs=30, harmN=5, dbRefWndMs=500 ): """ Plot the spectrum from one note (7th from last) in each attack pulse length sequence referred to by pitchL.""" plotN = len(pitchL) fig,axL = plt.subplots(plotN,1) for plot_idx,midiPitch in enumerate(pitchL): # get the audio and meta-data file names seqFn = os.path.join( inDir, str(midiPitch), "seq.json") audioFn = os.path.join( inDir, str(midiPitch), "audio.wav") # read the meta data object with open( seqFn, "rb") as f: r = json.load(f) # read the audio file srate, signalM = wavfile.read(audioFn) sigV = signalM / float(0x7fff) # calc. the RMS envelope in the time domain rms0DbV, rms0_srate = audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs ) # locate the sample index of the peak of each note attack pkIdx0L = locate_peak_indexes( rms0DbV, rms0_srate, r['eventTimeL'] ) # select the 7th to last note for spectrum measurement # # TODO: come up with a better way to select the note to measure # spectrumSmpIdx = pkIdx0L[ len(pkIdx0L) - 7 ] # calc. the RMS envelope by taking the max spectral peak in each STFT window rmsDbV, rms_srate, specV, specHopIdx, binHz = audio_stft_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs, spectrumSmpIdx) # specV[] is the spectrum of the note at spectrumSmpIdx # plot the spectrum and the harmonic selection ranges plot_spectrum( axL[plot_idx], srate, binHz, specV, midiPitch, harmN ) plt.show() def td_plot( ax, inDir, midi_pitch, id, analysisArgsD ): r = rms_analysis_main( inDir, midi_pitch, **analysisArgsD['rmsAnalysisArgs'] ) min_pk_idx, max_pk_idx = find_min_max_peak_index( r.pkDbL, analysisArgsD['minAttkDb'], analysisArgsD['maxDbOffset'] ) skipPkIdxL = find_skip_peaks( r.rmsDbV, r.pkIdxL, min_pk_idx, max_pk_idx ) jmpPkIdxL = find_out_of_range_peaks( r.rmsDbV, r.pkIdxL, min_pk_idx, max_pk_idx, analysisArgsD['maxDeltaDb'] ) secV = np.arange(0,len(r.rmsDbV)) / r.rms_srate ax.plot( secV, r.rmsDbV ) ax.plot( np.arange(0,len(r.tdRmsDbV)) / r.rms_srate, r.tdRmsDbV, color="black" ) # print beg/end boundaries for i,(begMs, endMs) in enumerate(r.eventTimeL): ax.axvline( x=begMs/1000.0, color="green") ax.axvline( x=endMs/1000.0, color="red") ax.text(begMs/1000.0, 20.0, str(i) ) return # plot peak markers for i,pki in enumerate(r.pkIdxL): marker = 4 if i==min_pk_idx or i==max_pk_idx else 5 color = "red" if i in skipPkIdxL else "black" ax.plot( [pki / r.rms_srate], [ r.rmsDbV[pki] ], marker=marker, color=color) if i in jmpPkIdxL: ax.plot( [pki / r.rms_srate], [ r.rmsDbV[pki] ], marker=6, color="blue") def do_td_plot( inDir, analysisArgs ): fig,ax = plt.subplots() fig.set_size_inches(18.5, 10.5, forward=True) id = int(inDir.split("/")[-1]) midi_pitch = int(inDir.split("/")[-2]) td_plot(ax,inDir,midi_pitch,id,analysisArgs) plt.show() def do_td_multi_plot( inDir, analysisArgs ): midi_pitch = int(inDir.split("/")[-1]) dirL = os.listdir(inDir) fig,axL = plt.subplots(len(dirL),1) for id,(idir,ax) in enumerate(zip(dirL,axL)): td_plot(ax, os.path.join(inDir,str(id)), midi_pitch, id, analysisArgs ) plt.show() if __name__ == "__main__": inDir = sys.argv[1] cfgFn = sys.argv[2] cfg = parse_yaml_cfg( cfgFn ) #do_td_plot(inDir,cfg.analysisArgs) #o_td_multi_plot(inDir,cfg.analysisArgs) #plot_spectral_ranges( inDir, [ 24, 36, 48, 60, 72, 84, 96, 104] ) plot_resample_pulse_times( inDir, cfg.analysisArgs )