Use a button if a raspberry pi is connected
This commit is contained in:
parent
17a477ae7f
commit
c75fa8c322
|
@ -17,6 +17,7 @@ import tempfile
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
|
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
|
||||||
from utils.kernel import put_kernel_messages_into_queue
|
from utils.kernel import put_kernel_messages_into_queue
|
||||||
|
from utils.get_system_info import get_system_info
|
||||||
from stt import stt_wav
|
from stt import stt_wav
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
@ -33,6 +34,9 @@ RECORDING = False # Flag to control recording state
|
||||||
SPACEBAR_PRESSED = False # Flag to track spacebar press state
|
SPACEBAR_PRESSED = False # Flag to track spacebar press state
|
||||||
|
|
||||||
|
|
||||||
|
# Specify OS
|
||||||
|
current_platform = get_system_info()
|
||||||
|
|
||||||
# Initialize PyAudio
|
# Initialize PyAudio
|
||||||
p = pyaudio.PyAudio()
|
p = pyaudio.PyAudio()
|
||||||
|
|
||||||
|
@ -186,8 +190,8 @@ async def websocket_communication(WS_URL):
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.exception(f"An error occurred during websocket communication. {e}")
|
logger.exception(f"An error occurred during websocket communication. {e}")
|
||||||
logging.info(f"Connecting to `{WS_URL}`...")
|
logger.info(f"Connecting to `{WS_URL}`...")
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
@ -205,9 +209,30 @@ if __name__ == "__main__":
|
||||||
if os.getenv('CODE_RUNNER') == "device":
|
if os.getenv('CODE_RUNNER') == "device":
|
||||||
asyncio.create_task(put_kernel_messages_into_queue(send_queue))
|
asyncio.create_task(put_kernel_messages_into_queue(send_queue))
|
||||||
|
|
||||||
# Keyboard listener for spacebar press/release
|
|
||||||
listener = keyboard.Listener(on_press=on_press, on_release=on_release)
|
#If Raspberry Pi, add the button listener, otherwise use the spacebar
|
||||||
listener.start()
|
if current_platform.startswith("raspberry-pi"):
|
||||||
|
logger.info("Raspberry Pi detected, using button on GPIO pin 15")
|
||||||
|
# Use GPIO pin 15
|
||||||
|
pindef = ["gpiochip4", "15"] # gpiofind PIN15
|
||||||
|
print("PINDEF", pindef)
|
||||||
|
|
||||||
|
# HACK: needs passwordless sudo
|
||||||
|
process = await asyncio.create_subprocess_exec("sudo", "gpiomon", "-brf", *pindef, stdout=asyncio.subprocess.PIPE)
|
||||||
|
while True:
|
||||||
|
line = await process.stdout.readline()
|
||||||
|
if line:
|
||||||
|
line = line.decode().strip()
|
||||||
|
if "FALLING" in line:
|
||||||
|
toggle_recording(False)
|
||||||
|
elif "RISING" in line:
|
||||||
|
toggle_recording(True)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# Keyboard listener for spacebar press/release
|
||||||
|
listener = keyboard.Listener(on_press=on_press, on_release=on_release)
|
||||||
|
listener.start()
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
p.terminate()
|
p.terminate()
|
Loading…
Reference in New Issue