mirror of
https://github.com/Rudd-O/ansible-qubes.git
synced 2025-03-01 14:22:33 +01:00
378 lines
10 KiB
Python
Executable File
378 lines
10 KiB
Python
Executable File
#!/usr/bin/python -u
|
|
|
|
import cPickle
|
|
# Security note:
|
|
#
|
|
# If you look at cPickle usage in bombshell, it's only used to package up
|
|
# the command line at the initiator side, and then it is unpacked at the
|
|
# receiver side. Given that the initiator has already been given all
|
|
# permissions to run arbitrary programs on the receiver, there is no
|
|
# additional security risk posed by the use of cPickle.
|
|
#
|
|
# End security note.
|
|
import contextlib
|
|
import fcntl
|
|
import os
|
|
import pipes
|
|
import Queue
|
|
import select
|
|
import signal
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mutexfile(filepath):
|
|
oldumask = os.umask(0077)
|
|
try:
|
|
f = file(filepath, "a")
|
|
finally:
|
|
os.umask(oldumask)
|
|
fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
|
|
yield
|
|
f.close()
|
|
|
|
|
|
debug_lock = threading.Lock()
|
|
debug_enabled = False
|
|
class LoggingEmu():
|
|
def __init__(self, prefix):
|
|
self.prefix = prefix
|
|
def debug(self, *a, **kw):
|
|
if not debug_enabled:
|
|
return
|
|
self._print(*a, **kw)
|
|
def info(self, *a, **kw):
|
|
self._print(*a, **kw)
|
|
def error(self, *a, **kw):
|
|
self._print(*a, **kw)
|
|
def _print(self, *a, **kw):
|
|
debug_lock.acquire()
|
|
try:
|
|
if len(a) == 1:
|
|
string = a[0]
|
|
else:
|
|
string = a[0] % a[1:]
|
|
print >> sys.stderr, self.prefix, string
|
|
finally:
|
|
debug_lock.release()
|
|
logging = LoggingEmu("master:" if "__file__" in globals() else "remote:")
|
|
|
|
|
|
def send_command(chan, cmd):
|
|
"""Sends a command over the wire.
|
|
|
|
Args:
|
|
chan: writable file-like object to send the command to
|
|
cmd: command to send, iterable of strings
|
|
"""
|
|
pickled = cPickle.dumps(cmd)
|
|
l = len(pickled)
|
|
assert l < 1<<32, l
|
|
chan.write(struct.pack("!I", l))
|
|
chan.write(pickled)
|
|
chan.flush()
|
|
logging.debug("Sent command: %s", cmd)
|
|
|
|
|
|
def recv_command(chan):
|
|
l = struct.unpack("!I", sys.stdin.read(4))[0]
|
|
pickled = chan.read(l)
|
|
cmd = cPickle.loads(pickled)
|
|
logging.debug("Received command: %s", cmd)
|
|
return cmd
|
|
|
|
|
|
def send_confirmation(chan, retval, errmsg):
|
|
chan.write(struct.pack("!H", retval))
|
|
l = len(errmsg)
|
|
assert l < 1<<32
|
|
chan.write(struct.pack("!I", l))
|
|
chan.write(errmsg)
|
|
chan.flush()
|
|
logging.debug("Sent confirmation: %s %s", retval, errmsg)
|
|
|
|
|
|
def recv_confirmation(chan):
|
|
logging.debug("Waiting for confirmation")
|
|
r = chan.read(2)
|
|
if len(r) == 0:
|
|
# This happens when the remote domain does not exist.
|
|
r, errmsg = 125, "domain does not exist"
|
|
logging.debug("No confirmation: %s %s", r, errmsg)
|
|
return r, errmsg
|
|
assert len(r) == 2, r
|
|
r = struct.unpack("!H", r)[0]
|
|
l = chan.read(4)
|
|
assert len(l) == 4, l
|
|
l = struct.unpack("!I", l)[0]
|
|
errmsg = chan.read(l)
|
|
logging.debug("Received confirmation: %s %s", r, errmsg)
|
|
return r, errmsg
|
|
|
|
|
|
class SignalSender(threading.Thread):
|
|
def __init__(self, signals, sigqueue):
|
|
"""Handles signals by pushing them into a file-like object."""
|
|
threading.Thread.__init__(self)
|
|
self.setDaemon(True)
|
|
self.queue = Queue.Queue()
|
|
self.sigqueue = sigqueue
|
|
for sig in signals:
|
|
signal.signal(sig, self.copy)
|
|
|
|
def copy(self, signum, frame):
|
|
self.queue.put(signum)
|
|
logging.debug("Signal %s pushed to queue", signum)
|
|
|
|
def run(self):
|
|
while True:
|
|
signum = self.queue.get()
|
|
logging.debug("Dequeued signal %s", signum)
|
|
if signum is None:
|
|
break
|
|
assert signum > 0
|
|
self.sigqueue.write(struct.pack("!H", signum))
|
|
self.sigqueue.flush()
|
|
logging.debug("Wrote signal %s to remote end", signum)
|
|
|
|
|
|
class Signaler(threading.Thread):
|
|
|
|
def __init__(self, process, sigqueue):
|
|
"""Reads integers from a file-like object and relays that as kill()."""
|
|
threading.Thread.__init__(self)
|
|
self.setDaemon(True)
|
|
self.process = process
|
|
self.sigqueue = sigqueue
|
|
|
|
def run(self):
|
|
while True:
|
|
data = self.sigqueue.read(2)
|
|
if len(data) == 0:
|
|
logging.debug("Received no signal data")
|
|
break
|
|
assert len(data) == 2
|
|
signum = struct.unpack("!H", data)[0]
|
|
logging.debug("Received relayed signal %s, sending to process", signum)
|
|
self.process.send_signal(signum)
|
|
logging.debug("End of signaler")
|
|
|
|
|
|
def unblock(fobj):
|
|
fl = fcntl.fcntl(fobj, fcntl.F_GETFL)
|
|
fcntl.fcntl(fobj, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
|
|
|
|
class DataMultiplexer(threading.Thread):
|
|
|
|
def __init__(self, sources, sink):
|
|
threading.Thread.__init__(self)
|
|
self.setDaemon(True)
|
|
self.sources = dict((s,num) for num, s in enumerate(sources))
|
|
self.sink = sink
|
|
|
|
def run(self):
|
|
map(unblock, (s for s in self.sources))
|
|
sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources))
|
|
assert not x, x
|
|
while sources:
|
|
logging.debug("mux: Sources that alarmed: %s", [self.sources[s] for s in sources])
|
|
for s in sources:
|
|
n = self.sources[s]
|
|
data = s.read()
|
|
if data == "":
|
|
logging.debug("Received no bytes from source %s", n)
|
|
del self.sources[s]
|
|
self.sink.write(struct.pack("!H", n))
|
|
self.sink.write(struct.pack("b", False))
|
|
self.sink.flush()
|
|
logging.debug("Informed sink about death of source %s", n)
|
|
continue
|
|
l = len(data)
|
|
logging.debug("Received %s bytes from source %s", l, n)
|
|
assert l < 1<<32
|
|
self.sink.write(struct.pack("!H", n))
|
|
self.sink.write(struct.pack("b", True))
|
|
self.sink.write(struct.pack("!I", l))
|
|
self.sink.write(data)
|
|
self.sink.flush()
|
|
logging.debug("Copied those %s bytes to sink", l)
|
|
if not self.sources:
|
|
break
|
|
sources, _, _ = select.select((s for s in self.sources), (), (s for s in self.sources))
|
|
assert not x, x
|
|
logging.debug("End of data multiplexer")
|
|
|
|
|
|
class DataDemultiplexer(threading.Thread):
|
|
|
|
def __init__(self, source, sinks):
|
|
threading.Thread.__init__(self)
|
|
self.setDaemon(True)
|
|
self.sinks = dict(enumerate(sinks))
|
|
self.source = source
|
|
|
|
def run(self):
|
|
while self.sinks:
|
|
r, _, x = select.select([self.source], (), [self.source])
|
|
assert not x, x
|
|
logging.debug("demux: Source alarmed")
|
|
for s in r:
|
|
n = s.read(2)
|
|
if n == "":
|
|
logging.debug("Received no bytes from source, closing all sinks")
|
|
for sink in self.sinks.values():
|
|
sink.close()
|
|
self.sinks = []
|
|
break
|
|
assert len(n) == 2, data
|
|
n = struct.unpack("!H", n)[0]
|
|
|
|
active = s.read(1)
|
|
assert len(active) == 1, active
|
|
active = struct.unpack("b", active)[0]
|
|
|
|
if not active:
|
|
logging.debug("Source %s now inactive, closing corresponding sink", n)
|
|
self.sinks[n].close()
|
|
del self.sinks[n]
|
|
else:
|
|
l = s.read(4)
|
|
assert len(l) == 4, l
|
|
l = struct.unpack("!I", l)[0]
|
|
data = s.read(l)
|
|
assert len(data) == l, len(data)
|
|
logging.debug("Received %s bytes from source %s, relaying to corresponding sink", l, n)
|
|
self.sinks[n].write(data)
|
|
self.sinks[n].flush()
|
|
logging.debug("Relayed %s bytes to sink %s", l, n)
|
|
logging.debug("End of data demultiplexer")
|
|
|
|
|
|
def main_master():
|
|
global debug_enabled
|
|
args = sys.argv[1:]
|
|
if args[0] == "-d":
|
|
args = args[1:]
|
|
debug_enabled = True
|
|
|
|
remote_vm = args[0]
|
|
remote_command = args[1:]
|
|
assert remote_command
|
|
|
|
remote_helper_text = "\n".join([
|
|
"exec python -u -c '",
|
|
open(__file__, "rb").read().replace("'", "'\\''"),
|
|
"'" + (" -d" if debug_enabled else ""),
|
|
"",
|
|
])
|
|
|
|
saved_stderr = os.fdopen(os.dup(sys.stderr.fileno()), "a")
|
|
|
|
with mutexfile(os.path.expanduser("~/.bombshell-lock")):
|
|
try:
|
|
p = subprocess.Popen(
|
|
["qrexec-client-vm", remote_vm, "qubes.VMShell"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
close_fds=True,
|
|
preexec_fn=os.setpgrp,
|
|
)
|
|
except OSError, e:
|
|
logging.error("cannot launch qrexec-client-vm: %s", e)
|
|
return 127
|
|
|
|
p.stdin.write(remote_helper_text)
|
|
p.stdin.flush()
|
|
|
|
send_command(p.stdin, remote_command)
|
|
confirmation, errmsg = recv_confirmation(p.stdout)
|
|
if confirmation != 0:
|
|
logging.error("remote: %s", errmsg)
|
|
return confirmation
|
|
|
|
handled_signals = (
|
|
signal.SIGINT,
|
|
signal.SIGABRT,
|
|
signal.SIGALRM,
|
|
signal.SIGTERM,
|
|
signal.SIGUSR1,
|
|
signal.SIGUSR2,
|
|
signal.SIGTSTP,
|
|
signal.SIGCONT,
|
|
)
|
|
read_signals, write_signals = pairofpipes()
|
|
signaler = SignalSender(handled_signals, write_signals)
|
|
signaler.start()
|
|
|
|
muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin)
|
|
muxer.start()
|
|
|
|
demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr])
|
|
demuxer.start()
|
|
|
|
retval = p.wait()
|
|
demuxer.join()
|
|
return retval
|
|
|
|
|
|
def pairofpipes():
|
|
read, write = os.pipe()
|
|
return os.fdopen(read, "rb"), os.fdopen(write, "wb")
|
|
|
|
|
|
def main_remote():
|
|
global debug_enabled
|
|
if len(sys.argv) > 1 and sys.argv[1] == "-d":
|
|
debug_enabled = True
|
|
|
|
cmd = recv_command(sys.stdin)
|
|
nicecmd = " ".join(pipes.quote(a) for a in cmd)
|
|
try:
|
|
p = subprocess.Popen(
|
|
cmd,
|
|
# ["strace", "-s4096", "-ff"] + cmd,
|
|
stdin = subprocess.PIPE,
|
|
stdout = subprocess.PIPE,
|
|
stderr = subprocess.PIPE,
|
|
close_fds=True,
|
|
)
|
|
send_confirmation(sys.stdout, 0, "")
|
|
except OSError, e:
|
|
msg = "cannot execute %s: %s" % (nicecmd, e)
|
|
logging.error(msg)
|
|
send_confirmation(sys.stdout, 127, msg)
|
|
sys.exit(0)
|
|
except BaseException, e:
|
|
msg = "cannot execute %s: %s" % (nicecmd, e)
|
|
logging.error(msg)
|
|
send_confirmation(sys.stdout, 126, msg)
|
|
sys.exit(0)
|
|
|
|
signals_read, signals_written = pairofpipes()
|
|
|
|
signaler = Signaler(p, signals_read)
|
|
signaler.start()
|
|
|
|
demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written])
|
|
demuxer.start()
|
|
|
|
muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout)
|
|
muxer.start()
|
|
|
|
logging.info("started %s", nicecmd)
|
|
|
|
retval = p.wait()
|
|
logging.info("return code %s for %s", retval, nicecmd)
|
|
muxer.join()
|
|
return retval
|
|
|
|
|
|
if "__file__" in locals():
|
|
sys.exit(main_master())
|
|
else:
|
|
sys.exit(main_remote())
|