Files
xplane202/xplane.py
2026-01-14 19:18:40 +02:00

62 lines
1.7 KiB
Python

import array
import socket
import sys
from collections import namedtuple
from struct import unpack
from typing import Iterator
DataItem = namedtuple('DataItem', ('index', 'values'))
def parse_raw(b: bytes, offset: int = 4) -> str:
return b.decode('latin')[:offset] + ' ' + ' '.join(f'{c:02x}' for c in b[offset:])
def str_dataitem(di: DataItem) -> str:
values = ' '.join(f'{v:10.4f}' for v in di.values)
return f'[{di.index:3d}] {values}'
def parse_data_packet(b: bytes) -> Iterator[DataItem]:
data_header_offset = 5
bytes_per_item = 4 * (8+1)
if not b.startswith(b'DATA*'):
raw_str = parse_raw(b, offset=5)
raise ValueError(f'invalid DATA* packet: {raw_str}')
len_b = len(b)
if (len_b - data_header_offset) % bytes_per_item != 0:
raw_str = parse_raw(b, data_header_offset)
raise ValueError(f'improperly aligned DATA* packet: [len: {len_b}] {raw_str}')
i = 5
while i < len_b:
data = unpack('iffffffff', b[i:i+bytes_per_item])
yield DataItem(data[0], data[1:])
i += bytes_per_item
class XPlaneInput:
def __init__(self, addr: str, port: int):
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s.bind((addr, port))
def run(self):
try:
old_data = None
while True:
data = self.s.recv(4096)
assert data.startswith(b"DATA"), data
data = list(parse_data_packet(data))
if data != old_data:
print('---')
for di in data:
print(str_dataitem(di))
old_data = data
except KeyboardInterrupt:
print("XPlaneInput -> CTRL+C", file=sys.stderr)