123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- #!/usr/bin/python3
- #
- # YAPTB Bluetooth keyboard emulation service
- # keyboard copy client.
- # Reads local key events and forwards them to the btk_server DBUS service
- #
- # Adapted from www.linuxuser.co.uk/tutorials/emulate-a-bluetooth-keyboard-with-the-raspberry-pi
- #
- #
- import os # used to all external commands
- import sys # used to exit the script
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- import time
- import evdev # used to get input from the keyboard
- from evdev import *
- import pyudev
- import re
- import logging
- from logging import debug, info, warning, error
- import errno
- import os
- import sys
- import dbus
- import time
- import evdev
- from evdev import ecodes
- from select import select
- import logging
- from logging import debug, info, warning, error
- import pyudev
- import re
- import functools
- import errno
- import socket
- from configparser import ConfigParser
- logging.basicConfig(level=logging.DEBUG)
- class BluetoothDevice:
- by_index = {}
- by_addr = {}
- current = 0
- connecting_sockets = []
- mouse_delay = 20 / 1000
- mouse_speed = 1
- def __init__(self):
- self.bus = dbus.SystemBus()
- self.btkservice = self.bus.get_object(
- 'org.yaptb.btkbservice', '/org/yaptb/btkbservice')
- self.iface = self.bus.Interface(btkservice, 'org.yaptb.btkbservice')
- self.mouse_delay = 20 / 1000
- self.mouse_speed = 1
- def __str__(self):
- return "%s%d: %s %s" % ('*' if BluetoothDevice.current == self.index else ' ', self.index, self.addr, self.state)
- def send_input(self, ir):
- try:
- print("send")
- self.iface.send(0, bytes(ir))
- except OSError as err:
- error(err)
- @staticmethod
- def mouse_delay():
- return BluetoothDevice.mouse_delay
- @staticmethod
- def mouse_speed():
- return BluetoothDevice.mouse_speed
- @staticmethod
- def send_current(ir):
- BluetoothDevice.send_input(0, ir)
- @staticmethod
- def set_current():
- BluetoothDevice.send_input([0xA2, 2, 0, 0, 0, 0])
- class InputDevice():
- inputs = []
- @staticmethod
- def init():
- context = pyudev.Context()
- devs = context.list_devices(subsystem="input")
- InputDevice.monitor = pyudev.Monitor.from_netlink(context)
- InputDevice.monitor.filter_by(subsystem='input')
- InputDevice.monitor.start()
- for d in [*devs]:
- InputDevice.add_device(d)
- @staticmethod
- def add_device(dev):
- if dev.device_node == None or not re.match(".*/event\\d+", dev.device_node):
- return
- try:
- if "ID_INPUT_MOUSE" in dev.properties:
- print("detected mouse: " + dev.device_node)
- InputDevice.inputs.append(MouseInput(dev.device_node))
- except OSError:
- error("Failed to connect to %s", dev.device_node)
- @staticmethod
- def remove_device(dev):
- if dev.device_node == None or not re.match(".*/event\\d+", dev.device_node):
- return
- InputDevice.inputs = list(
- filter(lambda i: i.device_node != dev.device_node, InputDevice.inputs))
- info("Disconnected %s", dev)
- @staticmethod
- def set_leds_all(ledvalue):
- for dev in InputDevice.inputs:
- dev.set_leds(ledvalue)
- @staticmethod
- def grab(on):
- if on:
- debug("Grabbing all input devices")
- for dev in InputDevice.inputs:
- dev.device.grab()
- else:
- debug("Releasing all input devices")
- for dev in InputDevice.inputs:
- dev.device.ungrab()
- def __init__(self, device_node):
- self.device_node = device_node
- self.device = evdev.InputDevice(device_node)
- self.device.grab()
- info("Connected %s", self)
- def fileno(self):
- return self.device.fd
- def __str__(self):
- return "%s@%s (%s)" % (self.__class__.__name__, self.device_node, self.device.name)
- class MouseInput(InputDevice):
- def __init__(self, device_node):
- super().__init__(device_node)
- self.state = [0xA1, 2, 0, 0, 0, 0]
- self.x = 0
- self.y = 0
- self.z = 0
- self.change = False
- self.last = 0
- self.bus = dbus.SystemBus()
- self.btkservice = self.bus.get_object(
- 'org.yaptb.btkbservice', '/org/yaptb/btkbservice')
- self.iface = dbus.Interface(self.btkservice, 'org.yaptb.btkbservice')
- self.mouse_delay = 20 / 1000
- self.mouse_speed = 1
- def send_current(self, ir):
- try:
- print("send_mouse")
- self.iface.send_mouse(0, bytes(ir))
- except OSError as err:
- error(err)
- def change_state(self, event):
- if event.type == ecodes.EV_SYN:
- current = time.monotonic()
- diff = 20/1000
- if current - self.last < diff and not self.change:
- return
- self.last = current
- speed = 1
- self.state[3] = min(127, max(-127, int(self.x * speed))) & 255
- self.state[4] = min(127, max(-127, int(self.y * speed))) & 255
- self.state[5] = min(127, max(-127, self.z)) & 255
- self.x = 0
- self.y = 0
- self.z = 0
- self.change = False
- self.send_current(self.state)
- if event.type == ecodes.EV_KEY:
- debug("Key event %s %d", ecodes.BTN[event.code], event.value)
- self.change = True
- if event.code >= 272 and event.code <= 276 and event.value < 2:
- button_no = event.code - 272
- if event.value == 1:
- self.state[2] |= 1 << button_no
- else:
- self.state[2] &= ~(1 << button_no)
- if event.type == ecodes.EV_REL:
- if event.code == 0:
- self.x += event.value
- if event.code == 1:
- self.y += event.value
- if event.code == 8:
- self.z += event.value
- def get_info(self):
- print("hello")
- def set_leds(self, ledvalue):
- pass
- def event_loop():
- while True:
- for i in InputDevice.inputs:
- try:
- for event in i.device.read():
- i.change_state(event)
- except OSError as err:
- if err.errno == errno.ENODEV:
- InputDevice.remove_device(i)
- warning(err)
- if __name__ == "__main__":
- state = [0xA1, 2, 0, 0, 0, 0]
- x = 0
- y = 0
- z = 0
- change = False
- last = 0
- # context = pyudev.Context()
- # devs = context.list_devices(subsystem="input")
- # monitor = pyudev.Monitor.from_netlink(context)
- # monitor.filter_by(subsystem='input')
- # monitor.start()
- # for dev in [*devs]:
- # if dev.device_node == None or not re.match(".*/event\\d+", dev.device_node):
- # continue
- # try:
- # if "ID_INPUT_MOUSE" in dev.properties:
- # print("Mouse detected: " + dev.device_node)
- # except OSError:
- # error("Failed to connect to %s", dev.device_node)
- InputDevice.init()
- while True:
- desctiptors = [*InputDevice.inputs, InputDevice.monitor]
- r = select(desctiptors, [], [])
- for i in InputDevice.inputs:
- try:
- for event in i.device.read():
- print(event)
- i.change_state(event)
- except OSError as err:
- warning(err)
|