##| Copyright: (C) 2019-2020 Kevin Larke ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file. import os,types,json,pickle,types from scipy.io import wavfile from scipy.signal import stft import numpy as np from common import parse_yaml_cfg def calc_harm_bins( srate, binHz, midiPitch, harmN ): semi_tone = 1.0/12 quarter_tone = 1.0/24 eigth_tone = 1.0/48 band_width_st = 3.0/48 # 3/8 tone fundHz = (13.75 * pow(2.0,(-9.0/12.0))) * pow(2.0,(midiPitch / 12)) fund_l_binL = [int(round(fundHz * pow(2.0,-band_width_st) * i/binHz)) for i in range(1,harmN+1)] fund_m_binL = [int(round(fundHz * i/binHz)) for i in range(1,harmN+1)] fund_u_binL = [int(round(fundHz * pow(2.0, band_width_st) * i/binHz)) for i in range(1,harmN+1)] for i in range(len(fund_m_binL)): if fund_l_binL[i] >= fund_m_binL[i] and fund_l_binL[i] > 0: fund_l_binL[i] = fund_m_binL[i] - 1 if fund_u_binL[i] <= fund_m_binL[i] and fund_u_binL[i] < len(fund_u_binL)-1: fund_u_binL[i] = fund_m_binL[i] + 1 return fund_l_binL, fund_m_binL, fund_u_binL def rms_to_db( xV, rms_srate, dbLinRef ): #dbWndN = int(round(refWndMs * rms_srate / 1000.0)) #dbRef = ref = np.mean(xV[0:dbWndN]) #print("DB REF:",dbLinRef, min(xV), np.argmin(xV)) rmsDbV = 20.0 * np.log10( (xV+np.nextafter(0,1)) / dbLinRef ) return rmsDbV def audio_rms( srate, xV, rmsWndMs, hopMs, dbLinRef ): wndSmpN = int(round( rmsWndMs * srate / 1000.0)) hopSmpN = int(round( hopMs * srate / 1000.0)) xN = xV.shape[0] yN = int(((xN - wndSmpN) / hopSmpN) + 1) assert( yN > 0) yV = np.zeros( (yN, ) ) assert( wndSmpN > 1 ) i = 0 j = 0 while i < xN and j < yN: if i == 0: yV[j] = np.sqrt(xV[0]*xV[0]) elif i < wndSmpN: yV[j] = np.sqrt( np.mean( xV[0:i] * xV[0:i] ) ) else: yV[j] = np.sqrt( np.mean( xV[i-wndSmpN:i] * xV[i-wndSmpN:i] ) ) i += hopSmpN j += 1 rms_srate = srate / hopSmpN return rms_to_db( yV[0:j], rms_srate, dbLinRef ), rms_srate def audio_stft_rms( srate, xV, rmsWndMs, hopMs, dbLinRef, spectrumIdx ): wndSmpN = int(round( rmsWndMs * srate / 1000.0)) hopSmpN = int(round( hopMs * srate / 1000.0)) binHz = srate / wndSmpN f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True ) specHopIdx = int(round( spectrumIdx )) specV = np.sqrt(np.abs(xM[:, specHopIdx ])) mV = np.zeros((xM.shape[1])) for i in range(xM.shape[1]): mV[i] = np.max(np.sqrt(np.abs(xM[:,i]))) rms_srate = srate / hopSmpN mV = rms_to_db( mV, rms_srate, dbLinRef ) return mV, rms_srate, specV, specHopIdx, binHz def audio_harm_rms( srate, xV, rmsWndMs, hopMs, dbLinRef, midiPitch, harmCandN, harmN ): wndSmpN = int(round( rmsWndMs * srate / 1000.0)) hopSmpN = int(round( hopMs * srate / 1000.0)) binHz = srate / wndSmpN #print( "STFT:", rmsWndMs, hopMs, wndSmpN, hopSmpN, wndSmpN-hopSmpN ) f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True ) harmLBinL,harmMBinL,harmUBinL = calc_harm_bins( srate, binHz, midiPitch, harmCandN ) rmsV = np.zeros((xM.shape[1],)) for i in range(xM.shape[1]): mV = np.sqrt(np.abs(xM[:,i])) pV = np.zeros((len(harmLBinL,))) for j,(b0i,b1i) in enumerate(zip( harmLBinL, harmUBinL )): pV[j] = np.max(mV[b0i:b1i]) rmsV[i] = np.mean( sorted(pV)[-harmN:] ) rms_srate = srate / hopSmpN rmsV = rms_to_db( rmsV, rms_srate, dbLinRef ) return rmsV, rms_srate, binHz def measure_duration_ms( rmsV, rms_srate, peak_idx, end_idx, decay_pct ): """ Calcuate the time it takes for a note to decay from the peak at rmsV[peak_idx] dB to 'decay_pct' percent of the peak value. """ pkRmsDb = rmsV[ peak_idx ] # calc the note turn-off (offset) db as a percentage of the peak amplitude offsetRmsDb = pkRmsDb * decay_pct / 100.0 # calc the sample index where the note is off offset_idx = peak_idx + np.argmin( np.abs(rmsV[peak_idx:end_idx] - offsetRmsDb) ) # calc the duration of the note dur_ms = int(round((offset_idx - peak_idx) * 1000.0 / rms_srate)) #print(pkRmsDb, offsetRmsDb, peak_idx, offset_idx, end_idx, dur_ms, rms_srate) return dur_ms def select_first_stable_note_by_dur( durMsL, minDurMs=800 ): first_stable_idx = None for i,durMs in enumerate(durMsL): if durMs > minDurMs and first_stable_idx is None: first_stable_idx = i else: if durMs < minDurMs: first_stable_idx = None return first_stable_idx def select_first_stable_note_by_delta_db_1( pkDbL, pkUsL, maxPulseUs=0.1 ): wndN = 5 aL = [] dV = np.diff(pkDbL) / pkDbL[1:] for ei in range(wndN,len(pkDbL)): xV = dV[ei-wndN:ei] avg = np.mean(np.abs(xV)) aL.append(avg) k = np.argmin(np.abs(np.array(pkUsL) - maxPulseUs)) print(aL) print(k) for i in range(k,0,-1): if aL[i] > maxDeltaDb: return i + 1 return None def select_first_stable_note_by_delta_db( pkDbL, pkUsL=None, maxPulseUs=0.1 ): wndN = 5 dV = np.diff(pkDbL) / pkDbL[1:] for ei in range(wndN,len(pkDbL)): xV = dV[ei-wndN:ei] avg = np.mean(np.abs(xV)) if avg < .1: return (ei-wndN)+1 return None def note_stats( r, decay_pct=50.0, extraDurSearchMs=500 ): """ Collect some statistics and markers for each note in the sequence. """ statsL = [] srate = r.rms_srate qmax = 0 for i,(begSmpMs, endSmpMs) in enumerate(r.eventTimeL): begSmpIdx = int(round(srate * begSmpMs / 1000.0)) endSmpIdx = int(round(srate * (endSmpMs + extraDurSearchMs) / 1000.0)) pkSmpIdx = r.pkIdxL[i] durMs = measure_duration_ms( r.rmsDbV, srate, pkSmpIdx, endSmpIdx, decay_pct ) bi = pkSmpIdx ei = pkSmpIdx + int(round(durMs * srate / 1000.0)) qualityCoeff = np.sum(r.rmsDbV[bi:ei]) + np.sum(r.tdRmsDbV[bi:ei]) if qualityCoeff > qmax: qmax = qualityCoeff if ei-bi == 0: tdRmsDb_v = 0.0 if bi >= len(r.tdRmsDbV) else np.mean(r.tdRmsDbV[bi]) hmRmsDb_v = 0.0 if bi >= len(r.rmsDbV) else np.mean(r.rmsDbV[bi]) durAvgDb = (hmRmsDb_v + tdRmsDb_v)/2.0 else: tdRmsDb_u = 0.0 if ei >= len(r.tdRmsDbV) else np.mean(r.tdRmsDbV[bi:ei]) hmRmsDb_u = 0.0 if ei >= len(r.rmsDbV) else np.mean(r.rmsDbV[bi:ei]) durAvgDb = (hmRmsDb_u + tdRmsDb_u)/2.0 statsL.append( types.SimpleNamespace(**{ 'begSmpSec':begSmpIdx/srate, 'endSmpSec':endSmpIdx/srate, 'pkSmpSec':pkSmpIdx/srate, 'durMs':durMs, 'pkDb':r.pkDbL[i], 'pulse_us':r.pkUsL[i], 'quality':qualityCoeff, 'durAvgDb':durAvgDb })) for i,r in enumerate(statsL): statsL[i].quality = 0 if qmax <= 0 else statsL[i].quality / qmax return statsL def locate_peak_indexes( xV, xV_srate, eventMsL, audioFn="" ): pkIdxL = [] for i, (begMs, endMs) in enumerate(eventMsL): begSmpIdx = int(begMs * xV_srate / 1000.0) endSmpIdx = int(endMs * xV_srate / 1000.0) if endSmpIdx != begSmpIdx: pkIdxL.append( begSmpIdx + np.argmax( xV[begSmpIdx:endSmpIdx] ) ) else: print("Missing peak %i : begween beg:%i ms end:%i ms : %s" % (i, begMs, endMs, audioFn )) pkIdxL.append( None ) return pkIdxL def key_info_dictionary( keyMapL=None, yamlCfgFn=None): if yamlCfgFn is not None: cfg = parse_yaml_cfg(yamlCfgFn) keyMapL = cfg.key_mapL kmD = {} for d in keyMapL: kmD[ d['midi'] ] = types.SimpleNamespace(**d) return kmD def rms_analyze_one_rt_note( sigV, srate, begMs, endMs, midi_pitch, rmsWndMs=300, rmsHopMs=30, dbLinRef=0.001, harmCandN=5, harmN=3, durDecayPct=40 ): sigV = np.squeeze(sigV) td_rmsDbV, td_srate = audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef ) begSmpIdx = int(round(begMs * td_srate/1000)) endSmpIdx = int(round(endMs * td_srate/1000)) td_pk_idx = begSmpIdx + np.argmax(td_rmsDbV[begSmpIdx:endSmpIdx]) td_durMs = measure_duration_ms( td_rmsDbV, td_srate, td_pk_idx, len(sigV)-1, durDecayPct ) hm_rmsDbV, hm_srate, binHz = audio_harm_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef, midi_pitch, harmCandN, harmN ) begSmpIdx = int(round(begMs * hm_srate/1000)) endSmpIdx = int(round(endMs * hm_srate/1000)) hm_pk_idx = begSmpIdx + np.argmax(hm_rmsDbV[begSmpIdx:endSmpIdx]) hm_durMs = measure_duration_ms( hm_rmsDbV, hm_srate, hm_pk_idx, len(sigV)-1, durDecayPct ) tdD = { "rmsDbV":td_rmsDbV.tolist(), "srate":td_srate, "pk_idx":int(td_pk_idx), "db":float(td_rmsDbV[td_pk_idx]), "durMs":td_durMs } hmD = { "rmsDbV":hm_rmsDbV.tolist(), "srate":hm_srate, "pk_idx":int(hm_pk_idx), "db":float(hm_rmsDbV[hm_pk_idx]), "durMs":hm_durMs } return { "td":tdD, "hm":hmD } def rms_analyze_one_rt_note_wrap( audioDev, annBegMs, annEndMs, midi_pitch, noteOffDurMs, rmsAnalysisD ): resD = None buf_result = audioDev.linear_buffer() if buf_result: sigV = buf_result.value if len(sigV.shape) > 1: sigV = sigV[:,0].squeeze() # get the annotated begin and end of the note as sample indexes into sigV bi = int(round(annBegMs * audioDev.srate / 1000)) ei = int(round(annEndMs * audioDev.srate / 1000)) # calculate half the length of the note-off duration in samples noteOffSmp_o_2 = int(round( (noteOffDurMs/2) * audioDev.srate / 1000)) # widen the note analysis space noteOffSmp_o_2 samples pre/post the annotated begin/end of the note bi = max(0,bi - noteOffSmp_o_2) ei = min(ei+noteOffSmp_o_2,sigV.shape[0]-1) ar = types.SimpleNamespace(**rmsAnalysisD) # shift the annotatd begin/end of the note to be relative to index bi begMs = noteOffSmp_o_2 * 1000 / audioDev.srate endMs = begMs + (annEndMs - annBegMs) #print("MEAS:",begMs,endMs,bi,ei,sigV.shape,audioDev.is_recording_enabled(),ar) # analyze the note resD = rms_analyze_one_rt_note( sigV[bi:ei], audioDev.srate, begMs, endMs, midi_pitch, rmsWndMs=ar.rmsWndMs, rmsHopMs=ar.rmsHopMs, dbLinRef=ar.dbLinRef, harmCandN=ar.harmCandN, harmN=ar.harmN, durDecayPct=ar.durDecayPct ) #print( "hm:%4.1f %4i td:%4.1f %4i" % (resD['hm']['db'], resD['hm']['durMs'], resD['td']['db'], resD['td']['durMs'])) return resD def calibrate_rms( sigV, srate, beg_ms, end_ms ): bi = int(round(beg_ms * srate / 1000)) ei = int(round(end_ms * srate / 1000)) rms = np.sqrt( np.mean( sigV[bi:ei] * sigV[bi:ei] )) return 20.0*np.log10( rms / 0.002 ) def calibrate_recording_analysis( inDir ): jsonFn = os.path.join(inDir, "meas.json" ) audioFn = os.path.join(inDir, "audio.wav" ) with open(jsonFn,"r") as f: r = json.load(f) measD = r['measD'] cfg = types.SimpleNamespace(**r['cfg']) annL = r['annoteL'] anlD = {} n = 0 for midi_pitch,measL in measD.items(): n += len(measL) anlD[int(midi_pitch)] = [] srate, signalM = wavfile.read(audioFn) sigV = signalM / float(0x7fff) anlr = types.SimpleNamespace(**cfg.analysisD) tdRmsDbV, td_srate = audio_rms( srate, sigV, anlr.rmsWndMs, anlr.rmsHopMs, anlr.dbLinRef ) # for each measured pitch for midi_pitch,measL in measD.items(): # for each measured note at this pitch for mi,d in enumerate(measL): mr = types.SimpleNamespace(**d) # locate the associated annotation reocrd for annD in annL: ar = types.SimpleNamespace(**annD) if ar.midi_pitch == mr.midi_pitch and ar.beg_ms==mr.beg_ms and ar.end_ms==mr.end_ms: assert( ar.pulse_us == mr.pulse_us ) bi = int(round(ar.beg_ms * td_srate / 1000)) ei = int(round(ar.end_ms * td_srate / 1000)) db = np.mean(tdRmsDbV[bi:ei]) db = calibrate_rms( sigV, srate, ar.beg_ms, ar.end_ms ) anlD[int(midi_pitch)].append({ 'pulse_us':ar.pulse_us, 'db':db, 'meas_idx':mi }) break return anlD def rms_analysis_main( inDir, midi_pitch, rmsWndMs=300, rmsHopMs=30, dbLinRef=0.001, harmCandN=5, harmN=3, durDecayPct=40 ): # form the audio and meta data file names seqFn = os.path.join( inDir, "seq.json") audioFn = os.path.join( inDir, "audio.wav") # read the meta data file with open( seqFn, "rb") as f: r = json.load(f) # rad the auido file srate, signalM = wavfile.read(audioFn) # convert the audio signal vector to contain only the first (left) channel if len(signalM.shape)>1: signalM = signalM[:,0].squeeze() # convert the auido file to floats in range [-1.0 to 1.0] sigV = signalM / float(0x7fff) # calc. the RMS signal tdRmsDbV, rms0_srate = audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef ) # locate the peaks in the RMS signal tdPkIdxL = locate_peak_indexes( tdRmsDbV, rms0_srate, r['eventTimeL'], audioFn ) # sum the first harmN harmonics to form a envelope of the audio signal (this is an alternative to the RMS signal) rmsDbV, rms_srate, binHz = audio_harm_rms( srate, sigV, rmsWndMs, rmsHopMs, dbLinRef, midi_pitch, harmCandN, harmN ) # locate the peaks in the harmonic sum signal pkIdxL = locate_peak_indexes( rmsDbV, rms_srate, r['eventTimeL'], audioFn ) # form the 'holdDutyPctlL' hold duty cycle transition point list holdDutyPctL = None if 'holdDutyPct' in r: holdDutyPctL = [ (0, r['holdDutyPct']) ] else: holdDutyPctL = r['holdDutyPctL'] eventN = len(r['eventTimeL']) assert( len(tdPkIdxL) == eventN and len(pkIdxL) == eventN ) # filter out all missing events that have no peak flL = [ (tdPkIdxL[i] is not None) and (pkIdxL[i] is not None) for i in range(eventN) ] eventTimeL = [ r['eventTimeL'][i] for i in range(eventN) if flL[i] ] tdPkDbL = [ tdRmsDbV[tdPkIdxL[i]] for i in range(eventN) if flL[i] ] pkDbL = [ rmsDbV[ pkIdxL[i]] for i in range(eventN) if flL[i] ] #pkUsL = [ r['pulseUsL'][pkIdxL[i]] for i in range(eventN) if flL[i] ] tdPkIdxL = [ i for i in tdPkIdxL if i is not None ] pkIdxL = [ i for i in pkIdxL if i is not None ] # form the result record r = types.SimpleNamespace(**{ "audio_srate":srate, "eventTimeMsL": eventTimeL, #r['eventTimeL'], "tdRmsDbV": tdRmsDbV, "tdPkIdxL": tdPkIdxL, "tdPkDbL": tdPkDbL, # [ tdRmsDbV[i] for i in tdPkIdxL ], "binHz": binHz, "rmsDbV": rmsDbV, "rms_srate":rms_srate, "pkIdxL":pkIdxL, # pkIdxL[ len(pulsUsL) ] - indexes into rmsDbV[] of peaks "eventTimeL": eventTimeL, #r['eventTimeL'], "holdDutyPctL":holdDutyPctL, 'pkDbL': pkDbL, # [ rmsDbV[ i ] for i in pkIdxL ], 'pkUsL': r['pulseUsL'] }) # r['pulseUsL'] # statsL = note_stats(r,durDecayPct) setattr(r,"statsL", statsL ) return r def rms_analysis_main_all( inDir, cacheFn, rmsWndMs=300, rmsHopMs=30, dbLinRef=0.001, harmCandN=5, harmN=3, durDecayPct=40 ): if os.path.isfile(cacheFn): print("READING analysis cache file: %s" % (cacheFn)) with open(cacheFn,"rb") as f: rD = pickle.load(f) return rD folderL = os.listdir(inDir) rD = {} for folder in folderL: pathL = folder.split(os.sep) midi_pitch = int(pathL[-1]) print(midi_pitch) path = os.path.join(inDir,folder,'0') if os.path.isdir(path) and os.path.isfile(os.path.join(os.path.join(path,"seq.json"))): r = rms_analysis_main( path, midi_pitch, rmsWndMs=rmsWndMs, rmsHopMs=rmsHopMs, dbLinRef=dbLinRef, harmCandN=harmCandN, harmN=harmN, durDecayPct=durDecayPct ) rD[ midi_pitch ] = r with open(cacheFn,"wb") as f: pickle.dump(rD,f) return rD def samples_to_linear_residual( usL, dbL, pointsPerLine=5 ): # Score the quality of each sampled point by measuring the # quality of fit to a local line. scoreD = { us:[] for us in usL } pointsPerLine = 5 i = pointsPerLine # for each sampled point while i < len(usL): # beginning with sample at index 'pointsPerLine' if i >= pointsPerLine: k = i - pointsPerLine # get the x (us) and y (db) sample values xL,yL = zip(*[ ((usL[k+j],1.0), dbL[k+j]) for j in range(pointsPerLine)]) xV = np.array(xL) yV = np.array(yL) # fit the sampled point to a line m,c = np.linalg.lstsq(xV,yV,rcond=None)[0] # calc the residual of the fit at each point resV = (m*xV+c)[:,0] - yV # assign the residual to the associated point in scoreD[x] for j in range(pointsPerLine): scoreD[usL[k+j]].append(resV[j]) i += 1 scoreL = [] # calc the mean of the residuals for each point # (most points were used in 'pointsPerLine' line estimations # and so they will have 'pointsPerLine' residual values) for us in usL: resL = scoreD[us] if len(resL) == 0: scoreL.append(0.0) elif len(resL) == 1: scoreL.append(resL[0]) else: scoreL.append(np.mean(resL)) # their should be one mean resid. value for each sampled point assert( len(scoreL) == len(usL) ) return np.array(scoreL) def write_audacity_label_files( inDir, analysisArgsD, reverseFl=True ): pitchDirL = os.listdir(inDir) for pitchDir in pitchDirL: folderL = pitchDir.split(os.sep) midi_pitch = int(folderL[-1]) pitchDir = os.path.join(inDir,pitchDir) takeDirL = os.listdir(pitchDir) for takeFolder in takeDirL: takeDir = os.path.join(pitchDir,takeFolder) if not os.path.isfile(os.path.join(takeDir,"seq.json")): continue r = rms_analysis_main( takeDir, midi_pitch, **analysisArgsD ) labelFn = os.path.join(takeDir,"audacity.txt") print("Writing:",labelFn) with open(labelFn,"w") as f: for i,s in enumerate(r.statsL): noteIndex = len(r.statsL)-(i+1) if reverseFl else i label = "%i %4.1f %6.1f" % (noteIndex, s.pkDb, s.durMs ) f.write("%f\t%f\t%s\n" % ( s.begSmpSec, s.endSmpSec, label ))