##| Copyright: (C) 2019-2020 Kevin Larke ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file. import pprint,queue import sounddevice as sd import soundfile as sf import numpy as np from result import Result class AudioDevice(object): def __init__(self): self.inputPortIdx = None self.outputPortIdx = None self.recordFl = False self.sineFl = False self.inStream = None self.queue = queue.Queue() self.bufL = [] self.bufIdx = -1 self.srate = 0 self.ch_cnt = 2 def setup( self, **kwargs ): res = Result() if kwargs is None: return res if 'inPortLabel' in kwargs: res += self.select_port( True, kwargs['inPortLabel'] ) if 'outPortLabel' in kwargs: res += self.select_port( False, kwargs['outPortLabel'] ) return res def get_port_label( self, inFl ): portL = self.get_port_list(inFl) portIdx = self.inputPortIdx if inFl else self.outputPortIdx portLabel = None if portL and (portIdx is not None): for port in portL: if portIdx == port['index']: portLabel = port['label'] break return portLabel def get_state( self ): d= { 'recordFl': self.inStream is not None and self.inStream.active, 'sineFl': self.sineFl, 'inPortL': [ d['label'] for d in self.get_port_list(True) ], 'outPortL': [ d['label'] for d in self.get_port_list(False) ], 'inPortLabel': self.get_port_label(True), 'outPortLabel': self.get_port_label(False) } return d def get_port_list( self, inFl ): devLabelL = str(sd.query_devices()).split("\n") portL = [] for i,dev in enumerate(sd.query_devices()): isInputFl = dev['max_input_channels'] > 0 isOutputFl = dev['max_output_channels'] > 0 if (inFl and isInputFl) or (not inFl and isOutputFl): portL.append({ 'chN':dev['max_input_channels'], 'label':devLabelL[i].strip(), 'index':i } ) return portL def get_in_port_list( self ): return get_port_list( True ) def get_out_port_list( self ): return get_port_list( False ) def select_port( self, inFl, portLabel ): res = Result() if inFl and self.inStream is not None and self.inStream.active: return res.set_error("A new port may not be selected while it is active." % (portLabel)) devLabelL = str(sd.query_devices()).split("\n") foundFl = False if portLabel is None and len(devLabelL)>0: portLabel = devLabelL[0] portLabel = portLabel.strip() N = len(portLabel) # for each device for i,label in enumerate(devLabelL): label = label.strip() # if the first N char's of this label match the requested port label if len(label)>=N and label[0:N] == portLabel: if inFl: self.inputPortIdx = i else: self.outputPortIdx = i foundFl = True if inFl: dev = sd.query_devices()[i] self.srate = dev['default_samplerate'] if self.inStream: self.inStream.close() self.inStream = sd.InputStream(samplerate=self.srate, device=self.inputPortIdx, channels=self.ch_cnt, callback=self._record_callback, dtype=np.dtype('float32')) break if not foundFl: res.set_error("Unable to locate the audio port named: '%s'." % (portLabel)) return res def is_recording_enabled( self ): if self.inStream is None: return False return self.inStream.active == True def record_enable( self, enableFl ): # if the input stream has not already been configured if enableFl and self.inStream is None: return Result(None,'Recording cannot start because a recording input port has not been selected.') if not enableFl and self.inStream is None: return Result() # if the input stream already matches the 'enableFl'. if enableFl == self.inStream.active: return Result() # if we are starting recording if enableFl: try: self.bufL = [] self.inStream.start() except Exception as ex: return Result(None,'The recording input stream could not be started. Reason: %s' % str(ex)) else: self.inStream.stop() return Result() def play_buffer( self ): if not self.playFl: self.playFl = True def play_end( self ): if self.playFl: self.playFl = False def write_buffer( self, fn=None ): if fn is None: fn = "temp.wav" with sf.SoundFile(fn,'w',samplerate=int(self.srate), channels=int(self.ch_cnt)) as f: for i,buf in enumerate(self.bufL): N = buf.shape[0] if i bufN: # store as much of the incoming vector as possible if len(self.bufL) > 0: self.bufL[-1][self.bufIdx:,:] = v[0:bufN,:] vi = bufN # increment the starting position of the src vector vN -= bufN # decrement the count of samples that needs to be stored # add an empty buffer to bufL[] N = int(self.srate * 60) # add a one minute buffer to bufL self.bufL.append( np.zeros( (N, self.ch_cnt), dtype=np.dtype('float32') ) ) self.bufIdx = 0 self.bufL[-1][self.bufIdx:vN,:] = v[vi:,] self.bufIdx += vN else: self.bufL[-1][self.bufIdx:self.bufIdx+vN,:] = v self.bufIdx += vN if __name__ == "__main__": ad = AudioDevice() ad.get_port_list(True)