From ca030abcf577a4b1f9164efd667a8b6a0a48fda8 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 18 Jan 2021 10:29:25 -0500 Subject: [PATCH] ChordTester.py,NoteTester.py,PolyNoteTester.py,VelTablePlayer.py : iniital commit. --- ChordTester.py | 68 +++++++++++++++++++++ NoteTester.py | 119 +++++++++++++++++++++++++++++++++++++ PolyNoteTester.py | 148 ++++++++++++++++++++++++++++++++++++++++++++++ VelTablePlayer.py | 148 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 483 insertions(+) create mode 100644 ChordTester.py create mode 100644 NoteTester.py create mode 100644 PolyNoteTester.py create mode 100644 VelTablePlayer.py diff --git a/ChordTester.py b/ChordTester.py new file mode 100644 index 0000000..6eb082e --- /dev/null +++ b/ChordTester.py @@ -0,0 +1,68 @@ +import types +import time + +class ChordTester: + def __init__( self, cfg, api ): + self.api = api + self.cfg = types.SimpleNamespace(**cfg.ChordTester) + + self.nextMs = 0 + self.isStartedFl = False + self.curNoteCnt = 0 + self.curRepeatCnt = 0 + self.isPlayingFl = False + + + def start( self ): + self.api.set_hold_duty_all( self.cfg.holdDuty ) + if self.cfg.useVelTableFl: + self.api.set_vel_table_all( self.cfg.pitchL ) + self.curNoteCnt = 0 + self.curRepeatCnt = 0 + self.isStartedFl = True + + def stop( self ): + self.isStartedFl = False + self.api.all_notes_off() + + + + def tick( self, ms ): + + if self.isStartedFl and ms >= self.nextMs: + + if self.isPlayingFl: + + # turn notes off + for i in range(0,self.curNoteCnt+1): + self.api.note_off( self.cfg.pitchL[i]) + time.sleep( 0.01 ) + + # repeat or advance the chord note count + self.curRepeatCnt += 1 + if self.curRepeatCnt >= self.cfg.repeatCnt: + self.curRepeatCnt = 0 + self.curNoteCnt += 1 + if self.curNoteCnt >= len(self.cfg.pitchL): + self.isStartedFl = False + self.curNoteCnt = 0 + + self.isPlayingFl = False + self.nextMs = ms + self.cfg.pauseMs + + else: + for i in range(0,self.curNoteCnt+1): + if self.cfg.useVelTableFl: + self.api.note_on_vel(self.cfg.pitchL[i], 45 ) + else: + self.api.note_on_us(self.cfg.pitchL[i], self.cfg.atkUsec) + time.sleep( 0.02 ) + + self.nextMs = ms + self.cfg.durMs + self.isPlayingFl = True + + + + + + diff --git a/NoteTester.py b/NoteTester.py new file mode 100644 index 0000000..d6e6834 --- /dev/null +++ b/NoteTester.py @@ -0,0 +1,119 @@ +import sys,os,types,json +from random import randrange + +class NoteTester: + def __init__( self, cfg, api ): + self.cfg = cfg + self.api = api + + r = types.SimpleNamespace(**cfg.NoteTester) + + self.durMsL = [ randrange(r.minNoteDurMs, r.maxNoteDurMs) for _ in range(r.noteCount) ] + self.pauseMsL = [ randrange(r.minPauseDurMs, r.maxPauseDurMs) for _ in range(r.noteCount) ] + self.eventL = [] + self.nextMs = 0 # next transition time + self.eventIdx = 0 # next event to play + self.noteOnFl = False # True if note is currently sounding + self.pitch = r.pitch # + self.filename = r.filename + self.isStartedFl = False + self.minAttackUsec = r.minAttackUsec + self.maxAttackUsec = r.maxAttackUsec + + def start( self ): + self.eventIdx = 0 + self.noteOnFl = False + self.nextMs = 0 + self.isStartedFl = True + + def stop( self ): + self.isStartedFl = False + self.write() + + def tick( self, ms ): + + if self.isStartedFl and ms > self.nextMs: + + offsMs = 0 + + if self.noteOnFl: + self.noteOnFl = False + self.api.note_off( self.pitch ) + offsMs = self.pauseMsL[ self.eventIdx ] + self.eventIdx += 1 + print("off:%i ms" % (offsMs)) + + + else: + usec = self.minAttackUsec + (int(self.eventIdx * 250) % int(self.maxAttackUsec - self.minAttackUsec)) + decay_level = self.api.calc_decay_level( usec ) + + self.api.note_on_us( self.pitch, usec, decay_level ) + offsMs = self.durMsL[ self.eventIdx ] + print("usec:%i %i dcy:%i" % (usec,offsMs, decay_level) ) + self.noteOnFl = True + + + self.eventL.append( (ms, self.noteOnFl) ) + self.nextMs = ms + offsMs + + if self.eventIdx >= len(self.durMsL): + self.write(); + self.isStartedFl = False + print("done % i" % (len(self.eventL))) + + + + def write( self ): + + with open(self.filename,"w") as f: + json.dump({ "eventL":self.eventL },f ) + + + +def note_tester_compare( nt_fn, logica_fn ): + + eventL = [] + logicaL = [] + + with open(nt_fn,"r") as f: + r = json.load(f) + eventL = r['eventL'] + eventL = [ (ms-eventL[0][0], level ) for ms,level in eventL ] + + with open(logica_fn,"r") as f: + logicaL = [ ( d['count']/16e3,d['level']) for d in json.load(f) if d['signal'] == 0 ] + logicaL = [ (ms-logicaL[0][0], level!=0 ) for ms,level in logicaL ] + + + print(len(eventL)) + print(len(logicaL)) + + #edL = [ eventL[i][0] - eventL[i-1][0] for i in range(2,len(eventL)) ] + #ldL = [ logicaL[i][0] - logicaL[i-1][0] for i in range(2,len(logicaL)) ] + + #print(edL[:10]) + #print(ldL[:10]) + + + durMs = 0 + ms = 0 + for i,(t0,t1) in enumerate(zip(eventL,logicaL)): + t = t0[0] # eventL[] time + dt = int(t - t1[0]) # diff between eventL[] and logicaL[] time + fl = ' ' if t0[1] == t1[1] else '*' # mark level mismatch with '*' + print("%5i %7i %4i %i %s" % (i,durMs,dt,t0[1],fl)) + durMs = t-ms + ms = t + + +if __name__ == "__main__": + + nt_fn = "note_tester.json" + logica_fn = sys.argv[1] + if len(sys.argv) > 2: + nt_fn = sys.argv[2] + + note_tester_compare( nt_fn, logica_fn) + + diff --git a/PolyNoteTester.py b/PolyNoteTester.py new file mode 100644 index 0000000..324b7b7 --- /dev/null +++ b/PolyNoteTester.py @@ -0,0 +1,148 @@ +import types +import time +from random import randrange + +class PolyNoteTester: + def __init__( self, cfg, api ): + self.api = api + + r = types.SimpleNamespace(**cfg.PolyNoteTester) + self.cfg = r + + if r.mode == "simple": + print("mode:simple") + self.schedL = self._gen_simple_sched(r) + else: + print("mode:poly") + self.schedL = self._gen_sched(r) + + + self.schedL = sorted( self.schedL, key=lambda x: x[0] ) + self.nextMs = 0 # next transition time + self.schedIdx = 0 # next event to play + self.isStartedFl = False + + #self._report() + + def _report( self ): + for t,cmd,pitch,atkUs in self.schedL: + print("%s %6i %3i %5i" % (cmd,t,pitch,atkUs)) + + def _gen_simple_sched( self, r ): + """ Play each note sequentially from lowest to highest. """ + durMs = int(r.minNoteDurMs + (r.maxNoteDurMs - r.minNoteDurMs)/2) + ioMs = int(r.minInterOnsetMs + (r.maxInterOnsetMs - r.minInterOnsetMs)/2) + atkUs = int(r.minAttackUsec + (r.maxAttackUsec - r.minAttackUsec)/2) + schedL = [] + + t0 = 0 + for pitch in range(r.minPitch,r.maxPitch+1): + schedL.append((t0,'on',pitch,atkUs)) + schedL.append((t0+durMs,'off',pitch,0)) + t0 += durMs + ioMs + + return schedL + + + def _does_note_overlap( self, beg0Ms, end0Ms, beg1Ms, end1Ms ): + """ if note 0 is entirely before or after note 1 """ + return not (beg0Ms > end1Ms or end0Ms < beg1Ms) + + def _do_any_notes_overlap( self, begMs, endMs, begEndL ): + for beg1,end1 in begEndL: + if self._does_note_overlap(begMs,endMs,beg1,end1): + return True + return False + + def _get_last_end_time( self, begEndL ): + end0 = 0 + for beg,end in begEndL: + if end > end0: + end0 = end + + return end0 + + + def _gen_sched( self, r ): + + pitchL = [ randrange(r.minPitch,r.maxPitch) for _ in range(r.noteCount) ] + durMsL = [ randrange(r.minNoteDurMs, r.maxNoteDurMs) for _ in range(r.noteCount) ] + ioMsL = [ randrange(r.minInterOnsetMs, r.maxInterOnsetMs) for _ in range(r.noteCount) ] + atkUsL = [ randrange(r.minAttackUsec, r.maxAttackUsec) for _ in range(r.noteCount) ] + schedL = [] + pitchD = {} # pitch: [ (begMs,endMs) ] + + t0 = 0 + # for each pitch,dur,ioi,atkUs tuple + for pitch,durMs,interOnsetMs,atkUs in zip(pitchL,durMsL,ioMsL,atkUsL): + + # calc note begin and end time + begMs = t0 + endMs = t0 + durMs + + # if this pitch hasn't yet been added to pitchD + if pitch not in pitchD: + pitchD[ pitch ] = [ (begMs,endMs) ] + else: + + # if the proposed note overlaps with other notes for this pitch + if self._do_any_notes_overlap( begMs, endMs, pitchD[pitch] ): + # move this pitch past the last note + begMs = self._get_last_end_time( pitchD[pitch] ) + interOnsetMs + endMs = begMs + durMs + + # add the new note to pitchD + pitchD[ pitch ].append( (begMs,endMs) ) + + + # update the schedule + schedL.append( (begMs, 'on', pitch, atkUs)) + schedL.append( (endMs, 'off', pitch, 0 )) + + t0 += interOnsetMs + + + return schedL + + + def start( self ): + self.schedIdx = 0 + self.nextMs = 0 + self.api.set_hold_duty_all(self.cfg.holdDutyPct) + self.isStartedFl = True + + + def stop( self ): + self.isStartedFl = False + self.api.all_notes_off() + + + def tick( self, ms ): + + while self.isStartedFl and ms >= self.nextMs and self.schedIdx < len(self.schedL): + + t0,cmd,pitch,usec = self.schedL[self.schedIdx] + + if cmd == 'on': + if pitch not in self.cfg.skipPitchL: + decay_level = self.api.calc_decay_level( usec ) + self.api.note_on_us( pitch, usec, decay_level ) + print("on %i %i %i" % (pitch,usec,decay_level)) + + elif cmd == 'off': + if pitch not in self.cfg.skipPitchL: + self.api.note_off( pitch ) + print("off %i" % pitch) + + self.schedIdx += 1 + if self.schedIdx < len(self.schedL): + self.nextMs = ms + (self.schedL[self.schedIdx][0] - t0) + else: + self.isStartedFl = False + print("Done.") + + + + + + diff --git a/VelTablePlayer.py b/VelTablePlayer.py new file mode 100644 index 0000000..029a24e --- /dev/null +++ b/VelTablePlayer.py @@ -0,0 +1,148 @@ +import json +from rt_note_analysis import RT_Analyzer + +class VelTablePlayer: + + def __init__( self, cfg, api, audio, holdDutyPctD, fn ): + self.cfg = cfg + self.api = api + self.audio = audio + self.rtAnalyzer = RT_Analyzer() + self.holdDutyPctD = holdDutyPctD + self.durMs = 500 + self.mode = "across" + self.state = "off" + self.minPitch = 21 + self.maxPitch = 108 + self.velMapD = {} + + self.curMaxPitch = self.maxPitch + self.curMinPitch = self.minPitch + self.curPitch = 21 + self.curVelocity = 0 + self.curEndMs = 0 + self.curBegNoteMs = 0 + self.curEndNoteMs = 0 + + with open(fn,"r") as f: + d = json.load(f) + + for pitch,value in d.items(): + self.velMapD[ int(pitch) ] = [ int(x[0]) for x in d[pitch] ] + + assert self.minPitch in self.velMapD + assert self.maxPitch in self.velMapD + + + def start( self, minPitch, maxPitch, mode ): + self.curMaxPitch = maxPitch + self.curMinPitch = minPitch + self.curPitch = minPitch + self.curVelocity = 0 + self.state = "note_on" + self.mode = mode + self.audio.record_enable(True) # start recording audio + + def stop( self ): + self.curPitch = self.minPitch + self._all_notes_off() + self.audio.record_enable(False) + + def tick( self, ms ): + if self.state == "off": + pass + + elif self.state == "note_on": + self.state = self._note_on(ms) + + elif self.state == "playing": + if ms >= self.curEndMs: + self.state = "note_off" + + elif self.state == "note_off": + self.state = self._note_off(ms) + + + def _get_duty_cycle( self, pitch, usec ): + usDutyL = self.holdDutyPctD[pitch] + + for i in range(len(usDutyL)): + if usDutyL[i][0] >= usec: + return usDutyL[i][1] + + return usDutyL[-1][1] + + def _calc_next_pitch( self ): + + self.curPitch += 1 + while self.curPitch not in self.velMapD and self.curPitch <= self.curMaxPitch: + self.curPitch+1 + + return self.curPitch <= self.curMaxPitch + + def _get_next_note_params( self ): + + usec = None + dutyPct = None + doneFl = False + + if self.mode == "updown": + if self.curVelocity + 1 < len(self.velMapD[ self.curPitch ]): + self.curVelocity += 1 + else: + + if self._calc_next_pitch(): + self.curVelocity = 0 + else: + doneFl = True + + else: + + if self._calc_next_pitch(): + self.curPitch += 1 + else: + if self.curVelocity + 1 < len(self.velMapD[ self.curPitch ]): + self.curVelocity += 1 + self.curPitch = self.curMinPitch + else: + doneFl = True + + if doneFl: + self.audio.record_enable(False) + else: + usec = self.velMapD[self.curPitch][self.curVelocity] + + dutyPct = self._get_duty_cycle( self.curPitch, usec ) + + + return self.curPitch, usec, dutyPct + + def _note_on( self, ms ): + + pitch,usec,dutyPct = self._get_next_note_params() + + if not usec: + return "off" + else: + print(self.curPitch,self.curVelocity,usec,dutyPct) + self.curBegNoteMs = self.audio.buffer_sample_ms().value + self.api.set_pwm_duty( pitch, dutyPct ) + self.api.note_on_us( pitch, usec ) + self.curEndMs = ms + self.durMs + return "playing" + + def _note_off( self, ms ): + + self.curEndNoteMs = self.audio.buffer_sample_ms().value + self.rtAnalyzer.analyze_note( self.audio, self.curPitch, self.curBegNoteMs, self.curEndNoteMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) + self.api.note_off( self.curPitch ) + return "note_on" + + + def _all_notes_off( self ): + if self.curPitch == 109: + self.state = 'off' + print('done') + else: + self.api.note_off( self.curPitch ) + self.curPitch += 1