mirror of
				https://github.com/Rudd-O/ansible-qubes.git
				synced 2025-10-31 11:38:55 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			355 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			355 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/python -u
 | |
| 
 | |
| import cPickle
 | |
| import fcntl
 | |
| import os
 | |
| import pipes
 | |
| import Queue
 | |
| import select
 | |
| import signal
 | |
| import struct
 | |
| import subprocess
 | |
| import sys
 | |
| import threading
 | |
| 
 | |
| 
 | |
| debug_lock = threading.Lock()
 | |
| debug_enabled = False
 | |
| class LoggingEmu():
 | |
|     def __init__(self, prefix):
 | |
|         self.prefix = prefix
 | |
|     def debug(self, *a, **kw):
 | |
|         if not debug_enabled:
 | |
|             return
 | |
|         self._print(*a, **kw)
 | |
|     def info(self, *a, **kw):
 | |
|         self._print(*a, **kw)
 | |
|     def error(self, *a, **kw):
 | |
|         self._print(*a, **kw)
 | |
|     def _print(self, *a, **kw):
 | |
|         debug_lock.acquire()
 | |
|         try:
 | |
|             if len(a) == 1:
 | |
|                 string = a[0]
 | |
|             else:
 | |
|                 string = a[0] % a[1:]
 | |
|             print >> sys.stderr, self.prefix, string
 | |
|         finally:
 | |
|             debug_lock.release()
 | |
| logging = LoggingEmu("master:" if "__file__" in globals() else "remote:")
 | |
| 
 | |
| 
 | |
| def send_command(chan, cmd):
 | |
|   """Sends a command over the wire.
 | |
| 
 | |
|   Args:
 | |
|     chan: writable file-like object to send the command to
 | |
|     cmd: command to send, iterable of strings
 | |
|   """
 | |
|   pickled = cPickle.dumps(cmd)
 | |
|   l = len(pickled)
 | |
|   assert l < 1<<32, l
 | |
|   chan.write(struct.pack("!I", l))
 | |
|   chan.write(pickled)
 | |
|   chan.flush()
 | |
|   logging.debug("Sent command: %s", cmd)
 | |
| 
 | |
| 
 | |
| def recv_command(chan):
 | |
|   l = struct.unpack("!I", sys.stdin.read(4))[0]
 | |
|   pickled = chan.read(l)
 | |
|   cmd = cPickle.loads(pickled)
 | |
|   logging.debug("Received command: %s", cmd)
 | |
|   return cmd
 | |
| 
 | |
| 
 | |
| def send_confirmation(chan, retval, errmsg):
 | |
|   chan.write(struct.pack("!H", retval))
 | |
|   l = len(errmsg)
 | |
|   assert l < 1<<32
 | |
|   chan.write(struct.pack("!I", l))
 | |
|   chan.write(errmsg)
 | |
|   chan.flush()
 | |
|   logging.debug("Sent confirmation: %s %s", retval, errmsg)
 | |
| 
 | |
| 
 | |
| def recv_confirmation(chan):
 | |
|   logging.debug("Waiting for confirmation")
 | |
|   r = chan.read(2)
 | |
|   if len(r) == 0:
 | |
|       # This happens when the remote domain does not exist.
 | |
|       r, errmsg = 125, "domain does not exist"
 | |
|       logging.debug("No confirmation: %s %s", r, errmsg)
 | |
|       return r, errmsg
 | |
|   assert len(r) == 2, r
 | |
|   r = struct.unpack("!H", r)[0]
 | |
|   l = chan.read(4)
 | |
|   assert len(l) == 4, l
 | |
|   l = struct.unpack("!I", l)[0]
 | |
|   errmsg = chan.read(l)
 | |
|   logging.debug("Received confirmation: %s %s", r, errmsg)
 | |
|   return r, errmsg
 | |
| 
 | |
| 
 | |
| class SignalSender(threading.Thread):
 | |
|   def __init__(self, signals, sigqueue):
 | |
|     """Handles signals by pushing them into a file-like object."""
 | |
|     threading.Thread.__init__(self)
 | |
|     self.setDaemon(True)
 | |
|     self.queue = Queue.Queue()
 | |
|     self.sigqueue = sigqueue
 | |
|     for sig in signals:
 | |
|       signal.signal(sig, self.copy)
 | |
| 
 | |
|   def copy(self, signum, frame):
 | |
|     self.queue.put(signum)
 | |
|     logging.debug("Signal %s pushed to queue", signum)
 | |
| 
 | |
|   def run(self):
 | |
|     while True:
 | |
|       signum = self.queue.get()
 | |
|       logging.debug("Dequeued signal %s", signum)
 | |
|       if signum is None:
 | |
|         break
 | |
|       assert signum > 0
 | |
|       self.sigqueue.write(struct.pack("!H", signum))
 | |
|       self.sigqueue.flush()
 | |
|       logging.debug("Wrote signal %s to remote end", signum)
 | |
| 
 | |
| 
 | |
| class Signaler(threading.Thread):
 | |
| 
 | |
|   def __init__(self, process, sigqueue):
 | |
