import array import socket import sys from collections import namedtuple from struct import unpack from typing import Iterator DataItem = namedtuple('DataItem', ('index', 'values')) def parse_raw(b: bytes, offset: int = 4) -> str: return b.decode('latin')[:offset] + ' ' + ' '.join(f'{c:02x}' for c in b[offset:]) def str_dataitem(di: DataItem) -> str: values = ' '.join(f'{v:10.4f}' for v in di.values) return f'[{di.index:3d}] {values}' def parse_data_packet(b: bytes) -> Iterator[DataItem]: data_header_offset = 5 bytes_per_item = 4 * (8+1) if not b.startswith(b'DATA*'): raw_str = parse_raw(b, offset=5) raise ValueError(f'invalid DATA* packet: {raw_str}') len_b = len(b) if (len_b - data_header_offset) % bytes_per_item != 0: raw_str = parse_raw(b, data_header_offset) raise ValueError(f'improperly aligned DATA* packet: [len: {len_b}] {raw_str}') i = 5 while i < len_b: data = unpack('iffffffff', b[i:i+bytes_per_item]) yield DataItem(data[0], data[1:]) i += bytes_per_item class XPlaneInput: def __init__(self, addr: str, port: int): self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.s.bind((addr, port)) def run(self): try: old_data = None while True: data = self.s.recv(4096) assert data.startswith(b"DATA"), data data = list(parse_data_packet(data)) if data != old_data: print('---') for di in data: print(str_dataitem(di)) old_data = data except KeyboardInterrupt: print("XPlaneInput -> CTRL+C", file=sys.stderr)