import sys,os,argparse,yaml,types,logging,select,time,json from datetime import datetime import multiprocessing from multiprocessing import Process, Pipe from picadae_api import Picadae from AudioDevice import AudioDevice from result import Result class AttackPulseSeq: """ Sequence a fixed chord over a list of attack pulse lengths.""" def __init__(self, audio, api, noteDurMs=1000, pauseDurMs=1000, holdDutyPct=50 ): self.audio = audio self.api = api self.outDir = None # directory to write audio file and results self.pitchL = None # chord to play self.pulseUsL = [] # one onset pulse length in microseconds per sequence element self.noteDurMs = noteDurMs # duration of each chord in milliseconds self.pauseDurMs = pauseDurMs # duration between end of previous note and start of next self.holdDutyPct = holdDutyPct # hold voltage duty cycle as a percentage (0-100) self.pulse_idx = 0 # Index of next pulse self.state = None # 'note_on','note_off' self.next_ms = 0 # Time of next event (note-on or note_off) self.eventTimeL = [] # Onset/offset time of each note [ [onset_ms,offset_ms] ] self.beginMs = 0 def start( self, ms, outDir, pitchL, pulseUsL ): self.outDir = outDir # directory to write audio file and results self.pitchL = pitchL # chord to play self.pulseUsL = pulseUsL # one onset pulse length in microseconds per sequence element self.pulse_idx = 0 self.state = 'note_on' self.next_ms = ms + 500 # wait for 500ms to play the first note (this will guarantee that there is some empty space in the audio file before the first note) self.eventTimeL = [[0,0]] * len(pulseUsL) # initialize the event time self.beginMs = ms self.audio.record_enable(True) # start recording audio self.tick(ms) # play the first note def stop(self, ms): self._send_note_off() # be sure that all notes are actually turn-off self.audio.record_enable(False) # stop recording audio self._disable() # disable this sequencer self._write() # write the results def is_enabled(self): return self.state is not None def tick(self, ms): self.audio.tick(ms) # if next event time has arrived if self.is_enabled() and ms >= self.next_ms: # if waiting to turn note on if self.state == 'note_on': self._note_on(ms) # if waiting to turn a note off elif self.state == 'note_off': self._note_off(ms) self.pulse_idx += 1 # if all notes have been played if self.pulse_idx >= len(self.pulseUsL): self.stop(ms) else: assert(0) def _note_on( self, ms ): self.eventTimeL[ self.pulse_idx ][0] = ms - self.beginMs self.next_ms = ms + self.noteDurMs self.state = 'note_off' for pitch in self.pitchL: self.api.note_on_us( pitch, int(self.pulseUsL[ self.pulse_idx ]) ) print("note-on:",pitch,self.pulse_idx) def _note_off( self, ms ): self.eventTimeL[ self.pulse_idx ][1] = ms - self.beginMs self.next_ms = ms + self.pauseDurMs self.state = 'note_on' self._send_note_off() def _send_note_off( self ): for pitch in self.pitchL: self.api.note_off( pitch ) print("note-off:",pitch,self.pulse_idx) def _disable(self): self.state = None def _write( self ): d = { "pulseUsL":self.pulseUsL, "pitchL":self.pitchL, "noteDurMs":self.noteDurMs, "pauseDurMs":self.pauseDurMs, "holdDutyPct":self.holdDutyPct, "eventTimeL":self.eventTimeL, "beginMs":self.beginMs } print("Writing: ", self.outDir ) outDir = os.path.expanduser(self.outDir) if not os.path.isdir(outDir): os.mkdir(outDir) with open(os.path.join( outDir, "seq.json" ),"w") as f: f.write(json.dumps( d )) self.audio.write_buffer( os.path.join( outDir, "audio.wav" ) ) class CalibrateKeys: def __init__(self, cfg, audioDev, api): self.cfg = cfg self.seq = AttackPulseSeq( audioDev, api, noteDurMs=1000, pauseDurMs=1000, holdDutyPct=50 ) self.label = None self.pulseUsL = None self.chordL = None self.pitch_idx = -1 def start( self, ms, label, chordL, pulseUsL ): if len(chordL) > 0: self.label = label self.pulseUsL = pulseUsL self.chordL = chordL self.pitch_idx = -1 self._start_next_chord( ms ) def stop( self, ms ): self.pitch_idx = -1 self.seq.stop(ms) def is_enabled( self ): return self.pitch_idx >= 0 def tick( self, ms ): if self.is_enabled(): self.seq.tick(ms) # if the sequencer is done playing if not self.seq.is_enabled(): self._start_next_chord( ms ) # ... else start the next sequence return None def _start_next_chord( self, ms ): self.pitch_idx += 1 # if the last chord in chordL has been played ... if self.pitch_idx >= len(self.chordL): self.stop(ms) # ... then we are done else: pitchL = self.chordL[ self.pitch_idx ] # be sure that the base directory exists outDir = os.path.expanduser( cfg.outDir ) if not os.path.isdir( outDir ): os.mkdir( outDir ) # form the output directory as "