piccal/AudioDevice.py

276 lines
8.0 KiB
Python

##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
import pprint,queue
import sounddevice as sd
import soundfile as sf
import numpy as np
from result import Result
class AudioDevice(object):
def __init__(self):
self.inputPortIdx = None
self.outputPortIdx = None
self.recordFl = False
self.sineFl = False
self.inStream = None
self.queue = queue.Queue()
self.bufL = []
self.bufIdx = -1
self.srate = 0
self.ch_cnt = 1
def setup( self, **kwargs ):
res = Result()
if kwargs is None:
return res
if 'inPortLabel' in kwargs:
res += self.select_port( True, kwargs['inPortLabel'] )
if 'outPortLabel' in kwargs:
res += self.select_port( False, kwargs['outPortLabel'] )
return res
def get_port_label( self, inFl ):
portL = self.get_port_list(inFl)
portIdx = self.inputPortIdx if inFl else self.outputPortIdx
portLabel = None
if portL and (portIdx is not None):
for port in portL:
if portIdx == port['index']:
portLabel = port['label']
break
return portLabel
def get_state( self ):
d= {
'recordFl': self.inStream is not None and self.inStream.active,
'sineFl': self.sineFl,
'inPortL': [ d['label'] for d in self.get_port_list(True) ],
'outPortL': [ d['label'] for d in self.get_port_list(False) ],
'inPortLabel': self.get_port_label(True),
'outPortLabel': self.get_port_label(False)
}
return d
def get_port_list( self, inFl ):
devLabelL = str(sd.query_devices()).split("\n")
portL = []
for i,dev in enumerate(sd.query_devices()):
isInputFl = dev['max_input_channels'] > 0
isOutputFl = dev['max_output_channels'] > 0
if (inFl and isInputFl) or (not inFl and isOutputFl):
portL.append({ 'chN':dev['max_input_channels'], 'label':devLabelL[i].strip(), 'index':i } )
return portL
def get_in_port_list( self ):
return get_port_list( True )
def get_out_port_list( self ):
return get_port_list( False )
def select_port( self, inFl, portLabel ):
res = Result()
if inFl and self.inStream is not None and self.inStream.active:
return res.set_error("A new port may not be selected while it is active." % (portLabel))
devLabelL = str(sd.query_devices()).split("\n")
foundFl = False
if portLabel is None and len(devLabelL)>0:
portLabel = devLabelL[0]
portLabel = portLabel.strip()
N = len(portLabel)
# for each device
for i,label in enumerate(devLabelL):
label = label.strip()
# if the first N char's of this label match the requested port label
if len(label)>=N and label[0:N] == portLabel:
if inFl:
self.inputPortIdx = i
else:
self.outputPortIdx = i
foundFl = True
if inFl:
dev = sd.query_devices()[i]
self.srate = dev['default_samplerate']
if self.inStream:
self.inStream.close()
self.inStream = sd.InputStream(samplerate=self.srate, device=self.inputPortIdx, channels=self.ch_cnt, callback=self._record_callback, dtype=np.dtype('float32'))
break
if not foundFl:
res.set_error("Unable to locate the audio port named: '%s'." % (portLabel))
return res
def is_recording_enabled( self ):
if self.inStream is None:
return False
return self.inStream.active == True
def record_enable( self, enableFl ):
# if the input stream has not already been configured
if enableFl and self.inStream is None:
return Result(None,'Recording cannot start because a recording input port has not been selected.')
if not enableFl and self.inStream is None:
return Result()
# if the input stream already matches the 'enableFl'.
if enableFl == self.inStream.active:
return Result()
# if we are starting recording
if enableFl:
try:
self.bufL = []
self.inStream.start()
except Exception as ex:
return Result(None,'The recording input stream could not be started. Reason: %s' % str(ex))
else:
self.inStream.stop()
return Result()
def play_buffer( self ):
if not self.playFl:
self.playFl = True
def play_end( self ):
if self.playFl:
self.playFl = False
def write_buffer( self, fn=None ):
if fn is None:
fn = "temp.wav"
with sf.SoundFile(fn,'w',samplerate=int(self.srate), channels=int(self.ch_cnt)) as f:
for i,buf in enumerate(self.bufL):
N = buf.shape[0] if i<len(self.bufL)-1 else self.bufIdx
f.write(buf[0:N,])
return Result()
def buffer_sample_count( self ):
smpN = 0
for i in range(len(self.bufL)):
smpN += self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
return Result(smpN)
def buffer_sample_ms( self ):
r = self.buffer_sample_count()
if r:
r.value = int(r.value * 1000.0 / self.srate)
return r
def linear_buffer( self ):
r = self.buffer_sample_count()
if r:
smpN = r.value
bV = np.zeros( (smpN,self.ch_cnt) )
bi = 0
for i in range(len(self.bufL)):
bufSmpN = self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
bV[ bi:bi+bufSmpN, ] = self.bufL[i][0:bufSmpN]
bi += bufSmpN
return Result(bV)
def _record_callback(self, v, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
# send audio to the app
self.queue.put(v.copy())
def tick( self, ms ):
try:
while True:
# get audio from the incoming audio thread
v = self.queue.get_nowait()
#print(type(v),len(v),np.result_type(v))
self._update_buffer(v)
except queue.Empty: # queue was empty
pass
return Result()
def _update_buffer(self,v ):
# get the length of the last buffer in bufL
bufN = 0 if len(self.bufL)==0 else self.bufL[-1].shape[0] - self.bufIdx
# get the length of the incoming sample vector
vN = v.shape[0]
# if there are more incoming samples than space in the buffer
if vN > bufN:
# store as much of the incoming vector as possible
if len(self.bufL) > 0:
self.bufL[-1][self.bufIdx:,:] = v[0:bufN,:]
vi = bufN # increment the starting position of the src vector
vN -= bufN # decrement the count of samples that needs to be stored
# add an empty buffer to bufL[]
N = int(self.srate * 60) # add a one minute buffer to bufL
self.bufL.append( np.zeros( (N, self.ch_cnt), dtype=np.dtype('float32') ) )
self.bufIdx = 0
self.bufL[-1][self.bufIdx:vN,:] = v[vi:,]
self.bufIdx += vN
else:
self.bufL[-1][self.bufIdx:self.bufIdx+vN,:] = v
self.bufIdx += vN
if __name__ == "__main__":
ad = AudioDevice()
ad.get_port_list(True)