Initial commit.
This commit is contained in:
commit
c3db8c8ab6
256
AudioDevice.py
Normal file
256
AudioDevice.py
Normal file
@ -0,0 +1,256 @@
|
||||
import pprint,queue
|
||||
import sounddevice as sd
|
||||
import soundfile as sf
|
||||
import numpy as np
|
||||
|
||||
from result import Result
|
||||
|
||||
class AudioDevice(object):
|
||||
def __init__(self):
|
||||
self.inputPortIdx = None
|
||||
self.outputPortIdx = None
|
||||
self.recordFl = False
|
||||
self.sineFl = False
|
||||
self.inStream = None
|
||||
self.queue = queue.Queue()
|
||||
self.bufL = []
|
||||
self.bufIdx = -1
|
||||
self.srate = 0
|
||||
self.ch_cnt = 1
|
||||
|
||||
|
||||
def setup( self, **kwargs ):
|
||||
|
||||
res = Result()
|
||||
|
||||
if kwargs is None:
|
||||
return res
|
||||
|
||||
if 'inPortLabel' in kwargs:
|
||||
res += self.select_port( True, kwargs['inPortLabel'] )
|
||||
|
||||
if 'outPortLabel' in kwargs:
|
||||
res += self.select_port( False, kwargs['outPortLabel'] )
|
||||
|
||||
return res
|
||||
|
||||
def get_port_label( self, inFl ):
|
||||
|
||||
portL = self.get_port_list(inFl)
|
||||
portIdx = self.inputPortIdx if inFl else self.outputPortIdx
|
||||
portLabel = None
|
||||
|
||||
if portL and (portIdx is not None):
|
||||
for port in portL:
|
||||
if portIdx == port['index']:
|
||||
portLabel = port['label']
|
||||
break
|
||||
|
||||
return portLabel
|
||||
|
||||
|
||||
def get_state( self ):
|
||||
|
||||
d= {
|
||||
'recordFl': self.inStream is not None and self.inStream.active,
|
||||
'sineFl': self.sineFl,
|
||||
'inPortL': [ d['label'] for d in self.get_port_list(True) ],
|
||||
'outPortL': [ d['label'] for d in self.get_port_list(False) ],
|
||||
'inPortLabel': self.get_port_label(True),
|
||||
'outPortLabel': self.get_port_label(False)
|
||||
}
|
||||
|
||||
return d
|
||||
|
||||
def get_port_list( self, inFl ):
|
||||
|
||||
devLabelL = str(sd.query_devices()).split("\n")
|
||||
|
||||
portL = []
|
||||
for i,dev in enumerate(sd.query_devices()):
|
||||
isInputFl = dev['max_input_channels'] > 0
|
||||
isOutputFl = dev['max_output_channels'] > 0
|
||||
if (inFl and isInputFl) or (not inFl and isOutputFl):
|
||||
portL.append({ 'chN':dev['max_input_channels'], 'label':devLabelL[i].strip(), 'index':i } )
|
||||
|
||||
return portL
|
||||
|
||||
def get_in_port_list( self ):
|
||||
return get_port_list( True )
|
||||
|
||||
def get_out_port_list( self ):
|
||||
return get_port_list( False )
|
||||
|
||||
def select_port( self, inFl, portLabel ):
|
||||
|
||||
res = Result()
|
||||
|
||||
if inFl and self.inStream is not None and self.inStream.active:
|
||||
return res.set_error("A new port may not be selected while it is active." % (portLabel))
|
||||
|
||||
devLabelL = str(sd.query_devices()).split("\n")
|
||||
foundFl = False
|
||||
|
||||
if portLabel is None and len(devLabelL)>0:
|
||||
portLabel = devLabelL[0]
|
||||
|
||||
portLabel = portLabel.strip()
|
||||
N = len(portLabel)
|
||||
|
||||
# for each device
|
||||
for i,label in enumerate(devLabelL):
|
||||
|
||||
label = label.strip()
|
||||
|
||||
# if the first N char's of this label match the requested port label
|
||||
if len(label)>=N and label[0:N] == portLabel:
|
||||
if inFl:
|
||||
self.inputPortIdx = i
|
||||
else:
|
||||
self.outputPortIdx = i
|
||||
foundFl = True
|
||||
|
||||
if inFl:
|
||||
dev = sd.query_devices()[i]
|
||||
self.srate = dev['default_samplerate']
|
||||
if self.inStream:
|
||||
self.inStream.close()
|
||||
|
||||
self.inStream = sd.InputStream(samplerate=self.srate, device=self.inputPortIdx, channels=self.ch_cnt, callback=self._record_callback, dtype=np.dtype('float32'))
|
||||
|
||||
|
||||
break
|
||||
|
||||
if not foundFl:
|
||||
res.set_error("Unable to locate the audio port named: '%s'." % (portLabel))
|
||||
|
||||
return res
|
||||
|
||||
def record_enable( self, enableFl ):
|
||||
|
||||
# if the input stream has not already been configured
|
||||
if enableFl and self.inStream is None:
|
||||
return Result(None,'Recording cannot start because a recording input port has not been selected.')
|
||||
|
||||
if not enableFl and self.inStream is None:
|
||||
return Result()
|
||||
|
||||
# if the input stream already matches the 'enableFl'.
|
||||
if enableFl == self.inStream.active:
|
||||
return Result()
|
||||
|
||||
# if we are starting recording
|
||||
if enableFl:
|
||||
|
||||
try:
|
||||
self.bufL = []
|
||||
self.inStream.start()
|
||||
|
||||
except Exception as ex:
|
||||
return Result(None,'The recording input stream could not be started. Reason: %s' % str(ex))
|
||||
|
||||
else:
|
||||
self.inStream.stop()
|
||||
|
||||
return Result()
|
||||
|
||||
def play_buffer( self ):
|
||||
if not self.playFl:
|
||||
self.playFl = True
|
||||
|
||||
def play_end( self ):
|
||||
if self.playFl:
|
||||
self.playFl = False
|
||||
|
||||
def write_buffer( self, fn=None ):
|
||||
|
||||
if fn is None:
|
||||
fn = "temp.wav"
|
||||
|
||||
with sf.SoundFile(fn,'w',samplerate=int(self.srate), channels=int(self.ch_cnt)) as f:
|
||||
for i,buf in enumerate(self.bufL):
|
||||
N = buf.shape[0] if i<len(self.bufL)-1 else self.bufIdx
|
||||
f.write(buf[0:N,])
|
||||
|
||||
return Result()
|
||||
|
||||
def buffer_sample_count( self ):
|
||||
smpN = 0
|
||||
for i in range(len(self.bufL)):
|
||||
smpN += self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
|
||||
|
||||
return Result(smpN)
|
||||
|
||||
def linear_buffer( self ):
|
||||
|
||||
smpN = self.buffer_sample_count()
|
||||
bV = np.zeros( (smpN,self.ch_cnt) )
|
||||
|
||||
bi = 0
|
||||
for i in range(len(self.bufL)):
|
||||
bufSmpN = self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
|
||||
bV[ bi:bi+bufSmpN, ] = self.bufL[i][0:bufSmpN]
|
||||
bi += bufSmpN
|
||||
|
||||
return Result(bV)
|
||||
|
||||
|
||||
def _record_callback(self, v, frames, time, status):
|
||||
"""This is called (from a separate thread) for each audio block."""
|
||||
# send audio to the app
|
||||
self.queue.put(v.copy())
|
||||
|
||||
def tick( self, ms ):
|
||||
|
||||
try:
|
||||
while True:
|
||||
|
||||
# get audio from the incoming audio thread
|
||||
v = self.queue.get_nowait()
|
||||
|
||||
#print(type(v),len(v),np.result_type(v))
|
||||
self._update_buffer(v)
|
||||
|
||||
except queue.Empty: # queue was empty
|
||||
pass
|
||||
|
||||
return Result()
|
||||
|
||||
|
||||
def _update_buffer(self,v ):
|
||||
|
||||
# get the length of the last buffer in bufL
|
||||
bufN = 0 if len(self.bufL)==0 else self.bufL[-1].shape[0] - self.bufIdx
|
||||
|
||||
# get the length of the incoming sample vector
|
||||
vN = v.shape[0]
|
||||
|
||||
# if there are more incoming samples than space in the buffer
|
||||
if vN > bufN:
|
||||
|
||||
# store as much of the incoming vector as possible
|
||||
if len(self.bufL) > 0:
|
||||
self.bufL[-1][self.bufIdx:,:] = v[0:bufN,:]
|
||||
|
||||
vi = bufN # increment the starting position of the src vector
|
||||
vN -= bufN # decrement the count of samples that needs to be stored
|
||||
|
||||
# add an empty buffer to bufL[]
|
||||
N = int(self.srate * 60) # add a one minute buffer to bufL
|
||||
self.bufL.append( np.zeros( (N, self.ch_cnt), dtype=np.dtype('float32') ) )
|
||||
self.bufIdx = 0
|
||||
|
||||
self.bufL[-1][self.bufIdx:vN,:] = v[vi:,]
|
||||
self.bufIdx += vN
|
||||
|
||||
else:
|
||||
self.bufL[-1][self.bufIdx:self.bufIdx+vN,:] = v
|
||||
self.bufIdx += vN
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
ad = AudioDevice()
|
||||
ad.get_port_list(True)
|
||||
|
80
convert.py
Normal file
80
convert.py
Normal file
@ -0,0 +1,80 @@
|
||||
import os,json,pickle,csv
|
||||
from shutil import copyfile
|
||||
|
||||
def event_times( eventTimeFn ):
|
||||
|
||||
eventL = []
|
||||
|
||||
with open(eventTimeFn,"r") as f:
|
||||
|
||||
rdr = csv.reader(f)
|
||||
|
||||
for row in rdr:
|
||||
if row[0] == 'start':
|
||||
beginMs = int(row[1])
|
||||
elif row[0] == 'key_down':
|
||||
key_downMs = int(row[1]) - beginMs
|
||||
elif row[0] == 'key_up':
|
||||
key_upMs = row[1]
|
||||
|
||||
eventL.append( [ key_downMs, key_downMs+1000 ] )
|
||||
|
||||
return eventL
|
||||
|
||||
def pulse_lengths( pulseLenFn ):
|
||||
|
||||
with open(pulseLenFn,'rb') as f:
|
||||
d = pickle.load(f)
|
||||
msL = d['msL']
|
||||
# note: first posn in table is a multiplier
|
||||
return [ msL[i]*msL[0] for i in range(1,len(msL))]
|
||||
|
||||
|
||||
def convert( inDir, outDir ):
|
||||
|
||||
if not os.path.isdir(outDir):
|
||||
os.mkdir(outDir)
|
||||
|
||||
for dirStr in os.listdir(inDir):
|
||||
|
||||
idir = os.path.join( inDir, dirStr )
|
||||
|
||||
if os.path.isdir(idir):
|
||||
|
||||
eventTimeFn = os.path.join( idir, "labels_0.csv" )
|
||||
|
||||
eventTimeL = event_times(eventTimeFn)
|
||||
|
||||
pulseTimeFn = os.path.join( idir, "table_0.pickle")
|
||||
|
||||
pulseUsL = pulse_lengths( pulseTimeFn )
|
||||
|
||||
pitch = idir.split("/")[-1]
|
||||
|
||||
|
||||
d = {
|
||||
"pulseUsL":pulseUsL,
|
||||
"pitchL":[ pitch ],
|
||||
"noteDurMs":1000,
|
||||
"pauseDurMs":0,
|
||||
"holdDutyPct":50,
|
||||
"eventTimeL":eventTimeL,
|
||||
"beginMs":0
|
||||
}
|
||||
|
||||
odir = os.path.join( outDir, pitch )
|
||||
if not os.path.isdir(odir):
|
||||
os.mkdir(odir)
|
||||
|
||||
with open(os.path.join( odir, "seq.json" ),"w") as f:
|
||||
f.write(json.dumps( d ))
|
||||
|
||||
copyfile( os.path.join(idir,"audio_0.wav"), os.path.join(odir,"audio.wav"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
inDir = "/home/kevin/temp/picadae_ac_2/full_map"
|
||||
outDir = "/home/kevin/temp/p_ac_3_cvt"
|
||||
|
||||
convert( inDir, outDir )
|
||||
|
522
p_ac.py
Normal file
522
p_ac.py
Normal file
@ -0,0 +1,522 @@
|
||||
import sys,os,argparse,yaml,types,logging,select,time,json
|
||||
from datetime import datetime
|
||||
|
||||
import multiprocessing
|
||||
from multiprocessing import Process, Pipe
|
||||
|
||||
from picadae_api import Picadae
|
||||
from AudioDevice import AudioDevice
|
||||
from result import Result
|
||||
|
||||
class AttackPulseSeq:
|
||||
""" Sequence a fixed chord over a list of attack pulse lengths."""
|
||||
|
||||
def __init__(self, audio, api, noteDurMs=1000, pauseDurMs=1000, holdDutyPct=50 ):
|
||||
self.audio = audio
|
||||
self.api = api
|
||||
self.outDir = None # directory to write audio file and results
|
||||
self.pitchL = None # chord to play
|
||||
self.pulseUsL = [] # one onset pulse length in microseconds per sequence element
|
||||
self.noteDurMs = noteDurMs # duration of each chord in milliseconds
|
||||
self.pauseDurMs = pauseDurMs # duration between end of previous note and start of next
|
||||
self.holdDutyPct = holdDutyPct # hold voltage duty cycle as a percentage (0-100)
|
||||
|
||||
self.pulse_idx = 0 # Index of next pulse
|
||||
self.state = None # 'note_on','note_off'
|
||||
self.next_ms = 0 # Time of next event (note-on or note_off)
|
||||
self.eventTimeL = [] # Onset/offset time of each note [ [onset_ms,offset_ms] ]
|
||||
self.beginMs = 0
|
||||
|
||||
def start( self, ms, outDir, pitchL, pulseUsL ):
|
||||
self.outDir = outDir # directory to write audio file and results
|
||||
self.pitchL = pitchL # chord to play
|
||||
self.pulseUsL = pulseUsL # one onset pulse length in microseconds per sequence element
|
||||
|
||||
self.pulse_idx = 0
|
||||
self.state = 'note_on'
|
||||
self.next_ms = ms + 500 # wait for 500ms to play the first note (this will guarantee that there is some empty space in the audio file before the first note)
|
||||
self.eventTimeL = [[0,0]] * len(pulseUsL) # initialize the event time
|
||||
self.beginMs = ms
|
||||
self.audio.record_enable(True) # start recording audio
|
||||
self.tick(ms) # play the first note
|
||||
|
||||
def stop(self, ms):
|
||||
self._send_note_off() # be sure that all notes are actually turn-off
|
||||
self.audio.record_enable(False) # stop recording audio
|
||||
self._disable() # disable this sequencer
|
||||
self._write() # write the results
|
||||
|
||||
def is_enabled(self):
|
||||
return self.state is not None
|
||||
|
||||
def tick(self, ms):
|
||||
|
||||
self.audio.tick(ms)
|
||||
|
||||
# if next event time has arrived
|
||||
if self.is_enabled() and ms >= self.next_ms:
|
||||
|
||||
# if waiting to turn note on
|
||||
if self.state == 'note_on':
|
||||
self._note_on(ms)
|
||||
|
||||
# if waiting to turn a note off
|
||||
elif self.state == 'note_off':
|
||||
self._note_off(ms)
|
||||
self.pulse_idx += 1
|
||||
|
||||
# if all notes have been played
|
||||
if self.pulse_idx >= len(self.pulseUsL):
|
||||
self.stop(ms)
|
||||
|
||||
else:
|
||||
assert(0)
|
||||
|
||||
|
||||
def _note_on( self, ms ):
|
||||
|
||||
self.eventTimeL[ self.pulse_idx ][0] = ms - self.beginMs
|
||||
self.next_ms = ms + self.noteDurMs
|
||||
self.state = 'note_off'
|
||||
|
||||
for pitch in self.pitchL:
|
||||
self.api.note_on_us( pitch, int(self.pulseUsL[ self.pulse_idx ]) )
|
||||
print("note-on:",pitch,self.pulse_idx)
|
||||
|
||||
def _note_off( self, ms ):
|
||||
self.eventTimeL[ self.pulse_idx ][1] = ms - self.beginMs
|
||||
self.next_ms = ms + self.pauseDurMs
|
||||
self.state = 'note_on'
|
||||
|
||||
self._send_note_off()
|
||||
|
||||
|
||||
def _send_note_off( self ):
|
||||
for pitch in self.pitchL:
|
||||
self.api.note_off( pitch )
|
||||
print("note-off:",pitch,self.pulse_idx)
|
||||
|
||||
def _disable(self):
|
||||
self.state = None
|
||||
|
||||
|
||||
def _write( self ):
|
||||
|
||||
d = {
|
||||
"pulseUsL":self.pulseUsL,
|
||||
"pitchL":self.pitchL,
|
||||
"noteDurMs":self.noteDurMs,
|
||||
"pauseDurMs":self.pauseDurMs,
|
||||
"holdDutyPct":self.holdDutyPct,
|
||||
"eventTimeL":self.eventTimeL,
|
||||
"beginMs":self.beginMs
|
||||
}
|
||||
|
||||
print("Writing: ", self.outDir )
|
||||
|
||||
outDir = os.path.expanduser(self.outDir)
|
||||
|
||||
if not os.path.isdir(outDir):
|
||||
os.mkdir(outDir)
|
||||
|
||||
with open(os.path.join( outDir, "seq.json" ),"w") as f:
|
||||
f.write(json.dumps( d ))
|
||||
|
||||
self.audio.write_buffer( os.path.join( outDir, "audio.wav" ) )
|
||||
|
||||
|
||||
class CalibrateKeys:
|
||||
def __init__(self, cfg, audioDev, api):
|
||||
self.cfg = cfg
|
||||
self.seq = AttackPulseSeq( audioDev, api, noteDurMs=1000, pauseDurMs=1000, holdDutyPct=50 )
|
||||
|
||||
self.label = None
|
||||
self.pulseUsL = None
|
||||
self.chordL = None
|
||||
self.pitch_idx = -1
|
||||
|
||||
|
||||
def start( self, ms, label, chordL, pulseUsL ):
|
||||
if len(chordL) > 0:
|
||||
self.label = label
|
||||
self.pulseUsL = pulseUsL
|
||||
self.chordL = chordL
|
||||
self.pitch_idx = -1
|
||||
self._start_next_chord( ms )
|
||||
|
||||
|
||||
def stop( self, ms ):
|
||||
self.pitch_idx = -1
|
||||
self.seq.stop(ms)
|
||||
|
||||
def is_enabled( self ):
|
||||
return self.pitch_idx >= 0
|
||||
|
||||
def tick( self, ms ):
|
||||
if self.is_enabled():
|
||||
self.seq.tick(ms)
|
||||
|
||||
# if the sequencer is done playing
|
||||
if not self.seq.is_enabled():
|
||||
self._start_next_chord( ms ) # ... else start the next sequence
|
||||
|
||||
return None
|
||||
|
||||
def _start_next_chord( self, ms ):
|
||||
|
||||
self.pitch_idx += 1
|
||||
|
||||
# if the last chord in chordL has been played ...
|
||||
if self.pitch_idx >= len(self.chordL):
|
||||
self.stop(ms) # ... then we are done
|
||||
else:
|
||||
|
||||
pitchL = self.chordL[ self.pitch_idx ]
|
||||
|
||||
# be sure that the base directory exists
|
||||
outDir = os.path.expanduser( cfg.outDir )
|
||||
if not os.path.isdir( outDir ):
|
||||
os.mkdir( outDir )
|
||||
|
||||
# form the output directory as "<label>_<pitch0>_<pitch1> ... "
|
||||
dirStr = self.label + "_" + "_".join([ str(pitch) for pitch in pitchL ])
|
||||
|
||||
outDir = os.path.join(outDir, dirStr )
|
||||
|
||||
# start the sequencer
|
||||
self.seq.start( ms, outDir, pitchL, self.pulseUsL )
|
||||
|
||||
|
||||
|
||||
# This is the main application API it is running in a child process.
|
||||
class App:
|
||||
def __init__(self ):
|
||||
self.cfg = None
|
||||
self.audioDev = None
|
||||
self.api = None
|
||||
self.calibrate = None
|
||||
|
||||
def setup( self, cfg ):
|
||||
self.cfg = cfg
|
||||
|
||||
self.audioDev = AudioDevice()
|
||||
|
||||
#
|
||||
# TODO: unify the result error handling
|
||||
# (the API and the audio device return two diferent 'Result' types
|
||||
#
|
||||
|
||||
res = self.audioDev.setup(**cfg.audio)
|
||||
|
||||
if res:
|
||||
self.api = Picadae( key_mapL=cfg.key_mapL)
|
||||
|
||||
# wait for the letter 'a' to come back from the serial port
|
||||
api_res = self.api.wait_for_serial_sync(timeoutMs=cfg.serial_sync_timeout_ms)
|
||||
|
||||
# did the serial port sync fail?
|
||||
if not api_res:
|
||||
res.set_error("Serial port sync failed.")
|
||||
else:
|
||||
print("Serial port sync'ed")
|
||||
|
||||
self.calibrate = CalibrateKeys( cfg, self.audioDev, self.api )
|
||||
|
||||
|
||||
return res
|
||||
|
||||
def tick( self, ms ):
|
||||
if self.calibrate:
|
||||
self.calibrate.tick(ms)
|
||||
|
||||
def audio_dev_list( self, ms ):
|
||||
portL = self.audioDev.get_port_list( True )
|
||||
|
||||
for port in portL:
|
||||
print("chs:%4i label:%s" % (port['chN'],port['label']))
|
||||
|
||||
def calibrate_keys_start( self, ms, pitchRangeL ):
|
||||
chordL = [ [pitch] for pitch in range(pitchRangeL[0], pitchRangeL[1]+1)]
|
||||
self.calibrate.start( ms, "full", chordL, cfg.full_pulseL )
|
||||
|
||||
def calibrate_keys_stop( self, ms ):
|
||||
self.calibrate.stop(ms)
|
||||
|
||||
def quit( self, ms ):
|
||||
if self.api:
|
||||
self.api.close()
|
||||
|
||||
|
||||
def _send_error( pipe, res ):
|
||||
if res is None:
|
||||
return
|
||||
|
||||
if res.msg:
|
||||
pipe.send( [{"type":"error", 'value':res.msg}] )
|
||||
|
||||
def _send_error_msg( pipe, msg ):
|
||||
_send_error( pipe, Result(None,msg))
|
||||
|
||||
def _send_quit( pipe ):
|
||||
pipe.send( [{ 'type':'quit' }] )
|
||||
|
||||
# This is the application engine async. process loop
|
||||
def app_event_loop_func( pipe, cfg ):
|
||||
|
||||
multiprocessing.get_logger().info("App Proc Started.")
|
||||
|
||||
# create the asynchronous application object
|
||||
app = App()
|
||||
|
||||
res = app.setup(cfg)
|
||||
|
||||
# if the app did not initialize successfully
|
||||
if not res:
|
||||
_send_error( pipe, res )
|
||||
_send_quit(pipe)
|
||||
return
|
||||
|
||||
dt0 = datetime.now()
|
||||
|
||||
ms = 0
|
||||
|
||||
while True:
|
||||
|
||||
# have any message arrived from the parent process?
|
||||
if pipe.poll():
|
||||
|
||||
msg = None
|
||||
try:
|
||||
msg = pipe.recv()
|
||||
except EOFError:
|
||||
return
|
||||
|
||||
if not hasattr(app,msg.type):
|
||||
_send_error_msg( pipe, "Unknown message type:'%s'." % (msg.type) )
|
||||
else:
|
||||
|
||||
# get the command handler function in 'app'
|
||||
func = getattr(app,msg.type)
|
||||
|
||||
ms = int(round( (datetime.now() - dt0).total_seconds() * 1000.0) )
|
||||
|
||||
# call the command handler
|
||||
if msg.value:
|
||||
res = func( ms, msg.value )
|
||||
else:
|
||||
res = func( ms )
|
||||
|
||||
# handle any errors returned from the commands
|
||||
_send_error( pipe, res )
|
||||
|
||||
# if a 'quit' msg was recived then break out of the loop
|
||||
if msg.type == 'quit':
|
||||
_send_quit(pipe)
|
||||
break
|
||||
|
||||
|
||||
# give some time to the system
|
||||
time.sleep(0.1)
|
||||
|
||||
# calc the tick() time stamp
|
||||
ms = int(round( (datetime.now() - dt0).total_seconds() * 1000.0) )
|
||||
|
||||
# tick the app
|
||||
app.tick( ms )
|
||||
|
||||
|
||||
class AppProcess(Process):
|
||||
def __init__(self,cfg):
|
||||
self.parent_end, child_end = Pipe()
|
||||
super(AppProcess, self).__init__(target=app_event_loop_func,name="AppProcess",args=(child_end,cfg))
|
||||
self.doneFl = False
|
||||
|
||||
|
||||
def send(self, d):
|
||||
# This function is called by the parent process to send an arbitrary msg to the App process
|
||||
self.parent_end.send( types.SimpleNamespace(**d) )
|
||||
return None
|
||||
|
||||
def recv(self):
|
||||
# This function is called by the parent process to receive lists of child messages.
|
||||
|
||||
msgL = None
|
||||
if not self.doneFl and self.parent_end.poll():
|
||||
|
||||
msgL = self.parent_end.recv()
|
||||
|
||||
for msg in msgL:
|
||||
if msg['type'] == 'quit':
|
||||
self.doneFl = True
|
||||
|
||||
return msgL
|
||||
|
||||
def isdone(self):
|
||||
return self.doneFl
|
||||
|
||||
|
||||
class Shell:
|
||||
def __init__( self, cfg ):
|
||||
self.appProc = None
|
||||
self.parseD = {
|
||||
'q':{ "func":'quit', "minN":0, "maxN":0, "help":"quit"},
|
||||
'?':{ "func":"_help", "minN":0, "maxN":0, "help":"Print usage text."},
|
||||
'a':{ "func":"audio_dev_list", "minN":0, "maxN":0, "help":"List the audio devices."},
|
||||
'c':{ "func":"calibrate_keys_start", "minN":1, "maxN":2, "help":"Calibrate a range of keys."},
|
||||
's':{ "func":"calibrate_keys_stop", "minN":0, "maxN":0, "help":"Stop key calibration"}
|
||||
}
|
||||
|
||||
def _help( self, _=None ):
|
||||
for k,d in self.parseD.items():
|
||||
s = "{} = {}".format( k, d['help'] )
|
||||
print(s)
|
||||
return None
|
||||
|
||||
|
||||
def _syntaxError( self, msg ):
|
||||
return Result(None,"Syntax Error: " + msg )
|
||||
|
||||
def _exec_cmd( self, tokL ):
|
||||
|
||||
if len(tokL) <= 0:
|
||||
return None
|
||||
|
||||
opcode = tokL[0]
|
||||
|
||||
if opcode not in self.parseD:
|
||||
return self._syntaxError("Unknown opcode: '{}'.".format(opcode))
|
||||
|
||||
d = self.parseD[ opcode ]
|
||||
|
||||
func_name = d['func']
|
||||
func = None
|
||||
|
||||
# find the function associated with this command
|
||||
if hasattr(self, func_name ):
|
||||
func = getattr(self, func_name )
|
||||
|
||||
try:
|
||||
# convert the parameter list into integers
|
||||
argL = [ int(tokL[i]) for i in range(1,len(tokL)) ]
|
||||
except:
|
||||
return self._syntaxError("Unable to create integer arguments.")
|
||||
|
||||
# validate the count of command args
|
||||
if d['minN'] != -1 and (d['minN'] > len(argL) or len(argL) > d['maxN']):
|
||||
return self._syntaxError("Argument count mismatch. {} is out of range:{} to {}".format(len(argL),d['minN'],d['maxN']))
|
||||
|
||||
|
||||
# call the command function
|
||||
if func:
|
||||
result = func(*argL)
|
||||
else:
|
||||
result = self.appProc.send( { 'type':func_name, 'value':argL } )
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
||||
def run( self ):
|
||||
|
||||
# create the API object
|
||||
self.appProc = AppProcess(cfg)
|
||||
|
||||
self.appProc.start()
|
||||
|
||||
print("'q'=quit '?'=help")
|
||||
time_out_secs = 1
|
||||
|
||||
# this is the shell main loop
|
||||
while True:
|
||||
|
||||
# wait for keyboard activity
|
||||
i, o, e = select.select( [sys.stdin], [], [], time_out_secs )
|
||||
|
||||
if i:
|
||||
# read the command
|
||||
s = sys.stdin.readline().strip()
|
||||
|
||||
# tokenize the command
|
||||
tokL = s.split(' ')
|
||||
|
||||
|
||||
# execute the command
|
||||
result = self._exec_cmd( tokL )
|
||||
|
||||
|
||||
# if this is the 'quit' command
|
||||
if tokL[0] == 'q':
|
||||
break
|
||||
|
||||
# check for msg's from the async application process
|
||||
if self._handle_app_msgs( self.appProc.recv() ):
|
||||
break
|
||||
|
||||
|
||||
# wait for the appProc to complete
|
||||
while not self.appProc.isdone():
|
||||
self.appProc.recv() # drain the AppProc() as it shutdown
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def _handle_app_msgs( self, msgL ):
|
||||
quitAppFl = False
|
||||
if msgL:
|
||||
for msg in msgL:
|
||||
if msg:
|
||||
if msg['type'] == 'error':
|
||||
print("Error: {}".format(msg['value']))
|
||||
|
||||
elif msg['type'] == 'quit':
|
||||
quitAppFl = True
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
return quitAppFl
|
||||
|
||||
def parse_args():
|
||||
"""Parse the command line arguments."""
|
||||
|
||||
descStr = """Picadae auto-calibrate."""
|
||||
logL = ['debug','info','warning','error','critical']
|
||||
|
||||
ap = argparse.ArgumentParser(description=descStr)
|
||||
|
||||
|
||||
ap.add_argument("-c","--config", default="cfg/p_ac.yml", help="YAML configuration file.")
|
||||
ap.add_argument("-l","--log_level",choices=logL, default="warning", help="Set logging level: debug,info,warning,error,critical. Default:warning")
|
||||
|
||||
return ap.parse_args()
|
||||
|
||||
|
||||
def parse_yaml_cfg( fn ):
|
||||
"""Parse the YAML configuration file."""
|
||||
|
||||
cfg = None
|
||||
|
||||
with open(fn,"r") as f:
|
||||
cfgD = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
cfg = types.SimpleNamespace(**cfgD['p_ac'])
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
logging.basicConfig()
|
||||
|
||||
#mplog = multiprocessing.log_to_stderr()
|
||||
#mplog.setLevel(logging.INFO)
|
||||
|
||||
|
||||
args = parse_args()
|
||||
|
||||
cfg = parse_yaml_cfg(args.config)
|
||||
|
||||
shell = Shell(cfg)
|
||||
|
||||
shell.run()
|
132
p_ac.yml
Normal file
132
p_ac.yml
Normal file
@ -0,0 +1,132 @@
|
||||
{
|
||||
p_ac: {
|
||||
|
||||
|
||||
# Audio device setup
|
||||
audio: {
|
||||
inPortLabel: "5 USB Audio CODEC:", #"HDA Intel PCH: CS4208", # "5 USB Audio CODEC:", #"5 USB Sound Device",
|
||||
outPortLabel: ,
|
||||
},
|
||||
|
||||
|
||||
# Picadae API args
|
||||
serial_dev: "/dev/ttyACM0",
|
||||
serial_baud: 38400,
|
||||
i2c_base_addr: 21,
|
||||
prescaler_usec: 16,
|
||||
serial_sync_timeout_ms: 10000,
|
||||
|
||||
|
||||
# MeasureSeq args
|
||||
outDir: "~/temp/p_ac3",
|
||||
noteDurMs: 1000,
|
||||
pauseDurMs: 1000,
|
||||
holdDutyPct: 50,
|
||||
|
||||
#full_pulseL: [ 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 5500, 6000, 6500, 7000, 8000, 9000, 10000, 12000, 14000, 18000, 22000, 26000, 30000, 34000, 40000],
|
||||
full_pulseL: [ 18000, 22000, 26000, 30000, 34000, 40000],
|
||||
|
||||
key_mapL: [
|
||||
|
||||
{ index: 0, board: 1, ch: 1, type: 'wB', midi: 21, class: 'A' },
|
||||
{ index: 1, board: 1, ch: 2, type: 'Bl', midi: 22, class: 'A#' },
|
||||
{ index: 2, board: 1, ch: 3, type: 'wF', midi: 23, class: 'B' },
|
||||
{ index: 3, board: 1, ch: 4, type: 'wB', midi: 24, class: 'C' },
|
||||
{ index: 4, board: 1, ch: 5, type: 'Bl', midi: 25, class: 'C#' },
|
||||
{ index: 5, board: 1, ch: 6, type: 'wF', midi: 26, class: 'D' },
|
||||
{ index: 6, board: 1, ch: 7, type: 'Bl', midi: 27, class: 'D#' },
|
||||
{ index: 7, board: 1, ch: 8, type: 'wB', midi: 28, class: 'E' },
|
||||
{ index: 8, board: 1, ch: 9, type: 'wF', midi: 29, class: 'F' },
|
||||
{ index: 9, board: 1, ch: 10, type: 'Bl', midi: 30, class: 'F#' },
|
||||
{ index: 10, board: 1, ch: 11, type: 'wB', midi: 31, class: 'G' },
|
||||
|
||||
{ index: 11, board: 2, ch: 1, type: 'Bl', midi: 32, class: 'G#' },
|
||||
{ index: 12, board: 2, ch: 2, type: 'wF', midi: 33, class: 'A' },
|
||||
{ index: 13, board: 2, ch: 3, type: 'Bl', midi: 34, class: 'A#' },
|
||||
{ index: 14, board: 2, ch: 4, type: 'wB', midi: 35, class: 'B' },
|
||||
{ index: 15, board: 2, ch: 5, type: 'wF', midi: 36, class: 'C' },
|
||||
{ index: 16, board: 2, ch: 6, type: 'Bl', midi: 37, class: 'C#' },
|
||||
{ index: 17, board: 2, ch: 7, type: 'wB', midi: 38, class: 'D' },
|
||||
{ index: 18, board: 2, ch: 8, type: 'Bl', midi: 39, class: 'D#' },
|
||||
{ index: 19, board: 2, ch: 9, type: 'wF', midi: 40, class: 'E' },
|
||||
{ index: 20, board: 2, ch: 10, type: 'wB', midi: 41, class: 'F' },
|
||||
{ index: 21, board: 2, ch: 11, type: 'Bl', midi: 42, class: 'F#' },
|
||||
|
||||
{ index: 22, board: 3, ch: 1, type: 'wF', midi: 43, class: 'G' },
|
||||
{ index: 23, board: 3, ch: 2, type: 'Bl', midi: 44, class: 'G#' },
|
||||
{ index: 24, board: 3, ch: 3, type: 'wB', midi: 45, class: 'A' },
|
||||
{ index: 25, board: 3, ch: 4, type: 'Bl', midi: 46, class: 'A#' },
|
||||
{ index: 26, board: 3, ch: 5, type: 'wF', midi: 47, class: 'B' },
|
||||
{ index: 27, board: 3, ch: 6, type: 'wB', midi: 48, class: 'C' },
|
||||
{ index: 28, board: 3, ch: 7, type: 'Bl', midi: 49, class: 'C#' },
|
||||
{ index: 29, board: 3, ch: 8, type: 'wF', midi: 50, class: 'D' },
|
||||
{ index: 30, board: 3, ch: 9, type: 'Bl', midi: 51, class: 'D#' },
|
||||
{ index: 31, board: 3, ch: 10, type: 'wB', midi: 52, class: 'E' },
|
||||
{ index: 32, board: 3, ch: 11, type: 'wF', midi: 53, class: 'F' },
|
||||
|
||||
{ index: 33, board: 4, ch: 1, type: 'Bl', midi: 54, class: 'F#' },
|
||||
{ index: 34, board: 4, ch: 2, type: 'wB', midi: 55, class: 'G' },
|
||||
{ index: 35, board: 4, ch: 3, type: 'Bl', midi: 56, class: 'G#' },
|
||||
{ index: 36, board: 4, ch: 4, type: 'wF', midi: 57, class: 'A' },
|
||||
{ index: 37, board: 4, ch: 5, type: 'Bl', midi: 58, class: 'A#' },
|
||||
{ index: 38, board: 4, ch: 6, type: 'wB', midi: 59, class: 'B' },
|
||||
{ index: 39, board: 4, ch: 7, type: 'wF', midi: 60, class: 'C' },
|
||||
{ index: 40, board: 4, ch: 8, type: 'Bl', midi: 61, class: 'C#' },
|
||||
{ index: 41, board: 4, ch: 9, type: 'wB', midi: 62, class: 'D' },
|
||||
{ index: 42, board: 4, ch: 10, type: 'Bl', midi: 63, class: 'D#' },
|
||||
{ index: 43, board: 4, ch: 11, type: 'wF', midi: 64, class: 'E' },
|
||||
|
||||
{ index: 44, board: 5, ch: 1, type: 'wB', midi: 65, class: 'F' },
|
||||
{ index: 45, board: 5, ch: 2, type: 'Bl', midi: 66, class: 'F#' },
|
||||
{ index: 46, board: 5, ch: 3, type: 'wF', midi: 67, class: 'G' },
|
||||
{ index: 47, board: 5, ch: 4, type: 'Bl', midi: 68, class: 'G#' },
|
||||
{ index: 48, board: 5, ch: 5, type: 'wB', midi: 69, class: 'A' },
|
||||
{ index: 49, board: 5, ch: 6, type: 'Bl', midi: 70, class: 'A#' },
|
||||
{ index: 50, board: 5, ch: 7, type: 'wF', midi: 71, class: 'B' },
|
||||
{ index: 51, board: 5, ch: 8, type: 'wB', midi: 72, class: 'C' },
|
||||
{ index: 52, board: 5, ch: 9, type: 'Bl', midi: 73, class: 'C#' },
|
||||
{ index: 53, board: 5, ch: 10, type: 'wF', midi: 74, class: 'D' },
|
||||
{ index: 54, board: 5, ch: 11, type: 'Bl', midi: 75, class: 'D#' },
|
||||
|
||||
{ index: 55, board: 6, ch: 1, type: 'wB', midi: 76, class: 'E' },
|
||||
{ index: 56, board: 6, ch: 2, type: 'wF', midi: 77, class: 'F' },
|
||||
{ index: 57, board: 6, ch: 3, type: 'Bl', midi: 78, class: 'F#' },
|
||||
{ index: 58, board: 6, ch: 4, type: 'wB', midi: 79, class: 'G' },
|
||||
{ index: 59, board: 6, ch: 5, type: 'Bl', midi: 80, class: 'G#' },
|
||||
{ index: 60, board: 6, ch: 6, type: 'wF', midi: 81, class: 'A' },
|
||||
{ index: 61, board: 6, ch: 7, type: 'Bl', midi: 82, class: 'A#' },
|
||||
{ index: 62, board: 6, ch: 8, type: 'wB', midi: 83, class: 'B' },
|
||||
{ index: 63, board: 6, ch: 9, type: 'wF', midi: 84, class: 'C' },
|
||||
{ index: 64, board: 6, ch: 10, type: 'Bl', midi: 85, class: 'C#' },
|
||||
{ index: 65, board: 6, ch: 11, type: 'wB', midi: 86, class: 'D' },
|
||||
|
||||
{ index: 66, board: 6, ch: 1, type: 'Bl', midi: 87, class: 'D#' },
|
||||
{ index: 67, board: 6, ch: 2, type: 'wF', midi: 88, class: 'E' },
|
||||
{ index: 68, board: 6, ch: 3, type: 'wB', midi: 89, class: 'F' },
|
||||
{ index: 69, board: 6, ch: 4, type: 'Bl', midi: 90, class: 'F#' },
|
||||
{ index: 70, board: 6, ch: 5, type: 'wF', midi: 91, class: 'G' },
|
||||
{ index: 71, board: 6, ch: 6, type: 'Bl', midi: 92, class: 'G#' },
|
||||
{ index: 72, board: 6, ch: 7, type: 'wB', midi: 93, class: 'A' },
|
||||
{ index: 73, board: 6, ch: 8, type: 'Bl', midi: 94, class: 'A#' },
|
||||
{ index: 74, board: 6, ch: 9, type: 'wF', midi: 95, class: 'B' },
|
||||
{ index: 75, board: 6, ch: 10, type: 'wB', midi: 96, class: 'C' },
|
||||
{ index: 76, board: 6, ch: 11, type: 'Bl', midi: 97, class: 'C#' },
|
||||
|
||||
{ index: 77, board: 7, ch: 1, type: 'wF', midi: 98, class: 'D' },
|
||||
{ index: 78, board: 7, ch: 2, type: 'Bl', midi: 99, class: 'D#' },
|
||||
{ index: 79, board: 7, ch: 3, type: 'wB', midi: 100, class: 'E' },
|
||||
{ index: 80, board: 7, ch: 4, type: 'wF', midi: 101, class: 'F' },
|
||||
{ index: 81, board: 7, ch: 5, type: 'Bl', midi: 102, class: 'F#' },
|
||||
{ index: 82, board: 7, ch: 6, type: 'wB', midi: 103, class: 'G' },
|
||||
{ index: 83, board: 7, ch: 7, type: 'Bl', midi: 104, class: 'G#' },
|
||||
{ index: 84, board: 7, ch: 8, type: 'wF', midi: 105, class: 'A' },
|
||||
{ index: 85, board: 7, ch: 9, type: 'Bl', midi: 106, class: 'A#' },
|
||||
{ index: 86, board: 7, ch: 10, type: 'wB', midi: 107, class: 'B' },
|
||||
{ index: 87, board: 7, ch: 11, type: 'wF', midi: 108, class: 'C' },
|
||||
|
||||
]
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
279
plot_seq.py
Normal file
279
plot_seq.py
Normal file
@ -0,0 +1,279 @@
|
||||
import os, sys, json
|
||||
from scipy.io import wavfile
|
||||
from scipy.signal import stft
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
def is_nanV( xV ):
|
||||
|
||||
for i in range(xV.shape[0]):
|
||||
if np.isnan( xV[i] ):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def calc_harm_bins( srate, binHz, midiPitch, harmN ):
|
||||
|
||||
semi_tone = 1.0/12
|
||||
quarter_tone = 1.0/24
|
||||
eigth_tone = 1.0/48
|
||||
band_width_st = 3.0/48 # 3/8 tone
|
||||
|
||||
fundHz = (13.75 * pow(2.0,(-9.0/12.0))) * pow(2.0,(midiPitch / 12))
|
||||
fund_l_binL = [int(round(fundHz * pow(2.0,-band_width_st) * i/binHz)) for i in range(1,harmN+1)]
|
||||
fund_m_binL = [int(round(fundHz * i/binHz)) for i in range(1,harmN+1)]
|
||||
fund_u_binL = [int(round(fundHz * pow(2.0, band_width_st) * i/binHz)) for i in range(1,harmN+1)]
|
||||
|
||||
for i in range(len(fund_m_binL)):
|
||||
if fund_l_binL[i] >= fund_m_binL[i] and fund_l_binL[i] > 0:
|
||||
fund_l_binL[i] = fund_m_binL[i] - 1
|
||||
|
||||
if fund_u_binL[i] <= fund_m_binL[i] and fund_u_binL[i] < len(fund_u_binL)-1:
|
||||
fund_u_binL[i] = fund_m_binL[i] + 1
|
||||
|
||||
return fund_l_binL, fund_m_binL, fund_u_binL
|
||||
|
||||
|
||||
|
||||
def audio_rms( srate, xV, rmsWndMs, hopMs ):
|
||||
|
||||
wndSmpN = int(round( rmsWndMs * srate / 1000.0))
|
||||
hopSmpN = int(round( hopMs * srate / 1000.0))
|
||||
|
||||
xN = xV.shape[0]
|
||||
yN = int(((xN - wndSmpN) / hopSmpN) + 1)
|
||||
assert( yN > 0)
|
||||
yV = np.zeros( (yN, ) )
|
||||
|
||||
assert( wndSmpN > 1 )
|
||||
|
||||
i = 0
|
||||
j = 0
|
||||
while i < xN and j < yN:
|
||||
|
||||
if i == 0:
|
||||
yV[j] = np.sqrt(xV[0]*xV[0])
|
||||
elif i < wndSmpN:
|
||||
yV[j] = np.sqrt( np.mean( xV[0:i] * xV[0:i] ) )
|
||||
else:
|
||||
yV[j] = np.sqrt( np.mean( xV[i-wndSmpN:i] * xV[i-wndSmpN:i] ) )
|
||||
|
||||
i += hopSmpN
|
||||
j += 1
|
||||
|
||||
return yV, srate / hopSmpN
|
||||
|
||||
def audio_db_rms( srate, xV, rmsWndMs, hopMs, dbRefWndMs ):
|
||||
|
||||
rmsV, rms_srate = audio_rms( srate, xV, rmsWndMs, hopMs )
|
||||
dbWndN = int(round(dbRefWndMs * rms_srate / 1000.0))
|
||||
dbRef = ref = np.mean(rmsV[0:dbWndN])
|
||||
return 20.0 * np.log10( rmsV / dbRef ), rms_srate
|
||||
|
||||
|
||||
|
||||
def audio_stft_rms( srate, xV, rmsWndMs, hopMs, spectrumIdx ):
|
||||
|
||||
wndSmpN = int(round( rmsWndMs * srate / 1000.0))
|
||||
hopSmpN = int(round( hopMs * srate / 1000.0))
|
||||
binHz = srate / wndSmpN
|
||||
|
||||
f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True )
|
||||
|
||||
specHopIdx = int(round( spectrumIdx ))
|
||||
specV = np.sqrt(np.abs(xM[:, specHopIdx ]))
|
||||
|
||||
mV = np.zeros((xM.shape[1]))
|
||||
|
||||
for i in range(xM.shape[1]):
|
||||
mV[i] = np.max(np.sqrt(np.abs(xM[:,i])))
|
||||
|
||||
return mV, srate / hopSmpN, specV, specHopIdx, binHz
|
||||
|
||||
def audio_stft_db_rms( srate, xV, rmsWndMs, hopMs, dbRefWndMs, spectrumIdx ):
|
||||
rmsV, rms_srate, specV, specHopIdx, binHz = audio_stft_rms( srate, xV, rmsWndMs, hopMs, spectrumIdx )
|
||||
|
||||
dbWndN = int(round(dbRefWndMs * rms_srate / 1000.0))
|
||||
dbRef = ref = np.mean(rmsV[0:dbWndN])
|
||||
rmsDbV = 20.0 * np.log10( rmsV / dbRef )
|
||||
|
||||
return rmsDbV, rms_srate, specV, specHopIdx, binHz
|
||||
|
||||
def audio_harm_rms( srate, xV, rmsWndMs, hopMs, midiPitch, harmCandN, harmN ):
|
||||
|
||||
wndSmpN = int(round( rmsWndMs * srate / 1000.0))
|
||||
hopSmpN = int(round( hopMs * srate / 1000.0))
|
||||
|
||||
binHz = srate / wndSmpN
|
||||
|
||||
f,t,xM = stft( xV, fs=srate, window="hann", nperseg=wndSmpN, noverlap=wndSmpN-hopSmpN, return_onesided=True )
|
||||
|
||||
harmLBinL,harmMBinL,harmUBinL = calc_harm_bins( srate, binHz, midiPitch, harmCandN )
|
||||
|
||||
rmsV = np.zeros((xM.shape[1],))
|
||||
|
||||
|
||||
for i in range(xM.shape[1]):
|
||||
mV = np.sqrt(np.abs(xM[:,i]))
|
||||
|
||||
pV = np.zeros((len(harmLBinL,)))
|
||||
|
||||
for j,(b0i,b1i) in enumerate(zip( harmLBinL, harmUBinL )):
|
||||
pV[j] = np.max(mV[b0i:b1i])
|
||||
|
||||
rmsV[i] = np.mean( sorted(pV)[-harmN:] )
|
||||
|
||||
|
||||
|
||||
|
||||
return rmsV, srate / hopSmpN, binHz
|
||||
|
||||
|
||||
|
||||
def audio_harm_db_rms( srate, xV, rmsWndMs, hopMs, dbRefWndMs, midiPitch, harmCandN, harmN ):
|
||||
|
||||
rmsV, rms_srate, binHz = audio_harm_rms( srate, xV, rmsWndMs, hopMs, midiPitch, harmCandN, harmN )
|
||||
|
||||
dbWndN = int(round(dbRefWndMs * rms_srate / 1000.0))
|
||||
dbRef = ref = np.mean(rmsV[0:dbWndN])
|
||||
rmsDbV = 20.0 * np.log10( rmsV / dbRef )
|
||||
|
||||
return rmsDbV, rms_srate, binHz
|
||||
|
||||
|
||||
|
||||
def locate_peak_indexes( xV, xV_srate, eventMsL ):
|
||||
|
||||
pkIdxL = []
|
||||
for begMs, endMs in eventMsL:
|
||||
|
||||
begSmpIdx = int(begMs * xV_srate / 1000.0)
|
||||
endSmpIdx = int(endMs * xV_srate / 1000.0)
|
||||
|
||||
pkIdxL.append( begSmpIdx + np.argmax( xV[begSmpIdx:endSmpIdx] ) )
|
||||
|
||||
return pkIdxL
|
||||
|
||||
|
||||
def plot_spectrum( ax, srate, binHz, specV, midiPitch, harmN ):
|
||||
|
||||
binN = specV.shape[0]
|
||||
harmLBinL,harmMBinL,harmUBinL = calc_harm_bins( srate, binHz, midiPitch, harmN )
|
||||
|
||||
fundHz = harmMBinL[0] * binHz
|
||||
maxPlotHz = fundHz * (harmN+1)
|
||||
maxPlotBinN = int(round(maxPlotHz/binHz))
|
||||
|
||||
hzV = np.arange(binN) * (srate/(binN*2))
|
||||
|
||||
specV = 20.0 * np.log10(specV)
|
||||
|
||||
ax.plot(hzV[0:maxPlotBinN], specV[0:maxPlotBinN] )
|
||||
|
||||
for h0,h1,h2 in zip(harmLBinL,harmMBinL,harmUBinL):
|
||||
ax.axvline( x=h0 * binHz, color="blue")
|
||||
ax.axvline( x=h1 * binHz, color="black")
|
||||
ax.axvline( x=h2 * binHz, color="blue")
|
||||
|
||||
ax.set_ylabel(str(midiPitch))
|
||||
|
||||
def plot_spectral_ranges( inDir, pitchL, rmsWndMs=300, rmsHopMs=30, harmN=5, dbRefWndMs=500 ):
|
||||
|
||||
plotN = len(pitchL)
|
||||
fig,axL = plt.subplots(plotN,1)
|
||||
|
||||
for plot_idx,midiPitch in enumerate(pitchL):
|
||||
|
||||
# get the audio and meta-data file names
|
||||
seqFn = os.path.join( inDir, str(midiPitch), "seq.json")
|
||||
audioFn = os.path.join( inDir, str(midiPitch), "audio.wav")
|
||||
|
||||
# read the meta data object
|
||||
with open( seqFn, "rb") as f:
|
||||
r = json.load(f)
|
||||
|
||||
# read the audio file
|
||||
srate, signalM = wavfile.read(audioFn)
|
||||
sigV = signalM / float(0x7fff)
|
||||
|
||||
# calc. the RMS envelope in the time domain
|
||||
rms0DbV, rms0_srate = audio_db_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs )
|
||||
|
||||
# locate the sample index of the peak of each note attack
|
||||
pkIdx0L = locate_peak_indexes( rms0DbV, rms0_srate, r['eventTimeL'] )
|
||||
|
||||
# select the 7th to last note for spectrum measurement
|
||||
|
||||
#
|
||||
# TODO: come up with a better way to select the note to measure
|
||||
#
|
||||
spectrumSmpIdx = pkIdx0L[ len(pkIdx0L) - 7 ]
|
||||
|
||||
|
||||
# calc. the RMS envelope by taking the max spectral peak in each STFT window
|
||||
rmsDbV, rms_srate, specV, specHopIdx, binHz = audio_stft_db_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs, spectrumSmpIdx)
|
||||
|
||||
# specV[] is the spectrum of the note at spectrumSmpIdx
|
||||
|
||||
# plot the spectrum and the harmonic selection ranges
|
||||
plot_spectrum( axL[plot_idx], srate, binHz, specV, midiPitch, harmN )
|
||||
plt.show()
|
||||
|
||||
|
||||
|
||||
def do_td_plot( inDir ):
|
||||
|
||||
rmsWndMs = 300
|
||||
rmsHopMs = 30
|
||||
dbRefWndMs = 500
|
||||
harmCandN = 5
|
||||
harmN = 3
|
||||
|
||||
seqFn = os.path.join( inDir, "seq.json")
|
||||
audioFn = os.path.join( inDir, "audio.wav")
|
||||
midiPitch = int(inDir.split("/")[-1])
|
||||
|
||||
|
||||
with open( seqFn, "rb") as f:
|
||||
r = json.load(f)
|
||||
|
||||
|
||||
srate, signalM = wavfile.read(audioFn)
|
||||
sigV = signalM / float(0x7fff)
|
||||
|
||||
rms0DbV, rms0_srate = audio_db_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs )
|
||||
|
||||
rmsDbV, rms_srate, binHz = audio_harm_db_rms( srate, sigV, rmsWndMs, rmsHopMs, dbRefWndMs, midiPitch, harmCandN, harmN )
|
||||
|
||||
pkIdxL = locate_peak_indexes( rmsDbV, rms_srate, r['eventTimeL'] )
|
||||
|
||||
fig,ax = plt.subplots()
|
||||
fig.set_size_inches(18.5, 10.5, forward=True)
|
||||
|
||||
secV = np.arange(0,len(rmsDbV)) / rms_srate
|
||||
|
||||
ax.plot( secV, rmsDbV )
|
||||
ax.plot( np.arange(0,len(rms0DbV)) / rms0_srate, rms0DbV, color="black" )
|
||||
|
||||
for begMs, endMs in r['eventTimeL']:
|
||||
ax.axvline( x=begMs/1000.0, color="green")
|
||||
ax.axvline( x=endMs/1000.0, color="red")
|
||||
|
||||
|
||||
for i,pki in enumerate(pkIdxL):
|
||||
ax.plot( [pki / rms_srate], [ rmsDbV[pki] ], marker='.', color="black")
|
||||
|
||||
plt.show()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
inDir = sys.argv[1]
|
||||
|
||||
do_td_plot(inDir)
|
||||
|
||||
#plot_spectral_ranges( inDir, [ 24, 36, 48, 60, 72, 84, 96, 104] )
|
34
result.py
Normal file
34
result.py
Normal file
@ -0,0 +1,34 @@
|
||||
import json
|
||||
|
||||
class Result(object):
|
||||
def __init__( self, value=None, msg=None ):
|
||||
self.value = value
|
||||
self.msg = msg
|
||||
self.resultL = []
|
||||
|
||||
def set_error( self, msg ):
|
||||
if self.msg is None:
|
||||
self.msg = ""
|
||||
|
||||
self.msg += " " + msg
|
||||
|
||||
def print(self):
|
||||
if value:
|
||||
print(value)
|
||||
if msg:
|
||||
print(msg)
|
||||
|
||||
if resultL:
|
||||
print(resultL)
|
||||
|
||||
def __bool__( self ):
|
||||
return self.msg is None
|
||||
|
||||
def __iadd__( self, b ):
|
||||
if self.value is None and self.msg is None:
|
||||
self = b
|
||||
else:
|
||||
self.resultL.append(b)
|
||||
|
||||
return self
|
||||
|
Loading…
Reference in New Issue
Block a user