From 167a82bac848c08c244095e7b6f6ee4811a573c3 Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Tue, 12 Jul 2022 05:47:00 +0000 Subject: [PATCH] Code reformat and quality improvement. --- bin/bombshell-client | 602 +++++++++++++++++++++++-------------------- 1 file changed, 325 insertions(+), 277 deletions(-) diff --git a/bin/bombshell-client b/bin/bombshell-client index c830047..7a8ff12 100755 --- a/bin/bombshell-client +++ b/bin/bombshell-client @@ -3,16 +3,19 @@ import base64 import pickle import contextlib -import ctypes -import ctypes.util import errno import fcntl import os -import pipes + try: - import queue + from shlex import quote except ImportError: - import Queue as queue + from pipes import quote # noqa + +try: + from queue import Queue +except ImportError: + from Queue import Queue # noqa import select import signal import struct @@ -24,7 +27,7 @@ import time import traceback -MAX_MUX_READ = 128*1024 # 64*1024*1024 +MAX_MUX_READ = 128 * 1024 # 64*1024*1024 PACKLEN = 8 PACKFORMAT = "!HbIx" @@ -43,42 +46,48 @@ def mutexfile(filepath): def unset_cloexec(fd): old = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, old & ~ fcntl.FD_CLOEXEC) + fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) def openfdforappend(fd): - f = None - try: - f = os.fdopen(fd, "ab", 0) - except IOError as e: - if e.errno != errno.ESPIPE: - raise - f = os.fdopen(fd, "wb", 0) - unset_cloexec(f.fileno()) - return f + f = None + try: + f = os.fdopen(fd, "ab", 0) + except IOError as e: + if e.errno != errno.ESPIPE: + raise + f = os.fdopen(fd, "wb", 0) + unset_cloexec(f.fileno()) + return f def openfdforread(fd): - f = os.fdopen(fd, "rb", 0) - unset_cloexec(f.fileno()) - return f + f = os.fdopen(fd, "rb", 0) + unset_cloexec(f.fileno()) + return f debug_lock = threading.Lock() debug_enabled = False _startt = time.time() -class LoggingEmu(): + + +class LoggingEmu: def __init__(self, prefix): self.prefix = prefix syslog.openlog("bombshell-client.%s" % self.prefix) + def debug(self, *a, **kw): if not debug_enabled: return self._print(syslog.LOG_DEBUG, *a, **kw) + def info(self, *a, **kw): self._print(syslog.LOG_INFO, *a, **kw) + def error(self, *a, **kw): self._print(syslog.LOG_ERR, *a, **kw) + def _print(self, prio, *a, **kw): debug_lock.acquire() global _startt @@ -88,108 +97,126 @@ class LoggingEmu(): string = a[0] else: string = a[0] % a[1:] - syslog.syslog(prio, ("%.3f " % deltat) + threading.current_thread().name + ": " + string) + n = threading.current_thread().name + syslog.syslog( + prio, + ("%.3f " % deltat) + n + ": " + string, + ) finally: debug_lock.release() + + logging = None def send_confirmation(chan, retval, errmsg): - chan.write(struct.pack("!H", retval)) - l = len(errmsg) - assert l < 1<<32 - chan.write(struct.pack("!I", l)) - chan.write(errmsg) - chan.flush() - logging.debug("Sent confirmation on channel %s: %s %s", chan, retval, errmsg) + chan.write(struct.pack("!H", retval)) + ln = len(errmsg) + assert ln < 1 << 32 + chan.write(struct.pack("!I", ln)) + chan.write(errmsg) + chan.flush() + logging.debug( + "Sent confirmation on channel %s: %s %s", + chan, + retval, + errmsg, + ) def recv_confirmation(chan): - logging.debug("Waiting for confirmation on channel %s", chan) - r = chan.read(2) - if len(r) == 0: - # This happens when the remote domain does not exist. - r, errmsg = 125, "domain does not exist" - logging.debug("No confirmation: %s %s", r, errmsg) - return r, errmsg - assert len(r) == 2, r - r = struct.unpack("!H", r)[0] - l = chan.read(4) - assert len(l) == 4, l - l = struct.unpack("!I", l)[0] - errmsg = chan.read(l) - logging.debug("Received confirmation: %s %s", r, errmsg) - return r, errmsg + logging.debug("Waiting for confirmation on channel %s", chan) + r = chan.read(2) + if len(r) == 0: + # This happens when the remote domain does not exist. + r, errmsg = 125, "domain does not exist" + logging.debug("No confirmation: %s %s", r, errmsg) + return r, errmsg + assert len(r) == 2, r + r = struct.unpack("!H", r)[0] + lc = chan.read(4) + assert len(lc) == 4, lc + lu = struct.unpack("!I", lc)[0] + errmsg = chan.read(lu) + logging.debug("Received confirmation: %s %s", r, errmsg) + return r, errmsg class MyThread(threading.Thread): - - def run(self): - try: - self._run() - except Exception as e: - logging.error("%s: unexpected exception", threading.current_thread().name) - tb = traceback.format_exc() - logging.error("%s: traceback: %s", threading.current_thread().name, tb) - logging.error("%s: exiting program", threading.current_thread().name) - os._exit(124) + def run(self): + try: + self._run() + except Exception: + n = threading.current_thread().name + logging.error("%s: unexpected exception", n) + tb = traceback.format_exc() + logging.error("%s: traceback: %s", n, tb) + logging.error("%s: exiting program", n) + os._exit(124) class SignalSender(MyThread): + def __init__(self, signals, sigqueue): + """Handles signals by pushing them into a file-like object.""" + threading.Thread.__init__(self) + self.daemon = True + self.queue = Queue() + self.sigqueue = sigqueue + for sig in signals: + signal.signal(sig, self.copy) - def __init__(self, signals, sigqueue): - """Handles signals by pushing them into a file-like object.""" - threading.Thread.__init__(self) - self.daemon = True - self.queue = queue.Queue() - self.sigqueue = sigqueue - for sig in signals: - signal.signal(sig, self.copy) + def copy(self, signum, frame): + self.queue.put(signum) + logging.debug("Signal %s pushed to queue", signum) - def copy(self, signum, frame): - self.queue.put(signum) - logging.debug("Signal %s pushed to queue", signum) - - def _run(self): - while True: - signum = self.queue.get() - logging.debug("Dequeued signal %s", signum) - if signum is None: - break - assert signum > 0 - self.sigqueue.write(struct.pack("!H", signum)) - self.sigqueue.flush() - logging.debug("Wrote signal %s to remote end", signum) + def _run(self): + while True: + signum = self.queue.get() + logging.debug("Dequeued signal %s", signum) + if signum is None: + break + assert signum > 0 + self.sigqueue.write(struct.pack("!H", signum)) + self.sigqueue.flush() + logging.debug("Wrote signal %s to remote end", signum) class Signaler(MyThread): + def __init__(self, process, sigqueue): + """Reads integers from a file-like object and relays that as kill().""" + threading.Thread.__init__(self) + self.daemon = True + self.process = process + self.sigqueue = sigqueue - def __init__(self, process, sigqueue): - """Reads integers from a file-like object and relays that as kill().""" - threading.Thread.__init__(self) - self.daemon = True - self.process = process - self.sigqueue = sigqueue - - def _run(self): - while True: - data = self.sigqueue.read(2) - if len(data) == 0: - logging.debug("Received no signal data") - break - assert len(data) == 2 - signum = struct.unpack("!H", data)[0] - logging.debug("Received relayed signal %s, sending to process %s", signum, self.process.pid) - try: - self.process.send_signal(signum) - except BaseException as e: - logging.error("Failed to relay signal %s to process %s: %s", signum, self.process.pid, e) - logging.debug("End of signaler") + def _run(self): + while True: + data = self.sigqueue.read(2) + if len(data) == 0: + logging.debug("Received no signal data") + break + assert len(data) == 2 + signum = struct.unpack("!H", data)[0] + logging.debug( + "Received relayed signal %s, sending to process %s", + signum, + self.process.pid, + ) + try: + self.process.send_signal(signum) + except BaseException as e: + logging.error( + "Failed to relay signal %s to process %s: %s", + signum, + self.process.pid, + e, + ) + logging.debug("End of signaler") -def write(dst, buffer, l): +def write(dst, buffer, ln): alreadywritten = 0 - mv = memoryview(buffer)[:l] + mv = memoryview(buffer)[:ln] while len(mv): dst.write(mv) writtenthisloop = len(mv) @@ -199,10 +226,10 @@ def write(dst, buffer, l): alreadywritten = alreadywritten + writtenthisloop -def copy(src, dst, buffer, l): +def copy(src, dst, buffer, ln): alreadyread = 0 - mv = memoryview(buffer)[:l] - assert len(mv) == l, "Buffer object is too small: %s %s" % (len(mv), l) + mv = memoryview(buffer)[:ln] + assert len(mv) == ln, "Buffer object is too small: %s %s" % (len(mv), ln) while len(mv): _, _, _ = select.select([src], (), ()) readthisloop = src.readinto(mv) @@ -210,220 +237,241 @@ def copy(src, dst, buffer, l): raise Exception("copy: Failed to read any bytes") mv = mv[readthisloop:] alreadyread = alreadyread + readthisloop - return write(dst, buffer, l) + return write(dst, buffer, ln) class DataMultiplexer(MyThread): + def __init__(self, sources, sink): + threading.Thread.__init__(self) + self.daemon = True + self.sources = dict((s, num) for num, s in enumerate(sources)) + self.sink = sink - def __init__(self, sources, sink): - threading.Thread.__init__(self) - self.daemon = True - self.sources = dict((s,num) for num, s in enumerate(sources)) - self.sink = sink - - def _run(self): - logging.debug("mux: Started with sources %s and sink %s", self.sources, self.sink) - buffer = bytearray(MAX_MUX_READ) - while self.sources: - sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources)) - assert not x, x - for s in sources: - n = self.sources[s] - logging.debug("mux: Source %s (%s) is active", n, s) - readthisloop = s.readinto(buffer) - if readthisloop == 0: - logging.debug("mux: Received no bytes from source %s, signaling peer to close corresponding source", n) - del self.sources[s] - header = struct.pack(PACKFORMAT, n, False, 0) - self.sink.write(header) - continue - l = readthisloop - header = struct.pack(PACKFORMAT, n, True, l) - self.sink.write(header) - write(self.sink, buffer, l) - logging.debug("mux: End of data multiplexer") + def _run(self): + logging.debug( + "mux: Started with sources %s and sink %s", self.sources, self.sink + ) + buffer = bytearray(MAX_MUX_READ) + while self.sources: + sources, _, x = select.select( + (s for s in self.sources), (), (s for s in self.sources) + ) + assert not x, x + for s in sources: + n = self.sources[s] + logging.debug("mux: Source %s (%s) is active", n, s) + readthisloop = s.readinto(buffer) + if readthisloop == 0: + logging.debug( + "mux: Received no bytes from source %s, signaling" + " peer to close corresponding source", + n, + ) + del self.sources[s] + header = struct.pack(PACKFORMAT, n, False, 0) + self.sink.write(header) + continue + ln = readthisloop + header = struct.pack(PACKFORMAT, n, True, ln) + self.sink.write(header) + write(self.sink, buffer, ln) + logging.debug("mux: End of data multiplexer") class DataDemultiplexer(MyThread): + def __init__(self, source, sinks): + threading.Thread.__init__(self) + self.daemon = True + self.sinks = dict(enumerate(sinks)) + self.source = source - def __init__(self, source, sinks): - threading.Thread.__init__(self) - self.daemon = True - self.sinks = dict(enumerate(sinks)) - self.source = source - - def _run(self): - logging.debug("demux: Started with source %s and sinks %s", self.source, self.sinks) - buffer = bytearray(MAX_MUX_READ) - while self.sinks: - r, _, x = select.select([self.source], (), [self.source]) - assert not x, x - for s in r: - header = s.read(PACKLEN) - if header == "": - logging.debug("demux: Received no bytes from source, closing all sinks") - for sink in self.sinks.values(): - sink.close() - self.sinks = [] - break - n, active, l = struct.unpack(PACKFORMAT, header) - if not active: - logging.debug("demux: Source %s now inactive, closing corresponding sink %s", s, self.sinks[n]) - self.sinks[n].close() - del self.sinks[n] - else: - copy(self.source, self.sinks[n], buffer, l) - logging.debug("demux: End of data demultiplexer") + def _run(self): + logging.debug( + "demux: Started with source %s and sinks %s", + self.source, + self.sinks, + ) + buffer = bytearray(MAX_MUX_READ) + while self.sinks: + r, _, x = select.select([self.source], (), [self.source]) + assert not x, x + for s in r: + header = s.read(PACKLEN) + if header == "": + logging.debug( + "demux: Received no bytes from source, closing sinks", + ) + for sink in self.sinks.values(): + sink.close() + self.sinks = [] + break + n, active, ln = struct.unpack(PACKFORMAT, header) + if not active: + logging.debug( + "demux: Source %s inactive, closing matching sink %s", + s, + self.sinks[n], + ) + self.sinks[n].close() + del self.sinks[n] + else: + copy(self.source, self.sinks[n], buffer, ln) + logging.debug("demux: End of data demultiplexer") def main_master(): - global logging - logging = LoggingEmu("master") + global logging + logging = LoggingEmu("master") - logging.info("Started with arguments: %s", sys.argv[1:]) + logging.info("Started with arguments: %s", sys.argv[1:]) - global debug_enabled - args = sys.argv[1:] - if args[0] == "-d": - args = args[1:] - debug_enabled = True + global debug_enabled + args = sys.argv[1:] + if args[0] == "-d": + args = args[1:] + debug_enabled = True - remote_vm = args[0] - remote_command = args[1:] - assert remote_command + remote_vm = args[0] + remote_command = args[1:] + assert remote_command - def anypython(exe): - return "` test -x %s && echo %s || echo python`" % (pipes.quote(exe), - pipes.quote(exe)) - - remote_helper_text = b"exec " - remote_helper_text += bytes(anypython(sys.executable), "utf-8") - remote_helper_text += bytes(" -u -c ", "utf-8") - remote_helper_text += bytes(pipes.quote(open(__file__, "r").read()), "ascii") - remote_helper_text += b" -d " if debug_enabled else b" " - remote_helper_text += base64.b64encode(pickle.dumps(remote_command, 2)) - remote_helper_text += b"\n" - - saved_stderr = openfdforappend(os.dup(sys.stderr.fileno())) - - with mutexfile(os.path.expanduser("~/.bombshell-lock")): - try: - p = subprocess.Popen( - ["qrexec-client-vm", remote_vm, "qubes.VMShell"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - close_fds=True, - preexec_fn=os.setpgrp, - bufsize=0, + def anypython(exe): + return "` test -x %s && echo %s || echo python`" % ( + quote(exe), + quote(exe), ) - except OSError as e: - logging.error("cannot launch qrexec-client-vm: %s", e) - return 127 - logging.debug("Writing the helper text into the other side") - p.stdin.write(remote_helper_text) - p.stdin.flush() + remote_helper_text = b"exec " + remote_helper_text += bytes(anypython(sys.executable), "utf-8") + remote_helper_text += bytes(" -u -c ", "utf-8") + remote_helper_text += bytes( + quote(open(__file__, "r").read()), + "ascii", + ) + remote_helper_text += b" -d " if debug_enabled else b" " + remote_helper_text += base64.b64encode(pickle.dumps(remote_command, 2)) + remote_helper_text += b"\n" - confirmation, errmsg = recv_confirmation(p.stdout) - if confirmation != 0: - logging.error("remote: %s", errmsg) - return confirmation + saved_stderr = openfdforappend(os.dup(sys.stderr.fileno())) - handled_signals = ( - signal.SIGINT, - signal.SIGABRT, - signal.SIGALRM, - signal.SIGTERM, - signal.SIGUSR1, - signal.SIGUSR2, - signal.SIGTSTP, - signal.SIGCONT, - ) - read_signals, write_signals = pairofpipes() - signaler = SignalSender(handled_signals, write_signals) - signaler.name = "master signaler" - signaler.start() + with mutexfile(os.path.expanduser("~/.bombshell-lock")): + try: + p = subprocess.Popen( + ["qrexec-client-vm", remote_vm, "qubes.VMShell"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + close_fds=True, + preexec_fn=os.setpgrp, + bufsize=0, + ) + except OSError as e: + logging.error("cannot launch qrexec-client-vm: %s", e) + return 127 - muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin) - muxer.name = "master multiplexer" - muxer.start() + logging.debug("Writing the helper text into the other side") + p.stdin.write(remote_helper_text) + p.stdin.flush() - demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr]) - demuxer.name = "master demultiplexer" - demuxer.start() + confirmation, errmsg = recv_confirmation(p.stdout) + if confirmation != 0: + logging.error("remote: %s", errmsg) + return confirmation - retval = p.wait() - logging.info("Return code %s for qubes.VMShell proxy", retval) - demuxer.join() - logging.info("Ending bombshell") - return retval + handled_signals = ( + signal.SIGINT, + signal.SIGABRT, + signal.SIGALRM, + signal.SIGTERM, + signal.SIGUSR1, + signal.SIGUSR2, + signal.SIGTSTP, + signal.SIGCONT, + ) + read_signals, write_signals = pairofpipes() + signaler = SignalSender(handled_signals, write_signals) + signaler.name = "master signaler" + signaler.start() + + muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin) + muxer.name = "master multiplexer" + muxer.start() + + demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr]) + demuxer.name = "master demultiplexer" + demuxer.start() + + retval = p.wait() + logging.info("Return code %s for qubes.VMShell proxy", retval) + demuxer.join() + logging.info("Ending bombshell") + return retval def pairofpipes(): - read, write = os.pipe() - return os.fdopen(read, "rb", 0), os.fdopen(write, "wb", 0) + read, write = os.pipe() + return os.fdopen(read, "rb", 0), os.fdopen(write, "wb", 0) def main_remote(): - global logging - logging = LoggingEmu("remote") + global logging + logging = LoggingEmu("remote") - logging.info("Started with arguments: %s", sys.argv[1:]) + logging.info("Started with arguments: %s", sys.argv[1:]) - global debug_enabled - if "-d" in sys.argv[1:]: - debug_enabled = True - cmd = sys.argv[2] - else: - cmd = sys.argv[1] + global debug_enabled + if "-d" in sys.argv[1:]: + debug_enabled = True + cmd = sys.argv[2] + else: + cmd = sys.argv[1] - cmd = pickle.loads(base64.b64decode(cmd)) - logging.debug("Received command: %s", cmd) + cmd = pickle.loads(base64.b64decode(cmd)) + logging.debug("Received command: %s", cmd) - nicecmd = " ".join(pipes.quote(a) for a in cmd) - try: - p = subprocess.Popen( - cmd, -# ["strace", "-s4096", "-ff"] + cmd, - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - close_fds=True, - bufsize=0, - ) - send_confirmation(sys.stdout, 0, b"") - except OSError as e: - msg = "cannot execute %s: %s" % (nicecmd, e) - logging.error(msg) - send_confirmation(sys.stdout, 127, bytes(msg, "utf-8")) - sys.exit(0) - except BaseException as e: - msg = "cannot execute %s: %s" % (nicecmd, e) - logging.error(msg) - send_confirmation(sys.stdout, 126, bytes(msg, "utf-8")) - sys.exit(0) + nicecmd = " ".join(quote(a) for a in cmd) + try: + p = subprocess.Popen( + cmd, + # ["strace", "-s4096", "-ff"] + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + bufsize=0, + ) + send_confirmation(sys.stdout, 0, b"") + except OSError as e: + msg = "cannot execute %s: %s" % (nicecmd, e) + logging.error(msg) + send_confirmation(sys.stdout, 127, bytes(msg, "utf-8")) + sys.exit(0) + except BaseException as e: + msg = "cannot execute %s: %s" % (nicecmd, e) + logging.error(msg) + send_confirmation(sys.stdout, 126, bytes(msg, "utf-8")) + sys.exit(0) - signals_read, signals_written = pairofpipes() + signals_read, signals_written = pairofpipes() - signaler = Signaler(p, signals_read) - signaler.name = "remote signaler" - signaler.start() + signaler = Signaler(p, signals_read) + signaler.name = "remote signaler" + signaler.start() - demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written]) - demuxer.name = "remote demultiplexer" - demuxer.start() + demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written]) + demuxer.name = "remote demultiplexer" + demuxer.start() - muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout) - muxer.name = "remote multiplexer" - muxer.start() + muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout) + muxer.name = "remote multiplexer" + muxer.start() - logging.info("Started %s", nicecmd) + logging.info("Started %s", nicecmd) - retval = p.wait() - logging.info("Return code %s for %s", retval, nicecmd) - muxer.join() - logging.info("Ending bombshell") - return retval + retval = p.wait() + logging.info("Return code %s for %s", retval, nicecmd) + muxer.join() + logging.info("Ending bombshell") + return retval sys.stdin = openfdforread(sys.stdin.fileno())