From e21fd5774379c52dc1119ca766fce21e73a57ac8 Mon Sep 17 00:00:00 2001 From: Sam Date: Mon, 19 Jun 2023 21:29:16 +0200 Subject: [PATCH] sitter3 for python3 --- ethertools/sitter3.py | 422 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 422 insertions(+) create mode 100755 ethertools/sitter3.py diff --git a/ethertools/sitter3.py b/ethertools/sitter3.py new file mode 100755 index 0000000..273d032 --- /dev/null +++ b/ethertools/sitter3.py @@ -0,0 +1,422 @@ +#!/usr/bin/python +# j4cDAC "sitter" +# +# Copyright 2012 Jacob Potter +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# python 3.11 macos : brew install tcl-tk + +import socket +import time +import struct + +def pack_point(x, y, r, g, b, i = -1, u1 = 0, u2 = 0, flags = 0): + """Pack some color values into a struct dac_point. + + Values must be specified for x, y, r, g, and b. If a value is not + passed in for the other fields, i will default to max(r, g, b); the + rest default to zero. + """ + + if i < 0: + i = max(r, g, b) + + return struct.pack(" len(self.buf): + self.buf += self.conn.recv(4096).decode('utf-8') + + obuf = self.buf + self.buf = obuf[l:] + return obuf[:l] + + def readresp(self, cmd): + """Read a response from the DAC.""" + + data = self.read(22) + response = data[0] + cmdR = data[1] + status = Status(data[2:]) + +# status.dump() + + if cmdR != cmd: + raise ProtocolError("expected resp for %r, got %r" + % (cmd, cmdR)) + + if response != "a": + raise ProtocolError("expected ACK, got %r" + % (response, )) + + self.last_status = status + return status + + def __init__(self, macstr, bp): + self.macstr = macstr + self.firmware_string = "-" + self.got_broadcast(bp) + + try: + t1 = time.time() + self.connect(self.last_broadcast.ip[0]) + t = time.time() - t1 + self.conn_status = "ok (%d ms)" % (t * 500) + + if self.last_broadcast.sw_rev < 2: + self.firmware_string = "(old)" + else: + self.conn.sendall('v') + self.firmware_string = self.read(32).replace("\x00", " ").strip() + except (Exception, e): + print(e) + self.conn_status = str(e) + def connect(self, host, port = 7765): + """Connect to the DAC over TCP.""" + + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.settimeout(0.2) + conn.connect((host, port)) + self.conn = conn + self.buf = "" + + # Read the "hello" message + first_status = self.readresp("?") + first_status.dump() + + + + def begin(self, lwm, rate): + cmd = struct.pack(">", self.update_selection) + + def update_selection(self, lb=None): + if self.dac_display: + try: + dac_obj = self.dac_list[self.index(ACTIVE)] + except: + return + self.dac_display.display_dac(dac_obj) + + def got_packet(self, bp): + macstr = bp.macstr() + if macstr not in self.dac_macstr_map: + new_dac = DAC(macstr, bp) + self.insert(END, macstr[6:]) + self.dac_list.append(new_dac) + self.dac_macstr_map[macstr] = new_dac + dac_obj = new_dac + else: + dac_obj = self.dac_macstr_map[macstr] + dac_obj.got_broadcast(bp) + + if len(self.dac_list) == 1: + self.selection_set(0) + self.update_selection() + + def check_on_dac(): + if time.time() - dac_obj.last_broadcast_time < 2: + return + + idx = self.dac_list.index(dac_obj) + self.dac_list.remove(dac_obj) + del self.dac_macstr_map[macstr] + self.delete(idx) + self.dac_display.display_none() + + + self.after(2000, check_on_dac) + + + +# Set up the basic window +root = Tk() +root.title("Ether Dream") +root.resizable(FALSE, FALSE) +frame = Frame(root) +frame.grid() + +disp = DacDisplay(root) +disp.grid(row=0, column=1, padx=5, pady=5) +tracker = DacTracker(root, height=22) +tracker.grid(row=0, column=0, padx=5, pady=5) +tracker.dac_display = disp + +# Set up queue checker +packet_queue = queue.Queue() +def queue_check(): + try: + while True: + data, addr = packet_queue.get_nowait() + tracker.got_packet(BroadcastPacket(data, addr)) + except queue.Empty: + root.after(100, queue_check) + +root.after(100, queue_check) + +# Set up listening socket and thread +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(("0.0.0.0", 7654)) +def socket_thread(): + while True: + packet_queue.put(s.recvfrom(1024)) +_thread.start_new(socket_thread, ()) + +root.mainloop()