picadae calibration programs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

keyboard.py 7.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. ##| Copyright: (C) 2019-2020 Kevin Larke <contact AT larke DOT org>
  2. ##| License: GNU GPL version 3.0 or above. See the accompanying LICENSE file.
  3. import os,types,pickle
  4. import numpy as np
  5. from plot_seq import form_final_pulse_list
  6. from rms_analysis import rms_analysis_main_all
  7. class Keyboard:
  8. def __init__(self, cfg, audio, api):
  9. self.cfg = cfg
  10. self.audio = audio
  11. self.api = api
  12. self.keyD = {} # { midi_pitch: { pulseUsL, holdDutyPctL } }
  13. self.noteDurMs = cfg.noteDurMs
  14. self.pauseDurMs= cfg.pauseDurMs
  15. self.pitchL = None
  16. self.pitch_idx = None
  17. self.pulse_idx = None
  18. self.targetDb = None
  19. self.next_ms = None
  20. self.enableFl = False
  21. self.state = None #"note_on" | "note_off"
  22. self.rmsAnlD = None
  23. #self._load( cfg.outDir, cfg.analysisArgs)
  24. def load( self, inDir, pitchL, analysisArgsD ):
  25. self.keyD = {}
  26. inDir = os.path.expanduser(inDir)
  27. finalPulseListCacheFn = analysisArgsD['finalPulseListCacheFn']
  28. if os.path.isfile(finalPulseListCacheFn):
  29. print("READING: final pulse list cache file: %s" % (finalPulseListCacheFn))
  30. with open(finalPulseListCacheFn,'rb') as f:
  31. self.keyD = pickle.load(f)
  32. else:
  33. dirL = os.listdir(inDir)
  34. for dirStr in dirL:
  35. dirStr = os.path.normpath(os.path.join(inDir,dirStr))
  36. if os.path.isdir(dirStr):
  37. pathL = dirStr.split(os.sep)
  38. midi_pitch = int( pathL[-1] )
  39. if midi_pitch in pitchL:
  40. print(dirStr,midi_pitch)
  41. pulseUsL,pulseDbL,holdDutyPctL = form_final_pulse_list( dirStr, midi_pitch, analysisArgsD )
  42. d = { 'pulseUsL':pulseUsL, 'holdDutyPctL':holdDutyPctL, 'lastDutyPct':0 }
  43. self.keyD[ midi_pitch ] = types.SimpleNamespace(**d)
  44. with open(finalPulseListCacheFn,'wb') as f:
  45. pickle.dump(self.keyD,f)
  46. print("Loading analysis ...")
  47. cacheFn = analysisArgsD['rmsAnalysisCacheFn']
  48. self.rmsAnlD = rms_analysis_main_all( inDir, cacheFn, **analysisArgsD['rmsAnalysisArgs'] )
  49. print("Load DONE.")
  50. def _get_duty_cycle_from_pulse_usec( self, pitch, pulseUsec ):
  51. if pitch not in self.keyD:
  52. print("Missing duty cycle.")
  53. return None
  54. dutyPct = self.keyD[pitch].holdDutyPctL[0][1]
  55. for refUsec,refDuty in self.keyD[pitch].holdDutyPctL:
  56. if pulseUsec < refUsec:
  57. break
  58. dutyPct = refDuty
  59. return dutyPct
  60. def _get_pulse_and_duty_cycle_from_pulse_idx( self, pitch, pulse_idx ):
  61. pulseUsec = self.keyD[ pitch ].pulseUsL[ pulse_idx ]
  62. dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec )
  63. return pulseUsec, dutyPct
  64. def _get_pulse_and_duty_cycle_target_db( self, pitch, targetDb ):
  65. r = self.rmsAnlD[pitch]
  66. pulse_idx = np.argmin( np.abs(np.array(r.pkDbL) - targetDb) )
  67. print("PULSE idx:",pulse_idx," db:", r.pkDbL[pulse_idx] )
  68. pulseUsec = r.pkUsL[pulse_idx]
  69. dutyPct = self._get_duty_cycle_from_pulse_usec( pitch, pulseUsec )
  70. return pulseUsec, dutyPct
  71. def _get_pulse_and_duty_cycle( self, pitch, pulse_idx, targetDb ):
  72. if pulse_idx is not None:
  73. return self._get_pulse_and_duty_cycle_from_pulse_idx(pitch,pulse_idx)
  74. else:
  75. return self._get_pulse_and_duty_cycle_target_db( pitch, targetDb )
  76. def start( self, ms, pitchL, pulse_idx, targetDb=None ):
  77. loadFl = True
  78. if self.pitchL is not None:
  79. loadFl = False
  80. for pitch in pitchL:
  81. if pitch not in self.pitchL:
  82. loadFl = True
  83. break
  84. if loadFl:
  85. self.load(self.cfg.outDir, pitchL, self.cfg.analysisArgs)
  86. self.pitchL = pitchL
  87. self.pitch_idx = 0
  88. self.pulse_idx = pulse_idx
  89. self.targetDb = targetDb
  90. self.state = "note_on"
  91. self.next_ms = ms
  92. self.eventTimeL = [[0,0] for _ in range(len(pitchL))] # initialize the event time
  93. self.audio.record_enable(True) # start recording audio
  94. self.tick(ms) # play the first note
  95. def repeat( self, ms, pulse_idx, targetDb=None ):
  96. self.start( ms, self.pitchL, pulse_idx, targetDb )
  97. def stop( self, ms ):
  98. self._send_note_off()
  99. self.audio.record_enable(False) # stop recording audio
  100. self.state = None # disable this sequencer
  101. def tick( self, ms ):
  102. #self.audio.tick(ms)
  103. # if next event time has arrived
  104. if self.state is not None and ms >= self.next_ms:
  105. # if waiting to turn note on
  106. if self.state == 'note_on':
  107. self._note_on(ms)
  108. # if waiting to turn a note off
  109. elif self.state == 'note_off':
  110. self._note_off(ms)
  111. self.pitch_idx += 1
  112. # if all notes have been played
  113. if self.pitch_idx >= len(self.pitchL):
  114. self.stop(ms)
  115. else:
  116. assert(0)
  117. def _note_on( self, ms ):
  118. self.eventTimeL[ self.pitch_idx ][0] = self.audio.buffer_sample_ms().value
  119. self.next_ms = ms + self.noteDurMs
  120. self.state = 'note_off'
  121. pitch = self.pitchL[ self.pitch_idx ]
  122. pulse_usec, dutyPct = self._get_pulse_and_duty_cycle( pitch, self.pulse_idx, self.targetDb )
  123. if pulse_usec is not None and dutyPct is not None:
  124. self._set_pwm_duty( pitch, dutyPct )
  125. self.api.note_on_us( pitch, pulse_usec )
  126. pulse_idx = 0 if self.pulse_idx is None else self.pulse_idx
  127. targetDb = 0 if self.targetDb is None else self.targetDb
  128. dutyPct = 0 if dutyPct is None else dutyPct
  129. print("note-on: %i %i %4.1f %8.1f %i" % (pitch, pulse_idx, targetDb, pulse_usec, dutyPct))
  130. def _set_pwm_duty( self, pitch, dutyPct ):
  131. if self.keyD[pitch].lastDutyPct != dutyPct:
  132. self.keyD[pitch].lastDutyPct = dutyPct
  133. self.api.set_pwm_duty( pitch, dutyPct )
  134. def _note_off( self, ms ):
  135. self.eventTimeL[ self.pitch_idx ][1] = self.audio.buffer_sample_ms().value
  136. self.next_ms = ms + self.pauseDurMs
  137. self.state = 'note_on'
  138. #begTimeMs = self.eventTimeL[ self.pulse_idx ][0]
  139. #endTimeMs = self.eventTimeL[ self.pulse_idx ][1]
  140. #self.rtAnalyzer.analyze_note( self.audio, self.pitchL[0], begTimeMs, endTimeMs, self.cfg.analysisArgs['rmsAnalysisArgs'] )
  141. self._send_note_off()
  142. def _send_note_off( self ):
  143. for pitch in self.pitchL:
  144. self.api.note_off( pitch )
  145. #print("note-off:",pitch,self.pulse_idx)