##| Copyright: (C) 2019-2020 Kevin Larke ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file. import sys,os,argparse,types,logging,select,time,json from datetime import datetime import multiprocessing from multiprocessing import Process, Pipe from picadae_api import Picadae from AudioDevice import AudioDevice from MidiDevice import MidiDevice from result import Result from common import parse_yaml_cfg from plot_seq import form_resample_pulse_time_list from plot_seq_1 import get_resample_points_wrap from plot_seq import form_final_pulse_list from rt_note_analysis import RT_Analyzer from keyboard import Keyboard from calibrate import Calibrate from rms_analysis import rms_analyze_one_rt_note_wrap from MidiFilePlayer import MidiFilePlayer class AttackPulseSeq: """ Sequence a fixed pitch over a list of attack pulse lengths.""" def __init__(self, cfg, audio, api, noteDurMs=1000, pauseDurMs=1000 ): self.cfg = cfg self.audio = audio self.api = api self.outDir = None # directory to write audio file and results self.pitch = None # pitch to paly self.pulseUsL = [] # one onset pulse length in microseconds per sequence element self.noteDurMs = noteDurMs # duration of each chord in milliseconds self.pauseDurMs = pauseDurMs # duration between end of previous note and start of next self.holdDutyPctL= None # hold voltage duty cycle table [ (minPulseSeqUsec,dutyCyclePct) ] self.holdDutyPctD= None # { us:dutyPct } for each us in self.pulseUsL self.silentNoteN = None self.pulse_idx = 0 # Index of next pulse self.state = None # 'note_on','note_off' self.prevHoldDutyPct = None self.next_ms = 0 # Time of next event (note-on or note_off) self.eventTimeL = [] # Onset/offset time of each note [ [onset_ms,offset_ms] ] (used to locate the note in the audio file) self.beginMs = 0 self.playOnlyFl = False self.rtAnalyzer = RT_Analyzer() def start( self, ms, outDir, pitch, pulseUsL, holdDutyPctL, holdDutyPctD, playOnlyFl=False ): self.outDir = outDir # directory to write audio file and results self.pitch = pitch # note to play self.pulseUsL = pulseUsL # one onset pulse length in microseconds per sequence element self.holdDutyPctL = holdDutyPctL self.holdDutyPctD = holdDutyPctD self.silentNoteN = 0 self.pulse_idx = 0 self.state = 'note_on' self.prevHoldDutyPct = None self.next_ms = ms + 500 # wait for 500ms to play the first note (this will guarantee that there is some empty space in the audio file before the first note) self.eventTimeL = [[0,0] for _ in range(len(pulseUsL))] # initialize the event time self.beginMs = ms self.playOnlyFl = playOnlyFl # kpl if not playOnlyFl: self.audio.record_enable(True) # start recording audio print("Hold Delay from end of attack:",cfg.defaultHoldDelayUsecs) self.api.set_hold_delay(pitch,cfg.defaultHoldDelayUsecs) self.tick(ms) # play the first note def stop(self, ms): self._send_note_off() # be sure that all notes are actually turn-off # kpl if not self.playOnlyFl: self.audio.record_enable(False) # stop recording audio self._disable() # disable this sequencer if not self.playOnlyFl: self._write() # write the results def is_enabled(self): return self.state is not None def tick(self, ms): # if next event time has arrived if self.is_enabled() and ms >= self.next_ms: # if waiting to turn note on if self.state == 'note_on': self._note_on(ms) # if waiting to turn a note off elif self.state == 'note_off': self._note_off(ms) self._count_silent_notes() self.pulse_idx += 1 # if all notes have been played if self.pulse_idx >= len(self.pulseUsL): # or self.silentNoteN >= self.cfg.maxSilentNoteCount: self.stop(ms) else: assert(0) def _count_silent_notes( self ): annBegMs = self.eventTimeL[ self.pulse_idx ][0] annEndMs = self.eventTimeL[ self.pulse_idx ][1] minDurMs = self.cfg.silentNoteMinDurMs maxPulseUs = self.cfg.silentNoteMaxPulseUs resD = rms_analyze_one_rt_note_wrap( self.audio, annBegMs, annEndMs, self.pitch, self.pauseDurMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) print( " %4.1f db %4i ms %i" % (resD['hm']['db'], resD['hm']['durMs'], self.pulse_idx)) if resD is not None and resD['hm']['durMs'] < minDurMs and self.pulseUsL[ self.pulse_idx ] < maxPulseUs: self.silentNoteN += 1 print("SILENT", self.silentNoteN) else: self.silentNoteN = 0 return self.silentNoteN def _get_duty_cycle( self, pulseUsec ): return self.holdDutyPctD[ pulseUsec ] def _set_duty_cycle( self, pitch, pulseUsec ): dutyPct = self._get_duty_cycle( pulseUsec ) if dutyPct != self.prevHoldDutyPct: self.api.set_pwm_duty( pitch, dutyPct ) print("Hold Duty:",dutyPct) self.prevHoldDutyPct = dutyPct def _note_on( self, ms ): self.eventTimeL[ self.pulse_idx ][0] = self.audio.buffer_sample_ms().value self.next_ms = ms + self.noteDurMs self.state = 'note_off' pulse_usec = int(self.pulseUsL[ self.pulse_idx ]) self._set_duty_cycle( self.pitch, pulse_usec ) self.api.note_on_us( self.pitch, pulse_usec ) print("note-on:",self.pitch, self.pulse_idx, pulse_usec) def _note_off( self, ms ): self.eventTimeL[ self.pulse_idx ][1] = self.audio.buffer_sample_ms().value self.next_ms = ms + self.pauseDurMs self.state = 'note_on' if self.playOnlyFl: begTimeMs = self.eventTimeL[ self.pulse_idx ][0] endTimeMs = self.eventTimeL[ self.pulse_idx ][1] self.rtAnalyzer.analyze_note( self.audio, self.pitch, begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] ) self._send_note_off() def _send_note_off( self ): self.api.note_off( self.pitch ) #print("note-off:",self.pitch,self.pulse_idx) def _disable(self): self.state = None def _write( self ): d = { "pulseUsL":self.pulseUsL, "pitch":self.pitch, "noteDurMs":self.noteDurMs, "pauseDurMs":self.pauseDurMs, "holdDutyPctL":self.holdDutyPctL, "eventTimeL":self.eventTimeL, "beginMs":self.beginMs } print("Writing: ", self.outDir ) outDir = os.path.expanduser(self.outDir) if not os.path.isdir(outDir): os.mkdir(outDir) with open(os.path.join( outDir, "seq.json" ),"w") as f: f.write(json.dumps( d )) self.audio.write_buffer( os.path.join( outDir, "audio.wav" ) ) class CalibrateKeys: def __init__(self, cfg, audioDev, api): self.cfg = cfg self.seq = AttackPulseSeq( cfg, audioDev, api, noteDurMs=cfg.noteDurMs, pauseDurMs=cfg.pauseDurMs ) self.pulseUsL = None self.pitchL = None self.pitch_idx = -1 def start( self, ms, pitchL, pulseUsL, playOnlyFl=False ): if len(pitchL) > 0: self.pulseUsL = pulseUsL self.pitchL = pitchL self.pitch_idx = -1 self._start_next_note( ms, playOnlyFl ) def stop( self, ms ): self.pitch_idx = -1 self.seq.stop(ms) def is_enabled( self ): return self.pitch_idx >= 0 def tick( self, ms ): if self.is_enabled(): self.seq.tick(ms) # if the sequencer is done playing if not self.seq.is_enabled(): self._start_next_note( ms, self.seq.playOnlyFl ) # ... else start the next sequence return None def _start_next_note( self, ms, playOnlyFl ): self.pitch_idx += 1 # if the last note in pitchL has been played ... if self.pitch_idx >= len(self.pitchL): self.stop(ms) # ... then we are done else: pitch = self.pitchL[ self.pitch_idx ] # be sure that the base directory exists baseDir = os.path.expanduser( cfg.outDir ) if not os.path.isdir( baseDir ): os.mkdir( baseDir ) outDir = os.path.join(baseDir, str(pitch) ) if not os.path.isdir(outDir): os.mkdir(outDir) # get the next available output directory id outDir_id = self._calc_next_out_dir_id( outDir ) print(outDir_id,outDir) # if this is not the first time this note has been sampled then get the resample locations if (outDir_id == 0) or self.cfg.useFullPulseListFl: self.pulseUsL = self.cfg.full_pulseL else: #self.pulseUsL,_,_ = form_resample_pulse_time_list( outDir, self.cfg.analysisArgs ) self.pulseUsL = get_resample_points_wrap( baseDir, pitch, self.cfg.analysisArgs ) holdDutyPctL = self.cfg.calibrateArgs['holdDutyPctD'][pitch] if playOnlyFl: self.pulseUsL,_,holdDutyPctL = form_final_pulse_list( baseDir, pitch, self.cfg.analysisArgs, take_id=None ) noteN = cfg.analysisArgs['auditionNoteN'] self.pulseUsL = [ self.pulseUsL[ int(round(i*126.0/(noteN-1)))] for i in range(noteN) ] else: outDir = os.path.join( outDir, str(outDir_id) ) if not os.path.isdir(outDir): os.mkdir(outDir) #------------------------ j = 0 holdDutyPctD = {} for us in self.pulseUsL: if j+1= holdDutyPctL[j+1][0]: j += 1 holdDutyPctD[ us ] = holdDutyPctL[j][1] #------------------------ if self.cfg.reversePulseListFl: self.pulseUsL = [ us for us in reversed(self.pulseUsL) ] # start the sequencer self.seq.start( ms, outDir, pitch, self.pulseUsL, holdDutyPctL, holdDutyPctD, playOnlyFl ) def _calc_next_out_dir_id( self, outDir ): id = 0 while os.path.isdir( os.path.join(outDir,"%i" % id)): id += 1 return id # This is the main application API it is running in a child process. class App: def __init__(self ): self.cfg = None self.audioDev = None self.api = None self.cal_keys = None self.keyboard = None self.calibrate = None self.midiFilePlayer = None def setup( self, cfg ): self.cfg = cfg self.audioDev = AudioDevice() self.midiDev = MidiDevice() res = None # # TODO: unify the result error handling # (the API and the audio device return two diferent 'Result' types # if hasattr(cfg,'audio'): res = self.audioDev.setup(**cfg.audio) if not res: self.audio_dev_list(0) else: print("The cfg file has no audio stanza. No audio device will be initialized.") self.audioDev = None if True: if hasattr(cfg,'midi'): res = self.midiDev.setup(**cfg.midi) if not res: self.midi_dev_list(0) else: self.midiDev = None self.api = Picadae( key_mapL=cfg.key_mapL) # wait for the letter 'a' to come back from the serial port api_res = self.api.wait_for_serial_sync(timeoutMs=cfg.serial_sync_timeout_ms) # did the serial port sync fail? if not api_res: res.set_error("Serial port sync failed.") else: print("Serial port sync'ed") self.cal_keys = CalibrateKeys( cfg, self.audioDev, self.api ) self.keyboard = Keyboard( cfg, self.audioDev, self.api ) self.calibrate = None #Calibrate( cfg.calibrateArgs, self.audioDev, self.midiDev, self.api ) self.midiFilePlayer = MidiFilePlayer( cfg, self.api, self.midiDev, cfg.midiFileFn ) return res def tick( self, ms ): if self.audioDev is not None: self.audioDev.tick(ms) if self.cal_keys: self.cal_keys.tick(ms) if self.keyboard: self.keyboard.tick(ms) if self.calibrate: self.calibrate.tick(ms) if self.midiFilePlayer: self.midiFilePlayer.tick(ms) def audio_dev_list( self, ms ): if self.audioDev is not None: portL = self.audioDev.get_port_list( True ) for port in portL: print("chs:%4i label:%s" % (port['chN'],port['label'])) def midi_dev_list( self, ms ): d = self.midiDev.get_port_list( True ) for port in d['listL']: print("IN:",port) d = self.midiDev.get_port_list( False ) for port in d['listL']: print("OUT:",port) def calibrate_keys_start( self, ms, pitchRangeL ): pitchL = [ pitch for pitch in range(pitchRangeL[0], pitchRangeL[1]+1)] self.cal_keys.start( ms, pitchL, cfg.full_pulseL ) def play_keys_start( self, ms, pitchRangeL ): chordL = [ pitch for pitch in range(pitchRangeL[0], pitchRangeL[1]+1)] self.cal_keys.start( ms, chordL, cfg.full_pulseL, playOnlyFl=True ) def keyboard_start_pulse_idx( self, ms, argL ): pitchL = [ pitch for pitch in range(argL[0], argL[1]+1)] self.keyboard.start( ms, pitchL, argL[2], None ) def keyboard_repeat_pulse_idx( self, ms, argL ): self.keyboard.repeat( ms, argL[0], None ) def keyboard_start_target_db( self, ms, argL ): pitchL = [ pitch for pitch in range(argL[0], argL[1]+1)] self.keyboard.start( ms, pitchL, None, argL[2] ) def keyboard_repeat_target_db( self, ms, argL ): self.keyboard.repeat( ms, None, argL[0] ) def calibrate_start( self, ms, argL ): self.calibrate.start(ms) def calibrate_play( self, ms, argL ): self.calibrate.play(ms) def calibrate_keys_stop( self, ms ): self.cal_keys.stop(ms) self.keyboard.stop(ms) self.calibrate.stop(ms) def midi_file_player_start( self, ms ): self.midiFilePlayer.start(ms) def midi_file_player_stop( self, ms ): self.midiFilePlayer.stop(ms) def pedal_down( self, ms ): print("pedal_down") self.midiDev.send_controller(64, 100 ) def pedal_up( self, ms ): print("pedal_up"); self.midiDev.send_controller(64, 0 ) def quit( self, ms ): if self.api: self.api.close() def _send_error( pipe, res ): if res is None: return if res.msg: pipe.send( [{"type":"error", 'value':res.msg}] ) def _send_error_msg( pipe, msg ): _send_error( pipe, Result(None,msg)) def _send_quit( pipe ): pipe.send( [{ 'type':'quit' }] ) # This is the application engine async. process loop def app_event_loop_func( pipe, cfg ): multiprocessing.get_logger().info("App Proc Started.") # create the asynchronous application object app = App() res = app.setup(cfg) # if the app did not initialize successfully if not res: _send_error( pipe, res ) _send_quit(pipe) return dt0 = datetime.now() ms = 0 while True: # have any message arrived from the parent process? if pipe.poll(): msg = None try: msg = pipe.recv() except EOFError: return if not hasattr(app,msg.type): _send_error_msg( pipe, "Unknown message type:'%s'." % (msg.type) ) else: # get the command handler function in 'app' func = getattr(app,msg.type) ms = int(round( (datetime.now() - dt0).total_seconds() * 1000.0) ) # call the command handler if msg.value: res = func( ms, msg.value ) else: res = func( ms ) # handle any errors returned from the commands _send_error( pipe, res ) # if a 'quit' msg was recived then break out of the loop if msg.type == 'quit': _send_quit(pipe) break # give some time to the system time.sleep(0.05) # calc the tick() time stamp ms = int(round( (datetime.now() - dt0).total_seconds() * 1000.0) ) # tick the app app.tick( ms ) class AppProcess(Process): def __init__(self,cfg): self.parent_end, child_end = Pipe() super(AppProcess, self).__init__(target=app_event_loop_func,name="AppProcess",args=(child_end,cfg)) self.doneFl = False def send(self, d): # This function is called by the parent process to send an arbitrary msg to the App process self.parent_end.send( types.SimpleNamespace(**d) ) return None def recv(self): # This function is called by the parent process to receive lists of child messages. msgL = None if not self.doneFl and self.parent_end.poll(): msgL = self.parent_end.recv() for msg in msgL: if msg['type'] == 'quit': self.doneFl = True return msgL def isdone(self): return self.doneFl class Shell: def __init__( self, cfg ): self.appProc = None self.parseD = { 'q':{ "func":'quit', "minN":0, "maxN":0, "help":"quit"}, '?':{ "func":"_help", "minN":0, "maxN":0, "help":"Print usage text."}, 'a':{ "func":"audio_dev_list", "minN":0, "maxN":0, "help":"List the audio devices."}, 'm':{ "func":"midi_dev_list", "minN":0, "maxN":0, "help":"List the MIDI devices."}, 'c':{ "func":"calibrate_keys_start", "minN":1, "maxN":2, "help":"Calibrate a range of keys. "}, 'd':{ "func":"calibrate_start", "minN":1, "maxN":1, "help":"Calibrate based on fixed db levels. "}, 'D':{ "func":"calibrate_play", "minN":1, "maxN":1, "help":"Play back last calibration."}, 's':{ "func":"calibrate_keys_stop", "minN":0, "maxN":0, "help":"Stop key calibration"}, 'p':{ "func":"play_keys_start", "minN":1, "maxN":2, "help":"Play current calibration"}, 'k':{ "func":"keyboard_start_pulse_idx", "minN":3, "maxN":3, "help":"Play pulse index across keyboard"}, 'r':{ "func":"keyboard_repeat_pulse_idx", "minN":1, "maxN":1, "help":"Repeat pulse index across keyboard with new pulse_idx"}, 'K':{ "func":"keyboard_start_target_db", "minN":3, "maxN":3, "help":"Play db across keyboard"}, 'R':{ "func":"keyboard_repeat_target_db", "minN":1, "maxN":1, "help":"Repeat db across keyboard with new pulse_idx"}, 'F':{ "func":"midi_file_player_start", "minN":0, "maxN":0, "help":"Play the MIDI file."}, 'f':{ "func":"midi_file_player_stop", "minN":0, "maxN":0, "help":"Stop the MIDI file."}, 'P':{ "func":"pedal_down", "minN":0, "maxN":0, "help":"Pedal down."}, 'U':{ "func":"pedal_up", "minN":0, "maxN":0, "help":"Pedal up."}, } def _help( self, _=None ): for k,d in self.parseD.items(): s = "{} = {}".format( k, d['help'] ) print(s) return None def _syntaxError( self, msg ): return Result(None,"Syntax Error: " + msg ) def _exec_cmd( self, tokL ): if len(tokL) <= 0: return None opcode = tokL[0] if opcode not in self.parseD: return self._syntaxError("Unknown opcode: '{}'.".format(opcode)) d = self.parseD[ opcode ] func_name = d['func'] func = None # find the function associated with this command if hasattr(self, func_name ): func = getattr(self, func_name ) try: # convert the parameter list into integers argL = [ int(tokL[i]) for i in range(1,len(tokL)) ] except: return self._syntaxError("Unable to create integer arguments.") # validate the count of command args if d['minN'] != -1 and (d['minN'] > len(argL) or len(argL) > d['maxN']): return self._syntaxError("Argument count mismatch. {} is out of range:{} to {}".format(len(argL),d['minN'],d['maxN'])) # call the command function if func: result = func(*argL) else: result = self.appProc.send( { 'type':func_name, 'value':argL } ) return result def run( self ): # create the API object self.appProc = AppProcess(cfg) self.appProc.start() print("'q'=quit '?'=help") time_out_secs = 1 # this is the shell main loop while True: # wait for keyboard activity i, o, e = select.select( [sys.stdin], [], [], time_out_secs ) if i: # read the command s = sys.stdin.readline().strip() # tokenize the command tokL = s.split(' ') # execute the command result = self._exec_cmd( tokL ) # if this is the 'quit' command if tokL[0] == 'q': break # check for msg's from the async application process if self._handle_app_msgs( self.appProc.recv() ): break # wait for the appProc to complete while not self.appProc.isdone(): self.appProc.recv() # drain the AppProc() as it shutdown time.sleep(0.1) def _handle_app_msgs( self, msgL ): quitAppFl = False if msgL: for msg in msgL: if msg: if msg['type'] == 'error': print("Error: {}".format(msg['value'])) elif msg['type'] == 'quit': quitAppFl = True else: print(msg) return quitAppFl def parse_args(): """Parse the command line arguments.""" descStr = """Picadae auto-calibrate.""" logL = ['debug','info','warning','error','critical'] ap = argparse.ArgumentParser(description=descStr) ap.add_argument("-c","--config", default="p_ac.yml", help="YAML configuration file.") ap.add_argument("-l","--log_level",choices=logL, default="warning", help="Set logging level: debug,info,warning,error,critical. Default:warning") return ap.parse_args() def create_logger(): import multiprocessing, logging logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) formatter = logging.Formatter(\ '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s') handler = logging.FileHandler('/home/kevin/temp/p_ac_log.txt') handler.setFormatter(formatter) # this bit will make sure you won't have # duplicated messages in the output if not len(logger.handlers): logger.addHandler(handler) return logger if __name__ == "__main__": from multiprocessing import Pool logger = create_logger() logger.info('Starting pooling') #p = Pool() #logging.basicConfig() #mplog = multiprocessing.log_to_stderr() #mplog.setLevel(logging.INFO) args = parse_args() cfg = parse_yaml_cfg(args.config) shell = Shell(cfg) shell.run()