##| Copyright: (C) 2019-2020 Kevin Larke ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file. import os, sys,json,math import matplotlib.pyplot as plt import numpy as np from common import parse_yaml_cfg import rms_analysis import elbow def fit_to_reference( pkL, refTakeId ): us_outL = [] db_outL = [] dur_outL = [] tid_outL = [] dbL,usL,durMsL,takeIdL = tuple(zip(*pkL)) us_refL,db_refL,dur_refL = zip(*[(usL[i],dbL[i],durMsL[i]) for i in range(len(usL)) if takeIdL[i]==refTakeId]) for takeId in set(takeIdL): us0L,db0L,dur0L = zip(*[(usL[i],dbL[i],durMsL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ]) if takeId == refTakeId: db_outL += db0L else: db1V = elbow.fit_points_to_reference(us0L,db0L,us_refL,db_refL) if db1V is not None: db_outL += db1V.tolist() us_outL += us0L dur_outL+= dur0L tid_outL+= [takeId] * len(us0L) return zip(db_outL,us_outL,dur_outL,tid_outL) def get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD, takeId=None ): inDir = os.path.join(inDir,"%i" % (midi_pitch)) takeIdL = [] if takeId is not None: takeIdL.append(takeId) else: takeDirL = os.listdir(inDir) # for each take in this directory for take_folder in takeDirL: takeIdL.append(int(take_folder)) pkL = [] refTakeId = None usRefL = None dbRefL = None # for each take in this directory for take_number in takeIdL: if refTakeId is None: refTakeId = take_number # analyze this takes audio and locate the note peaks r = rms_analysis.rms_analysis_main( os.path.join(inDir,str(take_number)), midi_pitch, **analysisArgsD ) # store the peaks in pkL[ (db,us) ] for db,us,stats in zip(r.pkDbL,r.pkUsL,r.statsL): pkL.append( (db,us,stats.durMs,take_number) ) pkUsL = [] pkDbL = [] durMsL = [] takeIdL = [] holdDutyPctL = [] if refTakeId is None: print("No valid data files at %s pitch:%i" % (inDir,midi_pitch)) else: pkL = fit_to_reference( pkL, refTakeId ) # sort the peaks on increasing attack pulse microseconds pkL = sorted( pkL, key= lambda x: x[1] ) # merge sample points that separated by less than 'minSampleDistUs' milliseconds #pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] ) # split pkL pkDbL,pkUsL,durMsL,takeIdL = tuple(zip(*pkL)) return pkUsL,pkDbL,durMsL,takeIdL,r.holdDutyPctL def select_resample_reference_indexes( noiseIdxL ): resampleIdxS = set() # for each noisy sample index store that index and the index # before and after it for i in noiseIdxL: resampleIdxS.add( i ) if i+1 < len(noiseIdxL): resampleIdxS.add( i+1 ) if i-1 >= 0: resampleIdxS.add( i-1 ) resampleIdxL = list(resampleIdxS) # if a single sample point is left out of a region of # contiguous sample points then include this as a resample point also for i in resampleIdxL: if i + 1 not in resampleIdxL and i + 2 in resampleIdxL: # BUG BUG BUG: Hardcoded constant if i+1 < len(noiseIdxL): resampleIdxL.append(i+1) return resampleIdxL def locate_resample_regions( usL, dbL, resampleIdxL ): # locate regions of points to resample regionL = [] # (bi,ei) inRegionFl = False bi = None for i in range(len(usL)): if inRegionFl: if i not in resampleIdxL: regionL.append((bi,i-1)) inRegionFl = False bi = None else: if i in resampleIdxL: inRegionFl = True bi = i if bi is not None: regionL.append((bi,len(usL)-1)) # select points around and within the resample regions # to resample reUsL = [] reDbL = [] for bi,ei in regionL: for i in range(bi,ei+2): if i == 0: us = usL[i] db = dbL[i] elif i >= len(usL): us = usL[i-1] db = dbL[i-1] else: us = usL[i-1] + (usL[i]-usL[i-1])/2 db = dbL[i-1] + (dbL[i]-dbL[i-1])/2 reUsL.append(us) reDbL.append(db) return reUsL,reDbL def get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreL, minDurMs, minDb, noiseLimitPct ): firstAudibleIdx = None firstNonSkipIdx = None # get the indexes of samples which do not meet the duration, db level, or noise criteria skipIdxL = [ i for i,(ms,db,score) in enumerate(zip(durMsL,dbL,scoreL)) if ms < minDurMs or db < minDb or score > noiseLimitPct ] # if a single sample point is left out of a region of # contiguous skipped points then skip this point also for i in range(len(durMsL)): if i not in skipIdxL and i-1 in skipIdxL and i+1 in skipIdxL: skipIdxL.append(i) # find the first set of 3 contiguous samples that # are greater than minDurMs - all samples prior # to these will be skipped xL = [] for i in range(len(durMsL)): if i in skipIdxL: xL = [] else: xL.append(i) if len(xL) == 3: # BUG BUG BUG: Hardcoded constant firstAudibleIdx = xL[0] break # decrease by one decibel to locate the first non-skip # TODO: what if no note exists that is one decibel less # The recordings of very quiet notes do not give reliabel decibel measures # so this may not be the best backup criteria if firstAudibleIdx is not None: i = firstAudibleIdx-1 while abs(dbL[i] - dbL[firstAudibleIdx]) < 1.0: # BUG BUG BUG: Hardcoded constant i -= 1 firstNonSkipIdx = i return skipIdxL, firstAudibleIdx, firstNonSkipIdx def get_resample_points( usL, dbL, durMsL, takeIdL, minDurMs, minDb, noiseLimitPct ): scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL ) skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreV.tolist(), minDurMs, minDb, noiseLimitPct ) skipL = [ (usL[i],dbL[i]) for i in skipIdxL ] noiseIdxL = [ i for i in range(scoreV.shape[0]) if scoreV[i] > noiseLimitPct ] noiseL = [ (usL[i],dbL[i]) for i in noiseIdxL ] resampleIdxL = select_resample_reference_indexes( noiseIdxL ) if firstNonSkipIdx is not None: resampleIdxL = [ i for i in resampleIdxL if i >= firstNonSkipIdx ] resampleL = [ (usL[i],dbL[i]) for i in resampleIdxL ] reUsL,reDbL = locate_resample_regions( usL, dbL, resampleIdxL ) return reUsL, reDbL, noiseL, resampleL, skipL, firstAudibleIdx, firstNonSkipIdx def get_resample_points_wrap( inDir, midi_pitch, analysisArgsD ): usL, dbL, durMsL,_,_ = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] ) reUsL,_,_,_,_,_,_ = get_resample_points( usL, dbL, durMsL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] ) return reUsL def plot_us_db_curves( ax, inDir, keyMapD, midi_pitch, analysisArgsD, plotResamplePointsFl=False, plotTakesFl=True, usMax=None ): usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] ) reUsL, reDbL, noiseL, resampleL, skipL, firstAudibleIdx, firstNonSkipIdx = get_resample_points( usL, dbL, durMsL, takeIdL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] ) # plot first audible and non-skip position if False: if firstNonSkipIdx is not None: ax.plot( usL[firstNonSkipIdx], dbL[firstNonSkipIdx], markersize=15, marker='+', linestyle='None', color='red') if firstAudibleIdx is not None: ax.plot( usL[firstAudibleIdx], dbL[firstAudibleIdx], markersize=15, marker='*', linestyle='None', color='red') # plot the resample points if plotResamplePointsFl: ax.plot( reUsL, reDbL, markersize=13, marker='x', linestyle='None', color='green') # plot the noisy sample positions if noiseL: nUsL,nDbL = zip(*noiseL) ax.plot( nUsL, nDbL, marker='o', markersize=9, linestyle='None', color='black') # plot the noisy sample positions and the neighbors included in the noisy region if resampleL: nUsL,nDbL = zip(*resampleL) ax.plot( nUsL, nDbL, marker='+', markersize=8, linestyle='None', color='red') # plot actual sample points elbow_us = None elbow_db = None elbow_len = None usL,dbL,takeIdL = zip(*[(us,dbL[i],takeIdL[i]) for i,us in enumerate(usL) if usMax is None or us <= usMax]) if plotTakesFl: for takeId in list(set(takeIdL)): # get the us,db points included in this take xL,yL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ]) ax.plot(xL,yL, marker='.',label=takeId) for i,(x,y) in enumerate(zip(xL,yL)): ax.text(x,y,str(i)) #if elbow_len is None or len(xL) > elbow_len: if takeId+1 == len(set(takeIdL)): elbow_us,elbow_db = elbow.find_elbow(xL,yL) elbow_len = len(xL) else: ax.plot(usL, dbL, marker='.') ax.plot([elbow_us],[elbow_db],marker='*',markersize=12,color='red',linestyle='None') # plot the skip points in yellow if False: if skipL: nUsL,nDbL = zip(*skipL) ax.plot( nUsL, nDbL, marker='.', linestyle='None', color='yellow') # plot the locations where the hold duty cycle changes with vertical black lines for us_duty in holdDutyPctL: us,duty = tuple(us_duty) if us > 0: ax.axvline(us,color='black') # plot the 'minDb' reference line ax.axhline(analysisArgsD['resampleMinDb'] ,color='black') if os.path.isfile("minInterpDb.json"): with open("minInterpDb.json","r") as f: r = json.load(f) if midi_pitch in r['pitchL']: ax.axhline( r['minDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' ) ax.axhline( r['maxDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' ) ax.set_ylabel( "%i %s %s" % (midi_pitch, keyMapD[midi_pitch]['type'],keyMapD[midi_pitch]['class'])) def plot_us_db_take_curve( ax, inDir, keyMapD, midi_pitch, takeId, analysisArgsD ): usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] ) reUsL, reDbL, noiseL, resampleL, skipL, firstAudibleIdx, firstNonSkipIdx = get_resample_points( usL, dbL, durMsL, takeIdL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] ) # plot first audible and non-skip position if False: if firstNonSkipIdx is not None: ax.plot( usL[firstNonSkipIdx], dbL[firstNonSkipIdx], markersize=15, marker='+', linestyle='None', color='red') if firstAudibleIdx is not None: ax.plot( usL[firstAudibleIdx], dbL[firstAudibleIdx], markersize=15, marker='*', linestyle='None', color='red') # plot the resample points if plotResamplePointsFl: ax.plot( reUsL, reDbL, markersize=13, marker='x', linestyle='None', color='green') # plot the noisy sample positions if noiseL: nUsL,nDbL = zip(*noiseL) ax.plot( nUsL, nDbL, marker='o', markersize=9, linestyle='None', color='black') # plot the noisy sample positions and the neighbors included in the noisy region if resampleL: nUsL,nDbL = zip(*resampleL) ax.plot( nUsL, nDbL, marker='+', markersize=8, linestyle='None', color='red') # plot actual sample points elbow_us = None elbow_db = None elbow_len = None usL,dbL,takeIdL = zip(*[(us,dbL[i],takeIdL[i]) for i,us in enumerate(usL) if usMax is None or us <= usMax]) if plotTakesFl: for takeId in list(set(takeIdL)): # get the us,db points included in this take xL,yL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ]) ax.plot(xL,yL, marker='.',label=takeId) for i,(x,y) in enumerate(zip(xL,yL)): ax.text(x,y,str(i)) #if elbow_len is None or len(xL) > elbow_len: if takeId+1 == len(set(takeIdL)): elbow_us,elbow_db = elbow.find_elbow(xL,yL) elbow_len = len(xL) else: ax.plot(usL, dbL, marker='.') ax.plot([elbow_us],[elbow_db],marker='*',markersize=12,color='red',linestyle='None') # plot the skip points in yellow if False: if skipL: nUsL,nDbL = zip(*skipL) ax.plot( nUsL, nDbL, marker='.', linestyle='None', color='yellow') # plot the locations where the hold duty cycle changes with vertical black lines for us_duty in holdDutyPctL: us,duty = tuple(us_duty) if us > 0: ax.axvline(us,color='black') # plot the 'minDb' reference line ax.axhline(analysisArgsD['resampleMinDb'] ,color='black') if os.path.isfile("minInterpDb.json"): with open("minInterpDb.json","r") as f: r = json.load(f) if midi_pitch in r['pitchL']: ax.axhline( r['minDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' ) ax.axhline( r['maxDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' ) ax.set_ylabel( "%i %s %s" % (midi_pitch, keyMapD[midi_pitch]['type'],keyMapD[midi_pitch]['class'])) def plot_us_db_curves_main( inDir, cfg, pitchL, plotTakesFl=True, usMax=None, printDir="" ): analysisArgsD = cfg.analysisArgs keyMapD = { d['midi']:d for d in cfg.key_mapL } axN = len(pitchL) fig,axL = plt.subplots(axN,1,sharex=True) if axN == 1: axL = [axL] fig.set_size_inches(18.5, 10.5*axN) for ax,midi_pitch in zip(axL,pitchL): plot_us_db_curves( ax,inDir, keyMapD, midi_pitch, analysisArgsD, plotTakesFl=plotTakesFl, usMax=usMax ) if plotTakesFl: plt.legend() if printDir: plt.savefig(os.path.join(printDir,"us_db.png"),format="png") plt.show() def _plot_us_db_takes( inDir, cfg, pitchL, takeIdL, printDir="", printFn="" ): assert( len(pitchL) == len(takeIdL) ) analysisArgsD = cfg.analysisArgs keyMapD = { d['midi']:d for d in cfg.key_mapL } fig,ax = plt.subplots(1,1) fig.set_size_inches(18.5, 10.5) for midi_pitch,takeId in zip(pitchL,takeIdL): usL, dbL, durMsL, _, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'], takeId=takeId ) ax.plot(usL,dbL, marker='.',label="%i:%i %s %s" % (midi_pitch,takeId,keyMapD[midi_pitch]['class'],keyMapD[midi_pitch]['type'])) for i,(x,y) in enumerate(zip(usL,dbL)): ax.text(x,y,str(i)) f_usL,f_dbL = filter_us_db(usL,dbL) ax.plot(f_usL,f_dbL, marker='.') elbow_us,elbow_db = elbow.find_elbow(usL,dbL) ax.plot([elbow_us],[elbow_db],marker='*',markersize=12,color='red',linestyle='None') elb_idx = nearest_sample_point( dbL, usL, elbow_db, elbow_us ) if printDir: plt.savefig(os.path.join(printDir,printFn),format="png") plt.legend() plt.show() def get_pitches_and_takes( inDir ): pitchD = {} inDirL = os.listdir( inDir ) for pitch in inDirL: path = os.path.join( inDir, pitch ) takeIdL = os.listdir( path ) takeIdL = sorted([ int(takeId) for takeId in takeIdL ]) takeIdL = [ str(x) for x in takeIdL ] pitchD[int(pitch)] = takeIdL return pitchD def plot_us_db_takes( inDir, cfg, pitchL, printDir=""): takeIdL = None takeIdL = [ pitchL[i] for i in range(1,len(pitchL),2) ] pitchL = [ pitchL[i] for i in range(0,len(pitchL),2) ] return _plot_us_db_takes( inDir, cfg, pitchL, takeIdL, printDir, "us_db_takes.png") def plot_us_db_takes_last( inDir, cfg, pitchL, printDir ): pitchD = get_pitches_and_takes(inDir) takeIdL = [] for pitch in pitchL: takeIdL.append( int(pitchD[pitch][-1]) ) return _plot_us_db_takes( inDir, cfg, pitchL, takeIdL, printDir, "us_db_takes_last.png") def plot_all_noise_curves( inDir, cfg, pitchL=None ): pitchFolderL = os.listdir(inDir) if pitchL is None: pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ] fig,ax = plt.subplots() for midi_pitch in pitchL: usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'] ) scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL ) minDurMs = cfg.analysisArgs['resampleMinDurMs'] minDb = cfg.analysisArgs['resampleMinDb'] noiseLimitPct = cfg.analysisArgs['resampleNoiseLimitPct'] skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, scoreV.tolist(), takeIdL, minDurMs, minDb, noiseLimitPct ) if False: ax.plot( usL[firstAudibleIdx], scoreV[firstAudibleIdx], markersize=10, marker='*', linestyle='None', color='red') ax.plot( usL, scoreV, label="%i"%(midi_pitch) ) ax.set_xlabel('us') else: xL = [ (score,db,i) for i,(score,db) in enumerate(zip(scoreV,dbL)) ] xL = sorted(xL, key=lambda x: x[1] ) scoreV,dbL,idxL = zip(*xL) ax.plot( dbL[idxL[firstAudibleIdx]], scoreV[idxL[firstAudibleIdx]], markersize=10, marker='*', linestyle='None', color='red') ax.plot( dbL, scoreV, label="%i"%(midi_pitch) ) ax.set_xlabel('db') ax.set_ylabel("noise db %") plt.legend() plt.show() def nearest_sample_point( dbL, usL, db0, us0 ): xL = np.array([ abs(us-us0) for db,us in zip(dbL,usL) ]) return np.argmin(xL) def plot_min_max_2_db( inDir, cfg, pitchL=None, takeId=2, printDir=None ): takeIdArg = takeId pitchTakeD = get_pitches_and_takes(inDir) pitchFolderL = os.listdir(inDir) if pitchL is None: pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ] okL = [] outPitchL = [] minDbL = [] maxDbL = [] for midi_pitch in pitchL: takeId = None if takeIdArg == -1: takeId = pitchTakeD[midi_pitch][-1] usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'], takeId ) okL.append(False) db_maxL = sorted(dbL) maxDbL.append( np.mean(db_maxL[-5:]) ) usL,dbL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ]) if len(set(takeIdL)) == 3: okL[-1] = True elbow_us,elbow_db = elbow.find_elbow(usL,dbL) minDbL.append(elbow_db) outPitchL.append(midi_pitch) smp_idx = nearest_sample_point( dbL, usL, elbow_db, elbow_us ) print(" %i:[-1,%i], " % (midi_pitch,smp_idx)) p_dL = sorted( zip(outPitchL,minDbL,maxDbL,okL), key=lambda x: x[0] ) outPitchL,minDbL,maxDbL,okL = zip(*p_dL) fig,ax = plt.subplots() ax.plot(outPitchL,minDbL) ax.plot(outPitchL,maxDbL) keyMapD = { d['midi']:d for d in cfg.key_mapL } for pitch,min_db,max_db,okFl in zip(outPitchL,minDbL,maxDbL,okL): c = 'black' if okFl else 'red' ax.text( pitch, min_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c) ax.text( pitch, max_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c) if printDir: plt.savefig(os.path.join(printDir,"min_max_db_2.png"),format="png") plt.show() def plot_min_db_manual( inDir, cfg, printDir=None, absMaxDb=27, absMinDb=3 ): pitchTakeD = get_pitches_and_takes(inDir) pitchL = list(cfg.manualMinD.keys()) outPitchL = [] maxDbL = [] minDbL = [] okL = [] anchorMinDbL = [] anchorMaxDbL = [] for midi_pitch in pitchL: if cfg.manualLastFl: manual_take_id = pitchTakeD[midi_pitch][-1] takeId = manual_take_id else: manual_take_id = cfg.manualMinD[midi_pitch][0] takeId = None manual_sample_idx = cfg.manualMinD[midi_pitch][1] usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'], takeId ) okL.append(False) if takeId is None: takeId = len(set(takeIdL))-1 # most pitches have 3 sample takes that do not if len(set(takeIdL)) == 3 and manual_take_id == takeId: okL[-1] = True else: okL[-1] = True # maxDb is computed on all takes (not just the specified take) db_maxL = sorted(dbL) max_db = min(absMaxDb,np.mean(db_maxL[-4:])) maxDbL.append( max_db ) # get the us,db values for the specified take usL,dbL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==manual_take_id ]) # min db from the sample index manually specified in cfg manualMinDb = max(absMinDb,dbL[ manual_sample_idx ]) minDbL.append( manualMinDb ) outPitchL.append(midi_pitch) if midi_pitch in cfg.manualAnchorPitchMinDbL: anchorMinDbL.append( manualMinDb ) if midi_pitch in cfg.manualAnchorPitchMaxDbL: anchorMaxDbL.append( max_db ) # Form the complete set of min/max db levels for each pitch by interpolating the # db values between the manually selected anchor points. interpMinDbL = np.interp( pitchL, cfg.manualAnchorPitchMinDbL, anchorMinDbL ) interpMaxDbL = np.interp( pitchL, cfg.manualAnchorPitchMaxDbL, anchorMaxDbL ) fig,ax = plt.subplots() ax.plot(outPitchL,minDbL) # plot the manually selected minDb values ax.plot(outPitchL,maxDbL) # plot the max db values # plot the interpolated minDb/maxDb values ax.plot(pitchL,interpMinDbL) ax.plot(pitchL,interpMaxDbL) keyMapD = { d['midi']:d for d in cfg.key_mapL } for pitch,min_db,max_db,okFl in zip(outPitchL,minDbL,maxDbL,okL): c = 'black' if okFl else 'red' ax.text( pitch, min_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c) ax.text( pitch, max_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c) with open("minInterpDb.json",'w') as f: json.dump( { "pitchL":pitchL, "minDbL":list(interpMinDbL), "maxDbL":list(interpMaxDbL) }, f ) if printDir: plt.savefig(os.path.join(printDir,"manual_db.png"),format="png") plt.show() def plot_min_max_db( inDir, cfg, pitchL=None, printDir=None ): pitchFolderL = os.listdir(inDir) if pitchL is None: pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ] maxDbL = [] minDbL = [] for midi_pitch in pitchL: print(midi_pitch) usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'] ) scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL ) minDurMs = cfg.analysisArgs['resampleMinDurMs'] minDb = cfg.analysisArgs['resampleMinDb'] noiseLimitPct = cfg.analysisArgs['resampleNoiseLimitPct'] skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreV.tolist(), minDurMs, minDb, noiseLimitPct ) minDbL.append( dbL[firstAudibleIdx] ) dbL = sorted(dbL) x = np.mean(dbL[-3:]) x = np.max(dbL) maxDbL.append( x ) fig,ax = plt.subplots() fig.set_size_inches(18.5, 10.5) p_dL = sorted( zip(pitchL,maxDbL), key=lambda x: x[0] ) pitchL,maxDbL = zip(*p_dL) ax.plot(pitchL,maxDbL) ax.plot(pitchL,minDbL) for pitch,db in zip(pitchL,maxDbL): keyMapD = { d['midi']:d for d in cfg.key_mapL } ax.text( pitch, db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class'])) if printDir: plt.savefig(os.path.join(printDir,"min_max_db.png"),format="png") plt.show() def estimate_us_to_db_map( inDir, cfg, minMapDb=16.0, maxMapDb=26.0, incrMapDb=0.5, pitchL=None ): pitchFolderL = os.listdir(inDir) if pitchL is None: pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ] mapD = {} # pitch:{ loDb: { hiDb, us_avg, us_cls, us_std, us_min, us_max, db_avg, db_std, cnt }} # where: cnt=count of valid sample points in this db range # us_cls=us of closest point to center of db range dbS = set() # { (loDb,hiDb) } track the set of db ranges for pitch in pitchL: print(pitch) # get the sample measurements for pitch usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, pitch, cfg.analysisArgs['rmsAnalysisArgs'] ) # calc the fit to local straight line curve fit at each point scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL ) minDurMs = cfg.analysisArgs['resampleMinDurMs'] minDb = cfg.analysisArgs['resampleMinDb'] noiseLimitPct = cfg.analysisArgs['resampleNoiseLimitPct'] # get the set of samples that are not valid (too short, too quiet, too noisy) skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreV.tolist(), minDurMs, minDb, noiseLimitPct ) mapD[ pitch ] = {} # get the count of db ranges N = int(round((maxMapDb - minMapDb) / incrMapDb)) + 1 # for each db range for i in range(N): loDb = minMapDb + (i*incrMapDb) hiDb = loDb + incrMapDb dbS.add((loDb,hiDb)) # get the valid (pulse,db) pairs for this range u_dL = [(us,db) for i,(us,db) in enumerate(zip(usL,dbL)) if i not in skipIdxL and loDb<=db and db 1: us_avg = np.mean(us0L) us_cls = us0L[ np.argmin(np.abs(np.array(db0L)-(loDb - (hiDb-loDb)/2.0 ))) ] us_min = np.min(us0L) us_max = np.max(us0L) us_std = np.std(us0L) db_avg = np.mean(db0L) db_std = np.std(db0L) us_avg = int(round(us_avg)) mapD[pitch][loDb] = { 'hiDb':hiDb, 'us_avg':us_avg, 'us_cls':us_cls, 'us_std':us_std,'us_min':us_min,'us_max':us_max, 'db_avg':db_avg, 'db_std':db_std, 'cnt':len(u_dL) } return mapD, list(dbS) def plot_us_to_db_map( inDir, cfg, minMapDb=16.0, maxMapDb=26.0, incrMapDb=1.0, pitchL=None, printDir=None ): fig,ax = plt.subplots() mapD, dbRefL = estimate_us_to_db_map( inDir, cfg, minMapDb, maxMapDb, incrMapDb, pitchL ) # for each pitch for pitch, dbD in mapD.items(): u_dL = [ (d['us_avg'],d['us_cls'],d['db_avg'],d['us_std'],d['us_min'],d['us_max'],d['db_std']) for loDb, d in dbD.items() if d['us_avg'] != 0 ] if u_dL: # get the us/db lists for this pitch usL,uscL,dbL,ussL,usnL,usxL,dbsL = zip(*u_dL) # plot central curve and std dev's p = ax.plot(usL,dbL, marker='.', label=str(pitch)) ax.plot(uscL,dbL, marker='x', label=str(pitch), color=p[0].get_color(), linestyle='None') ax.plot(usL,np.array(dbL)+dbsL, color=p[0].get_color(), alpha=0.3) ax.plot(usL,np.array(dbL)-dbsL, color=p[0].get_color(), alpha=0.3) # plot us error bars for db,us,uss,us_min,us_max in zip(dbL,usL,ussL,usnL,usxL): ax.plot([us_min,us_max],[db,db], color=p[0].get_color(), alpha=0.3 ) ax.plot([us-uss,us+uss],[db,db], color=p[0].get_color(), alpha=0.3, marker='.', linestyle='None' ) plt.legend() if printDir: plt.savefig(os.path.join(printDir,"us_db_map.png"),format="png") plt.show() def report_take_ids( inDir ): pitchDirL = os.listdir(inDir) for pitch in pitchDirL: pitchDir = os.path.join(inDir,pitch) takeDirL = os.listdir(pitchDir) if len(takeDirL) == 0: print(pitch," directory empty") else: fn = os.path.join(pitchDir,'0','seq.json') if not os.path.isfile(fn): print("Missing sequence file:",fn) else: with open( fn, "rb") as f: r = json.load(f) if len(r['eventTimeL']) != 81: print(pitch," ",len(r['eventTimeL'])) if len(takeDirL) != 3: print("***",pitch,len(takeDirL)) def filter_us_db( us0L, db0L ): us1L = [us0L[-1]] db1L = [db0L[-1]] dDb = 0 lastIdx = 0 for i,(us,db) in enumerate(zip( us0L[::-1],db0L[::-1])): db1 = db1L[-1] if db < db1 and db1-db >= dDb/2: dDb = db1 - db us1L.append(us) db1L.append(db) lastIdx = i lastIdx = len(us0L) - lastIdx - 1 usL = [ us0L[lastIdx] ] dbL = [ db0L[lastIdx] ] dDb = 0 for us,db in zip(us0L[lastIdx::],db0L[lastIdx::]): db1 = dbL[-1] if db > db1: dDb = db-db1 usL.append(us) dbL.append(db) return usL,dbL def cache_us_db( inDir, cfg, outFn ): pitchTakeD = get_pitches_and_takes(inDir) pitch_usDbD = {} pitchDirL = os.listdir(inDir) for pitch,takeIdL in pitchTakeD.items(): pitch = int(pitch) takeId = takeIdL[-1] print(pitch) usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, pitch, cfg.analysisArgs['rmsAnalysisArgs'], takeId ) pitch_usDbD[pitch] = { 'usL':usL, 'dbL':dbL, 'durMsL':durMsL, 'takeIdL':takeIdL, 'holdDutyPctL': holdDutyPctL } with open(outFn,"w") as f: json.dump(pitch_usDbD,f) def gen_vel_map( inDir, cfg, minMaxDbFn, dynLevelN, cacheFn ): velMapD = {} # { pitch:[ us ] } pitchDirL = os.listdir(inDir) # pitchUsDbD = { pitch: with open(cacheFn,"r") as f: pitchUsDbD = json.load(f) # form minMaxDb = { pitch:(minDb,maxDb) } with open("minInterpDb.json","r") as f: r = json.load(f) minMaxDbD = { pitch:(minDb,maxDb) for pitch,minDb,maxDb in zip(r['pitchL'],r['minDbL'],r['maxDbL']) } pitchL = sorted( [ int(pitch) for pitch in pitchUsDbD.keys()] ) # for each pitch for pitch in pitchL: # get the us/db map for this d = pitchUsDbD[str(pitch)] usL, dbL = filter_us_db( d['usL'], d['dbL'] ) #usL = d['usL'] #dbL = np.array(d['dbL']) dbL = np.array(dbL) velMapD[pitch] = [] maxDb = minMaxDbD[pitch][1] minDb = minMaxDbD[pitch][0] dynLevelN = len(cfg.velTableDbL) # for each dynamic level for i in range(dynLevelN): #db = minDb + (i * (maxDb - minDb)/ dynLevelN) db = cfg.velTableDbL[i] usIdx = np.argmin( np.abs(dbL - db) ) velMapD[pitch].append( (usL[ usIdx ],db) ) with open("velMapD.json","w") as f: json.dump(velMapD,f) mtx = np.zeros((len(velMapD),dynLevelN+1)) print(mtx.shape) for i,(pitch,usDbL) in enumerate(velMapD.items()): for j in range(len(usDbL)): mtx[i,j] = usDbL[j][1] fig,ax = plt.subplots() ax.plot(pitchL,mtx) plt.show() if __name__ == "__main__": printDir = os.path.expanduser("~/temp") # os.path.expanduser( "~/src/picadae_ac_3/doc") cfgFn = sys.argv[1] inDir = sys.argv[2] mode = sys.argv[3] if len(sys.argv) <= 4: pitchL = None else: pitchL = [ int(sys.argv[i]) for i in range(4,len(sys.argv)) ] cfg = parse_yaml_cfg( cfgFn ) if mode == 'us_db': plot_us_db_curves_main( inDir, cfg, pitchL, plotTakesFl=True,usMax=None, printDir=printDir ) elif mode == 'us_db_pitch_take': plot_us_db_takes( inDir, cfg, pitchL, printDir=printDir) elif mode == 'us_db_pitch_last': plot_us_db_takes_last( inDir, cfg, pitchL, printDir=printDir) elif mode == 'noise': plot_all_noise_curves( inDir, cfg, pitchL ) elif mode == 'min_max': plot_min_max_db( inDir, cfg, pitchL, printDir=printDir ) elif mode == 'min_max_2': takeId = pitchL[-1] del pitchL[-1] plot_min_max_2_db( inDir, cfg, pitchL, takeId=takeId, printDir=printDir ) elif mode == 'us_db_map': plot_us_to_db_map( inDir, cfg, pitchL=pitchL, printDir=printDir ) elif mode == 'audacity': rms_analysis.write_audacity_label_files( inDir, cfg.analysisArgs['rmsAnalysisArgs'] ) elif mode == 'rpt_take_ids': report_take_ids( inDir ) elif mode == 'manual_db': plot_min_db_manual( inDir, cfg, printDir=printDir ) elif mode == 'gen_vel_map': gen_vel_map( inDir, cfg, "minInterpDb.json", 12, "cache_us_db.json" ) elif mode == 'cache_us_db': cache_us_db( inDir, cfg, "cache_us_db.json") else: print("Unknown mode:",mode)