276 lines
8.0 KiB
Python
276 lines
8.0 KiB
Python
##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
|
|
##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
|
|
import pprint,queue
|
|
import sounddevice as sd
|
|
import soundfile as sf
|
|
import numpy as np
|
|
|
|
from result import Result
|
|
|
|
class AudioDevice(object):
|
|
def __init__(self):
|
|
self.inputPortIdx = None
|
|
self.outputPortIdx = None
|
|
self.recordFl = False
|
|
self.sineFl = False
|
|
self.inStream = None
|
|
self.queue = queue.Queue()
|
|
self.bufL = []
|
|
self.bufIdx = -1
|
|
self.srate = 0
|
|
self.ch_cnt = 1
|
|
|
|
|
|
def setup( self, **kwargs ):
|
|
|
|
res = Result()
|
|
|
|
if kwargs is None:
|
|
return res
|
|
|
|
if 'inPortLabel' in kwargs:
|
|
res += self.select_port( True, kwargs['inPortLabel'] )
|
|
|
|
if 'outPortLabel' in kwargs:
|
|
res += self.select_port( False, kwargs['outPortLabel'] )
|
|
|
|
return res
|
|
|
|
def get_port_label( self, inFl ):
|
|
|
|
portL = self.get_port_list(inFl)
|
|
portIdx = self.inputPortIdx if inFl else self.outputPortIdx
|
|
portLabel = None
|
|
|
|
if portL and (portIdx is not None):
|
|
for port in portL:
|
|
if portIdx == port['index']:
|
|
portLabel = port['label']
|
|
break
|
|
|
|
return portLabel
|
|
|
|
|
|
def get_state( self ):
|
|
|
|
d= {
|
|
'recordFl': self.inStream is not None and self.inStream.active,
|
|
'sineFl': self.sineFl,
|
|
'inPortL': [ d['label'] for d in self.get_port_list(True) ],
|
|
'outPortL': [ d['label'] for d in self.get_port_list(False) ],
|
|
'inPortLabel': self.get_port_label(True),
|
|
'outPortLabel': self.get_port_label(False)
|
|
}
|
|
|
|
return d
|
|
|
|
def get_port_list( self, inFl ):
|
|
|
|
devLabelL = str(sd.query_devices()).split("\n")
|
|
|
|
portL = []
|
|
for i,dev in enumerate(sd.query_devices()):
|
|
isInputFl = dev['max_input_channels'] > 0
|
|
isOutputFl = dev['max_output_channels'] > 0
|
|
if (inFl and isInputFl) or (not inFl and isOutputFl):
|
|
portL.append({ 'chN':dev['max_input_channels'], 'label':devLabelL[i].strip(), 'index':i } )
|
|
|
|
return portL
|
|
|
|
def get_in_port_list( self ):
|
|
return get_port_list( True )
|
|
|
|
def get_out_port_list( self ):
|
|
return get_port_list( False )
|
|
|
|
def select_port( self, inFl, portLabel ):
|
|
|
|
res = Result()
|
|
|
|
if inFl and self.inStream is not None and self.inStream.active:
|
|
return res.set_error("A new port may not be selected while it is active." % (portLabel))
|
|
|
|
devLabelL = str(sd.query_devices()).split("\n")
|
|
foundFl = False
|
|
|
|
if portLabel is None and len(devLabelL)>0:
|
|
portLabel = devLabelL[0]
|
|
|
|
portLabel = portLabel.strip()
|
|
N = len(portLabel)
|
|
|
|
# for each device
|
|
for i,label in enumerate(devLabelL):
|
|
|
|
label = label.strip()
|
|
|
|
# if the first N char's of this label match the requested port label
|
|
if len(label)>=N and label[0:N] == portLabel:
|
|
if inFl:
|
|
self.inputPortIdx = i
|
|
else:
|
|
self.outputPortIdx = i
|
|
foundFl = True
|
|
|
|
if inFl:
|
|
dev = sd.query_devices()[i]
|
|
self.srate = dev['default_samplerate']
|
|
if self.inStream:
|
|
self.inStream.close()
|
|
|
|
self.inStream = sd.InputStream(samplerate=self.srate, device=self.inputPortIdx, channels=self.ch_cnt, callback=self._record_callback, dtype=np.dtype('float32'))
|
|
|
|
|
|
break
|
|
|
|
if not foundFl:
|
|
res.set_error("Unable to locate the audio port named: '%s'." % (portLabel))
|
|
|
|
return res
|
|
|
|
def is_recording_enabled( self ):
|
|
|
|
if self.inStream is None:
|
|
return False
|
|
|
|
return self.inStream.active == True
|
|
|
|
def record_enable( self, enableFl ):
|
|
|
|
# if the input stream has not already been configured
|
|
if enableFl and self.inStream is None:
|
|
return Result(None,'Recording cannot start because a recording input port has not been selected.')
|
|
|
|
if not enableFl and self.inStream is None:
|
|
return Result()
|
|
|
|
# if the input stream already matches the 'enableFl'.
|
|
if enableFl == self.inStream.active:
|
|
return Result()
|
|
|
|
# if we are starting recording
|
|
if enableFl:
|
|
|
|
try:
|
|
self.bufL = []
|
|
self.inStream.start()
|
|
|
|
except Exception as ex:
|
|
return Result(None,'The recording input stream could not be started. Reason: %s' % str(ex))
|
|
|
|
else:
|
|
self.inStream.stop()
|
|
|
|
return Result()
|
|
|
|
def play_buffer( self ):
|
|
if not self.playFl:
|
|
self.playFl = True
|
|
|
|
def play_end( self ):
|
|
if self.playFl:
|
|
self.playFl = False
|
|
|
|
def write_buffer( self, fn=None ):
|
|
|
|
if fn is None:
|
|
fn = "temp.wav"
|
|
|
|
with sf.SoundFile(fn,'w',samplerate=int(self.srate), channels=int(self.ch_cnt)) as f:
|
|
for i,buf in enumerate(self.bufL):
|
|
N = buf.shape[0] if i<len(self.bufL)-1 else self.bufIdx
|
|
f.write(buf[0:N,])
|
|
|
|
return Result()
|
|
|
|
def buffer_sample_count( self ):
|
|
smpN = 0
|
|
for i in range(len(self.bufL)):
|
|
smpN += self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
|
|
|
|
return Result(smpN)
|
|
|
|
def buffer_sample_ms( self ):
|
|
r = self.buffer_sample_count()
|
|
|
|
if r:
|
|
r.value = int(r.value * 1000.0 / self.srate)
|
|
|
|
return r
|
|
|
|
def linear_buffer( self ):
|
|
|
|
r = self.buffer_sample_count()
|
|
if r:
|
|
smpN = r.value
|
|
bV = np.zeros( (smpN,self.ch_cnt) )
|
|
|
|
bi = 0
|
|
for i in range(len(self.bufL)):
|
|
bufSmpN = self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
|
|
bV[ bi:bi+bufSmpN, ] = self.bufL[i][0:bufSmpN]
|
|
bi += bufSmpN
|
|
|
|
return Result(bV)
|
|
|
|
|
|
def _record_callback(self, v, frames, time, status):
|
|
"""This is called (from a separate thread) for each audio block."""
|
|
# send audio to the app
|
|
self.queue.put(v.copy())
|
|
|
|
def tick( self, ms ):
|
|
|
|
try:
|
|
while True:
|
|
|
|
# get audio from the incoming audio thread
|
|
v = self.queue.get_nowait()
|
|
|
|
#print(type(v),len(v),np.result_type(v))
|
|
self._update_buffer(v)
|
|
|
|
except queue.Empty: # queue was empty
|
|
pass
|
|
|
|
return Result()
|
|
|
|
|
|
def _update_buffer(self,v ):
|
|
|
|
# get the length of the last buffer in bufL
|
|
bufN = 0 if len(self.bufL)==0 else self.bufL[-1].shape[0] - self.bufIdx
|
|
|
|
# get the length of the incoming sample vector
|
|
vN = v.shape[0]
|
|
|
|
# if there are more incoming samples than space in the buffer
|
|
if vN > bufN:
|
|
|
|
# store as much of the incoming vector as possible
|
|
if len(self.bufL) > 0:
|
|
self.bufL[-1][self.bufIdx:,:] = v[0:bufN,:]
|
|
|
|
vi = bufN # increment the starting position of the src vector
|
|
vN -= bufN # decrement the count of samples that needs to be stored
|
|
|
|
# add an empty buffer to bufL[]
|
|
N = int(self.srate * 60) # add a one minute buffer to bufL
|
|
self.bufL.append( np.zeros( (N, self.ch_cnt), dtype=np.dtype('float32') ) )
|
|
self.bufIdx = 0
|
|
|
|
self.bufL[-1][self.bufIdx:vN,:] = v[vi:,]
|
|
self.bufIdx += vN
|
|
|
|
else:
|
|
self.bufL[-1][self.bufIdx:self.bufIdx+vN,:] = v
|
|
self.bufIdx += vN
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
ad = AudioDevice()
|
|
ad.get_port_list(True)
|
|
|