ChordTester.py,NoteTester.py,PolyNoteTester.py,VelTablePlayer.py : iniital commit.

This commit is contained in:
kevin 2021-01-18 10:29:25 -05:00
parent d96bc2fded
commit ca030abcf5
4 changed files with 483 additions and 0 deletions

68
ChordTester.py Normal file
View File

@ -0,0 +1,68 @@
import types
import time
class ChordTester:
def __init__( self, cfg, api ):
self.api = api
self.cfg = types.SimpleNamespace(**cfg.ChordTester)
self.nextMs = 0
self.isStartedFl = False
self.curNoteCnt = 0
self.curRepeatCnt = 0
self.isPlayingFl = False
def start( self ):
self.api.set_hold_duty_all( self.cfg.holdDuty )
if self.cfg.useVelTableFl:
self.api.set_vel_table_all( self.cfg.pitchL )
self.curNoteCnt = 0
self.curRepeatCnt = 0
self.isStartedFl = True
def stop( self ):
self.isStartedFl = False
self.api.all_notes_off()
def tick( self, ms ):
if self.isStartedFl and ms >= self.nextMs:
if self.isPlayingFl:
# turn notes off
for i in range(0,self.curNoteCnt+1):
self.api.note_off( self.cfg.pitchL[i])
time.sleep( 0.01 )
# repeat or advance the chord note count
self.curRepeatCnt += 1
if self.curRepeatCnt >= self.cfg.repeatCnt:
self.curRepeatCnt = 0
self.curNoteCnt += 1
if self.curNoteCnt >= len(self.cfg.pitchL):
self.isStartedFl = False
self.curNoteCnt = 0
self.isPlayingFl = False
self.nextMs = ms + self.cfg.pauseMs
else:
for i in range(0,self.curNoteCnt+1):
if self.cfg.useVelTableFl:
self.api.note_on_vel(self.cfg.pitchL[i], 45 )
else:
self.api.note_on_us(self.cfg.pitchL[i], self.cfg.atkUsec)
time.sleep( 0.02 )
self.nextMs = ms + self.cfg.durMs
self.isPlayingFl = True

119
NoteTester.py Normal file
View File

@ -0,0 +1,119 @@
import sys,os,types,json
from random import randrange
class NoteTester:
def __init__( self, cfg, api ):
self.cfg = cfg
self.api = api
r = types.SimpleNamespace(**cfg.NoteTester)
self.durMsL = [ randrange(r.minNoteDurMs, r.maxNoteDurMs) for _ in range(r.noteCount) ]
self.pauseMsL = [ randrange(r.minPauseDurMs, r.maxPauseDurMs) for _ in range(r.noteCount) ]
self.eventL = []
self.nextMs = 0 # next transition time
self.eventIdx = 0 # next event to play
self.noteOnFl = False # True if note is currently sounding
self.pitch = r.pitch #
self.filename = r.filename
self.isStartedFl = False
self.minAttackUsec = r.minAttackUsec
self.maxAttackUsec = r.maxAttackUsec
def start( self ):
self.eventIdx = 0
self.noteOnFl = False
self.nextMs = 0
self.isStartedFl = True
def stop( self ):
self.isStartedFl = False
self.write()
def tick( self, ms ):
if self.isStartedFl and ms > self.nextMs:
offsMs = 0
if self.noteOnFl:
self.noteOnFl = False
self.api.note_off( self.pitch )
offsMs = self.pauseMsL[ self.eventIdx ]
self.eventIdx += 1
print("off:%i ms" % (offsMs))
else:
usec = self.minAttackUsec + (int(self.eventIdx * 250) % int(self.maxAttackUsec - self.minAttackUsec))
decay_level = self.api.calc_decay_level( usec )
self.api.note_on_us( self.pitch, usec, decay_level )
offsMs = self.durMsL[ self.eventIdx ]
print("usec:%i %i dcy:%i" % (usec,offsMs, decay_level) )
self.noteOnFl = True
self.eventL.append( (ms, self.noteOnFl) )
self.nextMs = ms + offsMs
if self.eventIdx >= len(self.durMsL):
self.write();
self.isStartedFl = False
print("done % i" % (len(self.eventL)))
def write( self ):
with open(self.filename,"w") as f:
json.dump({ "eventL":self.eventL },f )
def note_tester_compare( nt_fn, logica_fn ):
eventL = []
logicaL = []
with open(nt_fn,"r") as f:
r = json.load(f)
eventL = r['eventL']
eventL = [ (ms-eventL[0][0], level ) for ms,level in eventL ]
with open(logica_fn,"r") as f:
logicaL = [ ( d['count']/16e3,d['level']) for d in json.load(f) if d['signal'] == 0 ]
logicaL = [ (ms-logicaL[0][0], level!=0 ) for ms,level in logicaL ]
print(len(eventL))
print(len(logicaL))
#edL = [ eventL[i][0] - eventL[i-1][0] for i in range(2,len(eventL)) ]
#ldL = [ logicaL[i][0] - logicaL[i-1][0] for i in range(2,len(logicaL)) ]
#print(edL[:10])
#print(ldL[:10])
durMs = 0
ms = 0
for i,(t0,t1) in enumerate(zip(eventL,logicaL)):
t = t0[0] # eventL[] time
dt = int(t - t1[0]) # diff between eventL[] and logicaL[] time
fl = ' ' if t0[1] == t1[1] else '*' # mark level mismatch with '*'
print("%5i %7i %4i %i %s" % (i,durMs,dt,t0[1],fl))
durMs = t-ms
ms = t
if __name__ == "__main__":
nt_fn = "note_tester.json"
logica_fn = sys.argv[1]
if len(sys.argv) > 2:
nt_fn = sys.argv[2]
note_tester_compare( nt_fn, logica_fn)

