import os,types,json from scipy.io import wavfile from scipy.signal import stft import numpy as np def calc_harm_bins( srate, binHz, midiPitch, harmN ): semi_tone = 1.0/12 quarter_tone = 1.0/24 eigth_tone = 1.0/48 band_width_st = 3.0/48 # 3/8 tone fundHz = (13.75 * pow(2.0,(-9.0/12.0))) * pow(2.0,(midiPitch / 12)) fund_l_binL = [int(round(fundHz * pow(2.0,-band_width_st) * i/binHz)) for i in range(1,harmN+1)] fund_m_binL = [int(round(fundHz * i/binHz)) for i in range(1,harmN+1)] fund_u_binL = [int(round(fundHz * pow(2.0, band_width_st) * i/binHz)) for i in range(1,harmN+1)] for i in range(len(fund_m_binL)): if fund_l_binL[i] >= fund_m_binL[i] and fund_l_binL[i] > 0: fund_l_binL[i] = fund_m_binL[i] - 1 if fund_u_binL[i] <= fund_m_binL[i] and fund_u_binL[i] < len(fund_u_binL)-1: fund_u_binL[i] = fund_m_binL[i] + 1 return fund_l_binL, fund_m_binL, fund_u_binL def rms_to_db( xV, rms_srate, refWndMs ): dbWndN = int(round(refWndMs * rms_srate / 1000.0)) dbRef = ref = np.mean(xV[0:dbWndN]) rmsDbV = 20.0 * np.log10( xV / dbRef ) return rmsDbV def audio_rms( srate, xV, rmsWndMs, hopMs, refWndMs ): wndSmpN = int(round( rmsWndMs * srate / 1000.0)) hopSmpN = int(round( hopMs * srate / 1000.0)) xN = xV.shape[0] yN = int(((xN - wndSmpN) / hopSmpN) + 1) assert( yN > 0) yV = np.zeros( (yN, ) ) assert( wndSmpN > 1 ) i = 0 j = 0 while i < xN and j < yN: if i == 0: yV[j] = np.sqrt(xV[0]*xV[0]) elif i < wndSmpN: yV[j] = np.sqrt( np.mean( xV[0:i] * xV[0:i] ) ) else: yV[j] = np.sqrt( np.mean( xV[i-wndSmpN:i] * xV[i-wndSmpN:i] ) ) i += hopSmpN j += 1 rms_srate = srate / hopSmpN return rms_to_db( yV, rms_srate, refWndMs ), rms_srate def audio_stft_rms( srate, xV, rmsWndMs, hopMs, refWndMs, spectrumIdx ): wndSmpN = int(round( rmsWndMs * srate / 1000.0)) hopSmpN = int(round( hopMs * srate / 1000.0)) binHz = srate / wndSmpN f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True ) specHopIdx = int(round( spectrumIdx )) specV = np.sqrt(np.abs(xM[:, specHopIdx ])) mV = np.zeros((xM.shape[1])) for i in range(xM.shape[1]): mV[i] = np.max(np.sqrt(np.abs(xM[:,i]))) rms_srate = srate / hopSmpN mV = rms_to_db( mV, rms_srate, refWndMs ) return mV, rms_srate, specV, specHopIdx, binHz def audio_harm_rms( srate, xV, rmsWndMs, hopMs, dbRefWndMs, midiPitch, harmCandN, harmN ): wndSmpN = int(round( rmsWndMs * srate / 1000.0)) hopSmpN = int(round( hopMs * srate / 1000.0)) binHz = srate / wndSmpN f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True ) harmLBinL,harmMBinL,harmUBinL = calc_harm_bins( srate, binHz, midiPitch, harmCandN ) rmsV = np.zeros((xM.shape[1],)) for i in range(xM.shape[1]): mV = np.sqrt(np.abs(xM[:,i])) pV = np.zeros((len(harmLBinL,))) for j,(b0i,b1i) in enumerate(zip( harmLBinL, harmUBinL )): pV[j] = np.max(mV[b0i:b1i]) rmsV[i] = np.mean( sorted(pV)[-harmN:] ) rms_srate = srate / hopSmpN rmsV = rms_to_db( rmsV, rms_srate, dbRefWndMs ) return rmsV, rms_srate, binHz def locate_peak_indexes( xV, xV_srate, eventMsL ): pkIdxL = [] for begMs, endMs in eventMsL: begSmpIdx = int(begMs * xV_srate / 1000.0) endSmpIdx = int(endMs * xV_srate / 1000.0) pkIdxL.append( begSmpIdx + np.argmax( xV[begSmpIdx:endSmpIdx] ) ) return pkIdxL def rms_analysis_main( inDir, midi_pitch, rmsWndMs=300, rmsHopMs=30, dbRefWndMs=500, harmCandN=5, harmN=3 ): seqFn = os.path.join( inDir, "seq.json") audioFn = os.path.join( inDir, "audio.wav") with open( seqFn, "rb") as f: r = json.load(f) srate, signalM = wavfile.read(audioFn) sigV = signalM / float(0x7fff) tdRmsDbV, rms0_srate = audio_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs ) tdPkIdxL = locate_peak_indexes( tdRmsDbV, rms0_srate, r['eventTimeL']) rmsDbV, rms_srate, binHz = audio_harm_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs, midi_pitch, harmCandN, harmN ) pkIdxL = locate_peak_indexes( rmsDbV, rms_srate, r['eventTimeL'] ) r = types.SimpleNamespace(**{ "audio_srate":srate, "tdRmsDbV": tdRmsDbV, "tdPkIdxL": tdPkIdxL, "tdPkDbL": [ tdRmsDbV[i] for i in tdPkIdxL ], "binHz": binHz, "rmsDbV":rmsDbV, "rms_srate":rms_srate, "pkIdxL":pkIdxL, # pkIdxL[ len(pulsUsL) ] - indexes into rmsDbV[] of peaks #"min_pk_idx":min_pk_idx, #"max_pk_idx":max_pk_idx, "eventTimeL":r['eventTimeL'], 'pkDbL': [ rmsDbV[ i ] for i in pkIdxL ], 'pkUsL':r['pulseUsL'] }) return r