picadae calibration programs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

AudioDevice.py 8.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. ##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
  2. ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
  3. import pprint,queue
  4. import sounddevice as sd
  5. import soundfile as sf
  6. import numpy as np
  7. from result import Result
  8. class AudioDevice(object):
  9. def __init__(self):
  10. self.inputPortIdx = None
  11. self.outputPortIdx = None
  12. self.recordFl = False
  13. self.sineFl = False
  14. self.inStream = None
  15. self.queue = queue.Queue()
  16. self.bufL = []
  17. self.bufIdx = -1
  18. self.srate = 0
  19. self.ch_cnt = 2
  20. def setup( self, **kwargs ):
  21. res = Result()
  22. if kwargs is None:
  23. return res
  24. if 'inPortLabel' in kwargs:
  25. res += self.select_port( True, kwargs['inPortLabel'] )
  26. if 'outPortLabel' in kwargs:
  27. res += self.select_port( False, kwargs['outPortLabel'] )
  28. return res
  29. def get_port_label( self, inFl ):
  30. portL = self.get_port_list(inFl)
  31. portIdx = self.inputPortIdx if inFl else self.outputPortIdx
  32. portLabel = None
  33. if portL and (portIdx is not None):
  34. for port in portL:
  35. if portIdx == port['index']:
  36. portLabel = port['label']
  37. break
  38. return portLabel
  39. def get_state( self ):
  40. d= {
  41. 'recordFl': self.inStream is not None and self.inStream.active,
  42. 'sineFl': self.sineFl,
  43. 'inPortL': [ d['label'] for d in self.get_port_list(True) ],
  44. 'outPortL': [ d['label'] for d in self.get_port_list(False) ],
  45. 'inPortLabel': self.get_port_label(True),
  46. 'outPortLabel': self.get_port_label(False)
  47. }
  48. return d
  49. def get_port_list( self, inFl ):
  50. devLabelL = str(sd.query_devices()).split("\n")
  51. portL = []
  52. for i,dev in enumerate(sd.query_devices()):
  53. isInputFl = dev['max_input_channels'] > 0
  54. isOutputFl = dev['max_output_channels'] > 0
  55. if (inFl and isInputFl) or (not inFl and isOutputFl):
  56. portL.append({ 'chN':dev['max_input_channels'], 'label':devLabelL[i].strip(), 'index':i } )
  57. return portL
  58. def get_in_port_list( self ):
  59. return get_port_list( True )
  60. def get_out_port_list( self ):
  61. return get_port_list( False )
  62. def select_port( self, inFl, portLabel ):
  63. res = Result()
  64. if inFl and self.inStream is not None and self.inStream.active:
  65. return res.set_error("A new port may not be selected while it is active." % (portLabel))
  66. devLabelL = str(sd.query_devices()).split("\n")
  67. foundFl = False
  68. if portLabel is None and len(devLabelL)>0:
  69. portLabel = devLabelL[0]
  70. portLabel = portLabel.strip()
  71. N = len(portLabel)
  72. # for each device
  73. for i,label in enumerate(devLabelL):
  74. label = label.strip()
  75. # if the first N char's of this label match the requested port label
  76. if len(label)>=N and label[0:N] == portLabel:
  77. if inFl:
  78. self.inputPortIdx = i
  79. else:
  80. self.outputPortIdx = i
  81. foundFl = True
  82. if inFl:
  83. dev = sd.query_devices()[i]
  84. self.srate = dev['default_samplerate']
  85. if self.inStream:
  86. self.inStream.close()
  87. self.inStream = sd.InputStream(samplerate=self.srate, device=self.inputPortIdx, channels=self.ch_cnt, callback=self._record_callback, dtype=np.dtype('float32'))
  88. break
  89. if not foundFl:
  90. res.set_error("Unable to locate the audio port named: '%s'." % (portLabel))
  91. return res
  92. def is_recording_enabled( self ):
  93. if self.inStream is None:
  94. return False
  95. return self.inStream.active == True
  96. def record_enable( self, enableFl ):
  97. # if the input stream has not already been configured
  98. if enableFl and self.inStream is None:
  99. return Result(None,'Recording cannot start because a recording input port has not been selected.')
  100. if not enableFl and self.inStream is None:
  101. return Result()
  102. # if the input stream already matches the 'enableFl'.
  103. if enableFl == self.inStream.active:
  104. return Result()
  105. # if we are starting recording
  106. if enableFl:
  107. try:
  108. self.bufL = []
  109. self.inStream.start()
  110. except Exception as ex:
  111. return Result(None,'The recording input stream could not be started. Reason: %s' % str(ex))
  112. else:
  113. self.inStream.stop()
  114. return Result()
  115. def play_buffer( self ):
  116. if not self.playFl:
  117. self.playFl = True
  118. def play_end( self ):
  119. if self.playFl:
  120. self.playFl = False
  121. def write_buffer( self, fn=None ):
  122. if fn is None:
  123. fn = "temp.wav"
  124. with sf.SoundFile(fn,'w',samplerate=int(self.srate), channels=int(self.ch_cnt)) as f:
  125. for i,buf in enumerate(self.bufL):
  126. N = buf.shape[0] if i<len(self.bufL)-1 else self.bufIdx
  127. f.write(buf[0:N,])
  128. return Result()
  129. def buffer_sample_count( self ):
  130. smpN = 0
  131. for i in range(len(self.bufL)):
  132. smpN += self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
  133. return Result(smpN)
  134. def buffer_sample_ms( self ):
  135. r = self.buffer_sample_count()
  136. if r:
  137. r.value = int(r.value * 1000.0 / self.srate)
  138. return r
  139. def linear_buffer( self ):
  140. r = self.buffer_sample_count()
  141. if r:
  142. smpN = r.value
  143. bV = np.zeros( (smpN,self.ch_cnt) )
  144. bi = 0
  145. for i in range(len(self.bufL)):
  146. bufSmpN = self.bufIdx if i == len(self.bufL)-1 else self.bufL[i].shape[0]
  147. bV[ bi:bi+bufSmpN, ] = self.bufL[i][0:bufSmpN]
  148. bi += bufSmpN
  149. return Result(bV)
  150. def _record_callback(self, v, frames, time, status):
  151. """This is called (from a separate thread) for each audio block."""
  152. # send audio to the app
  153. self.queue.put(v.copy())
  154. def tick( self, ms ):
  155. try:
  156. while True:
  157. # get audio from the incoming audio thread
  158. v = self.queue.get_nowait()
  159. #print(type(v),len(v),np.result_type(v))
  160. self._update_buffer(v)
  161. except queue.Empty: # queue was empty
  162. pass
  163. return Result()
  164. def _update_buffer(self,v ):
  165. # get the length of the last buffer in bufL
  166. bufN = 0 if len(self.bufL)==0 else self.bufL[-1].shape[0] - self.bufIdx
  167. # get the length of the incoming sample vector
  168. vN = v.shape[0]
  169. # if there are more incoming samples than space in the buffer
  170. if vN > bufN:
  171. # store as much of the incoming vector as possible
  172. if len(self.bufL) > 0:
  173. self.bufL[-1][self.bufIdx:,:] = v[0:bufN,:]
  174. vi = bufN # increment the starting position of the src vector
  175. vN -= bufN # decrement the count of samples that needs to be stored
  176. # add an empty buffer to bufL[]
  177. N = int(self.srate * 60) # add a one minute buffer to bufL
  178. self.bufL.append( np.zeros( (N, self.ch_cnt), dtype=np.dtype('float32') ) )
  179. self.bufIdx = 0
  180. self.bufL[-1][self.bufIdx:vN,:] = v[vi:,]
  181. self.bufIdx += vN
  182. else:
  183. self.bufL[-1][self.bufIdx:self.bufIdx+vN,:] = v
  184. self.bufIdx += vN
  185. if __name__ == "__main__":
  186. ad = AudioDevice()
  187. ad.get_port_list(True)