123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- #!/usr/bin/python3
- #
- # thanhle Bluetooth keyboard/Mouse emulator DBUS Service
- #
- from __future__ import absolute_import, print_function
- from optparse import OptionParser, make_option
- import os
- import sys
- import uuid
- import dbus
- import dbus.service
- import dbus.mainloop.glib
- import time
- import socket
- from gi.repository import GLib
- from dbus.mainloop.glib import DBusGMainLoop
- import logging
- from logging import debug, info, warning, error
- import btk_dbus
- logging.basicConfig(level=logging.DEBUG)
- class BTKbDevice():
- # change these constants
- MY_ADDRESS = "BLUETOOTH_ADDRESS_PLACEHOLDER"
- MY_DEV_NAME = "DEVICE_NAME_PLACEHOLDER"
- # define some constants
- P_CTRL = 17 # Service port - must match port configured in SDP record
- P_INTR = 19 # Service port - must match port configured in SDP record#Interrrupt port
- # dbus path of the bluez profile we will create
- # file path of the sdp record to load
- SDP_RECORD_PATH = sys.path[0] + "/sdp_record.xml"
- UUID = "00001124-0000-1000-8000-00805f9b34fb"
- def __init__(self):
- self.init_bt_device()
- self.init_bluez_profile()
- # configure the bluetooth hardware device
- def init_bt_device(self):
- print("2. Start and config bluetooth device")
- # set the device class to a keybord and set the name
- os.system("hciconfig hci0 up")
- os.system("hciconfig hci0 class 0x0025C0")
- os.system("hciconfig hci0 name " + BTKbDevice.MY_DEV_NAME)
- print(" bluetooth name is: ", BTKbDevice.MY_DEV_NAME)
- # make the device discoverable
- os.system("hciconfig hci0 piscan")
- # set up a bluez profile to advertise device capabilities from a loaded service record
- def init_bluez_profile(self):
- print("3. Configuring Bluez Profile")
- # setup profile options
- service_record = self.read_sdp_service_record()
- opts = {
- "AutoConnect": True,
- "ServiceRecord": service_record
- }
- # retrieve a proxy for the bluez profile interface
- bus = dbus.SystemBus()
- manager = dbus.Interface(bus.get_object(
- "org.bluez", "/org/bluez"), "org.bluez.ProfileManager1")
- manager.RegisterProfile("/org/bluez/hci0", BTKbDevice.UUID, opts)
- print("5. Profile registered ")
- # read and return an sdp record from a file
- def read_sdp_service_record(self):
- print("4. Reading service record")
- try:
- fh = open(BTKbDevice.SDP_RECORD_PATH, "r")
- except:
- sys.exit("Could not open the sdp record. Exiting...")
- return fh.read()
- # listen for incoming client connections
- def listen(self):
- print("\033[0;33m7. Waiting for connections\033[0m")
- self.scontrol = socket.socket(
- socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # BluetoothSocket(L2CAP)
- self.sinterrupt = socket.socket(
- socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # BluetoothSocket(L2CAP)
- self.scontrol.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sinterrupt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- # bind these sockets to a port - port zero to select next available
- self.scontrol.bind((socket.BDADDR_ANY, self.P_CTRL))
- self.sinterrupt.bind((socket.BDADDR_ANY, self.P_INTR))
- # Start listening on the server sockets
- self.scontrol.listen(5)
- self.sinterrupt.listen(5)
- self.ccontrol, cinfo = self.scontrol.accept()
- print (
- "\033[0;32mGot a connection on the control channel from %s \033[0m" % cinfo[0])
- self.cinterrupt, cinfo = self.sinterrupt.accept()
- print (
- "\033[0;32mGot a connection on the interrupt channel from %s \033[0m" % cinfo[0])
- # send a string to the bluetooth host machine
- def send_string(self, message):
- try:
- self.cinterrupt.send(bytes(message))
- except OSError as err:
- error(err)
- # main routine
- if __name__ == "__main__":
- # we an only run as root
- try:
- if not os.geteuid() == 0:
- sys.exit("Only root can run this script")
- DBusGMainLoop(set_as_default=True)
- print("1. Setting up BTK device")
- service = BTKbDevice()
- print("6. Register service to dbus")
- btk_dbus.BTKbService(service)
- service.listen()
- loop = GLib.MainLoop()
- loop.run()
- except KeyboardInterrupt:
- sys.exit()
|