#!/usr/bin/python3 -u import pickle import contextlib import ctypes import ctypes.util import errno import fcntl import os import pipes import queue import select import signal import struct import subprocess import sys import syslog import threading import time import traceback MAX_MUX_READ = 128*1024 # 64*1024*1024 PACKLEN = 8 PACKFORMAT = "!HbIx" @contextlib.contextmanager def mutexfile(filepath): oldumask = os.umask(0o077) try: f = open(filepath, "a") finally: os.umask(oldumask) fcntl.lockf(f.fileno(), fcntl.LOCK_EX) yield f.close() def openfdforappend(fd): try: return os.fdopen(fd, "ab", 0, closefd=False) except IOError as e: if e.errno != errno.ESPIPE: raise return os.fdopen(fd, "wb", 0, closefd=False) def openfdforread(fd): return os.fdopen(fd, "rb", 0, closefd=False) debug_lock = threading.Lock() debug_enabled = False _startt = time.time() class LoggingEmu(): def __init__(self, prefix): self.prefix = prefix syslog.openlog("bombshell-client.%s" % self.prefix) def debug(self, *a, **kw): if not debug_enabled: return self._print(syslog.LOG_DEBUG, *a, **kw) def info(self, *a, **kw): self._print(syslog.LOG_INFO, *a, **kw) def error(self, *a, **kw): self._print(syslog.LOG_ERR, *a, **kw) def _print(self, prio, *a, **kw): debug_lock.acquire() global _startt deltat = time.time() - _startt try: if len(a) == 1: string = a[0] else: string = a[0] % a[1:] syslog.syslog(prio, ("%.3f " % deltat) + string) finally: debug_lock.release() logging = None def send_command(chan, cmd): """Sends a command over the wire. Args: chan: writable file-like object to send the command to cmd: command to send, iterable of strings """ pickled = pickle.dumps(cmd) l = len(pickled) assert l < 1<<32, l chan.write(struct.pack("!I", l)) chan.write(pickled) chan.flush() logging.debug("Sent command: %s", cmd) def recv_command(chan): l = struct.unpack("!I", sys.stdin.read(4))[0] pickled = chan.read(l) cmd = pickle.loads(pickled) logging.debug("Received command: %s", cmd) return cmd def send_beacon(chan): l = chan.write(b"1") if l != 1: raise Exception("Beacon could not be sent") chan.flush() logging.debug("Sent beacon") def recv_beacon(chan): m = chan.read(1) if not m or len(m) != 1: raise Exception("Beacon never received") logging.debug("Received beacon") def send_confirmation(chan, retval, errmsg): chan.write(struct.pack("!H", retval)) l = len(errmsg) assert l < 1<<32 chan.write(struct.pack("!I", l)) chan.write(errmsg) chan.flush() logging.debug("Sent confirmation on channel %s: %s %s", chan, retval, errmsg) def recv_confirmation(chan): logging.debug("Waiting for confirmation on channel %s", chan) r = chan.read(2) if len(r) == 0: # This happens when the remote domain does not exist. r, errmsg = 125, "domain does not exist" logging.debug("No confirmation: %s %s", r, errmsg) return r, errmsg assert len(r) == 2, r r = struct.unpack("!H", r)[0] l = chan.read(4) assert len(l) == 4, l l = struct.unpack("!I", l)[0] errmsg = chan.read(l) logging.debug("Received confirmation: %s %s", r, errmsg) return r, errmsg class MyThread(threading.Thread): def run(self): try: self._run() except Exception as e: logging.error("%s: unexpected exception", threading.currentThread()) tb = traceback.format_exc() logging.error("%s: traceback: %s", threading.currentThread(), tb) logging.error("%s: exiting program", threading.currentThread()) os._exit(16) class SignalSender(MyThread): def __init__(self, signals, sigqueue): """Handles signals by pushing them into a file-like object.""" threading.Thread.__init__(self) self.setDaemon(True) self.queue = queue.Queue() self.sigqueue = sigqueue for sig in signals: signal.signal(sig, self.copy) def copy(self, signum, frame): self.queue.put(signum) logging.debug("Signal %s pushed to queue", signum) def run(self): while True: signum = self.queue.get() logging.debug("Dequeued signal %s", signum) if signum is None: break assert signum > 0 self.sigqueue.write(struct.pack("!H", signum)) self.sigqueue.flush() logging.debug("Wrote signal %s to remote end", signum) class Signaler(MyThread): def __init__(self, process, sigqueue): """Reads integers from a file-like object and relays that as kill().""" threading.Thread.__init__(self) self.setDaemon(True) self.process = process self.sigqueue = sigqueue def run(self): while True: data = self.sigqueue.read(2) if len(data) == 0: logging.debug("Received no signal data") break assert len(data) == 2 signum = struct.unpack("!H", data)[0] logging.debug("Received relayed signal %s, sending to process %s", signum, self.process.pid) self.process.send_signal(signum) logging.debug("End of signaler") def unblock(fobj): logging.debug("Unblocking file object %s", fobj) os.set_blocking(fobj.fileno(), False) def write(dst, buffer, l): alreadywritten = 0 mv = memoryview(buffer)[:l] while len(mv): writtenthisloop = dst.write(mv) if writtenthisloop is None or writtenthisloop < 1: raise Exception("copy: Failed to write any bytes") mv = mv[writtenthisloop:] alreadywritten = alreadywritten + writtenthisloop logging.debug("write: Relayed %s bytes to sink %s", alreadywritten, dst) def copy(src, dst, buffer, l): alreadyread = 0 mv = memoryview(buffer)[:l] assert len(mv) == l, "Buffer object is too small: %s %s" % (len(mv), l) while len(mv): _, _, _ = select.select([src], (), ()) readthisloop = src.readinto(mv) if readthisloop is None or readthisloop < 1: raise Exception("copy: Failed to read any bytes") mv = mv[readthisloop:] alreadyread = alreadyread + readthisloop logging.debug("copy: Read %s bytes from %s", alreadyread, src) return write(dst, buffer, l) class DataMultiplexer(MyThread): def __init__(self, sources, sink): threading.Thread.__init__(self) self.setDaemon(True) self.sources = dict((s,num) for num, s in enumerate(sources)) self.sink = sink for s in sources: unblock(s) def _run(self): logging.debug("mux: Started with sources %s and sink %s", self.sources, self.sink) buffer = bytearray(MAX_MUX_READ) sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources)) assert not x, x while sources: logging.debug("mux: Sources that alarmed: %s", [self.sources[s] for s in sources]) for s in sources: n = self.sources[s] logging.debug("mux: Source %s (%s) is active", n, s) readthisloop = s.readinto(buffer) if readthisloop == 0: logging.debug("mux: Received no bytes from source %s", n) del self.sources[s] header = struct.pack(PACKFORMAT, n, False, 0) logging.debug("mux: Sending packet: %s" % header) self.sink.write(header) logging.debug("mux: Informed sink about death of source %s", n) continue l = readthisloop logging.debug("mux: Received %s bytes from source %s", l, n) header = struct.pack(PACKFORMAT, n, True, l) logging.debug("mux: Sending packet: %s" % header) self.sink.write(header) write(self.sink, buffer, l) if not self.sources: break sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources)) assert not x, x logging.debug("mux: End of data multiplexer") class DataDemultiplexer(MyThread): def __init__(self, source, sinks): threading.Thread.__init__(self) self.setDaemon(True) self.sinks = dict(enumerate(sinks)) self.source = source unblock(source) def run(self): logging.debug("demux: Started with source %s and sinks %s", self.source, self.sinks) buffer = bytearray(MAX_MUX_READ) while self.sinks: r, _, x = select.select([self.source], (), [self.source]) assert not x, x logging.debug("demux: Source alarmed") for s in r: logging.debug("demux: Source %s is active", s) header = s.read(PACKLEN) logging.debug("demux: received packet: %s" % header) if header == "": logging.debug("demux: Received no bytes from source, closing all sinks") for sink in self.sinks.values(): sink.close() self.sinks = [] break n, active, l = struct.unpack(PACKFORMAT, header) if not active: logging.debug("demux: Source %s now inactive, closing corresponding sink %s", s, self.sinks[n]) self.sinks[n].close() del self.sinks[n] else: logging.debug("demux: Source %s is sending %s bytes, relaying to corresponding sink", s, l) copy(self.source, self.sinks[n], buffer, l) logging.debug("demux: End of data demultiplexer") def main_master(): global logging logging = LoggingEmu("master") global debug_enabled args = sys.argv[1:] if args[0] == "-d": args = args[1:] debug_enabled = True logging.info("Started with arguments: %s", sys.argv[1:]) remote_vm = args[0] remote_command = args[1:] assert remote_command # if debug_enabled: # remote_helper_text = b""" #f=$(mktemp) #echo '%s' > "$f" #chmod +x "$f" #exec "$f" -s -d #""" % (open(__file__, "rb").read().replace(b"'", b"'\\''")) # else: remote_helper_text = b"\n".join([ b"exec %s -u -c '" % bytes(sys.executable, "utf-8"), open(__file__, "rb").read().replace(b"'", b"'\\''"), b"'" + (b" -d" if debug_enabled else b""), b"", ]) saved_stderr = openfdforappend(os.dup(sys.stderr.fileno())) with mutexfile(os.path.expanduser("~/.bombshell-lock")): try: p = subprocess.Popen( ["qrexec-client-vm", remote_vm, "qubes.VMShell"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True, preexec_fn=os.setpgrp, bufsize=0, ) except OSError as e: logging.error("cannot launch qrexec-client-vm: %s", e) return 127 logging.debug("Writing the helper text into the other side") p.stdin.write(remote_helper_text) p.stdin.flush() try: recv_beacon(p.stdout) except Exception as e: logging.error("%s", e) p.kill() return 124 send_command(p.stdin, remote_command) confirmation, errmsg = recv_confirmation(p.stdout) if confirmation != 0: logging.error("remote: %s", errmsg) return confirmation handled_signals = ( signal.SIGINT, signal.SIGABRT, signal.SIGALRM, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2, signal.SIGTSTP, signal.SIGCONT, ) read_signals, write_signals = pairofpipes() signaler = SignalSender(handled_signals, write_signals) signaler.setName("master signaler") signaler.start() muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin) muxer.setName("master multiplexer") muxer.start() demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr]) demuxer.setName("master demultiplexer") demuxer.start() retval = p.wait() logging.info("Return code %s for qubes.VMShell proxy", retval) demuxer.join() logging.info("Ending bombshell") return retval def pairofpipes(): read, write = os.pipe() return os.fdopen(read, "rb", 0), os.fdopen(write, "wb", 0) def main_remote(): global logging logging = LoggingEmu("remote") global debug_enabled if "-d" in sys.argv[1:]: debug_enabled = True logging.info("Started with arguments: %s", sys.argv[1:]) try: send_beacon(sys.stdout) except Exception as e: logging.error("%s", e) p.kill() return 124 cmd = recv_command(sys.stdin) nicecmd = " ".join(pipes.quote(a) for a in cmd) try: p = subprocess.Popen( cmd, # ["strace", "-s4096", "-ff"] + cmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds=True, bufsize=0, ) send_confirmation(sys.stdout, 0, b"") except OSError as e: msg = "cannot execute %s: %s" % (nicecmd, e) logging.error(msg) send_confirmation(sys.stdout, 127, bytes(msg, "utf-8")) sys.exit(0) except BaseException as e: msg = "cannot execute %s: %s" % (nicecmd, e) logging.error(msg) send_confirmation(sys.stdout, 126, bytes(msg, "utf-8")) sys.exit(0) signals_read, signals_written = pairofpipes() signaler = Signaler(p, signals_read) signaler.setName("remote signaler") signaler.start() demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written]) demuxer.setName("remote demultiplexer") demuxer.start() muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout) muxer.setName("remote multiplexer") muxer.start() logging.info("Started %s", nicecmd) retval = p.wait() logging.info("Return code %s for %s", retval, nicecmd) muxer.join() logging.info("Ending bombshell") return retval sys.stdin = openfdforread(sys.stdin.fileno()) sys.stdout = openfdforappend(sys.stdout.fileno()) if "__file__" in locals() and not ("-s" in sys.argv[1:2]): sys.exit(main_master()) else: sys.exit(main_remote())