ca9580cd50
First working version of calibrate.py Replaced analysis parameter dbRefWndMs with dbLinRef rms_analysis.py : Added samples_to_linear_residual() plot_seq_1.py : initial commit.
249 lines
8.2 KiB
Python
249 lines
8.2 KiB
Python
import os, sys
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from common import parse_yaml_cfg
|
|
import rms_analysis
|
|
|
|
def get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD ):
|
|
|
|
inDir = os.path.join(inDir,"%i" % (midi_pitch))
|
|
|
|
dirL = os.listdir(inDir)
|
|
|
|
pkL = []
|
|
|
|
# for each take in this directory
|
|
for idir in dirL:
|
|
|
|
take_number = int(idir)
|
|
|
|
# analyze this takes audio and locate the note peaks
|
|
r = rms_analysis.rms_analysis_main( os.path.join(inDir,idir), midi_pitch, **analysisArgsD )
|
|
|
|
# store the peaks in pkL[ (db,us) ]
|
|
for db,us,stats in zip(r.pkDbL,r.pkUsL,r.statsL):
|
|
pkL.append( (db,us,stats.durMs,take_number) )
|
|
|
|
|
|
|
|
# sort the peaks on increasing attack pulse microseconds
|
|
pkL = sorted( pkL, key= lambda x: x[1] )
|
|
|
|
# merge sample points that separated by less than 'minSampleDistUs' milliseconds
|
|
#pkL = merge_close_sample_points( pkL, analysisArgsD['minSampleDistUs'] )
|
|
|
|
# split pkL
|
|
pkDbL,pkUsL,durMsL,takeIdL = tuple(zip(*pkL))
|
|
|
|
return pkUsL,pkDbL,durMsL,takeIdL,r.holdDutyPctL
|
|
|
|
def select_resample_reference_indexes( noiseIdxL ):
|
|
|
|
resampleIdxS = set()
|
|
|
|
for i in noiseIdxL:
|
|
resampleIdxS.add( i )
|
|
resampleIdxS.add( i+1 )
|
|
resampleIdxS.add( i-1 )
|
|
|
|
resampleIdxL = list(resampleIdxS)
|
|
|
|
# if a single sample point is left out of a region of
|
|
# contiguous sample points then include this as a resample point
|
|
for i in resampleIdxL:
|
|
if i + 1 not in resampleIdxL and i + 2 in resampleIdxL: # BUG BUG BUG: Hardcoded constant
|
|
resampleIdxL.append(i+1)
|
|
|
|
return resampleIdxL
|
|
|
|
def locate_resample_regions( usL, dbL, resampleIdxL ):
|
|
|
|
# locate regions of points to resample
|
|
regionL = [] # (bi,ei)
|
|
inRegionFl = False
|
|
bi = None
|
|
for i in range(len(usL)):
|
|
if inRegionFl:
|
|
if i not in resampleIdxL:
|
|
regionL.append((bi,i-1))
|
|
inRegionFl = False
|
|
bi = None
|
|
else:
|
|
if i in resampleIdxL:
|
|
inRegionFl = True
|
|
bi = i
|
|
|
|
if bi is not None:
|
|
regionL.append((bi,len(usL)-1))
|
|
|
|
# select points around and within the resample regions
|
|
# to resample
|
|
reUsL = []
|
|
reDbL = []
|
|
for bi,ei in regionL:
|
|
|
|
for i in range(bi,ei+2):
|
|
if i == 0:
|
|
us = usL[i]
|
|
db = dbL[i]
|
|
elif i >= len(usL):
|
|
us = usL[i-1]
|
|
db = dbL[i-1]
|
|
else:
|
|
us = usL[i-1] + (usL[i]-usL[i-1])/2
|
|
db = dbL[i-1] + (dbL[i]-dbL[i-1])/2
|
|
|
|
reUsL.append(us)
|
|
reDbL.append(db)
|
|
|
|
|
|
return reUsL,reDbL
|
|
|
|
def get_dur_skip_indexes( durMsL, dbL, takeIdL, minDurMs, minDb ):
|
|
|
|
firstAudibleIdx = None
|
|
firstNonSkipIdx = None
|
|
skipIdxL = [ i for i,(ms,db) in enumerate(zip(durMsL,dbL)) if ms < minDurMs or db < minDb ]
|
|
|
|
# if a single sample point is left out of a region of
|
|
# contiguous skipped points then skip this point also
|
|
for i in range(len(durMsL)):
|
|
if i not in skipIdxL and i-1 in skipIdxL and i+1 in skipIdxL:
|
|
skipIdxL.append(i)
|
|
|
|
# find the first set of 3 contiguous samples that
|
|
# are greater than minDurMs - all samples prior
|
|
# to these will be skipped
|
|
xL = []
|
|
for i in range(len(durMsL)):
|
|
if i in skipIdxL:
|
|
xL = []
|
|
else:
|
|
xL.append(i)
|
|
|
|
if len(xL) == 3: # BUG BUG BUG: Hardcoded constant
|
|
firstAudibleIdx = xL[0]
|
|
break
|
|
|
|
|
|
# decrease by one decibel to locate the first non-skip
|
|
|
|
# TODO: what if no note exists that is one decibel less
|
|
# The recordings of very quiet notes do not give reliabel decibel measures
|
|
# so this may not be the best backup criteria
|
|
|
|
if firstAudibleIdx is not None:
|
|
i = firstAudibleIdx-1
|
|
while abs(dbL[i] - dbL[firstAudibleIdx]) < 1.0: # BUG BUG BUG: Hardcoded constant
|
|
i -= 1
|
|
|
|
firstNonSkipIdx = i
|
|
|
|
return skipIdxL, firstAudibleIdx, firstNonSkipIdx
|
|
|
|
def get_resample_points( usL, dbL, durMsL, takeIdL, minDurMs, minDb, noiseLimitPct ):
|
|
|
|
skipIdxL, firstAudibleIdx, firstNonSkipIdx = get_dur_skip_indexes( durMsL, dbL, takeIdL, minDurMs, minDb )
|
|
|
|
durL = [ (usL[i],dbL[i]) for i in skipIdxL ]
|
|
scoreV = np.abs( rms_analysis.samples_to_linear_residual( usL, dbL) * 100.0 / dbL )
|
|
noiseIdxL = [ i for i in range(scoreV.shape[0]) if scoreV[i] > noiseLimitPct ]
|
|
noiseL = [ (usL[i],dbL[i]) for i in noiseIdxL ]
|
|
resampleIdxL = select_resample_reference_indexes( noiseIdxL )
|
|
resampleIdxL = [ i for i in resampleIdxL if i >= firstNonSkipIdx ]
|
|
resampleL = [ (usL[i],dbL[i]) for i in resampleIdxL ]
|
|
reUsL,reDbL = locate_resample_regions( usL, dbL, resampleIdxL )
|
|
|
|
return reUsL, reDbL, noiseL, resampleL, durL, firstAudibleIdx, firstNonSkipIdx
|
|
|
|
def get_resample_points_wrap( inDir, midi_pitch, analysisArgsD ):
|
|
|
|
usL, dbL, durMsL,_,_ = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] )
|
|
|
|
reUsL,_,_,_,_,_,_ = get_resample_points( usL, dbL, durMsL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] )
|
|
|
|
return reUsL
|
|
|
|
def plot_noise_region( ax, inDir, keyMapD, midi_pitch, analysisArgsD ):
|
|
|
|
plotResampleFl = False
|
|
plotTakesFl = True
|
|
usL, dbL, durMsL, takeIdL, holdDutyPctL = get_merged_pulse_db_measurements( inDir, midi_pitch, analysisArgsD['rmsAnalysisArgs'] )
|
|
|
|
reUsL, reDbL, noiseL, resampleL, durL, firstAudibleIdx, firstNonSkipIdx = get_resample_points( usL, dbL, durMsL, takeIdL, analysisArgsD['resampleMinDurMs'], analysisArgsD['resampleMinDb'], analysisArgsD['resampleNoiseLimitPct'] )
|
|
|
|
# plot first audible and non-skip position
|
|
ax.plot( usL[firstNonSkipIdx], dbL[firstNonSkipIdx], markersize=15, marker='+', linestyle='None', color='red')
|
|
ax.plot( usL[firstNonSkipIdx], dbL[firstAudibleIdx], markersize=15, marker='*', linestyle='None', color='red')
|
|
|
|
# plot the resample points
|
|
if plotResampleFl:
|
|
ax.plot( reUsL, reDbL, markersize=10, marker='x', linestyle='None', color='green')
|
|
|
|
# plot the noisy sample positions
|
|
if noiseL:
|
|
nUsL,nDbL = zip(*noiseL)
|
|
ax.plot( nUsL, nDbL, marker='o', linestyle='None', color='black')
|
|
|
|
# plot the noisy sample positions and the neighbors included in the noisy region
|
|
if resampleL:
|
|
nUsL,nDbL = zip(*resampleL)
|
|
ax.plot( nUsL, nDbL, marker='*', linestyle='None', color='red')
|
|
|
|
# plot actual sample points
|
|
if plotTakesFl:
|
|
for takeId in list(set(takeIdL)):
|
|
xL,yL = zip(*[(usL[i],dbL[i]) for i in range(len(usL)) if takeIdL[i]==takeId ])
|
|
ax.plot(xL,yL, marker='.')
|
|
for i,(x,y) in enumerate(zip(xL,yL)):
|
|
ax.text(x,y,str(i))
|
|
else:
|
|
ax.plot(usL, dbL, marker='.')
|
|
|
|
# plot the duration skip points
|
|
if durL:
|
|
nUsL,nDbL = zip(*durL)
|
|
ax.plot( nUsL, nDbL, marker='.', linestyle='None', color='yellow')
|
|
|
|
# plot the locations where the hold duty cycle changes with vertical black lines
|
|
for us_duty in holdDutyPctL:
|
|
us,duty = tuple(us_duty)
|
|
if us > 0:
|
|
ax.axvline(us,color='black')
|
|
|
|
# plot the 'minDb' reference line
|
|
ax.axhline(analysisArgsD['resampleMinDb'] ,color='black')
|
|
|
|
|
|
ax.set_ylabel( "%i %s %s" % (midi_pitch, keyMapD[midi_pitch]['type'],keyMapD[midi_pitch]['class']))
|
|
|
|
def plot_noise_regions_main( inDir, cfg, pitchL ):
|
|
|
|
analysisArgsD = cfg.analysisArgs
|
|
keyMapD = { d['midi']:d for d in cfg.key_mapL }
|
|
axN = len(pitchL)
|
|
fig,axL = plt.subplots(axN,1)
|
|
if axN == 1:
|
|
axL = [axL]
|
|
fig.set_size_inches(18.5, 10.5*axN)
|
|
|
|
for ax,midi_pitch in zip(axL,pitchL):
|
|
plot_noise_region( ax,inDir, cfg.key_mapL, midi_pitch, analysisArgsD )
|
|
|
|
plt.show()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
inDir = sys.argv[1]
|
|
cfgFn = sys.argv[2]
|
|
pitch = int(sys.argv[3])
|
|
|
|
cfg = parse_yaml_cfg( cfgFn )
|
|
|
|
pitchL = [pitch]
|
|
|
|
plot_noise_regions_main( inDir, cfg, pitchL )
|
|
|
|
#rms_analysis.write_audacity_label_files( inDir, cfg.analysisArgs['rmsAnalysisArgs'] )
|