62 lines
1.7 KiB
Python
62 lines
1.7 KiB
Python
import array
|
|
import socket
|
|
import sys
|
|
from collections import namedtuple
|
|
from struct import unpack
|
|
from typing import Iterator
|
|
|
|
DataItem = namedtuple('DataItem', ('index', 'values'))
|
|
|
|
|
|
def parse_raw(b: bytes, offset: int = 4) -> str:
|
|
return b.decode('latin')[:offset] + ' ' + ' '.join(f'{c:02x}' for c in b[offset:])
|
|
|
|
|
|
def str_dataitem(di: DataItem) -> str:
|
|
values = ' '.join(f'{v:10.4f}' for v in di.values)
|
|
return f'[{di.index:3d}] {values}'
|
|
|
|
|
|
def parse_data_packet(b: bytes) -> Iterator[DataItem]:
|
|
data_header_offset = 5
|
|
bytes_per_item = 4 * (8+1)
|
|
|
|
if not b.startswith(b'DATA*'):
|
|
raw_str = parse_raw(b, offset=5)
|
|
raise ValueError(f'invalid DATA* packet: {raw_str}')
|
|
|
|
len_b = len(b)
|
|
if (len_b - data_header_offset) % bytes_per_item != 0:
|
|
raw_str = parse_raw(b, data_header_offset)
|
|
raise ValueError(f'improperly aligned DATA* packet: [len: {len_b}] {raw_str}')
|
|
|
|
i = 5
|
|
while i < len_b:
|
|
data = unpack('iffffffff', b[i:i+bytes_per_item])
|
|
yield DataItem(data[0], data[1:])
|
|
i += bytes_per_item
|
|
|
|
|
|
class XPlaneInput:
|
|
def __init__(self, addr: str, port: int):
|
|
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.s.bind((addr, port))
|
|
|
|
def run(self):
|
|
try:
|
|
old_data = None
|
|
while True:
|
|
data = self.s.recv(4096)
|
|
assert data.startswith(b"DATA"), data
|
|
|
|
data = list(parse_data_packet(data))
|
|
if data != old_data:
|
|
print('---')
|
|
for di in data:
|
|
print(str_dataitem(di))
|
|
|
|
old_data = data
|
|
except KeyboardInterrupt:
|
|
print("XPlaneInput -> CTRL+C", file=sys.stderr)
|
|
|