|     """Reads integers from a file-like object and relays that as kill()."""
 | |
|     threading.Thread.__init__(self)
 | |
|     self.setDaemon(True)
 | |
|     self.process = process
 | |
|     self.sigqueue = sigqueue
 | |
| 
 | |
|   def run(self):
 | |
|     while True:
 | |
|         data = self.sigqueue.read(2)
 | |
|         if len(data) == 0:
 | |
|             logging.debug("Received no signal data")
 | |
|             break
 | |
|         assert len(data) == 2
 | |
|         signum = struct.unpack("!H", data)[0]
 | |
|         logging.debug("Received relayed signal %s, sending to process", signum)
 | |
|         self.process.send_signal(signum)
 | |
|     logging.debug("End of signaler")
 | |
| 
 | |
| 
 | |
| def unblock(fobj):
 | |
|   fl = fcntl.fcntl(fobj, fcntl.F_GETFL)
 | |
|   fcntl.fcntl(fobj, fcntl.F_SETFL, fl | os.O_NONBLOCK)
 | |
| 
 | |
| 
 | |
| class DataMultiplexer(threading.Thread):
 | |
| 
 | |
|   def __init__(self, sources, sink):
 | |
|     threading.Thread.__init__(self)
 | |
|     self.setDaemon(True)
 | |
|     self.sources = dict((s,num) for num, s in enumerate(sources))
 | |
|     self.sink = sink
 | |
| 
 | |
|   def run(self):
 | |
|     map(unblock, (s for s in self.sources))
 | |
|     sources, _, x = select.select((s for s in self.sources), (), (s for s in self.sources))
 | |
|     assert not x, x
 | |
|     while sources:
 | |
|       logging.debug("mux: Sources that alarmed: %s", [self.sources[s] for s in sources])
 | |
|       for s in sources:
 | |
|         n = self.sources[s]
 | |
|         data = s.read()
 | |
|         if data == "":
 | |
|           logging.debug("Received no bytes from source %s", n)
 | |
|           del self.sources[s]
 | |
|           self.sink.write(struct.pack("!H", n))
 | |
|           self.sink.write(struct.pack("b", False))
 | |
|           self.sink.flush()
 | |
|           logging.debug("Informed sink about death of source %s", n)
 | |
|           continue
 | |
|         l = len(data)
 | |
|         logging.debug("Received %s bytes from source %s", l, n)
 | |
|         assert l < 1<<32
 | |
|         self.sink.write(struct.pack("!H", n))
 | |
|         self.sink.write(struct.pack("b", True))
 | |
|         self.sink.write(struct.pack("!I", l))
 | |
|         self.sink.write(data)
 | |
|         self.sink.flush()
 | |
|         logging.debug("Copied those %s bytes to sink", l)
 | |
|       if not self.sources:
 | |
|         break
 | |
|       sources, _, _ = select.select((s for s in self.sources), (), (s for s in self.sources))
 | |
|       assert not x, x
 | |
|     logging.debug("End of data multiplexer")
 | |
| 
 | |
| 
 | |
| class DataDemultiplexer(threading.Thread):
 | |
| 
 | |
|   def __init__(self, source, sinks):
 | |
|     threading.Thread.__init__(self)
 | |
|     self.setDaemon(True)
 | |
|     self.sinks = dict(enumerate(sinks))
 | |
|     self.source = source
 | |
| 
 | |
|   def run(self):
 | |
|     while self.sinks:
 | |
|       r, _, x = select.select([self.source], (), [self.source])
 | |
|       assert not x, x
 | |
|       logging.debug("demux: Source alarmed")
 | |
|       for s in r:
 | |
|         n = s.read(2)
 | |
|         if n == "":
 | |
|           logging.debug("Received no bytes from source, closing all sinks")
 | |
|           for sink in self.sinks.values():
 | |
|               sink.close()
 | |
|           self.sinks = []
 | |
|           break
 | |
|         assert len(n) == 2, data
 | |
|         n = struct.unpack("!H", n)[0]
 | |
| 
 | |
|         active = s.read(1)
 | |
|         assert len(active) == 1, active
 | |
|         active = struct.unpack("b", active)[0]
 | |
| 
 | |
|         if not active:
 | |
|           logging.debug("Source %s now inactive, closing corresponding sink", n)
 | |
|           self.sinks[n].close()
 | |
|           del self.sinks[n]
 | |
|         else:
 | |
|           l = s.read(4)
 | |
|           assert len(l) == 4, l
 | |
|           l = struct.unpack("!I", l)[0]
 | |
|           data = s.read(l)
 | |
|           assert len(data) == l, len(data)
 | |
|           logging.debug("Received %s bytes from source %s, relaying to corresponding sink", l, n)
 | |
|           self.sinks[n].write(data)
 | |
|           self.sinks[n].flush()
 | |
|           logging.debug("Relayed %s bytes to sink %s", l, n)
 | |
|     logging.debug("End of data demultiplexer")
 | |
| 
 | |
| 
 | |
| def main_master():
 | |
|   global debug_enabled
 | |
|   args = sys.argv[1:]
 | |