148
PolyNoteTester.py Normal file
View File

@ -0,0 +1,148 @@
import types
import time
from random import randrange
class PolyNoteTester:
def __init__( self, cfg, api ):
self.api = api
r = types.SimpleNamespace(**cfg.PolyNoteTester)
self.cfg = r
if r.mode == "simple":
print("mode:simple")
self.schedL = self._gen_simple_sched(r)
else:
print("mode:poly")
self.schedL = self._gen_sched(r)
self.schedL = sorted( self.schedL, key=lambda x: x[0] )
self.nextMs = 0 # next transition time
self.schedIdx = 0 # next event to play
self.isStartedFl = False
#self._report()
def _report( self ):
for t,cmd,pitch,atkUs in self.schedL:
print("%s %6i %3i %5i" % (cmd,t,pitch,atkUs))
def _gen_simple_sched( self, r ):
""" Play each note sequentially from lowest to highest. """
durMs = int(r.minNoteDurMs + (r.maxNoteDurMs - r.minNoteDurMs)/2)
ioMs = int(r.minInterOnsetMs + (r.maxInterOnsetMs - r.minInterOnsetMs)/2)
atkUs = int(r.minAttackUsec + (r.maxAttackUsec - r.minAttackUsec)/2)
schedL = []
t0 = 0
for pitch in range(r.minPitch,r.maxPitch+1):
schedL.append((t0,'on',pitch,atkUs))
schedL.append((t0+durMs,'off',pitch,0))
t0 += durMs + ioMs
return schedL
def _does_note_overlap( self, beg0Ms, end0Ms, beg1Ms, end1Ms ):
""" if note 0 is entirely before or after note 1 """
return not (beg0Ms > end1Ms or end0Ms < beg1Ms)
def _do_any_notes_overlap( self, begMs, endMs, begEndL ):
for beg1,end1 in begEndL:
if self._does_note_overlap(begMs,endMs,beg1,end1):
return True
return False
def _get_last_end_time( self, begEndL ):
end0 = 0
for beg,end in begEndL:
if end > end0:
end0 = end
return end0
def _gen_sched( self, r ):
pitchL = [ randrange(r.minPitch,r.maxPitch) for _ in range(r.noteCount) ]
durMsL = [ randrange(r.minNoteDurMs, r.maxNoteDurMs) for _ in range(r.noteCount) ]
ioMsL = [ randrange(r.minInterOnsetMs, r.maxInterOnsetMs) for _ in range(r.noteCount) ]
atkUsL = [ randrange(r.minAttackUsec, r.maxAttackUsec) for _ in range(r.noteCount) ]
schedL = []
pitchD = {} # pitch: [ (begMs,endMs) ]
t0 = 0
# for each pitch,dur,ioi,atkUs tuple
for pitch,durMs,interOnsetMs,atkUs in zip(pitchL,durMsL,ioMsL,atkUsL):
# calc note begin and end time
begMs = t0
endMs = t0 + durMs
# if this pitch hasn't yet been added to pitchD
if pitch not in pitchD:
pitchD[ pitch ] = [ (begMs,endMs) ]
else:
# if the proposed note overlaps with other notes for this pitch
if self._do_any_notes_overlap( begMs, endMs, pitchD[pitch] ):
# move this pitch past the last note
begMs = self._get_last_end_time( pitchD[pitch] ) + interOnsetMs
endMs = begMs + durMs
# add the new note to pitchD
pitchD[ pitch ].append( (begMs,endMs) )
# update the schedule
schedL.append( (begMs, 'on', pitch, atkUs))
schedL.append( (endMs, 'off', pitch, 0 ))
t0 += interOnsetMs
return schedL
def start( self ):
self.schedIdx = 0
self.nextMs = 0
self.api.set_hold_duty_all(self.cfg.holdDutyPct)
self.isStartedFl = True
def stop( self ):
self.isStartedFl = False
self.api.all_notes_off()
def tick( self, ms ):
while self.isStartedFl and ms >= self.nextMs and self.schedIdx < len(self.schedL):
t0,cmd,pitch,usec = self.schedL[self.schedIdx]
if cmd == 'on':
if pitch not in self.cfg.skipPitchL:
decay_level = self.api.calc_decay_level( usec )
self.api.note_on_us( pitch, usec, decay_level )
print("on %i %i %i" % (pitch,usec,decay_level))
elif cmd == 'off':
if pitch not in self.cfg.skipPitchL:
self.api.note_off( pitch )
print("off %i" % pitch)
self.schedIdx += 1
if self.schedIdx < len(self.schedL):
self.nextMs = ms + (self.schedL[self.schedIdx][0] - t0)
else:
self.isStartedFl = False
print("Done.")

