Code reformat and quality improvement.

This commit is contained in:
Manuel Amador (Rudd-O) 2022-07-12 05:47:00 +00:00
parent f6c623e5db
commit 167a82bac8

View File

@ -3,16 +3,19 @@
import base64 import base64
import pickle import pickle
import contextlib import contextlib
import ctypes
import ctypes.util
import errno import errno
import fcntl import fcntl
import os import os
import pipes
try: try:
import queue from shlex import quote
except ImportError: except ImportError:
import Queue as queue from pipes import quote # noqa
try:
from queue import Queue
except ImportError:
from Queue import Queue # noqa
import select import select
import signal import signal
import struct import struct
@ -24,7 +27,7 @@ import time
import traceback import traceback
MAX_MUX_READ = 128*1024 # 64*1024*1024 MAX_MUX_READ = 128 * 1024 # 64*1024*1024
PACKLEN = 8 PACKLEN = 8
PACKFORMAT = "!HbIx" PACKFORMAT = "!HbIx"
@ -43,7 +46,7 @@ def mutexfile(filepath):
def unset_cloexec(fd): def unset_cloexec(fd):
old = fcntl.fcntl(fd, fcntl.F_GETFD) old = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old & ~ fcntl.FD_CLOEXEC) fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
def openfdforappend(fd): def openfdforappend(fd):
@ -67,18 +70,24 @@ def openfdforread(fd):
debug_lock = threading.Lock() debug_lock = threading.Lock()
debug_enabled = False debug_enabled = False
_startt = time.time() _startt = time.time()
class LoggingEmu():
class LoggingEmu:
def __init__(self, prefix): def __init__(self, prefix):
self.prefix = prefix self.prefix = prefix
syslog.openlog("bombshell-client.%s" % self.prefix) syslog.openlog("bombshell-client.%s" % self.prefix)
def debug(self, *a, **kw): def debug(self, *a, **kw):
if not debug_enabled: if not debug_enabled:
return return
self._print(syslog.LOG_DEBUG, *a, **kw) self._print(syslog.LOG_DEBUG, *a, **kw)
def info(self, *a, **kw): def info(self, *a, **kw):
self._print(syslog.LOG_INFO, *a, **kw) self._print(syslog.LOG_INFO, *a, **kw)
def error(self, *a, **kw): def error(self, *a, **kw):
self._print(syslog.LOG_ERR, *a, **kw) self._print(syslog.LOG_ERR, *a, **kw)
def _print(self, prio, *a, **kw): def _print(self, prio, *a, **kw):
debug_lock.acquire() debug_lock.acquire()
global _startt global _startt
@ -88,20 +97,31 @@ class LoggingEmu():
string = a[0] string = a[0]
else: else:
string = a[0] % a[1:] string = a[0] % a[1:]
syslog.syslog(prio, ("%.3f " % deltat) + threading.current_thread().name + ": " + string) n = threading.current_thread().name
syslog.syslog(
prio,
("%.3f " % deltat) + n + ": " + string,
)
finally: finally:
debug_lock.release() debug_lock.release()
logging = None logging = None
def send_confirmation(chan, retval, errmsg): def send_confirmation(chan, retval, errmsg):
chan.write(struct.pack("!H", retval)) chan.write(struct.pack("!H", retval))
l = len(errmsg) ln = len(errmsg)
assert l < 1<<32 assert ln < 1 << 32
chan.write(struct.pack("!I", l)) chan.write(struct.pack("!I", ln))
chan.write(errmsg) chan.write(errmsg)
chan.flush() chan.flush()
logging.debug("Sent confirmation on channel %s: %s %s", chan, retval, errmsg) logging.debug(
"Sent confirmation on channel %s: %s %s",
chan,
retval,
errmsg,
)
def recv_confirmation(chan): def recv_confirmation(chan):
@ -114,34 +134,33 @@ def recv_confirmation(chan):
return r, errmsg return r, errmsg
assert len(r) == 2, r assert len(r) == 2, r
r = struct.unpack("!H", r)[0] r = struct.unpack("!H", r)[0]
l = chan.read(4) lc = chan.read(4)
assert len(l) == 4, l assert len(lc) == 4, lc
l = struct.unpack("!I", l)[0] lu = struct.unpack("!I", lc)[0]
errmsg = chan.read(l) errmsg = chan.read(lu)
logging.debug("Received confirmation: %s %s", r, errmsg) logging.debug("Received confirmation: %s %s", r, errmsg)
return r, errmsg return r, errmsg
class MyThread(threading.Thread): class MyThread(threading.Thread):
def run(self): def run(self):
try: try:
self._run() self._run()
except Exception as e: except Exception:
logging.error("%s: unexpected exception", threading.current_thread().name) n = threading.current_thread().name
logging.error("%s: unexpected exception", n)
tb = traceback.format_exc() tb = traceback.format_exc()
logging.error("%s: traceback: %s", threading.current_thread().name, tb) logging.error("%s: traceback: %s", n, tb)
logging.error("%s: exiting program", threading.current_thread().name) logging.error("%s: exiting program", n)
os._exit(124) os._exit(124)
class SignalSender(MyThread): class SignalSender(MyThread):
def __init__(self, signals, sigqueue): def __init__(self, signals, sigqueue):
"""Handles signals by pushing them into a file-like object.""" """Handles signals by pushing them into a file-like object."""
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.queue = queue.Queue() self.queue = Queue()
self.sigqueue = sigqueue self.sigqueue = sigqueue
for sig in signals: for sig in signals:
signal.signal(sig, self.copy) signal.signal(sig, self.copy)
@ -163,7 +182,6 @@ class SignalSender(MyThread):
class Signaler(MyThread): class Signaler(MyThread):
def __init__(self, process, sigqueue): def __init__(self, process, sigqueue):
"""Reads integers from a file-like object and relays that as kill().""" """Reads integers from a file-like object and relays that as kill()."""
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -179,17 +197,26 @@ class Signaler(MyThread):
break break
assert len(data) == 2 assert len(data) == 2
signum = struct.unpack("!H", data)[0] signum = struct.unpack("!H", data)[0]
logging.debug("Received relayed signal %s, sending to process %s", signum, self.process.pid) logging.debug(
"Received relayed signal %s, sending to process %s",
signum,
self.process.pid,
)
try: try:
self.process.send_signal(signum) self.process.send_signal(signum)
except BaseException as e: except BaseException as e:
logging.error("Failed to relay signal %s to process %s: %s", signum, self.process.pid, e) logging.error(
"Failed to relay signal %s to process %s: %s",
signum,
self.process.pid,
e,
)
logging.debug("End of signaler") logging.debug("End of signaler")
def write(dst, buffer, l): def write(dst, buffer, ln):
alreadywritten = 0 alreadywritten = 0
mv = memoryview(buffer)[:l] mv = memoryview(buffer)[:ln]
while len(mv): while len(mv):
dst.write(mv) dst.write(mv)
writtenthisloop = len(mv) writtenthisloop = len(mv)
@ -199,10 +226,10 @@ def write(dst, buffer, l):
alreadywritten = alreadywritten + writtenthisloop alreadywritten = alreadywritten + writtenthisloop
def copy(src, dst, buffer, l): def copy(src, dst, buffer, ln):
alreadyread = 0 alreadyread = 0
mv = memoryview(buffer)[:l] mv = memoryview(buffer)[:ln]
assert len(mv) == l, "Buffer object is too small: %s %s" % (len(mv), l) assert len(mv) == ln, "Buffer object is too small: %s %s" % (len(mv), ln)
while len(mv): while len(mv):
_, _, _ = select.select([src], (), ()) _, _, _ = select.select([src], (), ())
readthisloop = src.readinto(mv) readthisloop = src.readinto(mv)
@ -210,42 +237,48 @@ def copy(src, dst, buffer, l):
raise Exception("copy: Failed to read any bytes") raise Exception("copy: Failed to read any bytes")
mv = mv[readthisloop:] mv = mv[readthisloop:]
alreadyread = alreadyread + readthisloop alreadyread = alreadyread + readthisloop
return write(dst, buffer, l) return write(dst, buffer, ln)
class DataMultiplexer(MyThread): class DataMultiplexer(MyThread):
def __init__(self, sources, sink): def __init__(self, sources, sink):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.sources = dict((s,num) for num, s in enumerate(sources)) self.sources = dict((s, num) for num, s in enumerate(sources))
self.sink = sink self.sink = sink
def _run(self): def _run(self):
logging.debug("mux: Started with sources %s and sink %s", self.sources, self.sink) logging.debug(
"mux: Started with sources %s and sink %s", self.sources, self.sink
)
buffer = bytearray(MAX_MUX_READ) buffer = bytearray(MAX_MUX_READ)
while self.sources: while self.sources:
sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources)) sources, _, x = select.select(
(s for s in self.sources), (), (s for s in self.sources)
)
assert not x, x assert not x, x
for s in sources: for s in sources:
n = self.sources[s] n = self.sources[s]
logging.debug("mux: Source %s (%s) is active", n, s) logging.debug("mux: Source %s (%s) is active", n, s)
readthisloop = s.readinto(buffer) readthisloop = s.readinto(buffer)
if readthisloop == 0: if readthisloop == 0:
logging.debug("mux: Received no bytes from source %s, signaling peer to close corresponding source", n) logging.debug(
"mux: Received no bytes from source %s, signaling"
" peer to close corresponding source",
n,
)
del self.sources[s] del self.sources[s]
header = struct.pack(PACKFORMAT, n, False, 0) header = struct.pack(PACKFORMAT, n, False, 0)
self.sink.write(header) self.sink.write(header)
continue continue
l = readthisloop ln = readthisloop
header = struct.pack(PACKFORMAT, n, True, l) header = struct.pack(PACKFORMAT, n, True, ln)
self.sink.write(header) self.sink.write(header)
write(self.sink, buffer, l) write(self.sink, buffer, ln)
logging.debug("mux: End of data multiplexer") logging.debug("mux: End of data multiplexer")
class DataDemultiplexer(MyThread): class DataDemultiplexer(MyThread):
def __init__(self, source, sinks): def __init__(self, source, sinks):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
@ -253,7 +286,11 @@ class DataDemultiplexer(MyThread):
self.source = source self.source = source
def _run(self): def _run(self):
logging.debug("demux: Started with source %s and sinks %s", self.source, self.sinks) logging.debug(
"demux: Started with source %s and sinks %s",
self.source,
self.sinks,
)
buffer = bytearray(MAX_MUX_READ) buffer = bytearray(MAX_MUX_READ)
while self.sinks: while self.sinks:
r, _, x = select.select([self.source], (), [self.source]) r, _, x = select.select([self.source], (), [self.source])
@ -261,18 +298,24 @@ class DataDemultiplexer(MyThread):
for s in r: for s in r:
header = s.read(PACKLEN) header = s.read(PACKLEN)
if header == "": if header == "":
logging.debug("demux: Received no bytes from source, closing all sinks") logging.debug(
"demux: Received no bytes from source, closing sinks",
)
for sink in self.sinks.values(): for sink in self.sinks.values():
sink.close() sink.close()
self.sinks = [] self.sinks = []
break break
n, active, l = struct.unpack(PACKFORMAT, header) n, active, ln = struct.unpack(PACKFORMAT, header)
if not active: if not active:
logging.debug("demux: Source %s now inactive, closing corresponding sink %s", s, self.sinks[n]) logging.debug(
"demux: Source %s inactive, closing matching sink %s",
s,
self.sinks[n],
)
self.sinks[n].close() self.sinks[n].close()
del self.sinks[n] del self.sinks[n]
else: else:
copy(self.source, self.sinks[n], buffer, l) copy(self.source, self.sinks[n], buffer, ln)
logging.debug("demux: End of data demultiplexer") logging.debug("demux: End of data demultiplexer")
@ -293,13 +336,18 @@ def main_master():
assert remote_command assert remote_command
def anypython(exe): def anypython(exe):
return "` test -x %s && echo %s || echo python`" % (pipes.quote(exe), return "` test -x %s && echo %s || echo python`" % (
pipes.quote(exe)) quote(exe),
quote(exe),
)
remote_helper_text = b"exec " remote_helper_text = b"exec "
remote_helper_text += bytes(anypython(sys.executable), "utf-8") remote_helper_text += bytes(anypython(sys.executable), "utf-8")
remote_helper_text += bytes(" -u -c ", "utf-8") remote_helper_text += bytes(" -u -c ", "utf-8")
remote_helper_text += bytes(pipes.quote(open(__file__, "r").read()), "ascii") remote_helper_text += bytes(
quote(open(__file__, "r").read()),
"ascii",
)
remote_helper_text += b" -d " if debug_enabled else b" " remote_helper_text += b" -d " if debug_enabled else b" "
remote_helper_text += base64.b64encode(pickle.dumps(remote_command, 2)) remote_helper_text += base64.b64encode(pickle.dumps(remote_command, 2))
remote_helper_text += b"\n" remote_helper_text += b"\n"
@ -380,14 +428,14 @@ def main_remote():
cmd = pickle.loads(base64.b64decode(cmd)) cmd = pickle.loads(base64.b64decode(cmd))
logging.debug("Received command: %s", cmd) logging.debug("Received command: %s", cmd)
nicecmd = " ".join(pipes.quote(a) for a in cmd) nicecmd = " ".join(quote(a) for a in cmd)
try: try:
p = subprocess.Popen( p = subprocess.Popen(
cmd, cmd,
# ["strace", "-s4096", "-ff"] + cmd, # ["strace", "-s4096", "-ff"] + cmd,
stdin = subprocess.PIPE, stdin=subprocess.PIPE,
stdout = subprocess.PIPE, stdout=subprocess.PIPE,
stderr = subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True, close_fds=True,
bufsize=0, bufsize=0,
) )