|   if args[0] == "-d":
 | |
|       args = args[1:]
 | |
|       debug_enabled = True
 | |
| 
 | |
|   remote_vm = args[0]
 | |
|   remote_command = args[1:]
 | |
|   assert remote_command
 | |
| 
 | |
|   remote_helper_text = "\n".join([
 | |
|       "exec python -u -c '",
 | |
|       open(__file__, "rb").read().replace("'", "'\\''"),
 | |
|       "'" + (" -d" if debug_enabled else ""),
 | |
|       "",
 | |
|   ])
 | |
| 
 | |
|   saved_stderr = os.fdopen(os.dup(sys.stderr.fileno()), "a")
 | |
| 
 | |
|   try:
 | |
|     p = subprocess.Popen(
 | |
|       ["qrexec-client-vm", remote_vm, "qubes.VMShell"],
 | |
|       stdin=subprocess.PIPE,
 | |
|       stdout=subprocess.PIPE,
 | |
|       close_fds=True,
 | |
|       preexec_fn=os.setpgrp,
 | |
|     )
 | |
|   except OSError, e:
 | |
|     logging.error("cannot launch qrexec-client-vm: %s", e)
 | |
|     return 127
 | |
| 
 | |
|   p.stdin.write(remote_helper_text)
 | |
|   p.stdin.flush()
 | |
| 
 | |
|   send_command(p.stdin, remote_command)
 | |
|   confirmation, errmsg = recv_confirmation(p.stdout)
 | |
|   if confirmation != 0:
 | |
|     logging.error("remote: %s", errmsg)
 | |
|     return confirmation
 | |
| 
 | |
|   handled_signals = (
 | |
|       signal.SIGINT,
 | |
|       signal.SIGABRT,
 | |
|       signal.SIGALRM,
 | |
|       signal.SIGTERM,
 | |
|       signal.SIGUSR1,
 | |
|       signal.SIGUSR2,
 | |
|       signal.SIGTSTP,
 | |
|       signal.SIGCONT,
 | |
|   )
 | |
|   read_signals, write_signals = pairofpipes()
 | |
|   signaler = SignalSender(handled_signals, write_signals)
 | |
|   signaler.start()
 | |
| 
 | |
|   muxer = DataMultiplexer([sys.stdin, read_signals], p.stdin)
 | |
|   muxer.start()
 | |
| 
 | |
|   demuxer = DataDemultiplexer(p.stdout, [sys.stdout, saved_stderr])
 | |
|   demuxer.start()
 | |
| 
 | |
|   retval = p.wait()
 | |
|   demuxer.join()
 | |
|   return retval
 | |
| 
 | |
| 
 | |
| def pairofpipes():
 | |
|   read, write = os.pipe()
 | |
|   return os.fdopen(read, "rb"), os.fdopen(write, "wb")
 | |
| 
 | |
| 
 | |
| def main_remote():
 | |
|   global debug_enabled
 | |
|   if len(sys.argv) > 1 and sys.argv[1] == "-d":
 | |
|       debug_enabled =  True
 | |
| 
 | |
|   cmd = recv_command(sys.stdin)
 | |
|   nicecmd = " ".join(pipes.quote(a) for a in cmd)
 | |
|   try:
 | |
|     p = subprocess.Popen(
 | |
|       cmd,
 | |
| #      ["strace", "-s4096", "-ff"] + cmd,
 | |
|       stdin = subprocess.PIPE,
 | |
|       stdout = subprocess.PIPE,
 | |
|       stderr = subprocess.PIPE,
 | |
|       close_fds=True,
 | |
|     )
 | |
|     send_confirmation(sys.stdout, 0, "")
 | |
|   except OSError, e:
 | |
|     msg = "cannot execute %s: %s" % (nicecmd, e)
 | |
|     logging.error(msg)
 | |
|     send_confirmation(sys.stdout, 127, msg)
 | |
|     sys.exit(0)
 | |
|   except BaseException, e:
 | |
|     msg = "cannot execute %s: %s" % (nicecmd, e)
 | |
|     logging.error(msg)
 | |
|     send_confirmation(sys.stdout, 126, msg)
 | |
|     sys.exit(0)
 | |
| 
 | |
|   signals_read, signals_written = pairofpipes()
 | |
| 
 | |
|   signaler = Signaler(p, signals_read)
 | |
|   signaler.start()
 | |
| 
 | |
|   demuxer = DataDemultiplexer(sys.stdin, [p.stdin, signals_written])
 | |
|   demuxer.start()
 | |
| 
 | |
|   muxer = DataMultiplexer([p.stdout, p.stderr], sys.stdout)
 | |
|   muxer.start()
 | |
| 
 | |
|   logging.info("started %s", nicecmd)
 | |
| 
 | |
|   retval = p.wait()
 | |
|   logging.info("return code %s for %s", retval, nicecmd)
 | |
|   muxer.join()
 | |
|   return retval
 | |
| 
 | |
| 
 | |
| if "__file__" in locals():
 | |
|     sys.exit(main_master())
 | |
| else:
 | |
|     sys.exit(main_remote())
 | 
