import json from rt_note_analysis import RT_Analyzer class VelTablePlayer: def __init__( self, cfg, api, audio, holdDutyPctD, fn ): self.cfg = cfg self.api = api self.audio = audio self.rtAnalyzer = RT_Analyzer() self.holdDutyPctD = holdDutyPctD self.durMs = 500 self.mode = "across" self.state = "off" self.minPitch = 21 self.maxPitch = 108 self.velMapD = {} self.curMaxPitch = self.maxPitch self.curMinPitch = self.minPitch self.curPitch = 21 self.curVelocity = 0 self.curEndMs = 0 self.curBegNoteMs = 0 self.curEndNoteMs = 0 with open(fn,"r") as f: d = json.load(f) for pitch,value in d.items(): self.velMapD[ int(pitch) ] = [ int(x[0]) for x in d[pitch] ] assert self.minPitch in self.velMapD assert self.maxPitch in self.velMapD def start( self, minPitch, maxPitch, mode ): self.curMaxPitch = maxPitch self.curMinPitch = minPitch self.curPitch = minPitch self.curVelocity = 0 self.state = "note_on" self.mode = mode self.audio.record_enable(True) # start recording audio def stop( self ): self.curPitch = self.minPitch self._all_notes_off() self.audio.record_enable(False) def tick( self, ms ): if self.state == "off": pass elif self.state == "note_on": self.state = self._note_on(ms) elif self.state == "playing": if ms >= self.curEndMs: self.state = "note_off" elif self.state == "note_off": self.state = self._note_off(ms) def _get_duty_cycle( self, pitch, usec ): usDutyL = self.holdDutyPctD[pitch] for i in range(len(usDutyL)): if usDutyL[i][0] >= usec: return usDutyL[i][1] return usDutyL[-1][1] def _calc_next_pitch( self ): self.curPitch += 1 while self.curPitch not in self.velMapD and self.curPitch <= self.curMaxPitch: self.curPitch+1 return self.curPitch <= self.curMaxPitch def _get_next_note_params( self ): usec = None dutyPct = None doneFl = False if self.mode == "updown": if self.curVelocity + 1 < len(self.velMapD[ self.curPitch ]): self.curVelocity += 1 else: if self._calc_next_pitch(): self.curVelocity = 0 else: doneFl = True else: if self._calc_next_pitch(): self.curPitch += 1 else: if self.curVelocity + 1 < len(self.velMapD[ self.curPitch ]): self.curVelocity += 1 self.curPitch = self.curMinPitch else: doneFl = True if doneFl: self.audio.record_enable(False) else: usec = self.velMapD[self.curPitch][self.curVelocity] dutyPct = self._get_duty_cycle( self.curPitch, usec ) return self.curPitch, usec, dutyPct def _note_on( self, ms ): pitch,usec,dutyPct = self._get_next_note_params() if not usec: return "off" else: print(self.curPitch,self.curVelocity,usec,dutyPct) self.curBegNoteMs = self.audio.buffer_sample_ms().value self.api.set_pwm_duty( pitch, dutyPct ) self.api.note_on_us( pitch, usec ) self.curEndMs = ms + self.durMs return "playing" def _note_off( self, ms ): self.curEndNoteMs = self.audio.buffer_sample_ms().value self.rtAnalyzer.analyze_note( self.audio, self.curPitch, self.curBegNoteMs, self.curEndNoteMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) self.api.note_off( self.curPitch ) return "note_on" def _all_notes_off( self ): if self.curPitch == 109: self.state = 'off' print('done') else: self.api.note_off( self.curPitch ) self.curPitch += 1