commit c3db8c8ab601b344cd01a847053a01aa91f08eaf Author: kpl Date: Mon Aug 19 21:18:57 2019 -0400 Initial commit. diff --git a/AudioDevice.py b/AudioDevice.py new file mode 100644 index 0000000..51c9301 --- /dev/null +++ b/AudioDevice.py @@ -0,0 +1,256 @@ +import pprint,queue +import sounddevice as sd +import soundfile as sf +import numpy as np + +from result import Result + +class AudioDevice(object): + def __init__(self): + self.inputPortIdx = None + self.outputPortIdx = None + self.recordFl = False + self.sineFl = False + self.inStream = None + self.queue = queue.Queue() + self.bufL = [] + self.bufIdx = -1 + self.srate = 0 + self.ch_cnt = 1 + + + def setup( self, **kwargs ): + + res = Result() + + if kwargs is None: + return res + + if 'inPortLabel' in kwargs: + res += self.select_port( True, kwargs['inPortLabel'] ) + + if 'outPortLabel' in kwargs: + res += self.select_port( False, kwargs['outPortLabel'] ) + + return res + + def get_port_label( self, inFl ): + + portL = self.get_port_list(inFl) + portIdx = self.inputPortIdx if inFl else self.outputPortIdx + portLabel = None + + if portL and (portIdx is not None): + for port in portL: + if portIdx == port['index']: + portLabel = port['label'] + break + + return portLabel + + + def get_state( self ): + + d= { + 'recordFl': self.inStream is not None and self.inStream.active, + 'sineFl': self.sineFl, + 'inPortL': [ d['label'] for d in self.get_port_list(True) ], + 'outPortL': [ d['label'] for d in self.get_port_list(False) ], + 'inPortLabel': self.get_port_label(True), + 'outPortLabel': self.get_port_label(False) + } + + return d + + def get_port_list( self, inFl ): + + devLabelL = str(sd.query_devices()).split("\n") + + portL = [] + for i,dev in enumerate(sd.query_devices()): + isInputFl = dev['max_input_channels'] > 0 + isOutputFl = dev['max_output_channels'] > 0 + if (inFl and isInputFl) or (not inFl and isOutputFl): + portL.append({ 'chN':dev['max_input_channels'], 'label':devLabelL[i].strip(), 'index':i } ) + + return portL + + def get_in_port_list( self ): + return get_port_list( True ) + + def get_out_port_list( self ): + return get_port_list( False ) + + def select_port( self, inFl, portLabel ): + + res = Result() + + if inFl and self.inStream is not None and self.inStream.active: + return res.set_error("A new port may not be selected while it is active." % (portLabel)) + + devLabelL = str(sd.query_devices()).split("\n") + foundFl = False + + if portLabel is None and len(devLabelL)>0: + portLabel = devLabelL[0] + + portLabel = portLabel.strip() + N = len(portLabel) + + # for each device + for i,label in enumerate(devLabelL): + + label = label.strip() + + # if the first N char's of this label match the requested port label + if len(label)>=N and label[0:N] == portLabel: + if inFl: + self.inputPortIdx = i + else: + self.outputPortIdx = i + foundFl = True + + if inFl: + dev = sd.query_devices()[i] + self.srate = dev['default_samplerate'] + if self.inStream: + self.inStream.close() + + self.inStream = sd.InputStream(samplerate=self.srate, device=self.inputPortIdx, channels=self.ch_cnt, callback=self._record_callback, dtype=np.dtype('float32')) + + + break + + if not foundFl: + res.set_error("Unable to locate the audio port named: '%s'." % (portLabel)) + + return res + + def record_enable( self, enableFl ): + + # if the input stream has not already been configured + if enableFl and self.inStream is None: + return Result(None,'Recording cannot start because a recording input port has not been selected.') + + if not enableFl and self.inStream is None: + return Result() + + # if the input stream already matches the 'enableFl'. + if enableFl == self.inStream.active: + return Result() + + # if we are starting recording + if enableFl: + + try: + self.bufL = [] + self.inStream.start() + + except Exception as ex: + return Result(None,'The recording input stream could not be started. Reason: %s' % str(ex)) + + else: + self.inStream.stop() + + return Result() + + def play_buffer( self ): + if not self.playFl: + self.playFl = True + + def play_end( self ): + if self.playFl: + self.playFl = False + + def write_buffer( self, fn=None ): + + if fn is None: + fn = "temp.wav" + + with sf.SoundFile(fn,'w',samplerate=int(self.srate), channels=int(self.ch_cnt)) as f: + for i,buf in enumerate(self.bufL): + N = buf.shape[0] if i bufN: + + # store as much of the incoming vector as possible + if len(self.bufL) > 0: + self.bufL[-1][self.bufIdx:,:] = v[0:bufN,:] + + vi = bufN # increment the starting position of the src vector + vN -= bufN # decrement the count of samples that needs to be stored + + # add an empty buffer to bufL[] + N = int(self.srate * 60) # add a one minute buffer to bufL + self.bufL.append( np.zeros( (N, self.ch_cnt), dtype=np.dtype('float32') ) ) + self.bufIdx = 0 + + self.bufL[-1][self.bufIdx:vN,:] = v[vi:,] + self.bufIdx += vN + + else: + self.bufL[-1][self.bufIdx:self.bufIdx+vN,:] = v + self.bufIdx += vN + + + +if __name__ == "__main__": + + ad = AudioDevice() + ad.get_port_list(True) + diff --git a/convert.py b/convert.py new file mode 100644 index 0000000..fe1f103 --- /dev/null +++ b/convert.py @@ -0,0 +1,80 @@ +import os,json,pickle,csv +from shutil import copyfile + +def event_times( eventTimeFn ): + + eventL = [] + + with open(eventTimeFn,"r") as f: + + rdr = csv.reader(f) + + for row in rdr: + if row[0] == 'start': + beginMs = int(row[1]) + elif row[0] == 'key_down': + key_downMs = int(row[1]) - beginMs + elif row[0] == 'key_up': + key_upMs = row[1] + + eventL.append( [ key_downMs, key_downMs+1000 ] ) + + return eventL + +def pulse_lengths( pulseLenFn ): + + with open(pulseLenFn,'rb') as f: + d = pickle.load(f) + msL = d['msL'] + # note: first posn in table is a multiplier + return [ msL[i]*msL[0] for i in range(1,len(msL))] + + +def convert( inDir, outDir ): + + if not os.path.isdir(outDir): + os.mkdir(outDir) + + for dirStr in os.listdir(inDir): + + idir = os.path.join( inDir, dirStr ) + + if os.path.isdir(idir): + + eventTimeFn = os.path.join( idir, "labels_0.csv" ) + + eventTimeL = event_times(eventTimeFn) + + pulseTimeFn = os.path.join( idir, "table_0.pickle") + + pulseUsL = pulse_lengths( pulseTimeFn ) + + pitch = idir.split("/")[-1] + + + d = { + "pulseUsL":pulseUsL, + "pitchL":[ pitch ], + "noteDurMs":1000, + "pauseDurMs":0, + "holdDutyPct":50, + "eventTimeL":eventTimeL, + "beginMs":0 + } + + odir = os.path.join( outDir, pitch ) + if not os.path.isdir(odir): + os.mkdir(odir) + + with open(os.path.join( odir, "seq.json" ),"w") as f: + f.write(json.dumps( d )) + + copyfile( os.path.join(idir,"audio_0.wav"), os.path.join(odir,"audio.wav")) + + +if __name__ == "__main__": + inDir = "/home/kevin/temp/picadae_ac_2/full_map" + outDir = "/home/kevin/temp/p_ac_3_cvt" + + convert( inDir, outDir ) + diff --git a/p_ac.py b/p_ac.py new file mode 100644 index 0000000..a77e563 --- /dev/null +++ b/p_ac.py @@ -0,0 +1,522 @@ +import sys,os,argparse,yaml,types,logging,select,time,json +from datetime import datetime + +import multiprocessing +from multiprocessing import Process, Pipe + +from picadae_api import Picadae +from AudioDevice import AudioDevice +from result import Result + +class AttackPulseSeq: + """ Sequence a fixed chord over a list of attack pulse lengths.""" + + def __init__(self, audio, api, noteDurMs=1000, pauseDurMs=1000, holdDutyPct=50 ): + self.audio = audio + self.api = api + self.outDir = None # directory to write audio file and results + self.pitchL = None # chord to play + self.pulseUsL = [] # one onset pulse length in microseconds per sequence element + self.noteDurMs = noteDurMs # duration of each chord in milliseconds + self.pauseDurMs = pauseDurMs # duration between end of previous note and start of next + self.holdDutyPct = holdDutyPct # hold voltage duty cycle as a percentage (0-100) + + self.pulse_idx = 0 # Index of next pulse + self.state = None # 'note_on','note_off' + self.next_ms = 0 # Time of next event (note-on or note_off) + self.eventTimeL = [] # Onset/offset time of each note [ [onset_ms,offset_ms] ] + self.beginMs = 0 + + def start( self, ms, outDir, pitchL, pulseUsL ): + self.outDir = outDir # directory to write audio file and results + self.pitchL = pitchL # chord to play + self.pulseUsL = pulseUsL # one onset pulse length in microseconds per sequence element + + self.pulse_idx = 0 + self.state = 'note_on' + self.next_ms = ms + 500 # wait for 500ms to play the first note (this will guarantee that there is some empty space in the audio file before the first note) + self.eventTimeL = [[0,0]] * len(pulseUsL) # initialize the event time + self.beginMs = ms + self.audio.record_enable(True) # start recording audio + self.tick(ms) # play the first note + + def stop(self, ms): + self._send_note_off() # be sure that all notes are actually turn-off + self.audio.record_enable(False) # stop recording audio + self._disable() # disable this sequencer + self._write() # write the results + + def is_enabled(self): + return self.state is not None + + def tick(self, ms): + + self.audio.tick(ms) + + # if next event time has arrived + if self.is_enabled() and ms >= self.next_ms: + + # if waiting to turn note on + if self.state == 'note_on': + self._note_on(ms) + + # if waiting to turn a note off + elif self.state == 'note_off': + self._note_off(ms) + self.pulse_idx += 1 + + # if all notes have been played + if self.pulse_idx >= len(self.pulseUsL): + self.stop(ms) + + else: + assert(0) + + + def _note_on( self, ms ): + + self.eventTimeL[ self.pulse_idx ][0] = ms - self.beginMs + self.next_ms = ms + self.noteDurMs + self.state = 'note_off' + + for pitch in self.pitchL: + self.api.note_on_us( pitch, int(self.pulseUsL[ self.pulse_idx ]) ) + print("note-on:",pitch,self.pulse_idx) + + def _note_off( self, ms ): + self.eventTimeL[ self.pulse_idx ][1] = ms - self.beginMs + self.next_ms = ms + self.pauseDurMs + self.state = 'note_on' + + self._send_note_off() + + + def _send_note_off( self ): + for pitch in self.pitchL: + self.api.note_off( pitch ) + print("note-off:",pitch,self.pulse_idx) + + def _disable(self): + self.state = None + + + def _write( self ): + + d = { + "pulseUsL":self.pulseUsL, + "pitchL":self.pitchL, + "noteDurMs":self.noteDurMs, + "pauseDurMs":self.pauseDurMs, + "holdDutyPct":self.holdDutyPct, + "eventTimeL":self.eventTimeL, + "beginMs":self.beginMs + } + + print("Writing: ", self.outDir ) + + outDir = os.path.expanduser(self.outDir) + + if not os.path.isdir(outDir): + os.mkdir(outDir) + + with open(os.path.join( outDir, "seq.json" ),"w") as f: + f.write(json.dumps( d )) + + self.audio.write_buffer( os.path.join( outDir, "audio.wav" ) ) + + +class CalibrateKeys: + def __init__(self, cfg, audioDev, api): + self.cfg = cfg + self.seq = AttackPulseSeq( audioDev, api, noteDurMs=1000, pauseDurMs=1000, holdDutyPct=50 ) + + self.label = None + self.pulseUsL = None + self.chordL = None + self.pitch_idx = -1 + + + def start( self, ms, label, chordL, pulseUsL ): + if len(chordL) > 0: + self.label = label + self.pulseUsL = pulseUsL + self.chordL = chordL + self.pitch_idx = -1 + self._start_next_chord( ms ) + + + def stop( self, ms ): + self.pitch_idx = -1 + self.seq.stop(ms) + + def is_enabled( self ): + return self.pitch_idx >= 0 + + def tick( self, ms ): + if self.is_enabled(): + self.seq.tick(ms) + + # if the sequencer is done playing + if not self.seq.is_enabled(): + self._start_next_chord( ms ) # ... else start the next sequence + + return None + + def _start_next_chord( self, ms ): + + self.pitch_idx += 1 + + # if the last chord in chordL has been played ... + if self.pitch_idx >= len(self.chordL): + self.stop(ms) # ... then we are done + else: + + pitchL = self.chordL[ self.pitch_idx ] + + # be sure that the base directory exists + outDir = os.path.expanduser( cfg.outDir ) + if not os.path.isdir( outDir ): + os.mkdir( outDir ) + + # form the output directory as "