123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- ##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
- ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
- import os,types,pickle
- import numpy as np
- from plot_seq import form_final_pulse_list
- from rms_analysis import rms_analysis_main_all
-
- class Keyboard:
- def __init__(self, cfg, audio, api):
- self.cfg = cfg
- self.audio = audio
- self.api = api
- self.keyD = {} # { midi_pitch: { pulseUsL, holdDutyPctL } }
-
-
- self.noteDurMs = cfg.noteDurMs
- self.pauseDurMs= cfg.pauseDurMs
-
- self.pitchL = None
-
-
- self.pitch_idx = None
- self.pulse_idx = None
- self.targetDb = None
- self.next_ms = None
- self.enableFl = False
- self.state = None #"note_on" | "note_off"
-
- self.rmsAnlD = None
- #self._load( cfg.outDir, cfg.analysisArgs)
-
- def load( self, inDir, pitchL, analysisArgsD ):
-
- self.keyD = {}
-
- inDir = os.path.expanduser(inDir)
-
- finalPulseListCacheFn = analysisArgsD['finalPulseListCacheFn']
- if os.path.isfile(finalPulseListCacheFn):
- print("READING: final pulse list cache file: %s" % (finalPulseListCacheFn))
- with open(finalPulseListCacheFn,'rb') as f:
- self.keyD = pickle.load(f)
- else:
- dirL = os.listdir(inDir)
-
- for dirStr in dirL:
- dirStr = os.path.normpath(os.path.join(inDir,dirStr))
-
- if os.path.isdir(dirStr):
- pathL = dirStr.split(os.sep)
-
- midi_pitch = int( pathL[-1] )
-
- if midi_pitch in pitchL:
- print(dirStr,midi_pitch)
- pulseUsL,pulseDbL,holdDutyPctL = form_final_pulse_list( dirStr, midi_pitch, analysisArgsD )
-
- d = { 'pulseUsL':pulseUsL, 'holdDutyPctL':holdDutyPctL, 'lastDutyPct':0 }
-
- self.keyD[ midi_pitch ] = types.SimpleNamespace(**d)
-
- with open(finalPulseListCacheFn,'wb') as f:
- pickle.dump(self.keyD,f)
-
- print("Loading analysis ...")
-
- cacheFn = analysisArgsD['rmsAnalysisCacheFn']
- self.rmsAnlD = rms_analysis_main_all( inDir, cacheFn, **analysisArgsD['rmsAnalysisArgs'] )
-
- print("Load DONE.")
-
- def _get_duty_cycle_from_pulse_usec( self, pitch, pulseUsec ):
-
- if pitch not in self.keyD:
- print("Missing duty cycle.")
- return None
-
- dutyPct = self.keyD[pitch].holdDutyPctL[0][1]
- for refUsec,refDuty in self.keyD[pitch].holdDutyPctL:
- if pulseUsec < refUsec:
- break
- dutyPct = refDuty
-
- return dutyPct
-
- def _get_pulse_and_duty_cycle_from_pulse_idx( self, pitch, pulse_idx ):
-
- pulseUsec = self.keyD[ pitch ].pulseUsL[ pulse_idx ]
-
- dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec )
-
- return pulseUsec, dutyPct
-
- def _get_pulse_and_duty_cycle_target_db( self, pitch, targetDb ):
-
- r = self.rmsAnlD[pitch]
-
- pulse_idx = np.argmin( np.abs(np.array(r.pkDbL) - targetDb) )
-
- print("PULSE idx:",pulse_idx," db:", r.pkDbL[pulse_idx] )
-
- pulseUsec = r.pkUsL[pulse_idx]
-
- dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec )
-
- return pulseUsec, dutyPct
-
- def _get_pulse_and_duty_cycle( self, pitch, pulse_idx, targetDb ):
-
- if pulse_idx is not None:
- return self._get_pulse_and_duty_cycle_from_pulse_idx(pitch,pulse_idx)
- else:
- return self._get_pulse_and_duty_cycle_target_db( pitch, targetDb )
-
-
-
- def start( self, ms, pitchL, pulse_idx, targetDb=None ):
-
- loadFl = True
-
- if self.pitchL is not None:
- loadFl = False
- for pitch in pitchL:
- if pitch not in self.pitchL:
- loadFl = True
- break
-
- if loadFl:
- self.load(self.cfg.outDir, pitchL, self.cfg.analysisArgs)
-
- self.pitchL = pitchL
- self.pitch_idx = 0
- self.pulse_idx = pulse_idx
- self.targetDb = targetDb
- self.state = "note_on"
- self.next_ms = ms
- self.eventTimeL = [[0,0] for _ in range(len(pitchL))] # initialize the event time
- self.audio.record_enable(True) # start recording audio
- self.tick(ms) # play the first note
-
- def repeat( self, ms, pulse_idx, targetDb=None ):
- self.start( ms, self.pitchL, pulse_idx, targetDb )
-
- def stop( self, ms ):
- self._send_note_off()
- self.audio.record_enable(False) # stop recording audio
- self.state = None # disable this sequencer
-
-
- def tick( self, ms ):
- #self.audio.tick(ms)
-
- # if next event time has arrived
- if self.state is not None and ms >= self.next_ms:
-
- # if waiting to turn note on
- if self.state == 'note_on':
- self._note_on(ms)
-
- # if waiting to turn a note off
- elif self.state == 'note_off':
- self._note_off(ms)
- self.pitch_idx += 1
-
- # if all notes have been played
- if self.pitch_idx >= len(self.pitchL):
- self.stop(ms)
-
- else:
- assert(0)
-
- def _note_on( self, ms ):
-
- self.eventTimeL[ self.pitch_idx ][0] = self.audio.buffer_sample_ms().value
- self.next_ms = ms + self.noteDurMs
- self.state = 'note_off'
-
- pitch = self.pitchL[ self.pitch_idx ]
- pulse_usec, dutyPct = self._get_pulse_and_duty_cycle( pitch, self.pulse_idx, self.targetDb )
-
- if pulse_usec is not None and dutyPct is not None:
- self._set_pwm_duty( pitch, dutyPct )
- self.api.note_on_us( pitch, pulse_usec )
-
- pulse_idx = 0 if self.pulse_idx is None else self.pulse_idx
- targetDb = 0 if self.targetDb is None else self.targetDb
- dutyPct = 0 if dutyPct is None else dutyPct
- print("note-on: %i %i %4.1f %8.1f %i" % (pitch, pulse_idx, targetDb, pulse_usec, dutyPct))
-
- def _set_pwm_duty( self, pitch, dutyPct ):
-
- if self.keyD[pitch].lastDutyPct != dutyPct:
- self.keyD[pitch].lastDutyPct = dutyPct
- self.api.set_pwm_duty( pitch, dutyPct )
-
- def _note_off( self, ms ):
- self.eventTimeL[ self.pitch_idx ][1] = self.audio.buffer_sample_ms().value
- self.next_ms = ms + self.pauseDurMs
- self.state = 'note_on'
-
- #begTimeMs = self.eventTimeL[ self.pulse_idx ][0]
- #endTimeMs = self.eventTimeL[ self.pulse_idx ][1]
- #self.rtAnalyzer.analyze_note( self.audio, self.pitchL[0], begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] )
-
- self._send_note_off()
-
- def _send_note_off( self ):
- for pitch in self.pitchL:
- self.api.note_off( pitch )
- #print("note-off:",pitch,self.pulse_idx)
|