991 lines
32 KiB
Python
991 lines
32 KiB
Python
##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
|
|
##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
|
|
import os, sys,json
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from common import parse_yaml_cfg
|
|
import rms_analysis
|
|
import elbow
|
|
|
|
def fit_to_reference( pkL, refTakeId ):
|
|
|
|
us_outL = []
|
|
db_outL = []
|
|
dur_outL = []
|
|
tid_outL = []
|
|
|
|
|
|
dbL,usL,durMsL,takeIdL = tuple(zip(*pkL))
|
|
|
|
us_refL,db_refL,dur_refL = zip(*[(usL[i],dbL[i],durMsL[i]) for i in range(len(usL)) if takeIdL[i]==refTakeId])
|
|
|
|
|
|
for takeId in set(takeIdL):
|
|
us0L,db0L,dur0L = zip(*[(usL[i],dbL[i],durMsL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ])
|
|
|
|
|
|
if takeId == refTakeId:
|
|
db_outL += db0L
|
|
else:
|
|
db1V = elbow.fit_points_to_reference(us0L,db0L,us_refL,db_refL)
|
|
|
|
if db1V is not None:
|
|
db_outL += db1V.tolist()
|
|
|
|
us_outL += us0L
|
|
dur_outL+= dur0L
|
|
tid_outL+= [takeId] * len(us0L)
|
|
|
|
return zip(db_outL,us_outL,dur_outL,tid_outL)
|
|
|
|
|
|
def get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD, takeId=None ):
|
|
|
|
inDir = os.path.join(inDir,"%i" % (midi_pitch))
|
|
|
|
takeIdL = []
|
|
if takeId is not None:
|
|
takeIdL.append(takeId)
|
|
else:
|
|
takeDirL = os.listdir(inDir)
|
|
|
|
# for each take in this directory
|
|
for take_folder in takeDirL:
|
|
takeIdL.append(int(take_folder))
|
|
|
|
|
|
pkL = []
|
|
|
|
refTakeId = None
|
|
usRefL = None
|
|
dbRefL = None
|
|
|
|
# for each take in this directory
|
|
for take_number in takeIdL:
|
|
|
|
if refTakeId is None:
|
|
refTakeId = take_number
|
|
|
|
# analyze this takes audio and locate the note peaks
|
|
r = rms_analysis.rms_analysis_main( os.path.join(inDir,str(take_number)), midi_pitch, **analysisArgsD )
|
|
|
|
# store the peaks in pkL[ (db,us) ]
|
|
for db,us,stats in zip(r.pkDbL,r.pkUsL,r.statsL):
|
|
pkL.append( (db,us,stats.durMs,take_number) )
|
|
|
|
|
|
pkUsL = []
|
|
pkDbL = []
|
|
durMsL = []
|
|
takeIdL = []
|
|
holdDutyPctL = []
|
|
|
|
if refTakeId is None:
|
|
print("No valid data files at %s pitch:%i" % (inDir,midi_pitch))
|
|
else:
|
|
|
|
pkL = fit_to_reference( pkL, refTakeId )
|
|
|
|
# sort the peaks on increasing attack pulse microseconds
|
|
pkL = sorted( pkL, key= lambda x: x[1] )
|
|
|
|
# merge sample points that separated by less than 'minSampleDistUs' milliseconds
|
|
#pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] )
|
|
|
|
# split pkL
|
|
pkDbL,pkUsL,durMsL,takeIdL = tuple(zip(*pkL))
|
|
|
|
return pkUsL,pkDbL,durMsL,takeIdL,r.holdDutyPctL
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def select_resample_reference_indexes( noiseIdxL ):
|
|
|
|
resampleIdxS = set()
|
|
|
|
# for each noisy sample index store that index and the index
|
|
# before and after it
|
|
for i in noiseIdxL:
|
|
resampleIdxS.add( i )
|
|
if i+1 < len(noiseIdxL):
|
|
resampleIdxS.add( i+1 )
|
|
if i-1 >= 0:
|
|
resampleIdxS.add( i-1 )
|
|
|
|
resampleIdxL = list(resampleIdxS)
|
|
|
|
# if a single sample point is left out of a region of
|
|
# contiguous sample points then include this as a resample point also
|
|
for i in resampleIdxL:
|
|
if i + 1 not in resampleIdxL and i + 2 in resampleIdxL: # BUG BUG BUG: Hardcoded constant
|
|
if i+1 < len(noiseIdxL):
|
|
resampleIdxL.append(i+1)
|
|
|
|
return resampleIdxL
|
|
|
|
def locate_resample_regions( usL, dbL, resampleIdxL ):
|
|
|
|
# locate regions of points to resample
|
|
regionL = [] # (bi,ei)
|
|
inRegionFl = False
|
|
bi = None
|
|
for i in range(len(usL)):
|
|
if inRegionFl:
|
|
if i not in resampleIdxL:
|
|
regionL.append((bi,i-1))
|
|
inRegionFl = False
|
|
bi = None
|
|
else:
|
|
if i in resampleIdxL:
|
|
inRegionFl = True
|
|
bi = i
|
|
|
|
if bi is not None:
|
|
regionL.append((bi,len(usL)-1))
|
|
|
|
# select points around and within the resample regions
|
|
# to resample
|
|
reUsL = []
|
|
reDbL = []
|
|
for bi,ei in regionL:
|
|
|
|
for i in range(bi,ei+2):
|
|
if i == 0:
|
|
us = usL[i]
|
|
db = dbL[i]
|
|
elif i >= len(usL):
|
|
us = usL[i-1]
|
|
db = dbL[i-1]
|
|
else:
|
|
us = usL[i-1] + (usL[i]-usL[i-1])/2
|
|
db = dbL[i-1] + (dbL[i]-dbL[i-1])/2
|
|
|
|
reUsL.append(us)
|
|
reDbL.append(db)
|
|
|
|
|
|
return reUsL,reDbL
|
|
|
|
def get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreL, minDurMs, minDb, noiseLimitPct ):
|
|
|
|
firstAudibleIdx = None
|
|
firstNonSkipIdx = None
|
|
|
|
# get the indexes of samples which do not meet the duration, db level, or noise criteria
|
|
skipIdxL = [ i for i,(ms,db,score) in enumerate(zip(durMsL,dbL,scoreL)) if ms < minDurMs or db < minDb or score > noiseLimitPct ]
|
|
|
|
# if a single sample point is left out of a region of
|
|
# contiguous skipped points then skip this point also
|
|
for i in range(len(durMsL)):
|
|
if i not in skipIdxL and i-1 in skipIdxL and i+1 in skipIdxL:
|
|
skipIdxL.append(i)
|
|
|
|
# find the first set of 3 contiguous samples that
|
|
# are greater than minDurMs - all samples prior
|
|
# to these will be skipped
|
|
xL = []
|
|
for i in range(len(durMsL)):
|
|
if i in skipIdxL:
|
|
xL = []
|
|
else:
|
|
xL.append(i)
|
|
|
|
if len(xL) == 3: # BUG BUG BUG: Hardcoded constant
|
|
firstAudibleIdx = xL[0]
|
|
break
|
|
|
|
|
|
# decrease by one decibel to locate the first non-skip
|
|
|
|
# TODO: what if no note exists that is one decibel less
|
|
# The recordings of very quiet notes do not give reliabel decibel measures
|
|
# so this may not be the best backup criteria
|
|
|
|
if firstAudibleIdx is not None:
|
|
i = firstAudibleIdx-1
|
|
while abs(dbL[i] - dbL[firstAudibleIdx]) < 1.0: # BUG BUG BUG: Hardcoded constant
|
|
i -= 1
|
|
|
|
firstNonSkipIdx = i
|
|
|
|
return skipIdxL, firstAudibleIdx, firstNonSkipIdx
|
|
|
|
def get_resample_points( usL, dbL, durMsL, takeIdL, minDurMs, minDb, noiseLimitPct ):
|
|
|
|
scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL )
|
|
|
|
skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreV.tolist(), minDurMs, minDb, noiseLimitPct )
|
|
|
|
skipL = [ (usL[i],dbL[i]) for i in skipIdxL ]
|
|
noiseIdxL = [ i for i in range(scoreV.shape[0]) if scoreV[i] > noiseLimitPct ]
|
|
noiseL = [ (usL[i],dbL[i]) for i in noiseIdxL ]
|
|
resampleIdxL = select_resample_reference_indexes( noiseIdxL )
|
|
|
|
if firstNonSkipIdx is not None:
|
|
resampleIdxL = [ i for i in resampleIdxL if i >= firstNonSkipIdx ]
|
|
|
|
resampleL = [ (usL[i],dbL[i]) for i in resampleIdxL ]
|
|
reUsL,reDbL = locate_resample_regions( usL, dbL, resampleIdxL )
|
|
|
|
return reUsL, reDbL, noiseL, resampleL, skipL, firstAudibleIdx, firstNonSkipIdx
|
|
|
|
def get_resample_points_wrap( inDir, midi_pitch, analysisArgsD ):
|
|
|
|
usL, dbL, durMsL,_,_ = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] )
|
|
|
|
reUsL,_,_,_,_,_,_ = get_resample_points( usL, dbL, durMsL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] )
|
|
|
|
return reUsL
|
|
|
|
|
|
|
|
def plot_us_db_curves( ax, inDir, keyMapD, midi_pitch, analysisArgsD, plotResamplePointsFl=False, plotTakesFl=True, usMax=None ):
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] )
|
|
|
|
reUsL, reDbL, noiseL, resampleL, skipL, firstAudibleIdx, firstNonSkipIdx = get_resample_points( usL, dbL, durMsL, takeIdL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] )
|
|
|
|
# plot first audible and non-skip position
|
|
if False:
|
|
|
|
if firstNonSkipIdx is not None:
|
|
ax.plot( usL[firstNonSkipIdx], dbL[firstNonSkipIdx], markersize=15, marker='+', linestyle='None', color='red')
|
|
|
|
if firstAudibleIdx is not None:
|
|
ax.plot( usL[firstAudibleIdx], dbL[firstAudibleIdx], markersize=15, marker='*', linestyle='None', color='red')
|
|
|
|
# plot the resample points
|
|
if plotResamplePointsFl:
|
|
ax.plot( reUsL, reDbL, markersize=13, marker='x', linestyle='None', color='green')
|
|
|
|
# plot the noisy sample positions
|
|
if noiseL:
|
|
nUsL,nDbL = zip(*noiseL)
|
|
ax.plot( nUsL, nDbL, marker='o', markersize=9, linestyle='None', color='black')
|
|
|
|
# plot the noisy sample positions and the neighbors included in the noisy region
|
|
if resampleL:
|
|
nUsL,nDbL = zip(*resampleL)
|
|
ax.plot( nUsL, nDbL, marker='+', markersize=8, linestyle='None', color='red')
|
|
|
|
|
|
|
|
# plot actual sample points
|
|
|
|
elbow_us = None
|
|
elbow_db = None
|
|
elbow_len = None
|
|
|
|
usL,dbL,takeIdL = zip(*[(us,dbL[i],takeIdL[i]) for i,us in enumerate(usL) if usMax is None or us <= usMax])
|
|
|
|
if plotTakesFl:
|
|
for takeId in list(set(takeIdL)):
|
|
|
|
# get the us,db points included in this take
|
|
xL,yL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ])
|
|
|
|
ax.plot(xL,yL, marker='.',label=takeId)
|
|
|
|
for i,(x,y) in enumerate(zip(xL,yL)):
|
|
ax.text(x,y,str(i))
|
|
|
|
|
|
#if elbow_len is None or len(xL) > elbow_len:
|
|
if takeId+1 == len(set(takeIdL)):
|
|
elbow_us,elbow_db = elbow.find_elbow(xL,yL)
|
|
elbow_len = len(xL)
|
|
|
|
|
|
|
|
else:
|
|
ax.plot(usL, dbL, marker='.')
|
|
|
|
ax.plot([elbow_us],[elbow_db],marker='*',markersize=12,color='red',linestyle='None')
|
|
|
|
# plot the skip points in yellow
|
|
if False:
|
|
if skipL:
|
|
nUsL,nDbL = zip(*skipL)
|
|
ax.plot( nUsL, nDbL, marker='.', linestyle='None', color='yellow')
|
|
|
|
# plot the locations where the hold duty cycle changes with vertical black lines
|
|
for us_duty in holdDutyPctL:
|
|
us,duty = tuple(us_duty)
|
|
if us > 0:
|
|
ax.axvline(us,color='black')
|
|
|
|
# plot the 'minDb' reference line
|
|
ax.axhline(analysisArgsD['resampleMinDb'] ,color='black')
|
|
|
|
if os.path.isfile("minInterpDb.json"):
|
|
with open("minInterpDb.json","r") as f:
|
|
r = json.load(f)
|
|
if midi_pitch in r['pitchL']:
|
|
ax.axhline( r['minDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' )
|
|
ax.axhline( r['maxDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' )
|
|
|
|
ax.set_ylabel( "%i %s %s" % (midi_pitch, keyMapD[midi_pitch]['type'],keyMapD[midi_pitch]['class']))
|
|
|
|
|
|
def plot_us_db_take_curve( ax, inDir, keyMapD, midi_pitch, takeId, analysisArgsD ):
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] )
|
|
|
|
reUsL, reDbL, noiseL, resampleL, skipL, firstAudibleIdx, firstNonSkipIdx = get_resample_points( usL, dbL, durMsL, takeIdL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] )
|
|
|
|
# plot first audible and non-skip position
|
|
if False:
|
|
|
|
if firstNonSkipIdx is not None:
|
|
ax.plot( usL[firstNonSkipIdx], dbL[firstNonSkipIdx], markersize=15, marker='+', linestyle='None', color='red')
|
|
|
|
if firstAudibleIdx is not None:
|
|
ax.plot( usL[firstAudibleIdx], dbL[firstAudibleIdx], markersize=15, marker='*', linestyle='None', color='red')
|
|
|
|
# plot the resample points
|
|
if plotResamplePointsFl:
|
|
ax.plot( reUsL, reDbL, markersize=13, marker='x', linestyle='None', color='green')
|
|
|
|
# plot the noisy sample positions
|
|
if noiseL:
|
|
nUsL,nDbL = zip(*noiseL)
|
|
ax.plot( nUsL, nDbL, marker='o', markersize=9, linestyle='None', color='black')
|
|
|
|
# plot the noisy sample positions and the neighbors included in the noisy region
|
|
if resampleL:
|
|
nUsL,nDbL = zip(*resampleL)
|
|
ax.plot( nUsL, nDbL, marker='+', markersize=8, linestyle='None', color='red')
|
|
|
|
|
|
|
|
# plot actual sample points
|
|
|
|
elbow_us = None
|
|
elbow_db = None
|
|
elbow_len = None
|
|
|
|
usL,dbL,takeIdL = zip(*[(us,dbL[i],takeIdL[i]) for i,us in enumerate(usL) if usMax is None or us <= usMax])
|
|
|
|
if plotTakesFl:
|
|
for takeId in list(set(takeIdL)):
|
|
|
|
# get the us,db points included in this take
|
|
xL,yL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ])
|
|
|
|
ax.plot(xL,yL, marker='.',label=takeId)
|
|
|
|
for i,(x,y) in enumerate(zip(xL,yL)):
|
|
ax.text(x,y,str(i))
|
|
|
|
|
|
#if elbow_len is None or len(xL) > elbow_len:
|
|
if takeId+1 == len(set(takeIdL)):
|
|
elbow_us,elbow_db = elbow.find_elbow(xL,yL)
|
|
elbow_len = len(xL)
|
|
|
|
|
|
|
|
else:
|
|
ax.plot(usL, dbL, marker='.')
|
|
|
|
ax.plot([elbow_us],[elbow_db],marker='*',markersize=12,color='red',linestyle='None')
|
|
|
|
# plot the skip points in yellow
|
|
if False:
|
|
if skipL:
|
|
nUsL,nDbL = zip(*skipL)
|
|
ax.plot( nUsL, nDbL, marker='.', linestyle='None', color='yellow')
|
|
|
|
# plot the locations where the hold duty cycle changes with vertical black lines
|
|
for us_duty in holdDutyPctL:
|
|
us,duty = tuple(us_duty)
|
|
if us > 0:
|
|
ax.axvline(us,color='black')
|
|
|
|
# plot the 'minDb' reference line
|
|
ax.axhline(analysisArgsD['resampleMinDb'] ,color='black')
|
|
|
|
if os.path.isfile("minInterpDb.json"):
|
|
with open("minInterpDb.json","r") as f:
|
|
r = json.load(f)
|
|
if midi_pitch in r['pitchL']:
|
|
ax.axhline( r['minDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' )
|
|
ax.axhline( r['maxDbL'][ r['pitchL'].index(midi_pitch) ], color='blue' )
|
|
|
|
ax.set_ylabel( "%i %s %s" % (midi_pitch, keyMapD[midi_pitch]['type'],keyMapD[midi_pitch]['class']))
|
|
|
|
|
|
def plot_us_db_curves_main( inDir, cfg, pitchL, plotTakesFl=True, usMax=None, printDir="" ):
|
|
|
|
|
|
analysisArgsD = cfg.analysisArgs
|
|
keyMapD = { d['midi']:d for d in cfg.key_mapL }
|
|
axN = len(pitchL)
|
|
fig,axL = plt.subplots(axN,1,sharex=True)
|
|
if axN == 1:
|
|
axL = [axL]
|
|
|
|
fig.set_size_inches(18.5, 10.5*axN)
|
|
|
|
for ax,midi_pitch in zip(axL,pitchL):
|
|
|
|
plot_us_db_curves( ax,inDir, keyMapD, midi_pitch, analysisArgsD, plotTakesFl=plotTakesFl, usMax=usMax )
|
|
|
|
if plotTakesFl:
|
|
plt.legend()
|
|
|
|
if printDir:
|
|
plt.savefig(os.path.join(printDir,"us_db.png"),format="png")
|
|
|
|
plt.show()
|
|
|
|
def _plot_us_db_takes( inDir, cfg, pitchL, takeIdL, printDir="", printFn="" ):
|
|
|
|
assert( len(pitchL) == len(takeIdL) )
|
|
|
|
analysisArgsD = cfg.analysisArgs
|
|
keyMapD = { d['midi']:d for d in cfg.key_mapL }
|
|
fig,ax = plt.subplots(1,1)
|
|
|
|
fig.set_size_inches(18.5, 10.5)
|
|
|
|
for midi_pitch,takeId in zip(pitchL,takeIdL):
|
|
|
|
usL, dbL, durMsL, _, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'], takeId=takeId )
|
|
|
|
ax.plot(usL,dbL, marker='.',label="%i:%i %s %s" % (midi_pitch,takeId,keyMapD[midi_pitch]['class'],keyMapD[midi_pitch]['type']))
|
|
|
|
# for i,(x,y) in enumerate(zip(usL,dbL)):
|
|
# ax.text(x,y,str(i))
|
|
|
|
|
|
if printDir:
|
|
plt.savefig(os.path.join(printDir,printFn),format="png")
|
|
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
def plot_us_db_takes( inDir, cfg, pitchL, printDir=""):
|
|
|
|
takeIdL = None
|
|
takeIdL = [ pitchL[i] for i in range(1,len(pitchL),2) ]
|
|
pitchL = [ pitchL[i] for i in range(0,len(pitchL),2) ]
|
|
|
|
return _plot_us_db_takes( inDir, cfg, pitchL, takeIdL, printDir, "us_db_takes.png")
|
|
|
|
def plot_us_db_takes_last( inDir, cfg, pitchL, printDir ):
|
|
|
|
takeIdL = []
|
|
for pitch in pitchL:
|
|
|
|
inDirL = os.listdir( os.path.join(inDir,str(pitch)))
|
|
|
|
inDirL = sorted(inDirL)
|
|
|
|
takeIdL.append( int(inDirL[-1]) )
|
|
|
|
return _plot_us_db_takes( inDir, cfg, pitchL, takeIdL, printDir, "us_db_takes_last.png")
|
|
|
|
|
|
def plot_all_noise_curves( inDir, cfg, pitchL=None ):
|
|
|
|
pitchFolderL = os.listdir(inDir)
|
|
|
|
if pitchL is None:
|
|
pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ]
|
|
|
|
fig,ax = plt.subplots()
|
|
|
|
for midi_pitch in pitchL:
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
|
|
scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL )
|
|
|
|
minDurMs = cfg.analysisArgs['resampleMinDurMs']
|
|
minDb = cfg.analysisArgs['resampleMinDb']
|
|
noiseLimitPct = cfg.analysisArgs['resampleNoiseLimitPct']
|
|
|
|
|
|
|
|
skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, scoreV.tolist(), takeIdL, minDurMs, minDb, noiseLimitPct )
|
|
|
|
|
|
if False:
|
|
ax.plot( usL[firstAudibleIdx], scoreV[firstAudibleIdx], markersize=10, marker='*', linestyle='None', color='red')
|
|
ax.plot( usL, scoreV, label="%i"%(midi_pitch) )
|
|
ax.set_xlabel('us')
|
|
|
|
else:
|
|
xL = [ (score,db,i) for i,(score,db) in enumerate(zip(scoreV,dbL)) ]
|
|
xL = sorted(xL, key=lambda x: x[1] )
|
|
|
|
scoreV,dbL,idxL = zip(*xL)
|
|
ax.plot( dbL[idxL[firstAudibleIdx]], scoreV[idxL[firstAudibleIdx]], markersize=10, marker='*', linestyle='None', color='red')
|
|
ax.plot( dbL, scoreV, label="%i"%(midi_pitch) )
|
|
ax.set_xlabel('db')
|
|
|
|
ax.set_ylabel("noise db %")
|
|
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
def plot_min_max_2_db( inDir, cfg, pitchL=None, takeId=2, printDir=None ):
|
|
|
|
pitchFolderL = os.listdir(inDir)
|
|
|
|
print(pitchL)
|
|
|
|
if pitchL is None:
|
|
pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ]
|
|
|
|
print(pitchL)
|
|
|
|
okL = []
|
|
outPitchL = []
|
|
minDbL = []
|
|
maxDbL = []
|
|
|
|
for midi_pitch in pitchL:
|
|
|
|
print(midi_pitch)
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
|
|
okL.append(False)
|
|
|
|
db_maxL = sorted(dbL)
|
|
maxDbL.append( np.mean(db_maxL[-5:]) )
|
|
|
|
usL,dbL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ])
|
|
|
|
if len(set(takeIdL)) == 3:
|
|
okL[-1] = True
|
|
|
|
elbow_us,elbow_db = elbow.find_elbow(usL,dbL)
|
|
minDbL.append(elbow_db)
|
|
outPitchL.append(midi_pitch)
|
|
|
|
|
|
|
|
p_dL = sorted( zip(outPitchL,minDbL,maxDbL,okL), key=lambda x: x[0] )
|
|
outPitchL,minDbL,maxDbL,okL = zip(*p_dL)
|
|
|
|
fig,ax = plt.subplots()
|
|
ax.plot(outPitchL,minDbL)
|
|
ax.plot(outPitchL,maxDbL)
|
|
|
|
keyMapD = { d['midi']:d for d in cfg.key_mapL }
|
|
for pitch,min_db,max_db,okFl in zip(outPitchL,minDbL,maxDbL,okL):
|
|
c = 'black' if okFl else 'red'
|
|
ax.text( pitch, min_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c)
|
|
ax.text( pitch, max_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c)
|
|
|
|
|
|
if printDir:
|
|
plt.savefig(os.path.join(printDir,"min_max_db_2.png"),format="png")
|
|
|
|
|
|
plt.show()
|
|
|
|
def plot_min_db_manual( inDir, cfg, printDir=None ):
|
|
|
|
pitchL = list(cfg.manualMinD.keys())
|
|
|
|
outPitchL = []
|
|
maxDbL = []
|
|
minDbL = []
|
|
okL = []
|
|
anchorMinDbL = []
|
|
anchorMaxDbL = []
|
|
|
|
for midi_pitch in pitchL:
|
|
|
|
manual_take_id = cfg.manualMinD[midi_pitch][0]
|
|
manual_sample_idx = cfg.manualMinD[midi_pitch][1]
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
|
|
okL.append(False)
|
|
|
|
takeId = len(set(takeIdL))-1
|
|
|
|
# maxDb is computed on all takes (not just the specified take)
|
|
db_maxL = sorted(dbL)
|
|
max_db = np.mean(db_maxL[-4:])
|
|
maxDbL.append( max_db )
|
|
|
|
# get the us,db values for the specified take
|
|
usL,dbL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==manual_take_id ])
|
|
|
|
# most pitches have 3 sample takes that do not
|
|
if len(set(takeIdL)) == 3 and manual_take_id == takeId:
|
|
okL[-1] = True
|
|
|
|
# min db from the sample index manually specified in cfg
|
|
manualMinDb = dbL[ manual_sample_idx ]
|
|
|
|
minDbL.append( manualMinDb )
|
|
outPitchL.append(midi_pitch)
|
|
|
|
|
|
if midi_pitch in cfg.manualAnchorPitchMinDbL:
|
|
anchorMinDbL.append( manualMinDb )
|
|
|
|
if midi_pitch in cfg.manualAnchorPitchMaxDbL:
|
|
anchorMaxDbL.append( max_db )
|
|
|
|
|
|
|
|
|
|
|
|
# Form the complete set of min/max db levels for each pitch by interpolating the
|
|
# db values between the manually selected anchor points.
|
|
interpMinDbL = np.interp( pitchL, cfg.manualAnchorPitchMinDbL, anchorMinDbL )
|
|
interpMaxDbL = np.interp( pitchL, cfg.manualAnchorPitchMaxDbL, anchorMaxDbL )
|
|
|
|
fig,ax = plt.subplots()
|
|
|
|
|
|
ax.plot(outPitchL,minDbL) # plot the manually selected minDb values
|
|
ax.plot(outPitchL,maxDbL) # plot the max db values
|
|
|
|
# plot the interpolated minDb/maxDb values
|
|
ax.plot(pitchL,interpMinDbL)
|
|
ax.plot(pitchL,interpMaxDbL)
|
|
|
|
|
|
keyMapD = { d['midi']:d for d in cfg.key_mapL }
|
|
for pitch,min_db,max_db,okFl in zip(outPitchL,minDbL,maxDbL,okL):
|
|
c = 'black' if okFl else 'red'
|
|
ax.text( pitch, min_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c)
|
|
ax.text( pitch, max_db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']), color=c)
|
|
|
|
with open("minInterpDb.json",'w') as f:
|
|
json.dump( { "pitchL":pitchL, "minDbL":list(interpMinDbL), "maxDbL":list(interpMaxDbL) }, f )
|
|
|
|
|
|
|
|
if printDir:
|
|
plt.savefig(os.path.join(printDir,"manual_db.png"),format="png")
|
|
|
|
plt.show()
|
|
|
|
def plot_min_max_db( inDir, cfg, pitchL=None, printDir=None ):
|
|
|
|
pitchFolderL = os.listdir(inDir)
|
|
|
|
if pitchL is None:
|
|
pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ]
|
|
|
|
|
|
maxDbL = []
|
|
minDbL = []
|
|
for midi_pitch in pitchL:
|
|
|
|
print(midi_pitch)
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
|
|
scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL )
|
|
|
|
minDurMs = cfg.analysisArgs['resampleMinDurMs']
|
|
minDb = cfg.analysisArgs['resampleMinDb']
|
|
noiseLimitPct = cfg.analysisArgs['resampleNoiseLimitPct']
|
|
|
|
skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreV.tolist(), minDurMs, minDb, noiseLimitPct )
|
|
|
|
minDbL.append( dbL[firstAudibleIdx] )
|
|
|
|
dbL = sorted(dbL)
|
|
|
|
x = np.mean(dbL[-3:])
|
|
x = np.max(dbL)
|
|
maxDbL.append( x )
|
|
|
|
|
|
fig,ax = plt.subplots()
|
|
|
|
fig.set_size_inches(18.5, 10.5)
|
|
|
|
p_dL = sorted( zip(pitchL,maxDbL), key=lambda x: x[0] )
|
|
pitchL,maxDbL = zip(*p_dL)
|
|
|
|
ax.plot(pitchL,maxDbL)
|
|
ax.plot(pitchL,minDbL)
|
|
|
|
for pitch,db in zip(pitchL,maxDbL):
|
|
|
|
keyMapD = { d['midi']:d for d in cfg.key_mapL }
|
|
|
|
ax.text( pitch, db, "%i %s %s" % (pitch, keyMapD[pitch]['type'],keyMapD[pitch]['class']))
|
|
|
|
if printDir:
|
|
plt.savefig(os.path.join(printDir,"min_max_db.png"),format="png")
|
|
|
|
|
|
plt.show()
|
|
|
|
def estimate_us_to_db_map( inDir, cfg, minMapDb=16.0, maxMapDb=26.0, incrMapDb=0.5, pitchL=None ):
|
|
|
|
pitchFolderL = os.listdir(inDir)
|
|
|
|
if pitchL is None:
|
|
pitchL = [ int( int(pitchFolder) ) for pitchFolder in pitchFolderL ]
|
|
|
|
mapD = {} # pitch:{ loDb: { hiDb, us_avg, us_cls, us_std, us_min, us_max, db_avg, db_std, cnt }}
|
|
# where: cnt=count of valid sample points in this db range
|
|
# us_cls=us of closest point to center of db range
|
|
|
|
dbS = set() # { (loDb,hiDb) } track the set of db ranges
|
|
|
|
for pitch in pitchL:
|
|
|
|
print(pitch)
|
|
|
|
# get the sample measurements for pitch
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, pitch, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
|
|
# calc the fit to local straight line curve fit at each point
|
|
scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL )
|
|
|
|
minDurMs = cfg.analysisArgs['resampleMinDurMs']
|
|
minDb = cfg.analysisArgs['resampleMinDb']
|
|
noiseLimitPct = cfg.analysisArgs['resampleNoiseLimitPct']
|
|
|
|
# get the set of samples that are not valid (too short, too quiet, too noisy)
|
|
skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, scoreV.tolist(), minDurMs, minDb, noiseLimitPct )
|
|
|
|
mapD[ pitch ] = {}
|
|
|
|
# get the count of db ranges
|
|
N = int(round((maxMapDb - minMapDb) / incrMapDb)) + 1
|
|
|
|
# for each db range
|
|
for i in range(N):
|
|
|
|
loDb = minMapDb + (i*incrMapDb)
|
|
hiDb = loDb + incrMapDb
|
|
|
|
dbS.add((loDb,hiDb))
|
|
|
|
# get the valid (pulse,db) pairs for this range
|
|
u_dL = [(us,db) for i,(us,db) in enumerate(zip(usL,dbL)) if i not in skipIdxL and loDb<=db and db<hiDb ]
|
|
|
|
us_avg = 0
|
|
us_cls = 0
|
|
us_std = 0
|
|
us_min = 0
|
|
us_max = 0
|
|
db_avg = 0
|
|
db_std = 0
|
|
|
|
if len(u_dL) == 0:
|
|
print("No valid samples for pitch:",pitch," db range:",loDb,hiDb)
|
|
else:
|
|
us0L,db0L = zip(*u_dL)
|
|
|
|
if len(us0L) == 1:
|
|
us_avg = us0L[0]
|
|
us_cls = us_avg
|
|
us_min = us_avg
|
|
us_max = us_avg
|
|
db_avg = db0L[0]
|
|
|
|
elif len(us0L) > 1:
|
|
us_avg = np.mean(us0L)
|
|
us_cls = us0L[ np.argmin(np.abs(np.array(db0L)-(loDb - (hiDb-loDb)/2.0 ))) ]
|
|
us_min = np.min(us0L)
|
|
us_max = np.max(us0L)
|
|
us_std = np.std(us0L)
|
|
db_avg = np.mean(db0L)
|
|
db_std = np.std(db0L)
|
|
|
|
us_avg = int(round(us_avg))
|
|
|
|
|
|
mapD[pitch][loDb] = { 'hiDb':hiDb, 'us_avg':us_avg, 'us_cls':us_cls, 'us_std':us_std,'us_min':us_min,'us_max':us_max, 'db_avg':db_avg, 'db_std':db_std, 'cnt':len(u_dL) }
|
|
|
|
return mapD, list(dbS)
|
|
|
|
def plot_us_to_db_map( inDir, cfg, minMapDb=16.0, maxMapDb=26.0, incrMapDb=1.0, pitchL=None, printDir=None ):
|
|
|
|
fig,ax = plt.subplots()
|
|
|
|
mapD, dbRefL = estimate_us_to_db_map( inDir, cfg, minMapDb, maxMapDb, incrMapDb, pitchL )
|
|
|
|
# for each pitch
|
|
for pitch, dbD in mapD.items():
|
|
|
|
u_dL = [ (d['us_avg'],d['us_cls'],d['db_avg'],d['us_std'],d['us_min'],d['us_max'],d['db_std']) for loDb, d in dbD.items() if d['us_avg'] != 0 ]
|
|
|
|
if u_dL:
|
|
|
|
# get the us/db lists for this pitch
|
|
usL,uscL,dbL,ussL,usnL,usxL,dbsL = zip(*u_dL)
|
|
|
|
# plot central curve and std dev's
|
|
p = ax.plot(usL,dbL, marker='.', label=str(pitch))
|
|
ax.plot(uscL,dbL, marker='x', label=str(pitch), color=p[0].get_color(), linestyle='None')
|
|
ax.plot(usL,np.array(dbL)+dbsL, color=p[0].get_color(), alpha=0.3)
|
|
ax.plot(usL,np.array(dbL)-dbsL, color=p[0].get_color(), alpha=0.3)
|
|
|
|
# plot us error bars
|
|
for db,us,uss,us_min,us_max in zip(dbL,usL,ussL,usnL,usxL):
|
|
ax.plot([us_min,us_max],[db,db], color=p[0].get_color(), alpha=0.3 )
|
|
ax.plot([us-uss,us+uss],[db,db], color=p[0].get_color(), alpha=0.3, marker='.', linestyle='None' )
|
|
|
|
|
|
plt.legend()
|
|
|
|
if printDir:
|
|
plt.savefig(os.path.join(printDir,"us_db_map.png"),format="png")
|
|
|
|
plt.show()
|
|
|
|
def report_take_ids( inDir ):
|
|
|
|
pitchDirL = os.listdir(inDir)
|
|
|
|
for pitch in pitchDirL:
|
|
|
|
pitchDir = os.path.join(inDir,pitch)
|
|
|
|
takeDirL = os.listdir(pitchDir)
|
|
|
|
if len(takeDirL) == 0:
|
|
print(pitch," directory empty")
|
|
else:
|
|
with open( os.path.join(pitchDir,'0','seq.json'), "rb") as f:
|
|
r = json.load(f)
|
|
|
|
if len(r['eventTimeL']) != 81:
|
|
print(pitch," ",len(r['eventTimeL']))
|
|
|
|
if len(takeDirL) != 3:
|
|
print("***",pitch,len(takeDirL))
|
|
|
|
def cache_us_db( inDir, cfg, outFn ):
|
|
|
|
pitch_usDbD = {}
|
|
pitchDirL = os.listdir(inDir)
|
|
|
|
for pitch in pitchDirL:
|
|
|
|
pitch = int(pitch)
|
|
|
|
print(pitch)
|
|
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, pitch, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
|
|
pitch_usDbD[pitch] = { 'usL':usL, 'dbL':dbL, 'durMsL':durMsL, 'takeIdL':takeIdL, 'holdDutyPctL': holdDutyPctL }
|
|
|
|
|
|
with open(outFn,"w") as f:
|
|
json.dump(pitch_usDbD,f)
|
|
|
|
|
|
|
|
def gen_vel_map( inDir, cfg, minMaxDbFn, dynLevelN, cacheFn ):
|
|
|
|
velMapD = {} # { pitch:[ us ] }
|
|
|
|
pitchDirL = os.listdir(inDir)
|
|
|
|
with open(cacheFn,"r") as f:
|
|
pitchUsDbD = json.load(f)
|
|
|
|
|
|
with open("minInterpDb.json","r") as f:
|
|
r = json.load(f)
|
|
minMaxDbD = { pitch:(minDb,maxDb) for pitch,minDb,maxDb in zip(r['pitchL'],r['minDbL'],r['maxDbL']) }
|
|
|
|
|
|
pitchL = sorted( [ int(pitch) for pitch in pitchUsDbD.keys()] )
|
|
|
|
for pitch in pitchL:
|
|
d = pitchUsDbD[str(pitch)]
|
|
|
|
usL = d['usL']
|
|
dbL = np.array(d['dbL'])
|
|
|
|
velMapD[pitch] = []
|
|
|
|
for i in range(dynLevelN+1):
|
|
|
|
db = minMaxDbD[pitch][0] + (i * (minMaxDbD[pitch][1] - minMaxDbD[pitch][0])/ dynLevelN)
|
|
|
|
usIdx = np.argmin( np.abs(dbL - db) )
|
|
|
|
velMapD[pitch].append( (usL[ usIdx ],db) )
|
|
|
|
|
|
|
|
with open("velMapD.json","w") as f:
|
|
json.dump(velMapD,f)
|
|
|
|
mtx = np.zeros((len(velMapD),dynLevelN+1))
|
|
print(mtx.shape)
|
|
|
|
for i,(pitch,usDbL) in enumerate(velMapD.items()):
|
|
for j in range(len(usDbL)):
|
|
mtx[i,j] = usDbL[j][1]
|
|
|
|
fig,ax = plt.subplots()
|
|
ax.plot(pitchL,mtx)
|
|
plt.show()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
printDir = None #os.path.expanduser( "~/src/picadae_ac_3/doc")
|
|
cfgFn = sys.argv[1]
|
|
inDir = sys.argv[2]
|
|
mode = sys.argv[3]
|
|
|
|
if len(sys.argv) <= 4:
|
|
pitchL = None
|
|
else:
|
|
pitchL = [ int(sys.argv[i]) for i in range(4,len(sys.argv)) ]
|
|
|
|
cfg = parse_yaml_cfg( cfgFn )
|
|
|
|
if mode == 'us_db':
|
|
plot_us_db_curves_main( inDir, cfg, pitchL, plotTakesFl=True,usMax=None, printDir=printDir )
|
|
elif mode == 'us_db_pitch_take':
|
|
plot_us_db_takes( inDir, cfg, pitchL, printDir=printDir)
|
|
elif mode == 'us_db_pitch_last':
|
|
plot_us_db_takes_last( inDir, cfg, pitchL, printDir=printDir)
|
|
elif mode == 'noise':
|
|
plot_all_noise_curves( inDir, cfg, pitchL )
|
|
elif mode == 'min_max':
|
|
plot_min_max_db( inDir, cfg, pitchL, printDir=printDir )
|
|
elif mode == 'min_max_2':
|
|
takeId = pitchL[-1]
|
|
del pitchL[-1]
|
|
plot_min_max_2_db( inDir, cfg, pitchL, takeId=takeId, printDir=printDir )
|
|
elif mode == 'us_db_map':
|
|
plot_us_to_db_map( inDir, cfg, pitchL=pitchL, printDir=printDir )
|
|
elif mode == 'audacity':
|
|
rms_analysis.write_audacity_label_files( inDir, cfg.analysisArgs['rmsAnalysisArgs'] )
|
|
elif mode == 'rpt_take_ids':
|
|
report_take_ids( inDir )
|
|
elif mode == 'manual_db':
|
|
plot_min_db_manual( inDir, cfg, printDir=printDir )
|
|
elif mode == 'gen_vel_map':
|
|
gen_vel_map( inDir, cfg, "minInterpDb.json", 9, "cache_us_db.json" )
|
|
elif mode == 'cache_us_db':
|
|
cache_us_db( inDir, cfg, "cache_us_db.json")
|
|
else:
|
|
print("Unknown mode:",mode)
|
|
|
|
|