ansible-qubes/bin/bombshell-client

435 lines
12 KiB
Python
Executable File

#!/usr/bin/python3 -u
import base64
import pickle
import contextlib
import ctypes
import ctypes.util
import errno
import fcntl
import os
import pipes
try:
import queue
except ImportError:
import Queue as queue
import select
import signal
import struct
import subprocess
import sys
import syslog
import threading
import time
import traceback
MAX_MUX_READ = 128*1024 # 64*1024*1024
PACKLEN = 8
PACKFORMAT = "!HbIx"
@contextlib.contextmanager
def mutexfile(filepath):
oldumask = os.umask(0o077)
try:
f = open(filepath, "a")
finally:
os.umask(oldumask)
fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
yield
f.close()
def unset_cloexec(fd):
old = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old & ~ fcntl.FD_CLOEXEC)
def openfdforappend(fd):
f = None
try:
f = os.fdopen(fd, "ab", 0)
except IOError as e:
if e.errno != errno.ESPIPE:
raise
f = os.fdopen(fd, "wb", 0)
unset_cloexec(f.fileno())
return f
def openfdforread(fd):
f = os.fdopen(fd, "rb", 0)
unset_cloexec(f.fileno())
return f
debug_lock = threading.Lock()
debug_enabled = False
_startt = time.time()
class LoggingEmu():
def __init__(self, prefix):
self.prefix = prefix
syslog.openlog("bombshell-client.%s" % self.prefix)
def debug(self, *a, **kw):
if not debug_enabled:
return
self._print(syslog.LOG_DEBUG, *a, **kw)
def info(self, *a, **kw):
self._print(syslog.LOG_INFO, *a, **kw)
def error(self, *a, **kw):
self._print(syslog.LOG_ERR, *a, **kw)
def _print(self, prio, *a, **kw):
debug_lock.acquire()
global _startt
deltat = time.time() - _startt
try:
if len(a) == 1:
string = a[0]
else:
string = a[0] % a[1:]
syslog.syslog(prio, ("%.3f " % deltat) + string)
finally:
debug_lock.release()
logging = None
def send_confirmation(chan, retval, errmsg):
chan.write(struct.pack("!H", retval))
l = len(errmsg)
assert l < 1<<32
chan.write(struct.pack("!I", l))
chan.write(errmsg)
chan.flush()
logging.debug("Sent confirmation on channel %s: %s %s", chan, retval, errmsg)
def recv_confirmation(chan):
logging.debug("Waiting for confirmation on channel %s", chan)
r = chan.read(2)
if len(r) == 0:
# This happens when the remote domain does not exist.
r, errmsg = 125, "domain does not exist"
logging.debug("No confirmation: %s %s", r, errmsg)
return r, errmsg
assert len(r) == 2, r
r = struct.unpack("!H", r)[0]
l = chan.read(4)
assert len(l) == 4, l
l = struct.unpack("!I", l)[0]
errmsg = chan.read(l)
logging.debug("Received confirmation: %s %s", r, errmsg)
return r, errmsg
class MyThread(threading.Thread):
def run(self):
try:
self._run()
except Exception as e:
logging.error("%s: unexpected exception", threading.currentThread())
tb = traceback.format_exc()
logging.error("%s: traceback: %s", threading.currentThread(), tb)
logging.error("%s: exiting program", threading.currentThread())
os._exit(124)
class SignalSender(MyThread):
def __init__(self, signals, sigqueue):
"""Handles signals by pushing them into a file-like object."""
threading.Thread.__init__(self)
self.setDaemon(True)
self.queue = queue.Queue()
self.sigqueue = sigqueue
for sig in signals:
signal.signal(sig, self.copy)
def copy(self, signum, frame):
self.queue.put(signum)
logging.debug("Signal %s pushed to queue", signum)
def _run(self):
while True:
signum = self.queue.get()
logging.debug("Dequeued signal %s", signum)
if signum is None:
break
assert signum > 0
self.sigqueue.write(struct.pack("!H", signum))
self.sigqueue.flush()
logging.debug("Wrote signal %s to remote end", signum)
class Signaler(MyThread):
def __init__(self, process, sigqueue):
"""Reads integers from a file-like object and relays that as kill()."""
threading.Thread.__init__(self)
self.setDaemon(True)
self.process = process
self.sigqueue = sigqueue
def _run(self):
while True:
data = self.sigqueue.read(2)
if len(data) == 0:
logging.debug("Received no signal data")
break
assert len(data) == 2
signum = struct.unpack("!H", data)[0]
logging.debug("Received relayed signal %s, sending to process %s", signum, self.process.pid)
try:
self.process.send_signal(signum)
except BaseException as e:
logging.error("Failed to relay signal %s to process %s: %s", signum, self.process.pid, e)
logging.debug("End of signaler")
def write(dst, buffer, l):
alreadywritten = 0
mv = memoryview(buffer)[:l]
while len(mv):
dst.write(mv)
writtenthisloop = len(mv)
if writtenthisloop is None or writtenthisloop < 1:
raise Exception("copy: Failed to write any bytes")
mv = mv[writtenthisloop:]
alreadywritten = alreadywritten + writtenthisloop
def copy(src, dst, buffer, l):
alreadyread = 0
mv = memoryview(buffer)[:l]
assert len(mv) == l, "Buffer object is too small: %s %s" % (len(mv), l)
while len(mv):
_, _, _ = select.select([src], (), ())
readthisloop = src.readinto(mv)
if readthisloop is None or readthisloop < 1:
raise Exception("copy: Failed to read any bytes")
mv = mv[readthisloop:]
alreadyread = alreadyread + readthisloop
return write(dst, buffer, l)
class DataMultiplexer(MyThread):
def __init__(self, sources, sink):
threading.Thread.__init__(self)
self.setDaemon(True)
self.sources = dict((s,num) for num, s in enumerate(sources))
self.sink = sink
def _run(self):
logging.debug("mux: Started with sources %s and sink %s", self.sources, self.sink)
buffer = bytearray(MAX_MUX_READ)
while self.sources:
sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources))
assert not x, x
for s in sources:
n = self.sources[s]
logging.debug("mux: Source %s (%s) is active", n, s)
readthisloop = s.readinto(buffer)
if readthisloop == 0:
logging.debug("mux: Received no bytes from source %s, signaling peer to close corresponding source", n)
del self.sources[s]
header = struct.pack(PACKFORMAT, n, False, 0)
self.sink.write(header)
continue
l = readthisloop
header = struct.pack(PACKFORMAT, n, True, l)
self.sink.write(header)
write(self.sink, buffer, l)
logging.debug("mux: End of data multiplexer")
class DataDemultiplexer(MyThread):
def __init__(self, source, sinks):
threading.Thread.__init__(self)
self.setDaemon(True)
self.sinks = dict(enumerate(sinks))
self.source = source
def _run(self):
logging.debug("demux: Started with source %s and sinks %s", self.source, self.sinks)
buffer = bytearray(MAX_MUX_READ)
while self.sinks:
r, _, x = select.select([self.source], (), [self.source])
assert not x, x
for s in r:
header = s.read(PACKLEN)
if header == "":
logging.debug("demux: Received no bytes from source, closing all sinks")
for sink in self.sinks.values():
sink.close()
self.sinks = []
break
n, active, l = struct.unpack(PACKFORMAT, header)
if not active:
logging.debug("demux: Source %s now inactive, closing corresponding sink %s", s, self.sinks[n])
self.sinks[n].close()
del self.sinks[n]
else:
copy(self.source, self.sinks[n], buffer, l)
logging.debug("demux: End of data demultiplexer")
def main_master():
global logging
logging = LoggingEmu("master")
logging.info("Started with arguments: %s", sys.argv[1:])
global debug_enabled
args = sys.argv[1:]
if args[0] == "-d":
args = args[1:]
debug_enabled = True
remote_vm = args[0]
remote_command = args[1:]
assert remote_command
def anypython(exe):
return "` test -x %s && echo %s || echo python`" % (pipes.quote(exe),
pipes.quote(exe))
remote_helper_text = b"exec "
remote_helper_text += bytes(anypython(sys.executable), "utf-8")
remote_helper_text += bytes(" -u -c ", "utf-8")
remote_helper_text += bytes(pipes.quote(open(__file__, "r").read()), "ascii")
remote_helper_text += b" -d " if debug_enabled else b" "
remote_helper_text += base64.b64encode(pickle.dumps(remote_command, 2))
remote_helper_text += b"\n"
saved_stderr = openfdforappend(os.dup(sys.stderr.fileno()))
with mutexfile(os.path.expanduser("~/.bombshell-lock")):
try:
p = subprocess.Popen(
["qrexec-client-vm", remote_vm, "qubes.VMShell"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
close_fds=True,
preexec_fn=os.setpgrp,
bufsize=0,
)
except OSError as e:
logging.error("cannot launch qrexec-client-vm: %s", e)
return 127
logging.debug("Writing the helper text into the other side")
p.stdin.write(remote_helper_text)
p.stdin.flush()
confirmation, errmsg = recv_confirmation(p.stdout)
if confirmation != 0:
logging.error("remote: %s", errmsg)
return confirmation
handled_signals = (
signal.SIGINT,
signal.SIGABRT,
signal.SIGALRM,
signal.SIGTERM,
signal.SIGUSR1,
signal.SIGUSR2,
signal.SIGTSTP,
signal.SIGCONT,
)
read_signals, write_signals = pairofpipes()
signaler = SignalSender(handled_signals, write_signals)
signaler.setName("master signaler")
signaler.start()
muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin)
muxer.setName("master multiplexer")
muxer.start()
demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr])
demuxer.setName("master demultiplexer")
demuxer.start()
retval = p.wait()
logging.info("Return code %s for qubes.VMShell proxy", retval)
demuxer.join()
logging.info("Ending bombshell")
return retval
def pairofpipes():
read, write = os.pipe()
return os.fdopen(read, "rb", 0), os.fdopen(write, "wb", 0)
def main_remote():
global logging
logging = LoggingEmu("remote")
logging.info("Started with arguments: %s", sys.argv[1:])
global debug_enabled
if "-d" in sys.argv[1:]:
debug_enabled = True
cmd = sys.argv[2]
else:
cmd = sys.argv[1]
cmd = pickle.loads(base64.b64decode(cmd))
logging.debug("Received command: %s", cmd)
nicecmd = " ".join(pipes.quote(a) for a in cmd)
try:
p = subprocess.Popen(
cmd,
# ["strace", "-s4096", "-ff"] + cmd,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
close_fds=True,
bufsize=0,
)
send_confirmation(sys.stdout, 0, b"")
except OSError as e:
msg = "cannot execute %s: %s" % (nicecmd, e)
logging.error(msg)
send_confirmation(sys.stdout, 127, bytes(msg, "utf-8"))
sys.exit(0)
except BaseException as e:
msg = "cannot execute %s: %s" % (nicecmd, e)
logging.error(msg)
send_confirmation(sys.stdout, 126, bytes(msg, "utf-8"))
sys.exit(0)
signals_read, signals_written = pairofpipes()
signaler = Signaler(p, signals_read)
signaler.setName("remote signaler")
signaler.start()
demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written])
demuxer.setName("remote demultiplexer")
demuxer.start()
muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout)
muxer.setName("remote multiplexer")
muxer.start()
logging.info("Started %s", nicecmd)
retval = p.wait()
logging.info("Return code %s for %s", retval, nicecmd)
muxer.join()
logging.info("Ending bombshell")
return retval
sys.stdin = openfdforread(sys.stdin.fileno())
sys.stdout = openfdforappend(sys.stdout.fileno())
if "__file__" in locals() and not ("-s" in sys.argv[1:2]):
sys.exit(main_master())
else:
sys.exit(main_remote())