I have a BLE enabled weight machine beurer BF950. I am unable to read weight data from it. From the company, an excel sheet is provided with mentioned UUIds. In the following code, it doesn’t enter weight_measurement_callback
it subscribes to the service but keeps waiting for the data. But from the company app weight is fetched there
import asyncio
from bleak import BleakClient
from struct import unpack
BEURER_BF950_ADDRESS = "5C:CA:D3:F8:1C:C6"
WEIGHT_SCALE_SERVICE_UUID = "0000181d-0000-1000-8000-00805f9b34fb"
WEIGHT_MEASUREMENT_CHARACTERISTIC_UUID = "00002a9d-0000-1000-8000-00805f9b34fb"
async def connect_and_read_weight(address):
try:
async with BleakClient(address) as client:
print(f"Connected: {client.is_connected}")
await client.start_notify(WEIGHT_MEASUREMENT_CHARACTERISTIC_UUID, weight_measurement_callback)
print("Please step on the scale...")
await asyncio.sleep(30)
await client.stop_notify(WEIGHT_MEASUREMENT_CHARACTERISTIC_UUID)
except Exception as e:
print(f"Error: {e}")
def weight_measurement_callback(sender, data):
# print("here")
flags = data[0]
weight = unpack("<H", data[1:3])[0] * 0.005
print(f"Weight: {weight:.2f} kg")
async def main():
print("Attempting to connect and read weight...")
await connect_and_readweight_measurement_callback_weight(BEURER_BF950_ADDRESS)
print("Operation completed.")
asyncio.run(main())
I am able to read static measurements about the device like battery from the following code
import asyncio
from bleak import BleakClient
BEURER_BF950_ADDRESS = "5C:CA:D3:F8:1C:C6"
# List of characteristics to read
CHARACTERISTICS_TO_READ = [
("00002a9e-0000-1000-8000-00805f9b34fb", "Weight Scale Feature"),
("00002a19-0000-1000-8000-00805f9b34fb", "Battery Level"),
("00002a29-0000-1000-8000-00805f9b34fb", "Manufacturer Name"),
("00002a24-0000-1000-8000-00805f9b34fb", "Model Number"),
("00002a26-0000-1000-8000-00805f9b34fb", "Firmware Revision"),
("00002a27-0000-1000-8000-00805f9b34fb", "Hardware Revision"),
("00002a28-0000-1000-8000-00805f9b34fb", "Software Revision"),
("00002a23-0000-1000-8000-00805f9b34fb", "System ID"),
("0000000b-0000-1000-8000-00805f9b34fb", "Custom Characteristic (Reference Weight/BF)"),
("00000000-0000-1000-8000-00805f9b34fb", "Custom Characteristic (Scale Setting)"),
]
async def read_characteristics(address):
try:
async with BleakClient(address) as client:
print(f"Connected: {client.is_connected}")
for uuid, name in CHARACTERISTICS_TO_READ:
try:
value = await client.read_gatt_char(uuid)
print(f"{name} ({uuid}):")
print(f" Raw: {value.hex()}")
if name == "Battery Level":
print(f" Parsed: {value[0]}%")
elif name == "Manufacturer Name" or name == "Model Number":
print(f" Parsed: {value.decode('utf-8')}")
else:
print(f" Parsed: {value}")
except Exception as e:
print(f"Error reading {name} ({uuid}): {e}")
except Exception as e:
print(f"Connection error: {e}")
async def main():
print("Attempting to connect and read characteristics...")
await read_characteristics(BEURER_BF950_ADDRESS)
print("Operation completed.")
asyncio.run(main())
Tried using this code as well using asyncio.Event()
but it also doesn’t work and can’t fetch weight data
import asyncio
from bleak import BleakClient
from struct import unpack
BEURER_BF950_ADDRESS = "5C:CA:D3:F8:1C:C6"
CUSTOM_SERVICE_UUID = "0000ffff-0000-1000-8000-00805f9b34fb"
TAKE_MEASUREMENT_CHARACTERISTIC_UUID = "00000006-0000-1000-8000-00805f9b34fb"
MEASUREMENT_NOTIFY_CHARACTERISTIC_UUID = "00000007-0000-1000-8000-00805f9b34fb"
weight_received = asyncio.Event()
async def connect_and_read_weight(address):
try:
async with BleakClient(address) as client:
print(f"Connected: {client.is_connected}")
print("Enabling notifications for custom measurement characteristic...")
await client.start_notify(MEASUREMENT_NOTIFY_CHARACTERISTIC_UUID, measurement_callback)
print("Triggering measurement...")
await client.write_gatt_char(TAKE_MEASUREMENT_CHARACTERISTIC_UUID, b'x00')
print("Please step on the scale...")
await asyncio.wait_for(weight_received.wait(), timeout=60.0)
print("Disabling notifications...")
await client.stop_notify(MEASUREMENT_NOTIFY_CHARACTERISTIC_UUID)
except asyncio.TimeoutError:
print("Timeout: No measurement received within 60 seconds.")
except Exception as e:
print(f"Error: {e}")
def measurement_callback(sender, data):
print(f"Received data: {data.hex()}")
try:
# The exact data format may vary, this is a guess based on common formats
if len(data) >= 2:
weight = unpack("<H", data[:2])[0] * 0.1 # Assuming weight is first 2 bytes, in 0.1 kg units
print(f"Weight: {weight:.1f} kg")
weight_received.set()
else:
print("Received data is too short to contain a weight measurement")
except Exception as e:
print(f"Error parsing data: {e}")
async def main():
print("Attempting to connect and read weight...")
await connect_and_read_weight(BEURER_BF950_ADDRESS)
print("Operation completed.")
asyncio.run(main())
uzair khan is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
3