diff --git a/AudioDevice.py b/AudioDevice.py index f9894f8..132af1b 100644 --- a/AudioDevice.py +++ b/AudioDevice.py @@ -126,6 +126,13 @@ class AudioDevice(object): return res + def is_recording_enabled( self ): + + if self.inStream is None: + return False + + return self.inStream.active == True + def record_enable( self, enableFl ): # if the input stream has not already been configured diff --git a/calibrate.py b/calibrate.py index e0193ef..72132c0 100644 --- a/calibrate.py +++ b/calibrate.py @@ -1,6 +1,6 @@ import os,types,wave,json,array import numpy as np -from rms_analysis import rms_analyze_one_note +from rms_analysis import rms_analyze_one_rt_note class Calibrate: def __init__( self, cfg, audio, midi, api ): @@ -65,9 +65,9 @@ class Calibrate: if self.midi is not None: self.midi.send_all_notes_off() - if not self.playOnlyFl: - self.audio.record_enable(False) + self.audio.record_enable(False) + if not self.playOnlyFl: self._save_results() def play(self,ms): @@ -75,11 +75,15 @@ class Calibrate: if self.measD is None or len(self.measD) == 0: print("Nothing to play.") else: + self.startMs = ms self.state = 'started' self.playOnlyFl = True self.nextStateChangeMs = ms + 500 self.curPitchIdx = -1 self.curTargetDbIdx = 0 + + self.audio.record_enable(True) + self._do_play_update() def tick(self,ms): @@ -102,6 +106,7 @@ class Calibrate: elif self.state == 'note_off': if self.playOnlyFl: if not self._do_play_update(): + self.stop(ms) self.state = 'stopped' else: if self._do_analysis(ms): @@ -113,10 +118,29 @@ class Calibrate: # if the state was not changed to 'stopped' if self.state == 'note_off': self.state = 'started' - + + + def _calc_play_pulse_us( self, pitch, targetDb ): + + pulseDbL = [] + for d in self.measD[ pitch ]: + if d['targetDb'] == targetDb and d['matchFl']==True: + pulseDbL.append( ( d['pulse_us'], d[self.cfg.dbSrcLabel]['db']) ) + + if len(pulseDbL) == 0: + return -1 + + pulseL,dbL = zip(*pulseDbL) + + # TODO: make a weighted average based on db error + + return np.mean(pulseL) def _do_play_update( self ): + if self.curPitchIdx >= 0: + self._meas_note( self.cfg.pitchL[self.curPitchIdx], self.curPulseUs ) + self.curPitchIdx +=1 if self.curPitchIdx >= len(self.cfg.pitchL): self.curPitchIdx = 0 @@ -124,13 +148,10 @@ class Calibrate: if self.curTargetDbIdx >= len(self.cfg.targetDbL): return False - pitch = self.cfg.pitchL[ self.curPitchIdx ] + pitch = self.cfg.pitchL[ self.curPitchIdx ] targetDb = self.cfg.targetDbL[ self.curTargetDbIdx ] - self.curPulseUs = -1 - for d in self.measD[ pitch ]: - if d['targetDb'] == targetDb and d['matchFl']==True: - self.curPulseUs = d['pulse_us'] - break + self.curPulseUs = self._calc_play_pulse_us( pitch, targetDb ) + self.curTargetDb = targetDb if self.curPulseUs == -1: print("Pitch:%i TargetDb:%f not found." % (pitch,targetDb)) @@ -195,10 +216,47 @@ class Calibrate: #print("note-off: ",self.cfg.pitchL[ self.curPitchIdx]) + def _proportional_step( self, targetDb, dbL, pulseL ): + + curPulse,curDb = self.pulseDbL[-1] + + # get the point closest to the target db + i = np.argmin( np.array(dbL) - targetDb ) + + # find the percentage difference to the target - based on the closest point + pd = abs(curDb-targetDb) / abs(curDb - dbL[i]) + + # + delta_pulse = pd * abs(curPulse - pulseL[i]) + print("prop:",pd,"delta_pulse:",delta_pulse) + + return int(round(curPulse + np.sign(targetDb - curDb) * delta_pulse)) + + def _step( self, targetDb, dbL, pulseL ): + + pulse0,db0 = self.pulseDbL[-2] + pulse1,db1 = self.pulseDbL[-1] + + # microseconds per decibel for the last two points + us_per_db = abs(pulse0-pulse1) / abs(db0-db1) + + if us_per_db == 0: + us_per_db = 10 # ************************************** CONSTANT *********************** + + # calcuate the decibels we need to move from the last point + error_db = targetDb - db1 + + print("us_per_db:",us_per_db," error db:", error_db ) + + return pulse1 + us_per_db * error_db + + + + def _calc_next_pulse_us( self, targetDb ): # sort pulseDb ascending on db - self.pulseDbL = sorted( self.pulseDbL, key=lambda x: x[1] ) + #self.pulseDbL = sorted( self.pulseDbL, key=lambda x: x[1] ) pulseL,dbL = zip(*self.pulseDbL) @@ -223,6 +281,10 @@ class Calibrate: self.deltaDnMult = 1 pu = np.interp([targetDb],dbL,pulseL) + if int(pu) in pulseL: + pu = self._step(targetDb, dbL, pulseL ) + + return max(min(pu,self.cfg.maxPulseUs),self.cfg.minPulseUs) def _do_analysis(self,ms): @@ -309,16 +371,18 @@ class Calibrate: sigV = buf_result.value + + # get the annotated begin and end of the note as sample indexes into sigV bi = int(round(annD['beg_ms'] * self.audio.srate / 1000)) ei = int(round(annD['end_ms'] * self.audio.srate / 1000)) # calculate half the length of the note-off duration in samples - noteOffSmp_o_2 = int(round(self.cfg.noteOffDurMs/2 * self.audio.srate / 1000)) + noteOffSmp_o_2 = int(round( (self.cfg.noteOffDurMs/2) * self.audio.srate / 1000)) # widen the note analysis space noteOffSmp_o_2 samples pre/post the annotated begin/end of the note bi = max(0,bi - noteOffSmp_o_2) - ei = min(noteOffSmp_o_2,sigV.shape[0]-1) + ei = min(ei+noteOffSmp_o_2,sigV.shape[0]-1) ar = types.SimpleNamespace(**self.cfg.analysisD) @@ -327,8 +391,11 @@ class Calibrate: begMs = noteOffSmp_o_2 * 1000 / self.audio.srate endMs = begMs + (annD['end_ms'] - annD['beg_ms']) + #print("MEAS:",begMs,endMs,bi,ei,sigV.shape,self.audio.is_recording_enabled(),ar) + + # analyze the note - resD = rms_analyze_rt_one_note( sigV[bi:ei], self.audio.srate, begMs, endMs, midi_pitch, rmsWndMs=ar.rmsWndMs, rmsHopMs=ar.rmsHopMs, dbRefWndMs=ar.dbRefWndMs, harmCandN=ar.harmCandN, harmN=ar.harmN, durDecayPct=ar.durDecayPct ) + resD = rms_analyze_one_rt_note( sigV[bi:ei], self.audio.srate, begMs, endMs, midi_pitch, rmsWndMs=ar.rmsWndMs, rmsHopMs=ar.rmsHopMs, dbRefWndMs=ar.dbRefWndMs, harmCandN=ar.harmCandN, harmN=ar.harmN, durDecayPct=ar.durDecayPct ) resD["pulse_us"] = pulse_us resD["midi_pitch"] = midi_pitch diff --git a/p_ac.py b/p_ac.py index 23b6aae..e331498 100644 --- a/p_ac.py +++ b/p_ac.py @@ -10,20 +10,21 @@ from MidiDevice import MidiDevice from result import Result from common import parse_yaml_cfg from plot_seq import form_resample_pulse_time_list +from plot_seq import get_resample_points_wrap from plot_seq import form_final_pulse_list from rt_note_analysis import RT_Analyzer from keyboard import Keyboard from calibrate import Calibrate class AttackPulseSeq: - """ Sequence a fixed chord over a list of attack pulse lengths.""" + """ Sequence a fixed pitch over a list of attack pulse lengths.""" def __init__(self, cfg, audio, api, noteDurMs=1000, pauseDurMs=1000 ): self.cfg = cfg self.audio = audio self.api = api self.outDir = None # directory to write audio file and results - self.pitchL = None # chord to play + self.pitch = None # pitch to paly self.pulseUsL = [] # one onset pulse length in microseconds per sequence element self.noteDurMs = noteDurMs # duration of each chord in milliseconds self.pauseDurMs = pauseDurMs # duration between end of previous note and start of next @@ -38,9 +39,9 @@ class AttackPulseSeq: self.playOnlyFl = False self.rtAnalyzer = RT_Analyzer() - def start( self, ms, outDir, pitchL, pulseUsL, holdDutyPctL, playOnlyFl=False ): + def start( self, ms, outDir, pitch, pulseUsL, holdDutyPctL, playOnlyFl=False ): self.outDir = outDir # directory to write audio file and results - self.pitchL = pitchL # chord to play + self.pitch = pitch # note to play self.pulseUsL = pulseUsL # one onset pulse length in microseconds per sequence element self.holdDutyPctL = holdDutyPctL self.pulse_idx = 0 @@ -51,9 +52,6 @@ class AttackPulseSeq: self.beginMs = ms self.playOnlyFl = playOnlyFl - #for pitch in pitchL: - # self.api.set_pwm_duty( pitch, self.holdDutyPct ) - # print("set PWM:%i"%(self.holdDutyPct)) # kpl if not playOnlyFl: self.audio.record_enable(True) # start recording audio @@ -121,11 +119,10 @@ class AttackPulseSeq: self.next_ms = ms + self.noteDurMs self.state = 'note_off' - for pitch in self.pitchL: - pulse_usec = int(self.pulseUsL[ self.pulse_idx ]) - self._set_duty_cycle( pitch, pulse_usec ) - self.api.note_on_us( pitch, pulse_usec ) - print("note-on:",pitch, self.pulse_idx, pulse_usec) + pulse_usec = int(self.pulseUsL[ self.pulse_idx ]) + self._set_duty_cycle( self.pitch, pulse_usec ) + self.api.note_on_us( self.pitch, pulse_usec ) + print("note-on:",self.pitch, self.pulse_idx, pulse_usec) def _note_off( self, ms ): self.eventTimeL[ self.pulse_idx ][1] = self.audio.buffer_sample_ms().value @@ -135,15 +132,14 @@ class AttackPulseSeq: if self.playOnlyFl: begTimeMs = self.eventTimeL[ self.pulse_idx ][0] endTimeMs = self.eventTimeL[ self.pulse_idx ][1] - self.rtAnalyzer.analyze_note( self.audio, self.pitchL[0], begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) + self.rtAnalyzer.analyze_note( self.audio, self.pitch, begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) self._send_note_off() def _send_note_off( self ): - for pitch in self.pitchL: - self.api.note_off( pitch ) - #print("note-off:",pitch,self.pulse_idx) + self.api.note_off( self.pitch ) + #print("note-off:",self.pitch,self.pulse_idx) def _disable(self): self.state = None @@ -153,7 +149,7 @@ class AttackPulseSeq: d = { "pulseUsL":self.pulseUsL, - "pitchL":self.pitchL, + "pitch":self.pitch, "noteDurMs":self.noteDurMs, "pauseDurMs":self.pauseDurMs, "holdDutyPctL":self.holdDutyPctL, @@ -180,16 +176,16 @@ class CalibrateKeys: self.seq = AttackPulseSeq( cfg, audioDev, api, noteDurMs=cfg.noteDurMs, pauseDurMs=cfg.pauseDurMs ) self.pulseUsL = None - self.chordL = None + self.pitchL = None self.pitch_idx = -1 - def start( self, ms, chordL, pulseUsL, playOnlyFl=False ): - if len(chordL) > 0: + def start( self, ms, pitchL, pulseUsL, playOnlyFl=False ): + if len(pitchL) > 0: self.pulseUsL = pulseUsL - self.chordL = chordL + self.pitchL = pitchL self.pitch_idx = -1 - self._start_next_chord( ms, playOnlyFl ) + self._start_next_note( ms, playOnlyFl ) def stop( self, ms ): @@ -205,31 +201,27 @@ class CalibrateKeys: # if the sequencer is done playing if not self.seq.is_enabled(): - self._start_next_chord( ms, self.seq.playOnlyFl ) # ... else start the next sequence + self._start_next_note( ms, self.seq.playOnlyFl ) # ... else start the next sequence return None - def _start_next_chord( self, ms, playOnlyFl ): - + def _start_next_note( self, ms, playOnlyFl ): self.pitch_idx += 1 - # if the last chord in chordL has been played ... - if self.pitch_idx >= len(self.chordL): + # if the last note in pitchL has been played ... + if self.pitch_idx >= len(self.pitchL): self.stop(ms) # ... then we are done else: - pitchL = self.chordL[ self.pitch_idx ] + pitch = self.pitchL[ self.pitch_idx ] # be sure that the base directory exists - outDir = os.path.expanduser( cfg.outDir ) - if not os.path.isdir( outDir ): - os.mkdir( outDir ) + baseDir = os.path.expanduser( cfg.outDir ) + if not os.path.isdir( baseDir ): + os.mkdir( baseDir ) - # form the output directory as "