I have Protocol Buffer data from a GPS device.
Unfortunately, I am unable to decrypt it.
I downloaded the data from the device via Python pywinusb.hid:
(Note: I have copied the function from the classensturcture of my project and removed the namespace.)
This is internal method used for HOST->DEVICE communication.
This method only sends raw request and returns raw response.
:param request: Raw request to send. Array of byte values expected.
:param timeout: Timeout in milliseconds.
:return: Device raw response(array of byte values).
"""
self.response = None
self.out_report.set_raw_data(request)
self.out_report.send()
t = 0
while self.response is None:
time.sleep(0.05) # 0.05 Seconds <-> 50 ms
t += 50 # 50 ms
if t >= timeout:
print("Warning: response timeout")
break
return self.response
# end-of-method send_wait
def get_ack_msg(packet_no):
"""
Create response message byte array to acknowledge incoming packet.
:param packet_no: Number of packet to ack.
:return: Ack packet message bytes array.
"""
p = Packet()
p.set_sequence(packet_no)
p.set_size(1)
p.set_is_last(False)
return p.get_raw_data()
# end-of-method get_ack_msg
class Commands():
GET = 0
PUT = 1
MERGE = 2
REMOVE = 3
# end-of-class Commands
class Packet():
""" Device USB packet.
Packet 64 bytes:
[0] - id
[1] - size(6bits) + is_last(2bits)
[2] - sequence
[3...63] - data
"""
PACKET_SIZE = 64
DATA_SIZE = 61
def __init__(self):
self.buffer = [0] * 64
self.set_id()
self.set_sequence()
self.set_is_last(True)
# end-of-method __init__
def set_id(self, p_id=0x01):
self.buffer[0] = p_id
# end-of-method set_id
def get_id(self):
return self.buffer[0]
# end-of-method get_id
def set_size(self, size: int):
self.buffer[1] &= 0x03
self.buffer[1] = (size << 2)
# end-of-method set_size
def get_size(self):
return ((self.buffer[1] & 0xFC) >> 2)
# end-of-method get_size
def set_is_last(self, is_last: bool):
self.buffer[1] &= 0xFC
self.buffer[1] |= 0 if is_last else 1
# end-of-method set_is_last
def get_is_last(self):
return (self.buffer[1] & 0x03) == 0
# end-of-method get_is_last
def set_sequence(self, sequence_id=0x00):
self.buffer[2] = sequence_id
# end-of-method set_sequence
def get_sequence(self):
return self.buffer[2]
# end-of-method get_sequence
def set_data(self, data: int):
if len(data) <= Packet.DATA_SIZE:
self.buffer[3:3+len(data)] = data
self.set_size(len(data)+2)
else:
raise RuntimeError("Packet data to big!")
# end-of-method set_data
defget_raw_data(self):
return self.buffer
# end-of-method get_packet
pass
# end-of-class Packet
</code>
<code>import time
from pathlib import Path
## connection part ##
# self.usb_device = list_of_devices[0]
# self.usb_device.open()
# self.usb_device.set_raw_data_handler(self.data_handler)
# self.out_report = self.usb_device.find_output_reports()[0]
# self.response = None # reset response
def copy_file_from_device(self, file_path: Path, dest_file_path: Path):
data = self.read_file(file_path)
with open(dest_file_path, "wb") as file:
print(f"Copy_file '{str(file_path):>30s}' to '{str(dest_file_path)}'")
file.write(bytearray(data))
# end-of-method copy_file_from_device
def read_file(self, file_path: Path):
"""
Read file from device.
:param file_path: File's path.
:return: Bytes list.
"""
req_msg = get_request_msg(str(file_path), Commands.GET)
resp = self.send(request=req_msg)
return resp
# end-of-method read_file
def get_request_msg(path: str, command: Commands) -> list[int]:
"""
Create FTP operation message.
:param path: Message path.
:param command: command enum value.
:return: Message bytes array.
"""
path = str(path).replace("\", "/")
if not path.startswith("/"):
path = "/" + path
p = Packet()
data = [len(path) + 4, 0x00, 0x08, command, 0x12, len(path)] + [ord(value) for value in path]
p.set_data(data)
return p.get_raw_data()
# end-of-method get_request_msg
def send(self, request, timeout=10000, skip_header=True):
"""
Send
:param request: UsbDevice instance for device to connect.
:param timeout: Timeout value.
:param skip_header: Backward compatibility. If false, raw device response is returned.
:return: Nothing.
"""
resp = []
data = self.__send_wait(request, timeout)
while data is not None:
if data[1] & 3 == 0:
length = data[1] >> 2
resp += data[3:length+1]
break
resp += data[3:]
pckt_no = data[2]
data = self.__send_wait(get_ack_msg(pckt_no), timeout)
if skip_header:
return resp[2:]
else:
return resp
# end-of-method send
def __send_wait(self, request, timeout: int=5000):
"""
This is internal method used for HOST->DEVICE communication.
This method only sends raw request and returns raw response.
:param request: Raw request to send. Array of byte values expected.
:param timeout: Timeout in milliseconds.
:return: Device raw response (array of byte values).
"""
self.response = None
self.out_report.set_raw_data(request)
self.out_report.send()
t = 0
while self.response is None:
time.sleep(0.05) # 0.05 Seconds <-> 50 ms
t += 50 # 50 ms
if t >= timeout:
print("Warning: response timeout")
break
return self.response
# end-of-method send_wait
def get_ack_msg(packet_no):
"""
Create response message byte array to acknowledge incoming packet.
:param packet_no: Number of packet to ack.
:return: Ack packet message bytes array.
"""
p = Packet()
p.set_sequence(packet_no)
p.set_size(1)
p.set_is_last(False)
return p.get_raw_data()
# end-of-method get_ack_msg
class Commands():
GET = 0
PUT = 1
MERGE = 2
REMOVE = 3
# end-of-class Commands
class Packet():
""" Device USB packet.
Packet 64 bytes:
[0] - id
[1] - size(6bits) + is_last(2bits)
[2] - sequence
[3...63] - data
"""
PACKET_SIZE = 64
DATA_SIZE = 61
def __init__(self):
self.buffer = [0] * 64
self.set_id()
self.set_sequence()
self.set_is_last(True)
# end-of-method __init__
def set_id(self, p_id=0x01):
self.buffer[0] = p_id
# end-of-method set_id
def get_id(self):
return self.buffer[0]
# end-of-method get_id
def set_size(self, size: int):
self.buffer[1] &= 0x03
self.buffer[1] = (size << 2)
# end-of-method set_size
def get_size(self):
return ((self.buffer[1] & 0xFC) >> 2)
# end-of-method get_size
def set_is_last(self, is_last: bool):
self.buffer[1] &= 0xFC
self.buffer[1] |= 0 if is_last else 1
# end-of-method set_is_last
def get_is_last(self):
return (self.buffer[1] & 0x03) == 0
# end-of-method get_is_last
def set_sequence(self, sequence_id=0x00):
self.buffer[2] = sequence_id
# end-of-method set_sequence
def get_sequence(self):
return self.buffer[2]
# end-of-method get_sequence
def set_data(self, data: int):
if len(data) <= Packet.DATA_SIZE:
self.buffer[3:3+len(data)] = data
self.set_size(len(data)+2)
else:
raise RuntimeError("Packet data to big!")
# end-of-method set_data
def get_raw_data(self):
return self.buffer
# end-of-method get_packet
pass
# end-of-class Packet
</code>
import time
from pathlib import Path
## connection part ##
# self.usb_device = list_of_devices[0]
# self.usb_device.open()
# self.usb_device.set_raw_data_handler(self.data_handler)
# self.out_report = self.usb_device.find_output_reports()[0]
# self.response = None # reset response
def copy_file_from_device(self, file_path: Path, dest_file_path: Path):
data = self.read_file(file_path)
with open(dest_file_path, "wb") as file:
print(f"Copy_file '{str(file_path):>30s}' to '{str(dest_file_path)}'")
file.write(bytearray(data))
# end-of-method copy_file_from_device
def read_file(self, file_path: Path):
"""
Read file from device.
:param file_path: File's path.
:return: Bytes list.
"""
req_msg = get_request_msg(str(file_path), Commands.GET)
resp = self.send(request=req_msg)
return resp
# end-of-method read_file
def get_request_msg(path: str, command: Commands) -> list[int]:
"""
Create FTP operation message.
:param path: Message path.
:param command: command enum value.
:return: Message bytes array.
"""
path = str(path).replace("\", "/")
if not path.startswith("/"):
path = "/" + path
p = Packet()
data = [len(path) + 4, 0x00, 0x08, command, 0x12, len(path)] + [ord(value) for value in path]
p.set_data(data)
return p.get_raw_data()
# end-of-method get_request_msg
def send(self, request, timeout=10000, skip_header=True):
"""
Send
:param request: UsbDevice instance for device to connect.
:param timeout: Timeout value.
:param skip_header: Backward compatibility. If false, raw device response is returned.
:return: Nothing.
"""
resp = []
data = self.__send_wait(request, timeout)
while data is not None:
if data[1] & 3 == 0:
length = data[1] >> 2
resp += data[3:length+1]
break
resp += data[3:]
pckt_no = data[2]
data = self.__send_wait(get_ack_msg(pckt_no), timeout)
if skip_header:
return resp[2:]
else:
return resp
# end-of-method send
def __send_wait(self, request, timeout: int=5000):
"""
This is internal method used for HOST->DEVICE communication.
This method only sends raw request and returns raw response.
:param request: Raw request to send. Array of byte values expected.
:param timeout: Timeout in milliseconds.
:return: Device raw response (array of byte values).
"""
self.response = None
self.out_report.set_raw_data(request)
self.out_report.send()
t = 0
while self.response is None:
time.sleep(0.05) # 0.05 Seconds <-> 50 ms
t += 50 # 50 ms
if t >= timeout:
print("Warning: response timeout")
break
return self.response
# end-of-method send_wait
def get_ack_msg(packet_no):
"""
Create response message byte array to acknowledge incoming packet.
:param packet_no: Number of packet to ack.
:return: Ack packet message bytes array.
"""
p = Packet()
p.set_sequence(packet_no)
p.set_size(1)
p.set_is_last(False)
return p.get_raw_data()
# end-of-method get_ack_msg
class Commands():
GET = 0
PUT = 1
MERGE = 2
REMOVE = 3
# end-of-class Commands
class Packet():
""" Device USB packet.
Packet 64 bytes:
[0] - id
[1] - size(6bits) + is_last(2bits)
[2] - sequence
[3...63] - data
"""
PACKET_SIZE = 64
DATA_SIZE = 61
def __init__(self):
self.buffer = [0] * 64
self.set_id()
self.set_sequence()
self.set_is_last(True)
# end-of-method __init__
def set_id(self, p_id=0x01):
self.buffer[0] = p_id
# end-of-method set_id
def get_id(self):
return self.buffer[0]
# end-of-method get_id
def set_size(self, size: int):
self.buffer[1] &= 0x03
self.buffer[1] = (size << 2)
# end-of-method set_size
def get_size(self):
return ((self.buffer[1] & 0xFC) >> 2)
# end-of-method get_size
def set_is_last(self, is_last: bool):
self.buffer[1] &= 0xFC
self.buffer[1] |= 0 if is_last else 1
# end-of-method set_is_last
def get_is_last(self):
return (self.buffer[1] & 0x03) == 0
# end-of-method get_is_last
def set_sequence(self, sequence_id=0x00):
self.buffer[2] = sequence_id
# end-of-method set_sequence
def get_sequence(self):
return self.buffer[2]
# end-of-method get_sequence
def set_data(self, data: int):
if len(data) <= Packet.DATA_SIZE:
self.buffer[3:3+len(data)] = data
self.set_size(len(data)+2)
else:
raise RuntimeError("Packet data to big!")
# end-of-method set_data
def get_raw_data(self):
return self.buffer
# end-of-method get_packet
pass
# end-of-class Packet
I have been able to find sources for the required *.proto files, but they seem to be outdated. Unfortunately I am not allowed to share the found protofile here due to the guideline of Stackoverflow.
Edited: Proto-File content deleted
For my project I use Python google.protobuf and have therefore generated a .py file via protoc (protocol buffer compiler).
Edited: Proto-py-File content deleted
The content of a GPS binary file “gpsData.GZB” of the device:
I actually thought that the problem was with the existing *.proto files. However, the execution of “protoc –decode_raw < ROUTE.GZB” leads to the following error:
Failed to parse input.
So I may have made a mistake when downloading the data.
I would be grateful for any solutions, approaches or inspiration.
python
protocol-buffers
hid
New contributor
CodeMonkey is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.