import os,types,pickle import numpy as np from plot_seq import form_final_pulse_list from rms_analysis import rms_analysis_main_all class Keyboard: def __init__(self, cfg, audio, api): self.cfg = cfg self.audio = audio self.api = api self.keyD = {} # { midi_pitch: { pulseUsL, holdDutyPctL } } self.noteDurMs = cfg.noteDurMs self.pauseDurMs= cfg.pauseDurMs self.pitchL = None self.pitch_idx = None self.pulse_idx = None self.targetDb = None self.next_ms = None self.enableFl = False self.state = None #"note_on" | "note_off" self.rmsAnlD = None #self._load( cfg.outDir, cfg.analysisArgs) def load( self, inDir, pitchL, analysisArgsD ): self.keyD = {} inDir = os.path.expanduser(inDir) finalPulseListCacheFn = analysisArgsD['finalPulseListCacheFn'] if os.path.isfile(finalPulseListCacheFn): print("READING: final pulse list cache file: %s" % (finalPulseListCacheFn)) with open(finalPulseListCacheFn,'rb') as f: self.keyD = pickle.load(f) else: dirL = os.listdir(inDir) for dirStr in dirL: dirStr = os.path.normpath(os.path.join(inDir,dirStr)) if os.path.isdir(dirStr): pathL = dirStr.split(os.sep) midi_pitch = int( pathL[-1] ) if midi_pitch in pitchL: print(dirStr,midi_pitch) pulseUsL,pulseDbL,holdDutyPctL = form_final_pulse_list( dirStr, midi_pitch, analysisArgsD ) d = { 'pulseUsL':pulseUsL, 'holdDutyPctL':holdDutyPctL, 'lastDutyPct':0 } self.keyD[ midi_pitch ] = types.SimpleNamespace(**d) with open(finalPulseListCacheFn,'wb') as f: pickle.dump(self.keyD,f) print("Loading analysis ...") cacheFn = analysisArgsD['rmsAnalysisCacheFn'] self.rmsAnlD = rms_analysis_main_all( inDir, cacheFn, **analysisArgsD['rmsAnalysisArgs'] ) print("Load DONE.") def _get_duty_cycle_from_pulse_usec( self, pitch, pulseUsec ): if pitch not in self.keyD: print("Missing duty cycle.") return None dutyPct = self.keyD[pitch].holdDutyPctL[0][1] for refUsec,refDuty in self.keyD[pitch].holdDutyPctL: if pulseUsec < refUsec: break dutyPct = refDuty return dutyPct def _get_pulse_and_duty_cycle_from_pulse_idx( self, pitch, pulse_idx ): pulseUsec = self.keyD[ pitch ].pulseUsL[ pulse_idx ] dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec ) return pulseUsec, dutyPct def _get_pulse_and_duty_cycle_target_db( self, pitch, targetDb ): r = self.rmsAnlD[pitch] pulse_idx = np.argmin( np.abs(np.array(r.pkDbL) - targetDb) ) print("PULSE idx:",pulse_idx," db:", r.pkDbL[pulse_idx] ) pulseUsec = r.pkUsL[pulse_idx] dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec ) return pulseUsec, dutyPct def _get_pulse_and_duty_cycle( self, pitch, pulse_idx, targetDb ): if pulse_idx is not None: return self._get_pulse_and_duty_cycle_from_pulse_idx(pitch,pulse_idx) else: return self._get_pulse_and_duty_cycle_target_db( pitch, targetDb ) def start( self, ms, pitchL, pulse_idx, targetDb=None ): loadFl = True if self.pitchL is not None: loadFl = False for pitch in pitchL: if pitch not in self.pitchL: loadFl = True break if loadFl: self.load(self.cfg.outDir, pitchL, self.cfg.analysisArgs) self.pitchL = pitchL self.pitch_idx = 0 self.pulse_idx = pulse_idx self.targetDb = targetDb self.state = "note_on" self.next_ms = ms self.eventTimeL = [[0,0] for _ in range(len(pitchL))] # initialize the event time self.audio.record_enable(True) # start recording audio self.tick(ms) # play the first note def repeat( self, ms, pulse_idx, targetDb=None ): self.start( ms, self.pitchL, pulse_idx, targetDb ) def stop( self, ms ): self._send_note_off() self.audio.record_enable(False) # stop recording audio self.state = None # disable this sequencer def tick( self, ms ): #self.audio.tick(ms) # if next event time has arrived if self.state is not None and ms >= self.next_ms: # if waiting to turn note on if self.state == 'note_on': self._note_on(ms) # if waiting to turn a note off elif self.state == 'note_off': self._note_off(ms) self.pitch_idx += 1 # if all notes have been played if self.pitch_idx >= len(self.pitchL): self.stop(ms) else: assert(0) def _note_on( self, ms ): self.eventTimeL[ self.pitch_idx ][0] = self.audio.buffer_sample_ms().value self.next_ms = ms + self.noteDurMs self.state = 'note_off' pitch = self.pitchL[ self.pitch_idx ] pulse_usec, dutyPct = self._get_pulse_and_duty_cycle( pitch, self.pulse_idx, self.targetDb ) if pulse_usec is not None and dutyPct is not None: self._set_pwm_duty( pitch, dutyPct ) self.api.note_on_us( pitch, pulse_usec ) pulse_idx = 0 if self.pulse_idx is None else self.pulse_idx targetDb = 0 if self.targetDb is None else self.targetDb dutyPct = 0 if dutyPct is None else dutyPct print("note-on: %i %i %4.1f %8.1f %i" % (pitch, pulse_idx, targetDb, pulse_usec, dutyPct)) def _set_pwm_duty( self, pitch, dutyPct ): if self.keyD[pitch].lastDutyPct != dutyPct: self.keyD[pitch].lastDutyPct = dutyPct self.api.set_pwm_duty( pitch, dutyPct ) def _note_off( self, ms ): self.eventTimeL[ self.pitch_idx ][1] = self.audio.buffer_sample_ms().value self.next_ms = ms + self.pauseDurMs self.state = 'note_on' #begTimeMs = self.eventTimeL[ self.pulse_idx ][0] #endTimeMs = self.eventTimeL[ self.pulse_idx ][1] #self.rtAnalyzer.analyze_note( self.audio, self.pitchL[0], begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) self._send_note_off() def _send_note_off( self ): for pitch in self.pitchL: self.api.note_off( pitch ) #print("note-off:",pitch,self.pulse_idx)