piccal/keyboard.py
kpl 26b997811a AudioDevice.py : fixed linear_buffer()
p_ac.py : changed dutyCyclePct to a list, added real-timen note analysis and multi-pitch tests.
Added keyboard.py, rt_note_analysis.py, plot_note_analysis.py, plot_all_note_durations.ipynb.
2019-11-18 11:44:47 -05:00

210 lines
7.0 KiB
Python

import os,types,pickle
import numpy as np
from plot_seq import form_final_pulse_list
from rms_analysis import rms_analysis_main_all
class Keyboard:
def __init__(self, cfg, audio, api):
self.cfg = cfg
self.audio = audio
self.api = api
self.keyD = {} # { midi_pitch: { pulseUsL, holdDutyPctL } }
self.noteDurMs = cfg.noteDurMs
self.pauseDurMs= cfg.pauseDurMs
self.pitchL = None
self.pitch_idx = None
self.pulse_idx = None
self.targetDb = None
self.next_ms = None
self.enableFl = False
self.state = None #"note_on" | "note_off"
self.rmsAnlD = None
#self._load( cfg.outDir, cfg.analysisArgs)
def load( self, inDir, pitchL, analysisArgsD ):
self.keyD = {}
inDir = os.path.expanduser(inDir)
finalPulseListCacheFn = analysisArgsD['finalPulseListCacheFn']
if os.path.isfile(finalPulseListCacheFn):
print("READING: final pulse list cache file: %s" % (finalPulseListCacheFn))
with open(finalPulseListCacheFn,'rb') as f:
self.keyD = pickle.load(f)
else:
dirL = os.listdir(inDir)
for dirStr in dirL:
dirStr = os.path.normpath(os.path.join(inDir,dirStr))
if os.path.isdir(dirStr):
pathL = dirStr.split(os.sep)
midi_pitch = int( pathL[-1] )
if midi_pitch in pitchL:
print(dirStr,midi_pitch)
pulseUsL,pulseDbL,holdDutyPctL = form_final_pulse_list( dirStr, midi_pitch, analysisArgsD )
d = { 'pulseUsL':pulseUsL, 'holdDutyPctL':holdDutyPctL, 'lastDutyPct':0 }
self.keyD[ midi_pitch ] = types.SimpleNamespace(**d)
with open(finalPulseListCacheFn,'wb') as f:
pickle.dump(self.keyD,f)
print("Loading analysis ...")
cacheFn = analysisArgsD['rmsAnalysisCacheFn']
self.rmsAnlD = rms_analysis_main_all( inDir, cacheFn, **analysisArgsD['rmsAnalysisArgs'] )
print("Load DONE.")
def _get_duty_cycle_from_pulse_usec( self, pitch, pulseUsec ):
if pitch not in self.keyD:
print("Missing duty cycle.")
return None
dutyPct = self.keyD[pitch].holdDutyPctL[0][1]
for refUsec,refDuty in self.keyD[pitch].holdDutyPctL:
if pulseUsec < refUsec:
break
dutyPct = refDuty
return dutyPct
def _get_pulse_and_duty_cycle_from_pulse_idx( self, pitch, pulse_idx ):
pulseUsec = self.keyD[ pitch ].pulseUsL[ pulse_idx ]
dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec )
return pulseUsec, dutyPct
def _get_pulse_and_duty_cycle_target_db( self, pitch, targetDb ):
r = self.rmsAnlD[pitch]
pulse_idx = np.argmin( np.abs(np.array(r.pkDbL) - targetDb) )
print("PULSE idx:",pulse_idx," db:", r.pkDbL[pulse_idx] )
pulseUsec = r.pkUsL[pulse_idx]
dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec )
return pulseUsec, dutyPct
def _get_pulse_and_duty_cycle( self, pitch, pulse_idx, targetDb ):
if pulse_idx is not None:
return self._get_pulse_and_duty_cycle_from_pulse_idx(pitch,pulse_idx)
else:
return self._get_pulse_and_duty_cycle_target_db( pitch, targetDb )
def start( self, ms, pitchL, pulse_idx, targetDb=None ):
loadFl = True
if self.pitchL is not None:
loadFl = False
for pitch in pitchL:
if pitch not in self.pitchL:
loadFl = True
break
if loadFl:
self.load(self.cfg.outDir, pitchL, self.cfg.analysisArgs)
self.pitchL = pitchL
self.pitch_idx = 0
self.pulse_idx = pulse_idx
self.targetDb = targetDb
self.state = "note_on"
self.next_ms = ms
self.eventTimeL = [[0,0] for _ in range(len(pitchL))] # initialize the event time
self.audio.record_enable(True) # start recording audio
self.tick(ms) # play the first note
def repeat( self, ms, pulse_idx, targetDb=None ):
self.start( ms, self.pitchL, pulse_idx, targetDb )
def stop( self, ms ):
self._send_note_off()
self.audio.record_enable(False) # stop recording audio
self.state = None # disable this sequencer
def tick( self, ms ):
#self.audio.tick(ms)
# if next event time has arrived
if self.state is not None and ms >= self.next_ms:
# if waiting to turn note on
if self.state == 'note_on':
self._note_on(ms)
# if waiting to turn a note off
elif self.state == 'note_off':
self._note_off(ms)
self.pitch_idx += 1
# if all notes have been played
if self.pitch_idx >= len(self.pitchL):
self.stop(ms)
else:
assert(0)
def _note_on( self, ms ):
self.eventTimeL[ self.pitch_idx ][0] = self.audio.buffer_sample_ms().value
self.next_ms = ms + self.noteDurMs
self.state = 'note_off'
pitch = self.pitchL[ self.pitch_idx ]
pulse_usec, dutyPct = self._get_pulse_and_duty_cycle( pitch, self.pulse_idx, self.targetDb )
if pulse_usec is not None and dutyPct is not None:
self._set_pwm_duty( pitch, dutyPct )
self.api.note_on_us( pitch, pulse_usec )
pulse_idx = 0 if self.pulse_idx is None else self.pulse_idx
targetDb = 0 if self.targetDb is None else self.targetDb
dutyPct = 0 if dutyPct is None else dutyPct
print("note-on: %i %i %4.1f %8.1f %i" % (pitch, pulse_idx, targetDb, pulse_usec, dutyPct))
def _set_pwm_duty( self, pitch, dutyPct ):
if self.keyD[pitch].lastDutyPct != dutyPct:
self.keyD[pitch].lastDutyPct = dutyPct
self.api.set_pwm_duty( pitch, dutyPct )
def _note_off( self, ms ):
self.eventTimeL[ self.pitch_idx ][1] = self.audio.buffer_sample_ms().value
self.next_ms = ms + self.pauseDurMs
self.state = 'note_on'
#begTimeMs = self.eventTimeL[ self.pulse_idx ][0]
#endTimeMs = self.eventTimeL[ self.pulse_idx ][1]
#self.rtAnalyzer.analyze_note( self.audio, self.pitchL[0], begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] )
self._send_note_off()
def _send_note_off( self ):
for pitch in self.pitchL:
self.api.note_off( pitch )
#print("note-off:",pitch,self.pulse_idx)