#!/usr/bin/python3 # # thanhle Bluetooth keyboard/Mouse emulator DBUS Service # from __future__ import absolute_import, print_function from optparse import OptionParser, make_option import os import sys import uuid import dbus import dbus.service import dbus.mainloop.glib import time import socket from gi.repository import GLib from dbus.mainloop.glib import DBusGMainLoop import logging from logging import debug, info, warning, error import btk_dbus logging.basicConfig(level=logging.DEBUG) class BTKbDevice(): # change these constants MY_ADDRESS = "BLUETOOTH_ADDRESS_PLACEHOLDER" MY_DEV_NAME = "DEVICE_NAME_PLACEHOLDER" # define some constants P_CTRL = 17 # Service port - must match port configured in SDP record P_INTR = 19 # Service port - must match port configured in SDP record#Interrrupt port # dbus path of the bluez profile we will create # file path of the sdp record to load SDP_RECORD_PATH = sys.path[0] + "/sdp_record.xml" UUID = "00001124-0000-1000-8000-00805f9b34fb" def __init__(self): self.init_bt_device() self.init_bluez_profile() # configure the bluetooth hardware device def init_bt_device(self): print("2. Start and config bluetooth device") # set the device class to a keybord and set the name os.system("hciconfig hci0 up") os.system("hciconfig hci0 class 0x0025C0") os.system("hciconfig hci0 name " + BTKbDevice.MY_DEV_NAME) print(" bluetooth name is: ", BTKbDevice.MY_DEV_NAME) # make the device discoverable os.system("hciconfig hci0 piscan") # set up a bluez profile to advertise device capabilities from a loaded service record def init_bluez_profile(self): print("3. Configuring Bluez Profile") # setup profile options service_record = self.read_sdp_service_record() opts = { "AutoConnect": True, "ServiceRecord": service_record } # retrieve a proxy for the bluez profile interface bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object( "org.bluez", "/org/bluez"), "org.bluez.ProfileManager1") manager.RegisterProfile("/org/bluez/hci0", BTKbDevice.UUID, opts) print("5. Profile registered ") # read and return an sdp record from a file def read_sdp_service_record(self): print("4. Reading service record") try: fh = open(BTKbDevice.SDP_RECORD_PATH, "r") except: sys.exit("Could not open the sdp record. Exiting...") return fh.read() # listen for incoming client connections def listen(self): print("\033[0;33m7. Waiting for connections\033[0m") self.scontrol = socket.socket( socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # BluetoothSocket(L2CAP) self.sinterrupt = socket.socket( socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # BluetoothSocket(L2CAP) self.scontrol.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sinterrupt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # bind these sockets to a port - port zero to select next available self.scontrol.bind((socket.BDADDR_ANY, self.P_CTRL)) self.sinterrupt.bind((socket.BDADDR_ANY, self.P_INTR)) # Start listening on the server sockets self.scontrol.listen(5) self.sinterrupt.listen(5) self.ccontrol, cinfo = self.scontrol.accept() print ( "\033[0;32mGot a connection on the control channel from %s \033[0m" % cinfo[0]) self.cinterrupt, cinfo = self.sinterrupt.accept() print ( "\033[0;32mGot a connection on the interrupt channel from %s \033[0m" % cinfo[0]) # send a string to the bluetooth host machine def send_string(self, message): try: self.cinterrupt.send(bytes(message)) except OSError as err: error(err) # main routine if __name__ == "__main__": # we an only run as root try: if not os.geteuid() == 0: sys.exit("Only root can run this script") DBusGMainLoop(set_as_default=True) print("1. Setting up BTK device") service = BTKbDevice() print("6. Register service to dbus") btk_dbus.BTKbService(service) service.listen() loop = GLib.MainLoop() loop.run() except KeyboardInterrupt: sys.exit()