From 3e1674801a0ef24d520863232a7bae3e86bae147 Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Mon, 29 Aug 2016 01:28:02 +0000 Subject: [PATCH] bombshell-client updated to work with Python 3 and to have better logging --- bin/bombshell-client | 371 ++++++++++++++++++++++++++++++++----------- 1 file changed, 281 insertions(+), 90 deletions(-) diff --git a/bin/bombshell-client b/bin/bombshell-client index 98be6de..afd330a 100755 --- a/bin/bombshell-client +++ b/bin/bombshell-client @@ -1,33 +1,110 @@ -#!/usr/bin/python -u +#!/usr/bin/python3 -u -import cPickle -# Security note: -# -# If you look at cPickle usage in bombshell, it's only used to package up -# the command line at the initiator side, and then it is unpacked at the -# receiver side. Given that the initiator has already been given all -# permissions to run arbitrary programs on the receiver, there is no -# additional security risk posed by the use of cPickle. -# -# End security note. +import pickle import contextlib +import ctypes +import ctypes.util +import errno import fcntl import os import pipes -import Queue +import queue import select import signal import struct import subprocess import sys +import syslog import threading +import time +import traceback + + +MAX_MUX_READ = 128*1024 # 64*1024*1024 +PACKLEN = 8 +PACKFORMAT = "!HbIx" + +# From bits/fcntl.h +# Values for 'flags', can be OR'ed together +SPLICE_F_MOVE = 1 +SPLICE_F_NONBLOCK = 2 +SPLICE_F_MORE = 4 +SPLICE_F_GIFT = 8 + +def make_splice(): + '''Set up a splice(2) wrapper''' + + # Load libc + libc_name = ctypes.util.find_library('c') + libc = ctypes.CDLL(libc_name, use_errno=True) + + # Get a handle to the 'splice' call + c_splice = libc.splice + + # These should match for x86_64, might need some tweaking for other + # platforms... + c_loff_t = ctypes.c_uint64 + c_loff_t_p = ctypes.POINTER(c_loff_t) + + # ssize_t splice(int fd_in, loff_t *off_in, int fd_out, + # loff_t *off_out, size_t len, unsigned int flags) + c_splice.argtypes = [ + ctypes.c_int, c_loff_t_p, + ctypes.c_int, c_loff_t_p, + ctypes.c_size_t, + ctypes.c_uint + ] + c_splice.restype = ctypes.c_ssize_t + + # Clean-up closure names. Yup, useless nit-picking. + del libc + del libc_name + del c_loff_t_p + + # pylint: disable-msg=W0621,R0913 + def splice(fd_in, off_in, fd_out, off_out, len_, flags): + '''Wrapper for splice(2) + See the syscall documentation ('man 2 splice') for more information + about the arguments and return value. + `off_in` and `off_out` can be `None`, which is equivalent to `NULL`. + If the call to `splice` fails (i.e. returns -1), an `OSError` is raised + with the appropriate `errno`, unless the error is `EINTR`, which results + in the call to be retried. + ''' + + c_off_in = \ + ctypes.byref(c_loff_t(off_in)) if off_in is not None else None + c_off_out = \ + ctypes.byref(c_loff_t(off_out)) if off_out is not None else None + + # For handling EINTR... + while True: + res = c_splice(fd_in, c_off_in, fd_out, c_off_out, len_, flags) + + if res == -1: + errno_ = ctypes.get_errno() + + # Try again on EINTR + if errno_ == errno.EINTR: + continue + + raise IOError(errno_, os.strerror(errno_)) + + return res + + return splice + + +# Build and export wrapper +splice = make_splice() #pylint: disable-msg=C0103 +del make_splice @contextlib.contextmanager def mutexfile(filepath): - oldumask = os.umask(0077) + oldumask = os.umask(0o077) try: - f = file(filepath, "a") + f = open(filepath, "a") finally: os.umask(oldumask) fcntl.lockf(f.fileno(), fcntl.LOCK_EX) @@ -35,30 +112,50 @@ def mutexfile(filepath): f.close() +def openfdforappend(fd): + try: + return os.fdopen(fd, "ab", 0, closefd=False) + except IOError as e: + if e.errno != errno.ESPIPE: + raise + return os.fdopen(fd, "wb", 0, closefd=False) + + +def openfdforread(fd): + return os.fdopen(fd, "rb", 0, closefd=False) + + debug_lock = threading.Lock() debug_enabled = False +_startt = time.time() class LoggingEmu(): def __init__(self, prefix): self.prefix = prefix + syslog.openlog("bombshell-client.%s" % self.prefix) def debug(self, *a, **kw): if not debug_enabled: return - self._print(*a, **kw) + for p in ("copy", "write", "Unblocking"): + if a[0].startswith(p): + return + self._print(syslog.LOG_DEBUG, *a, **kw) def info(self, *a, **kw): - self._print(*a, **kw) + self._print(syslog.LOG_INFO, *a, **kw) def error(self, *a, **kw): - self._print(*a, **kw) - def _print(self, *a, **kw): + self._print(syslog.LOG_ERR, *a, **kw) + def _print(self, prio, *a, **kw): debug_lock.acquire() + global _startt + deltat = time.time() - _startt try: if len(a) == 1: string = a[0] else: string = a[0] % a[1:] - print >> sys.stderr, self.prefix, string + syslog.syslog(prio, ("%.3f " % deltat) + string) finally: debug_lock.release() -logging = LoggingEmu("master:" if "__file__" in globals() else "remote:") +logging = None def send_command(chan, cmd): @@ -68,7 +165,7 @@ def send_command(chan, cmd): chan: writable file-like object to send the command to cmd: command to send, iterable of strings """ - pickled = cPickle.dumps(cmd) + pickled = pickle.dumps(cmd) l = len(pickled) assert l < 1<<32, l chan.write(struct.pack("!I", l)) @@ -80,11 +177,26 @@ def send_command(chan, cmd): def recv_command(chan): l = struct.unpack("!I", sys.stdin.read(4))[0] pickled = chan.read(l) - cmd = cPickle.loads(pickled) + cmd = pickle.loads(pickled) logging.debug("Received command: %s", cmd) return cmd +def send_beacon(chan): + l = chan.write(b"1") + if l != 1: + raise Exception("Beacon could not be sent") + chan.flush() + logging.debug("Sent beacon") + + +def recv_beacon(chan): + m = chan.read(1) + if not m or len(m) != 1: + raise Exception("Beacon never received") + logging.debug("Received beacon") + + def send_confirmation(chan, retval, errmsg): chan.write(struct.pack("!H", retval)) l = len(errmsg) @@ -92,11 +204,11 @@ def send_confirmation(chan, retval, errmsg): chan.write(struct.pack("!I", l)) chan.write(errmsg) chan.flush() - logging.debug("Sent confirmation: %s %s", retval, errmsg) + logging.debug("Sent confirmation on channel %s: %s %s", chan, retval, errmsg) def recv_confirmation(chan): - logging.debug("Waiting for confirmation") + logging.debug("Waiting for confirmation on channel %s", chan) r = chan.read(2) if len(r) == 0: # This happens when the remote domain does not exist. @@ -113,12 +225,26 @@ def recv_confirmation(chan): return r, errmsg -class SignalSender(threading.Thread): +class MyThread(threading.Thread): + + def run(self): + try: + self._run() + except Exception as e: + logging.error("%s: unexpected exception", threading.currentThread()) + tb = traceback.format_exc() + logging.error("%s: traceback: %s", threading.currentThread(), tb) + logging.error("%s: exiting program", threading.currentThread()) + os._exit(16) + + +class SignalSender(MyThread): + def __init__(self, signals, sigqueue): """Handles signals by pushing them into a file-like object.""" threading.Thread.__init__(self) self.setDaemon(True) - self.queue = Queue.Queue() + self.queue = queue.Queue() self.sigqueue = sigqueue for sig in signals: signal.signal(sig, self.copy) @@ -139,7 +265,7 @@ class SignalSender(threading.Thread): logging.debug("Wrote signal %s to remote end", signum) -class Signaler(threading.Thread): +class Signaler(MyThread): def __init__(self, process, sigqueue): """Reads integers from a file-like object and relays that as kill().""" @@ -156,121 +282,153 @@ class Signaler(threading.Thread): break assert len(data) == 2 signum = struct.unpack("!H", data)[0] - logging.debug("Received relayed signal %s, sending to process", signum) + logging.debug("Received relayed signal %s, sending to process %s", signum, self.process.pid) self.process.send_signal(signum) logging.debug("End of signaler") def unblock(fobj): - fl = fcntl.fcntl(fobj, fcntl.F_GETFL) - fcntl.fcntl(fobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) + logging.debug("Unblocking file object %s", fobj) + os.set_blocking(fobj.fileno(), False) -class DataMultiplexer(threading.Thread): +def write(dst, buffer, l): + alreadywritten = 0 + mv = memoryview(buffer)[:l] + while len(mv): + writtenthisloop = dst.write(mv) + if writtenthisloop is None or writtenthisloop < 1: + raise Exception("copy: Failed to write any bytes") + mv = mv[writtenthisloop:] + alreadywritten = alreadywritten + writtenthisloop + logging.debug("write: Relayed %s bytes to sink %s", alreadywritten, dst) + + +def copy(src, dst, buffer, l): + alreadyread = 0 + mv = memoryview(buffer)[:l] + assert len(mv) == l, "Buffer object is too small: %s %s" % (len(mv), l) + while len(mv): + _, _, _ = select.select([src], (), ()) + readthisloop = src.readinto(mv) + if readthisloop is None or readthisloop < 1: + raise Exception("copy: Failed to read any bytes") + mv = mv[readthisloop:] + alreadyread = alreadyread + readthisloop + logging.debug("copy: Read %s bytes from %s", alreadyread, src) + return write(dst, buffer, l) + + +class DataMultiplexer(MyThread): def __init__(self, sources, sink): threading.Thread.__init__(self) self.setDaemon(True) self.sources = dict((s,num) for num, s in enumerate(sources)) self.sink = sink + for s in sources: unblock(s) - def run(self): - map(unblock, (s for s in self.sources)) + def _run(self): + logging.debug("mux: Started with sources %s and sink %s", self.sources, self.sink) + buffer = bytearray(MAX_MUX_READ) sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources)) assert not x, x while sources: logging.debug("mux: Sources that alarmed: %s", [self.sources[s] for s in sources]) for s in sources: n = self.sources[s] - data = s.read() - if data == "": - logging.debug("Received no bytes from source %s", n) + logging.debug("mux: Source %s (%s) is active", n, s) + readthisloop = s.readinto(buffer) + if readthisloop == 0: + logging.debug("mux: Received no bytes from source %s", n) del self.sources[s] - self.sink.write(struct.pack("!H", n)) - self.sink.write(struct.pack("b", False)) - self.sink.flush() - logging.debug("Informed sink about death of source %s", n) + header = struct.pack(PACKFORMAT, n, False, 0) + logging.debug("mux: Sending packet: %s" % header) + self.sink.write(header) + logging.debug("mux: Informed sink about death of source %s", n) continue - l = len(data) - logging.debug("Received %s bytes from source %s", l, n) - assert l < 1<<32 - self.sink.write(struct.pack("!H", n)) - self.sink.write(struct.pack("b", True)) - self.sink.write(struct.pack("!I", l)) - self.sink.write(data) - self.sink.flush() - logging.debug("Copied those %s bytes to sink", l) + l = readthisloop + logging.debug("mux: Received %s bytes from source %s", l, n) + header = struct.pack(PACKFORMAT, n, True, l) + logging.debug("mux: Sending packet: %s" % header) + self.sink.write(header) + write(self.sink, buffer, l) if not self.sources: break - sources, _, _ = select.select((s for s in self.sources), (), (s for s in self.sources)) + sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources)) assert not x, x - logging.debug("End of data multiplexer") + logging.debug("mux: End of data multiplexer") -class DataDemultiplexer(threading.Thread): +class DataDemultiplexer(MyThread): def __init__(self, source, sinks): threading.Thread.__init__(self) self.setDaemon(True) self.sinks = dict(enumerate(sinks)) self.source = source + unblock(source) def run(self): + logging.debug("demux: Started with source %s and sinks %s", self.source, self.sinks) + buffer = bytearray(MAX_MUX_READ) while self.sinks: r, _, x = select.select([self.source], (), [self.source]) assert not x, x logging.debug("demux: Source alarmed") for s in r: - n = s.read(2) - if n == "": - logging.debug("Received no bytes from source, closing all sinks") + logging.debug("demux: Source %s is active", s) + header = s.read(PACKLEN) + logging.debug("demux: received packet: %s" % header) + if header == "": + logging.debug("demux: Received no bytes from source, closing all sinks") for sink in self.sinks.values(): sink.close() self.sinks = [] break - assert len(n) == 2, data - n = struct.unpack("!H", n)[0] - - active = s.read(1) - assert len(active) == 1, active - active = struct.unpack("b", active)[0] - + n, active, l = struct.unpack(PACKFORMAT, header) if not active: - logging.debug("Source %s now inactive, closing corresponding sink", n) + logging.debug("demux: Source %s now inactive, closing corresponding sink %s", s, self.sinks[n]) self.sinks[n].close() del self.sinks[n] else: - l = s.read(4) - assert len(l) == 4, l - l = struct.unpack("!I", l)[0] - data = s.read(l) - assert len(data) == l, len(data) - logging.debug("Received %s bytes from source %s, relaying to corresponding sink", l, n) - self.sinks[n].write(data) - self.sinks[n].flush() - logging.debug("Relayed %s bytes to sink %s", l, n) - logging.debug("End of data demultiplexer") + logging.debug("demux: Source %s is sending %s bytes, relaying to corresponding sink", s, l) + copy(self.source, self.sinks[n], buffer, l) + logging.debug("demux: End of data demultiplexer") def main_master(): + global logging + logging = LoggingEmu("master") + global debug_enabled args = sys.argv[1:] if args[0] == "-d": args = args[1:] debug_enabled = True + logging.info("Started with arguments: %s", sys.argv[1:]) + remote_vm = args[0] remote_command = args[1:] assert remote_command - remote_helper_text = "\n".join([ - "exec python -u -c '", - open(__file__, "rb").read().replace("'", "'\\''"), - "'" + (" -d" if debug_enabled else ""), - "", +# if debug_enabled: +# remote_helper_text = b""" +#f=$(mktemp) +#echo '%s' > "$f" +#chmod +x "$f" +#exec "$f" -s -d +#""" % (open(__file__, "rb").read().replace(b"'", b"'\\''")) +# else: + remote_helper_text = b"\n".join([ + b"exec %s -u -c '" % bytes(sys.executable, "utf-8"), + open(__file__, "rb").read().replace(b"'", b"'\\''"), + b"'" + (b" -d" if debug_enabled else b""), + b"", ]) - saved_stderr = os.fdopen(os.dup(sys.stderr.fileno()), "a") + saved_stderr = openfdforappend(os.dup(sys.stderr.fileno())) with mutexfile(os.path.expanduser("~/.bombshell-lock")): try: @@ -280,14 +438,23 @@ def main_master(): stdout=subprocess.PIPE, close_fds=True, preexec_fn=os.setpgrp, + bufsize=0, ) - except OSError, e: + except OSError as e: logging.error("cannot launch qrexec-client-vm: %s", e) return 127 + logging.debug("Writing the helper text into the other side") p.stdin.write(remote_helper_text) p.stdin.flush() + try: + recv_beacon(p.stdout) + except Exception as e: + logging.error("%s", e) + p.kill() + return 124 + send_command(p.stdin, remote_command) confirmation, errmsg = recv_confirmation(p.stdout) if confirmation != 0: @@ -306,29 +473,46 @@ def main_master(): ) read_signals, write_signals = pairofpipes() signaler = SignalSender(handled_signals, write_signals) + signaler.setName("master signaler") signaler.start() muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin) + muxer.setName("master multiplexer") muxer.start() demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr]) + demuxer.setName("master demultiplexer") demuxer.start() retval = p.wait() + logging.info("Return code %s for qubes.VMShell proxy", retval) demuxer.join() + logging.info("Ending bombshell") return retval def pairofpipes(): read, write = os.pipe() - return os.fdopen(read, "rb"), os.fdopen(write, "wb") + return os.fdopen(read, "rb", 0), os.fdopen(write, "wb", 0) def main_remote(): + global logging + logging = LoggingEmu("remote") + global debug_enabled - if len(sys.argv) > 1 and sys.argv[1] == "-d": + if "-d" in sys.argv[1:]: debug_enabled = True + logging.info("Started with arguments: %s", sys.argv[1:]) + + try: + send_beacon(sys.stdout) + except Exception as e: + logging.error("%s", e) + p.kill() + return 124 + cmd = recv_command(sys.stdin) nicecmd = " ".join(pipes.quote(a) for a in cmd) try: @@ -339,39 +523,46 @@ def main_remote(): stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds=True, + bufsize=0, ) - send_confirmation(sys.stdout, 0, "") - except OSError, e: + send_confirmation(sys.stdout, 0, b"") + except OSError as e: msg = "cannot execute %s: %s" % (nicecmd, e) logging.error(msg) - send_confirmation(sys.stdout, 127, msg) + send_confirmation(sys.stdout, 127, bytes(msg, "utf-8")) sys.exit(0) - except BaseException, e: + except BaseException as e: msg = "cannot execute %s: %s" % (nicecmd, e) logging.error(msg) - send_confirmation(sys.stdout, 126, msg) + send_confirmation(sys.stdout, 126, bytes(msg, "utf-8")) sys.exit(0) signals_read, signals_written = pairofpipes() signaler = Signaler(p, signals_read) + signaler.setName("remote signaler") signaler.start() demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written]) + demuxer.setName("remote demultiplexer") demuxer.start() muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout) + muxer.setName("remote multiplexer") muxer.start() - logging.info("started %s", nicecmd) + logging.info("Started %s", nicecmd) retval = p.wait() - logging.info("return code %s for %s", retval, nicecmd) + logging.info("Return code %s for %s", retval, nicecmd) muxer.join() + logging.info("Ending bombshell") return retval -if "__file__" in locals(): +sys.stdin = openfdforread(sys.stdin.fileno()) +sys.stdout = openfdforappend(sys.stdout.fileno()) +if "__file__" in locals() and not ("-s" in sys.argv[1:2]): sys.exit(main_master()) else: sys.exit(main_remote())