621 lines
19 KiB
Python
621 lines
19 KiB
Python
##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
|
|
##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
|
|
import os,types,json,pickle,types
|
|
from scipy.io import wavfile
|
|
from scipy.signal import stft
|
|
import numpy as np
|
|
from common import parse_yaml_cfg
|
|
|
|
|
|
def calc_harm_bins( srate, binHz, midiPitch, harmN ):
|
|
|
|
semi_tone = 1.0/12
|
|
quarter_tone = 1.0/24
|
|
eigth_tone = 1.0/48
|
|
band_width_st = 3.0/48 # 3/8 tone
|
|
|
|
fundHz = (13.75 * pow(2.0,(-9.0/12.0))) * pow(2.0,(midiPitch / 12))
|
|
fund_l_binL = [int(round(fundHz * pow(2.0,-band_width_st) * i/binHz)) for i in range(1,harmN+1)]
|
|
fund_m_binL = [int(round(fundHz * i/binHz)) for i in range(1,harmN+1)]
|
|
fund_u_binL = [int(round(fundHz * pow(2.0, band_width_st) * i/binHz)) for i in range(1,harmN+1)]
|
|
|
|
for i in range(len(fund_m_binL)):
|
|
if fund_l_binL[i] >= fund_m_binL[i] and fund_l_binL[i] > 0:
|
|
fund_l_binL[i] = fund_m_binL[i] - 1
|
|
|
|
if fund_u_binL[i] <= fund_m_binL[i] and fund_u_binL[i] < len(fund_u_binL)-1:
|
|
fund_u_binL[i] = fund_m_binL[i] + 1
|
|
|
|
return fund_l_binL, fund_m_binL, fund_u_binL
|
|
|
|
def rms_to_db( xV, rms_srate, dbLinRef ):
|
|
#dbWndN = int(round(refWndMs * rms_srate / 1000.0))
|
|
#dbRef = ref = np.mean(xV[0:dbWndN])
|
|
|
|
#print("DB REF:",dbLinRef, min(xV), np.argmin(xV))
|
|
rmsDbV = 20.0 * np.log10( (xV+np.nextafter(0,1)) / dbLinRef )
|
|
|
|
return rmsDbV
|
|
|
|
def audio_rms( srate, xV, rmsWndMs, hopMs, dbLinRef ):
|
|
|
|
wndSmpN = int(round( rmsWndMs * srate / 1000.0))
|
|
hopSmpN = int(round( hopMs * srate / 1000.0))
|
|
|
|
xN = xV.shape[0]
|
|
|
|
yN = int(((xN - wndSmpN) / hopSmpN) + 1)
|
|
|
|
assert( yN > 0)
|
|
yV = np.zeros( (yN, ) )
|
|
|
|
assert( wndSmpN > 1 )
|
|
|
|
i = 0
|
|
j = 0
|
|
while i < xN and j < yN:
|
|
|
|
if i == 0:
|
|
yV[j] = np.sqrt(xV[0]*xV[0])
|
|
elif i < wndSmpN:
|
|
yV[j] = np.sqrt( np.mean( xV[0:i] * xV[0:i] ) )
|
|
else:
|
|
yV[j] = np.sqrt( np.mean( xV[i-wndSmpN:i] * xV[i-wndSmpN:i] ) )
|
|
|
|
i += hopSmpN
|
|
j += 1
|
|
|
|
rms_srate = srate / hopSmpN
|
|
return rms_to_db( yV[0:j], rms_srate, dbLinRef ), rms_srate
|
|
|
|
|
|
def audio_stft_rms( srate, xV, rmsWndMs, hopMs, dbLinRef, spectrumIdx ):
|
|
|
|
wndSmpN = int(round( rmsWndMs * srate / 1000.0))
|
|
hopSmpN = int(round( hopMs * srate / 1000.0))
|
|
binHz = srate / wndSmpN
|
|
|
|
f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True )
|
|
|
|
specHopIdx = int(round( spectrumIdx ))
|
|
specV = np.sqrt(np.abs(xM[:, specHopIdx ]))
|
|
|
|
mV = np.zeros((xM.shape[1]))
|
|
|
|
for i in range(xM.shape[1]):
|
|
mV[i] = np.max(np.sqrt(np.abs(xM[:,i])))
|
|
|
|
|
|
rms_srate = srate / hopSmpN
|
|
mV = rms_to_db( mV, rms_srate, dbLinRef )
|
|
|
|
return mV, rms_srate, specV, specHopIdx, binHz
|
|
|
|
|
|
def audio_harm_rms( srate, xV, rmsWndMs, hopMs, dbLinRef, midiPitch, harmCandN, harmN ):
|
|
|
|
wndSmpN = int(round( rmsWndMs * srate / 1000.0))
|
|
hopSmpN = int(round( hopMs * srate / 1000.0))
|
|
|
|
binHz = srate / wndSmpN
|
|
|
|
#print( "STFT:", rmsWndMs, hopMs, wndSmpN, hopSmpN, wndSmpN-hopSmpN )
|
|
|
|
f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True )
|
|
|
|
harmLBinL,harmMBinL,harmUBinL = calc_harm_bins( srate, binHz, midiPitch, harmCandN )
|
|
|
|
rmsV = np.zeros((xM.shape[1],))
|
|
|
|
|
|
for i in range(xM.shape[1]):
|
|
mV = np.sqrt(np.abs(xM[:,i]))
|
|
|
|
pV = np.zeros((len(harmLBinL,)))
|
|
|
|
for j,(b0i,b1i) in enumerate(zip( harmLBinL, harmUBinL )):
|
|
pV[j] = np.max(mV[b0i:b1i])
|
|
|
|
rmsV[i] = np.mean( sorted(pV)[-harmN:] )
|
|
|
|
|
|
|
|
rms_srate = srate / hopSmpN
|
|
rmsV = rms_to_db( rmsV, rms_srate, dbLinRef )
|
|
return rmsV, rms_srate, binHz
|
|
|
|
def measure_duration_ms( rmsV, rms_srate, peak_idx, end_idx, decay_pct ):
|
|
"""
|
|
Calcuate the time it takes for a note to decay from the peak at
|
|
rmsV[peak_idx] dB to 'decay_pct' percent of the peak value.
|
|
"""
|
|
|
|
pkRmsDb = rmsV[ peak_idx ]
|
|
|
|
# calc the note turn-off (offset) db as a percentage of the peak amplitude
|
|
offsetRmsDb = pkRmsDb * decay_pct / 100.0
|
|
|
|
# calc the sample index where the note is off
|
|
offset_idx = peak_idx + np.argmin( np.abs(rmsV[peak_idx:end_idx] - offsetRmsDb) )
|
|
|
|
|
|
# calc the duration of the note
|
|
dur_ms = int(round((offset_idx - peak_idx) * 1000.0 / rms_srate))
|
|
|
|
#print(pkRmsDb, offsetRmsDb, peak_idx, offset_idx, end_idx, dur_ms, rms_srate)
|
|
|
|
return dur_ms
|
|
|
|
|
|
def select_first_stable_note_by_dur( durMsL, minDurMs=800 ):
|
|
|
|
first_stable_idx = None
|
|
for i,durMs in enumerate(durMsL):
|
|
if durMs > minDurMs and first_stable_idx is None:
|
|
first_stable_idx = i
|
|
else:
|
|
if durMs < minDurMs:
|
|
first_stable_idx = None
|
|
|
|
return first_stable_idx
|
|
|
|
def select_first_stable_note_by_delta_db_1( pkDbL, pkUsL, maxPulseUs=0.1 ):
|
|
|
|
wndN = 5
|
|
aL = []
|
|
dV = np.diff(pkDbL) / pkDbL[1:]
|
|
|
|
for ei in range(wndN,len(pkDbL)):
|
|
xV = dV[ei-wndN:ei]
|
|
avg = np.mean(np.abs(xV))
|
|
aL.append(avg)
|
|
|
|
k = np.argmin(np.abs(np.array(pkUsL) - maxPulseUs))
|
|
|
|
print(aL)
|
|
print(k)
|
|
|
|
|
|
for i in range(k,0,-1):
|
|
if aL[i] > maxDeltaDb:
|
|
return i + 1
|
|
|
|
return None
|
|
|
|
|
|
def select_first_stable_note_by_delta_db( pkDbL, pkUsL=None, maxPulseUs=0.1 ):
|
|
|
|
wndN = 5
|
|
|
|
dV = np.diff(pkDbL) / pkDbL[1:]
|
|
|
|
|
|
for ei in range(wndN,len(pkDbL)):
|
|
xV = dV[ei-wndN:ei]
|
|
avg = np.mean(np.abs(xV))
|
|
|
|
if avg < .1:
|
|
return (ei-wndN)+1
|
|
|
|
|
|
return None
|
|
|
|
def note_stats( r, decay_pct=50.0, extraDurSearchMs=500 ):
|
|
""" Collect some statistics and markers for each note in the sequence. """
|
|
statsL = []
|
|
|
|
srate = r.rms_srate
|
|
|
|
qmax = 0
|
|
|
|
|
|
|
|
for i,(begSmpMs, endSmpMs) in enumerate(r.eventTimeL):
|
|
|
|
begSmpIdx = int(round(srate * begSmpMs / 1000.0))
|
|
endSmpIdx = int(round(srate * (endSmpMs + extraDurSearchMs) / 1000.0))
|
|
pkSmpIdx = r.pkIdxL[i]
|
|
|
|
|
|
durMs = measure_duration_ms( r.rmsDbV, srate, pkSmpIdx, endSmpIdx, decay_pct )
|
|
|
|
bi = pkSmpIdx
|
|
ei = pkSmpIdx + int(round(durMs * srate / 1000.0))
|
|
|
|
qualityCoeff = np.sum(r.rmsDbV[bi:ei]) + np.sum(r.tdRmsDbV[bi:ei])
|
|
if qualityCoeff > qmax:
|
|
qmax = qualityCoeff
|
|
|
|
if ei-bi == 0:
|
|
tdRmsDb_v = 0.0 if bi >= len(r.tdRmsDbV) else np.mean(r.tdRmsDbV[bi])
|
|
hmRmsDb_v = 0.0 if bi >= len(r.rmsDbV) else np.mean(r.rmsDbV[bi])
|
|
durAvgDb = (hmRmsDb_v + tdRmsDb_v)/2.0
|
|
else:
|
|
tdRmsDb_u = 0.0 if ei >= len(r.tdRmsDbV) else np.mean(r.tdRmsDbV[bi:ei])
|
|
hmRmsDb_u = 0.0 if ei >= len(r.rmsDbV) else np.mean(r.rmsDbV[bi:ei])
|
|
durAvgDb = (hmRmsDb_u + tdRmsDb_u)/2.0
|
|
|
|
statsL.append( types.SimpleNamespace(**{
|
|
'begSmpSec':begSmpIdx/srate,
|
|
'endSmpSec':endSmpIdx/srate,
|
|
'pkSmpSec':pkSmpIdx/srate,
|
|
'durMs':durMs,
|
|
'pkDb':r.pkDbL[i],
|
|
'pulse_us':r.pkUsL[i],
|
|
'quality':qualityCoeff,
|
|
'durAvgDb':durAvgDb }))
|
|
|
|
for i,r in enumerate(statsL):
|
|
statsL[i].quality = 0 if qmax <= 0 else statsL[i].quality / qmax
|
|
|
|
|
|
return statsL
|
|
|
|
|
|
|
|
|
|
def locate_peak_indexes( xV, xV_srate, eventMsL, audioFn ):
|
|
|
|
pkIdxL = []
|
|
for i, (begMs, endMs) in enumerate(eventMsL):
|
|
|
|
begSmpIdx = int(begMs * xV_srate / 1000.0)
|
|
endSmpIdx = int(endMs * xV_srate / 1000.0)
|
|
|
|
if endSmpIdx != begSmpIdx:
|
|
pkIdxL.append( begSmpIdx + np.argmax( xV[begSmpIdx:endSmpIdx] ) )
|
|
else:
|
|
print("Missing peak %i : begween beg:%i ms end:%i ms : %s" % (i, begMs, endMs, audioFn ))
|
|
pkIdxL.append( None )
|
|
|
|
return pkIdxL
|
|
|
|
|
|
def key_info_dictionary( keyMapL=None, yamlCfgFn=None):
|
|
|
|
if yamlCfgFn is not None:
|
|
cfg = parse_yaml_cfg(yamlCfgFn)
|
|
|
|
keyMapL = cfg.key_mapL
|
|
|
|
kmD = {}
|
|
for d in keyMapL:
|
|
kmD[ d['midi'] ] = types.SimpleNamespace(**d)
|
|
|
|
return kmD
|
|
|
|
def rms_analyze_one_rt_note( sigV, srate, begMs, endMs, midi_pitch, rmsWndMs=300, rmsHopMs=30, dbLinRef=0.001, harmCandN=5, harmN=3, durDecayPct=40 ):
|
|
|
|
sigV = np.squeeze(sigV)
|
|
|
|
td_rmsDbV, td_srate = audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef )
|
|
|
|
begSmpIdx = int(round(begMs * td_srate/1000))
|
|
endSmpIdx = int(round(endMs * td_srate/1000))
|
|
|
|
td_pk_idx = begSmpIdx + np.argmax(td_rmsDbV[begSmpIdx:endSmpIdx])
|
|
|
|
td_durMs = measure_duration_ms( td_rmsDbV, td_srate, td_pk_idx, len(sigV)-1, durDecayPct )
|
|
|
|
hm_rmsDbV, hm_srate, binHz = audio_harm_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef, midi_pitch, harmCandN, harmN )
|
|
|
|
begSmpIdx = int(round(begMs * hm_srate/1000))
|
|
endSmpIdx = int(round(endMs * hm_srate/1000))
|
|
hm_pk_idx = begSmpIdx + np.argmax(hm_rmsDbV[begSmpIdx:endSmpIdx])
|
|
|
|
hm_durMs = measure_duration_ms( hm_rmsDbV, hm_srate, hm_pk_idx, len(sigV)-1, durDecayPct )
|
|
|
|
tdD = { "rmsDbV":td_rmsDbV.tolist(), "srate":td_srate, "pk_idx":int(td_pk_idx), "db":float(td_rmsDbV[td_pk_idx]), "durMs":td_durMs }
|
|
hmD = { "rmsDbV":hm_rmsDbV.tolist(), "srate":hm_srate, "pk_idx":int(hm_pk_idx), "db":float(hm_rmsDbV[hm_pk_idx]), "durMs":hm_durMs }
|
|
|
|
return { "td":tdD, "hm":hmD }
|
|
|
|
def rms_analyze_one_rt_note_wrap( audioDev, annBegMs, annEndMs, midi_pitch, noteOffDurMs, rmsAnalysisD ):
|
|
|
|
resD = None
|
|
buf_result = audioDev.linear_buffer()
|
|
|
|
if buf_result:
|
|
|
|
sigV = buf_result.value
|
|
|
|
if len(sigV.shape) > 1:
|
|
sigV = sigV[:,0].squeeze()
|
|
|
|
|
|
|
|
# get the annotated begin and end of the note as sample indexes into sigV
|
|
bi = int(round(annBegMs * audioDev.srate / 1000))
|
|
ei = int(round(annEndMs * audioDev.srate / 1000))
|
|
|
|
# calculate half the length of the note-off duration in samples
|
|
noteOffSmp_o_2 = int(round( (noteOffDurMs/2) * audioDev.srate / 1000))
|
|
|
|
# widen the note analysis space noteOffSmp_o_2 samples pre/post the annotated begin/end of the note
|
|
bi = max(0,bi - noteOffSmp_o_2)
|
|
ei = min(ei+noteOffSmp_o_2,sigV.shape[0]-1)
|
|
|
|
|
|
ar = types.SimpleNamespace(**rmsAnalysisD)
|
|
|
|
# shift the annotatd begin/end of the note to be relative to index bi
|
|
begMs = noteOffSmp_o_2 * 1000 / audioDev.srate
|
|
endMs = begMs + (annEndMs - annBegMs)
|
|
|
|
#print("MEAS:",begMs,endMs,bi,ei,sigV.shape,audioDev.is_recording_enabled(),ar)
|
|
|
|
|
|
# analyze the note
|
|
resD = rms_analyze_one_rt_note( sigV[bi:ei], audioDev.srate, begMs, endMs, midi_pitch, rmsWndMs=ar.rmsWndMs, rmsHopMs=ar.rmsHopMs, dbLinRef=ar.dbLinRef, harmCandN=ar.harmCandN, harmN=ar.harmN, durDecayPct=ar.durDecayPct )
|
|
|
|
#print( "hm:%4.1f %4i td:%4.1f %4i" % (resD['hm']['db'], resD['hm']['durMs'], resD['td']['db'], resD['td']['durMs']))
|
|
|
|
return resD
|
|
|
|
def calibrate_rms( sigV, srate, beg_ms, end_ms ):
|
|
|
|
bi = int(round(beg_ms * srate / 1000))
|
|
ei = int(round(end_ms * srate / 1000))
|
|
rms = np.sqrt( np.mean( sigV[bi:ei] * sigV[bi:ei] ))
|
|
|
|
return 20.0*np.log10( rms / 0.002 )
|
|
|
|
|
|
def calibrate_recording_analysis( inDir ):
|
|
|
|
jsonFn = os.path.join(inDir, "meas.json" )
|
|
audioFn = os.path.join(inDir, "audio.wav" )
|
|
|
|
with open(jsonFn,"r") as f:
|
|
r = json.load(f)
|
|
|
|
measD = r['measD']
|
|
cfg = types.SimpleNamespace(**r['cfg'])
|
|
annL = r['annoteL']
|
|
anlD = {}
|
|
|
|
n = 0
|
|
for midi_pitch,measL in measD.items():
|
|
n += len(measL)
|
|
anlD[int(midi_pitch)] = []
|
|
|
|
srate, signalM = wavfile.read(audioFn)
|
|
sigV = signalM / float(0x7fff)
|
|
|
|
anlr = types.SimpleNamespace(**cfg.analysisD)
|
|
|
|
tdRmsDbV, td_srate = audio_rms( srate, sigV, anlr.rmsWndMs, anlr.rmsHopMs, anlr.dbLinRef )
|
|
|
|
# for each measured pitch
|
|
for midi_pitch,measL in measD.items():
|
|
|
|
# for each measured note at this pitch
|
|
for mi,d in enumerate(measL):
|
|
|
|
mr = types.SimpleNamespace(**d)
|
|
|
|
# locate the associated annotation reocrd
|
|
for annD in annL:
|
|
ar = types.SimpleNamespace(**annD)
|
|
|
|
if ar.midi_pitch == mr.midi_pitch and ar.beg_ms==mr.beg_ms and ar.end_ms==mr.end_ms:
|
|
assert( ar.pulse_us == mr.pulse_us )
|
|
|
|
bi = int(round(ar.beg_ms * td_srate / 1000))
|
|
ei = int(round(ar.end_ms * td_srate / 1000))
|
|
db = np.mean(tdRmsDbV[bi:ei])
|
|
|
|
db = calibrate_rms( sigV, srate, ar.beg_ms, ar.end_ms )
|
|
|
|
anlD[int(midi_pitch)].append({ 'pulse_us':ar.pulse_us, 'db':db, 'meas_idx':mi })
|
|
|
|
break
|
|
|
|
|
|
return anlD
|
|
|
|
|
|
|
|
|
|
def rms_analysis_main( inDir, midi_pitch, rmsWndMs=300, rmsHopMs=30, dbLinRef=0.001, harmCandN=5, harmN=3, durDecayPct=40 ):
|
|
|
|
# form the audio and meta data file names
|
|
seqFn = os.path.join( inDir, "seq.json")
|
|
audioFn = os.path.join( inDir, "audio.wav")
|
|
|
|
# read the meta data file
|
|
with open( seqFn, "rb") as f:
|
|
r = json.load(f)
|
|
|
|
# rad the auido file
|
|
srate, signalM = wavfile.read(audioFn)
|
|
|
|
# convert the audio signal vector to contain only the first (left) channel
|
|
if len(signalM.shape)>1:
|
|
signalM = signalM[:,0].squeeze()
|
|
|
|
# convert the auido file to floats in range [-1.0 to 1.0]
|
|
sigV = signalM / float(0x7fff)
|
|
|
|
# calc. the RMS signal
|
|
tdRmsDbV, rms0_srate = audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef )
|
|
|
|
# locate the peaks in the RMS signal
|
|
tdPkIdxL = locate_peak_indexes( tdRmsDbV, rms0_srate, r['eventTimeL'], audioFn )
|
|
|
|
# sum the first harmN harmonics to form a envelope of the audio signal (this is an alternative to the RMS signal)
|
|
rmsDbV, rms_srate, binHz = audio_harm_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef, midi_pitch, harmCandN, harmN )
|
|
|
|
# locate the peaks in the harmonic sum signal
|
|
pkIdxL = locate_peak_indexes( rmsDbV, rms_srate, r['eventTimeL'], audioFn )
|
|
|
|
# form the 'holdDutyPctlL' hold duty cycle transition point list
|
|
holdDutyPctL = None
|
|
if 'holdDutyPct' in r:
|
|
holdDutyPctL = [ (0, r['holdDutyPct']) ]
|
|
else:
|
|
holdDutyPctL = r['holdDutyPctL']
|
|
|
|
eventN = len(r['eventTimeL'])
|
|
assert( len(tdPkIdxL) == eventN and len(pkIdxL) == eventN )
|
|
|
|
# filter out all missing events that have no peak
|
|
flL = [ (tdPkIdxL[i] is not None) and (pkIdxL[i] is not None) for i in range(eventN) ]
|
|
eventTimeL = [ r['eventTimeL'][i] for i in range(eventN) if flL[i] ]
|
|
tdPkDbL = [ tdRmsDbV[tdPkIdxL[i]] for i in range(eventN) if flL[i] ]
|
|
pkDbL = [ rmsDbV[ pkIdxL[i]] for i in range(eventN) if flL[i] ]
|
|
#pkUsL = [ r['pulseUsL'][pkIdxL[i]] for i in range(eventN) if flL[i] ]
|
|
tdPkIdxL = [ i for i in tdPkIdxL if i is not None ]
|
|
pkIdxL = [ i for i in pkIdxL if i is not None ]
|
|
|
|
# form the result record
|
|
r = types.SimpleNamespace(**{
|
|
"audio_srate":srate,
|
|
"eventTimeMsL": eventTimeL, #r['eventTimeL'],
|
|
"tdRmsDbV": tdRmsDbV,
|
|
"tdPkIdxL": tdPkIdxL,
|
|
"tdPkDbL": tdPkDbL, # [ tdRmsDbV[i] for i in tdPkIdxL ],
|
|
"binHz": binHz,
|
|
"rmsDbV": rmsDbV,
|
|
"rms_srate":rms_srate,
|
|
"pkIdxL":pkIdxL, # pkIdxL[ len(pulsUsL) ] - indexes into rmsDbV[] of peaks
|
|
"eventTimeL": eventTimeL, #r['eventTimeL'],
|
|
"holdDutyPctL":holdDutyPctL,
|
|
'pkDbL': pkDbL, # [ rmsDbV[ i ] for i in pkIdxL ],
|
|
'pkUsL': r['pulseUsL'] }) # r['pulseUsL']
|
|
|
|
#
|
|
statsL = note_stats(r,durDecayPct)
|
|
|
|
setattr(r,"statsL", statsL )
|
|
|
|
return r
|
|
|
|
|
|
def rms_analysis_main_all( inDir, cacheFn, rmsWndMs=300, rmsHopMs=30, dbLinRef=0.001, harmCandN=5, harmN=3, durDecayPct=40 ):
|
|
|
|
if os.path.isfile(cacheFn):
|
|
print("READING analysis cache file: %s" % (cacheFn))
|
|
with open(cacheFn,"rb") as f:
|
|
rD = pickle.load(f)
|
|
return rD
|
|
|
|
|
|
folderL = os.listdir(inDir)
|
|
|
|
rD = {}
|
|
|
|
for folder in folderL:
|
|
|
|
pathL = folder.split(os.sep)
|
|
|
|
midi_pitch = int(pathL[-1])
|
|
|
|
print(midi_pitch)
|
|
|
|
path = os.path.join(inDir,folder,'0')
|
|
|
|
if os.path.isdir(path) and os.path.isfile(os.path.join(os.path.join(path,"seq.json"))):
|
|
r = rms_analysis_main( path, midi_pitch, rmsWndMs=rmsWndMs, rmsHopMs=rmsHopMs, dbLinRef=dbLinRef, harmCandN=harmCandN, harmN=harmN, durDecayPct=durDecayPct )
|
|
|
|
rD[ midi_pitch ] = r
|
|
|
|
|
|
with open(cacheFn,"wb") as f:
|
|
pickle.dump(rD,f)
|
|
|
|
return rD
|
|
|
|
|
|
|
|
def samples_to_linear_residual( usL, dbL, pointsPerLine=5 ):
|
|
# Score the quality of each sampled point by measuring the
|
|
# quality of fit to a local line.
|
|
|
|
scoreD = { us:[] for us in usL }
|
|
|
|
pointsPerLine = 5
|
|
|
|
i = pointsPerLine
|
|
|
|
# for each sampled point
|
|
while i < len(usL):
|
|
# beginning with sample at index 'pointsPerLine'
|
|
if i >= pointsPerLine:
|
|
|
|
k = i - pointsPerLine
|
|
|
|
# get the x (us) and y (db) sample values
|
|
xL,yL = zip(*[ ((usL[k+j],1.0), dbL[k+j]) for j in range(pointsPerLine)])
|
|
xV = np.array(xL)
|
|
yV = np.array(yL)
|
|
|
|
# fit the sampled point to a line
|
|
m,c = np.linalg.lstsq(xV,yV,rcond=None)[0]
|
|
|
|
# calc the residual of the fit at each point
|
|
resV = (m*xV+c)[:,0] - yV
|
|
|
|
# assign the residual to the associated point in scoreD[x]
|
|
for j in range(pointsPerLine):
|
|
scoreD[usL[k+j]].append(resV[j])
|
|
|
|
i += 1
|
|
|
|
|
|
scoreL = []
|
|
|
|
# calc the mean of the residuals for each point
|
|
# (most points were used in 'pointsPerLine' line estimations
|
|
# and so they will have 'pointsPerLine' residual values)
|
|
for us in usL:
|
|
|
|
resL = scoreD[us]
|
|
|
|
if len(resL) == 0:
|
|
scoreL.append(0.0)
|
|
elif len(resL) == 1:
|
|
scoreL.append(resL[0])
|
|
else:
|
|
scoreL.append(np.mean(resL))
|
|
|
|
# their should be one mean resid. value for each sampled point
|
|
assert( len(scoreL) == len(usL) )
|
|
return np.array(scoreL)
|
|
|
|
def write_audacity_label_files( inDir, analysisArgsD, reverseFl=True ):
|
|
|
|
pitchDirL = os.listdir(inDir)
|
|
|
|
for pitchDir in pitchDirL:
|
|
|
|
folderL = pitchDir.split(os.sep)
|
|
|
|
midi_pitch = int(folderL[-1])
|
|
|
|
pitchDir = os.path.join(inDir,pitchDir)
|
|
|
|
takeDirL = os.listdir(pitchDir)
|
|
|
|
for takeFolder in takeDirL:
|
|
|
|
takeDir = os.path.join(pitchDir,takeFolder)
|
|
|
|
r = rms_analysis_main( takeDir, midi_pitch, **analysisArgsD )
|
|
|
|
labelFn = os.path.join(takeDir,"audacity.txt")
|
|
|
|
print("Writing:",labelFn)
|
|
|
|
with open(labelFn,"w") as f:
|
|
|
|
for i,s in enumerate(r.statsL):
|
|
|
|
noteIndex = len(r.statsL)-(i+1) if reverseFl else i
|
|
|
|
label = "%i %4.1f %6.1f" % (noteIndex, s.pkDb, s.durMs )
|
|
|
|
f.write("%f\t%f\t%s\n" % ( s.begSmpSec, s.endSmpSec, label ))
|
|
|
|
|