148
VelTablePlayer.py Normal file
View File

@ -0,0 +1,148 @@
import json
from rt_note_analysis import RT_Analyzer
class VelTablePlayer:
def __init__( self, cfg, api, audio, holdDutyPctD, fn ):
self.cfg = cfg
self.api = api
self.audio = audio
self.rtAnalyzer = RT_Analyzer()
self.holdDutyPctD = holdDutyPctD
self.durMs = 500
self.mode = "across"
self.state = "off"
self.minPitch = 21
self.maxPitch = 108
self.velMapD = {}
self.curMaxPitch = self.maxPitch
self.curMinPitch = self.minPitch
self.curPitch = 21
self.curVelocity = 0
self.curEndMs = 0
self.curBegNoteMs = 0
self.curEndNoteMs = 0
with open(fn,"r") as f:
d = json.load(f)
for pitch,value in d.items():
self.velMapD[ int(pitch) ] = [ int(x[0]) for x in d[pitch] ]
assert self.minPitch in self.velMapD
assert self.maxPitch in self.velMapD
def start( self, minPitch, maxPitch, mode ):
self.curMaxPitch = maxPitch
self.curMinPitch = minPitch
self.curPitch = minPitch
self.curVelocity = 0
self.state = "note_on"
self.mode = mode
self.audio.record_enable(True) # start recording audio
def stop( self ):
self.curPitch = self.minPitch
self._all_notes_off()
self.audio.record_enable(False)
def tick( self, ms ):
if self.state == "off":
pass
elif self.state == "note_on":
self.state = self._note_on(ms)
elif self.state == "playing":
if ms >= self.curEndMs:
self.state = "note_off"
elif self.state == "note_off":
self.state = self._note_off(ms)
def _get_duty_cycle( self, pitch, usec ):
usDutyL = self.holdDutyPctD[pitch]
for i in range(len(usDutyL)):
if usDutyL[i][0] >= usec:
return usDutyL[i][1]
return usDutyL[-1][1]
def _calc_next_pitch( self ):
self.curPitch += 1
while self.curPitch not in self.velMapD and self.curPitch <= self.curMaxPitch:
self.curPitch+1
return self.curPitch <= self.curMaxPitch
def _get_next_note_params( self ):
usec = None
dutyPct = None
doneFl = False
if self.mode == "updown":
if self.curVelocity + 1 < len(self.velMapD[ self.curPitch ]):
self.curVelocity += 1
else:
if self._calc_next_pitch():
self.curVelocity = 0
else:
doneFl = True
else:
if self._calc_next_pitch():
self.curPitch += 1
else:
if self.curVelocity + 1 < len(self.velMapD[ self.curPitch ]):
self.curVelocity += 1
self.curPitch = self.curMinPitch
else:
doneFl = True
if doneFl:
self.audio.record_enable(False)
else:
usec = self.velMapD[self.curPitch][self.curVelocity]
dutyPct = self._get_duty_cycle( self.curPitch, usec )
return self.curPitch, usec, dutyPct
def _note_on( self, ms ):
pitch,usec,dutyPct = self._get_next_note_params()
if not usec:
return "off"
else:
print(self.curPitch,self.curVelocity,usec,dutyPct)
self.curBegNoteMs = self.audio.buffer_sample_ms().value
self.api.set_pwm_duty( pitch, dutyPct )
self.api.note_on_us( pitch, usec )
self.curEndMs = ms + self.durMs
return "playing"
def _note_off( self, ms ):
self.curEndNoteMs = self.audio.buffer_sample_ms().value
self.rtAnalyzer.analyze_note( self.audio, self.curPitch, self.curBegNoteMs, self.curEndNoteMs, self.cfg.analysisArgs['rmsAnalysisArgs'] )
self.api.note_off( self.curPitch )
return "note_on"
def _all_notes_off( self ):
if self.curPitch == 109:
self.state = 'off'
print('done')
else:
self.api.note_off( self.curPitch )
self.curPitch += 1