diff --git a/ansible/connection_plugins/qubes.py b/ansible/connection_plugins/qubes.py index 8c91617..85e5e25 100644 --- a/ansible/connection_plugins/qubes.py +++ b/ansible/connection_plugins/qubes.py @@ -13,12 +13,12 @@ DOCUMENTATION = """ short_description: Execute tasks in Qubes VMs. description: - - Use the qssh command to run commands in Qubes OS VMs. + - Use the qrun command to run commands in Qubes OS VMs. version_added: "2.0" requirements: - - qssh (Python script from ansible-qubes) + - qrun (Python script from ansible-qubes) options: management_proxy: @@ -32,19 +32,31 @@ DOCUMENTATION = """ """ import distutils.spawn +import inspect import traceback +import textwrap import os import shlex +import sys import subprocess import pipes from ansible import errors from ansible import utils -from ansible.utils.display import Display -display = Display() +from ansible.plugins.loader import connection_loader from ansible.plugins.connection import ConnectionBase from ansible.utils.vars import combine_vars from ansible.module_utils._text import to_bytes +from ansible.utils.path import unfrackpath from ansible import constants as C +try: + from __main__ import display +except ImportError: + from ansible.utils.display import Display + display = Display() +class x(object): + def vvvv(self, text, host=None): + print >> file("/tmp/log", "ab"), text, host +display = x() BUFSIZE = 1024*1024 @@ -54,6 +66,163 @@ CONNECTION_OPTIONS = { } +def encode_exception(exc, stream): + stream.write('{}\n'.format(len(exc.__class__.__name__)).encode('ascii')) + stream.write('{}'.format(exc.__class__.__name__).encode('ascii')) + for attr in "errno", "filename", "message", "strerror": + stream.write('{}\n'.format(len('{}'.format(getattr(exc, attr)))).encode('ascii')) + stream.write('{}'.format('{}'.format(getattr(exc, attr))).encode('ascii')) + + +def decode_exception(stream): + name_len = stream.readline(16) + name_len = int(name_len) + name = stream.read(name_len) + keys = ["errno", "filename", "message", "strerror"] + vals = dict((a, None) for a in keys) + for k in keys: + v_len = stream.readline(16) + v_len = int(v_len) + v = stream.read(v_len) + if v == 'None': + vals[k] = None + else: + try: + vals[k] = int(v) + except Exception: + vals[k] = v + if name == "IOError": + e = IOError() + elif name == "OSError": + e = OSError() + else: + raise TypeError("Exception %s cannot be decoded" % name) + for k, v in vals.items(): + setattr(e, k, v) + return e + + +def popen(cmd, in_data, outf=sys.stdout): + try: + p = subprocess.Popen( + cmd, shell=False, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + out, err = p.communicate(in_data) + ret = p.wait() + outf.write('{}\n'.format('Y').encode('ascii')) + except (IOError, OSError) as e: + outf.write('{}\n'.format('N').encode('ascii')) + encode_exception(e, out) + outf.write('{}\n'.format(ret).encode('ascii')) + outf.write('{}\n'.format(len(out)).encode('ascii')) + outf.write(out) + outf.write('{}\n'.format(len(err)).encode('ascii')) + outf.write(err) + outf.flush() + + +def put(out_path): + try: + f = open(out_path, "wb") + sys.stdout.write('{}\n'.format('Y').encode('ascii')) + except (IOError, OSError) as e: + sys.stdout.write('{}\n'.format('N').encode('ascii')) + encode_exception(e, sys.stdout) + return + try: + while True: + chunksize = int(sys.stdin.readline(16).decode('ascii')) + if chunksize == 0: + break + chunk = sys.stdin.read(chunksize) + try: + f.write(chunk) + sys.stdout.write('{}\n'.format('Y').encode('ascii')) + except (IOError, OSError) as e: + sys.stdout.write('{}\n'.format('N').encode('ascii')) + encode_exception(e, sys.stdout) + return + try: + f.flush() + except (IOError, OSError) as e: + sys.stdout.write('{}\n'.format('N').encode('ascii')) + encode_exception(e, sys.stdout) + finally: + f.close() + + +def fetch(in_path, bufsize): + try: + f = open(in_path, "rb") + except (IOError, OSError) as e: + sys.stdout.write('{}\n'.format('N').encode('ascii')) + encode_exception(e, sys.stdout) + return + try: + while True: + try: + data = f.read(bufsize) + except (IOError, OSError) as e: + sys.stdout.write('{}\n'.format('N').encode('ascii')) + encode_exception(e, sys.stdout) + return + sys.stdout.write('{}\n'.format(len(data)).encode('ascii')) + if len(data) == 0: + sys.stdout.flush() + break + sys.stdout.write(data) + sys.stdout.flush() + finally: + f.close() + + +if __name__ == '__main__': + # FIXME: WRITE TESTS! + import StringIO + s = StringIO.StringIO() + try: + file("/doesnotexist") + except Exception as e: + encode_exception(e, s) + s.seek(0) + dec = decode_exception(s) + + +preamble = ''' +from __future__ import print_function +import sys, os, subprocess +sys.ps1 = '' +sys.ps2 = '' +sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) if hasattr(sys.stdin, 'buffer') else sys.stdin +sys.stdout = sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout +''' +payload = '\n\n'.join( + inspect.getsource(x) + for x in (encode_exception, popen, put, fetch) +) + \ +r''' + +_ = sys.stdout.write(b'OK\n') +sys.stdout.flush() +''' + + +def _prefix_login_path(remote_path): + ''' Make sure that we put files into a standard path + + If a path is relative, then we need to choose where to put it. + ssh chooses $HOME but we aren't guaranteed that a home dir will + exist in any given chroot. So for now we're choosing "/" instead. + This also happens to be the former default. + + Can revisit using $HOME instead if it's a problem + ''' + if not remote_path.startswith(os.path.sep): + remote_path = os.path.join(os.path.sep, remote_path) + return os.path.normpath(remote_path) + + class QubesRPCError(subprocess.CalledProcessError): def __init__(self, returncode, cmd, output=None): @@ -71,13 +240,18 @@ class Connection(ConnectionBase): transport = CONNECTION_TRANSPORT connection_options = CONNECTION_OPTIONS documentation = DOCUMENTATION - has_pipelining = False become_from_methods = frozenset(["sudo"]) + has_pipelining = True transport_cmd = None + _transport = None def __init__(self, play_context, new_stdin, *args, **kwargs): super(Connection, self).__init__(play_context, new_stdin, *args, **kwargs) + display.vvvv("INSTANTIATING %s" % (os.getppid(),), host=play_context.remote_addr) + if 'transport_cmd' in kwargs: + self.transport_cmd = kwargs['transport_cmd'] + return self.transport_cmd = distutils.spawn.find_executable('qrun') if not self.transport_cmd: self.transport_cmd = os.path.join( @@ -91,106 +265,187 @@ class Connection(ConnectionBase): self.transport_cmd = None if not self.transport_cmd: raise errors.AnsibleError("qrun command not found in PATH") + self.transport_cmd = [self.transport_cmd] + display.vvvv("INSTANTIATED %s" % (os.getppid(),), host=play_context.remote_addr) def _connect(self): - '''Connect to the VM; nothing to do here ''' + '''Connect to the VM. + + Unlike in earlier editions of this program, in this edition the + program attempts to create a persistent Python session with the + machine it's trying to connect to, speeding up greatly the exec- + ution of Ansible modules against VMs, whether local or remote + via SSH. In other words, we have pipelining now. + ''' super(Connection, self)._connect() if not self._connected: - display.vvv("THIS IS A QUBES VM", host=self._play_context.remote_addr) + remote_cmd = [to_bytes(x, errors='surrogate_or_strict') for x in [ + # 'strace', '-s', '2048', '-o', '/tmp/log', + 'python', '-i', '-c', preamble + ]] + addr = self._play_context.remote_addr + proxy = self.get_option("management_proxy") + if proxy: + proxy = ["--proxy=%s" % proxy] if proxy else [] + addr = addr.split(".")[0] + else: + proxy = [] + cmd = self.transport_cmd + proxy + [addr] + remote_cmd + display.vvvv("CONNECT %s" % (cmd,), host=self._play_context.remote_addr) + self._transport = subprocess.Popen( + cmd, shell=False, stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + try: + self._transport.stdin.write(payload) + self._transport.stdin.flush() + ok = self._transport.stdout.readline(16) + if not ok.startswith("OK\n"): + raise errors.AnsibleError("the remote end of the Qubes connection was not ready") + except Exception: + self._abort_transport() + raise + display.vvvv("CONNECTED %s" % (cmd,), host=self._play_context.remote_addr) self._connected = True - def _produce_command(self, cmd): - addr = self._play_context.remote_addr - proxy = self.get_option("management_proxy") - if proxy: - proxy = ["--proxy=%s" % proxy] if proxy else [] - addr = addr.split(".")[0] - else: - proxy = [] - if isinstance(cmd, basestring): - cmd = shlex.split(cmd) - cmd = [self.transport_cmd] + proxy + [addr] + cmd - display.vvv("COMMAND %s" % (cmd,), host=self._play_context.remote_addr) - return cmd + def _abort_transport(self): + display.vvvv("ABORT", host=self._play_context.remote_addr) + if self._transport: + display.vvvv("ABORTING", host=self._play_context.remote_addr) + try: + self._transport.kill() + except Exception: + pass + display.vvvv("ABORTED", host=self._play_context.remote_addr) + self.close() + + def close(self): + '''Terminate the connection.''' + super(Connection, self).close() + display.vvvv("CLOSE %s" % (os.getppid(),), host=self._play_context.remote_addr) + if self._transport: + display.vvvv("CLOSING %s" % (os.getppid(),), host=self._play_context.remote_addr) + self._transport.stdin.close() + self._transport.stdout.close() + retcode = self._transport.wait() + self._transport = None + self._connected = False + display.vvvv("CLOSED %s" % (os.getppid(),), host=self._play_context.remote_addr) def exec_command(self, cmd, in_data=None, sudoable=False): '''Run a command on the VM.''' super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) - cmd = self._produce_command(cmd) - cmd = [to_bytes(i, errors='surrogate_or_strict') for i in cmd] - p = subprocess.Popen(cmd, shell=False, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = p.communicate(in_data) - return (p.returncode, stdout, stderr) + if isinstance(cmd, basestring): + cmd = shlex.split(cmd) + display.vvvv("EXEC %s" % cmd, host=self._play_context.remote_addr) + try: + payload = 'popen(%r, %r)\n' % (cmd, in_data) + self._transport.stdin.write(payload) + self._transport.stdin.flush() + yesno = self._transport.stdout.readline(2) + except Exception: + self._abort_transport() + raise + if yesno == "Y\n": + try: + retcode = self._transport.stdout.readline(16) + retcode = int(retcode) + if retcode > 65536 or retcode < -65535: + raise errors.AnsibleError("return code from remote end is outside the range: %s" % retcode) + stdout_len = self._transport.stdout.readline(16) + stdout_len = int(stdout_len) + if stdout_len > 1024*1024*1024 or stdout_len < 0: + raise errors.AnsibleError("stdout size from remote end is invalid: %s" % stdout_len) + stdout = self._transport.stdout.read(stdout_len) if stdout_len != 0 else '' + if len(stdout) != stdout_len: + raise errors.AnsibleError("stdout size from remote end does not match actual stdout length: %s != %s" % (stdout_len, len(stdout))) + stderr_len = self._transport.stdout.readline(16) + stderr_len = int(stderr_len) + if stdout_len > 1024*1024*1024 or stdout_len < 0: + raise errors.AnsibleError("stderr size from remote end is invalid: %s" % stderr_len) + stderr = self._transport.stdout.read(stderr_len) if stderr_len != 0 else '' + if len(stderr) != stderr_len: + raise errors.AnsibleError("stderr size from remote end does not match actual stderr length: %s != %s" % (stderr_len, len(stderr))) + return (retcode, stdout, stderr) + except Exception: + self._abort_transport() + raise + elif yesno == "N\n": + exc = decode_exception(self._transport.stdin) + raise exc + else: + self._abort_transport() + raise errors.AnsibleError("pass/fail from remote end is unexpected: %s" % yesno) def put_file(self, in_path, out_path): - ''' transfer a file from local to VM ''' + '''Transfer a file from local to VM.''' super(Connection, self).put_file(in_path, out_path) - display.vvv("PUT %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr) - - out_path = self._prefix_login_path(out_path) - try: - with open(in_path, 'rb') as in_file: + display.vvvv("PUT %s to %s" % (in_path, out_path), host=self._play_context.remote_addr) + out_path = _prefix_login_path(out_path) + payload = 'put(%r)\n' % (out_path,) + self._transport.stdin.write(payload) + self._transport.stdin.flush() + yesno = self._transport.stdout.readline(2) + if yesno == "Y\n": + pass + elif yesno == "N\n": + exc = decode_exception(self._transport.stdin) + raise exc + else: + self._abort_transport() + raise errors.AnsibleError("pass/fail from remote end is unexpected: %s" % yesno) + with open(in_path, 'rb') as in_file: + while True: + chunk = in_file.read(BUFSIZE) try: - cmd = self._produce_command(['dd','of=%s' % out_path, 'bs=%s' % BUFSIZE]) - p = subprocess.Popen(cmd, shell=False, stdin=in_file, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - except OSError: - raise errors.AnsibleError("chroot connection requires dd command in the chroot") - try: - stdout, stderr = p.communicate() - except: - traceback.print_exc() - raise errors.AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) - if p.returncode != 0: - raise errors.AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) - except IOError: - raise errors.AnsibleError("file or module does not exist at: %s" % in_path) - - def _prefix_login_path(self, remote_path): - ''' Make sure that we put files into a standard path - - If a path is relative, then we need to choose where to put it. - ssh chooses $HOME but we aren't guaranteed that a home dir will - exist in any given chroot. So for now we're choosing "/" instead. - This also happens to be the former default. - - Can revisit using $HOME instead if it's a problem - ''' - if not remote_path.startswith(os.path.sep): - remote_path = os.path.join(os.path.sep, remote_path) - return os.path.normpath(remote_path) + self._transport.stdin.write("%s\n" % len(chunk)) + self._transport.stdin.flush() + if len(chunk) == 0: + break + self._transport.stdin.write(chunk) + self._transport.stdin.flush() + except Exception: + self._abort_transport() + raise + yesno = self._transport.stdout.readline(2) + if yesno == "Y\n": + pass + elif yesno == "N\n": + exc = decode_exception(self._transport.stdin) + raise exc + else: + self._abort_transport() + raise errors.AnsibleError("pass/fail from remote end is unexpected: %s" % yesno) def fetch_file(self, in_path, out_path): - ''' fetch a file from VM to local ''' + '''Fetch a file from VM to local.''' super(Connection, self).fetch_file(in_path, out_path) - display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr) - - in_path = self._prefix_login_path(in_path) + display.vvvv("FETCH %s to %s" % (in_path, out_path), host=self._play_context.remote_addr) + in_path = _prefix_login_path(in_path) + out_file = open(out_path, "wb") try: - cmd = self._produce_command(['dd', 'if=%s' % in_path, 'bs=%s' % BUFSIZE]) - p = subprocess.Popen(cmd, shell=False, stdin=open(os.devnull), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - except OSError: - raise errors.AnsibleError("Qubes connection requires dd command in the chroot") - - with open(out_path, 'wb+') as out_file: - try: - chunk = p.stdout.read(BUFSIZE) - while chunk: - out_file.write(chunk) - chunk = p.stdout.read(BUFSIZE) - except: - traceback.print_exc() - raise errors.AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) - stdout, stderr = p.communicate() - if p.returncode != 0: - raise errors.AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) - - def close(self): - ''' terminate the connection; nothing to do here ''' - super(Connection, self).close() - self._connected = False + payload = 'fetch(%r, %r)\n' % (in_path, BUFSIZE) + self._transport.stdin.write(payload) + self._transport.stdin.flush() + while True: + chunk_len = self._transport.stdout.readline(16) + try: + chunk_len = int(chunk_len) + except Exception: + if chunk_len == "N\n": + exc = decode_exception(self._transport.stdin) + raise exc + else: + self._abort_transport() + raise errors.AnsibleError("chunk size from remote end is unexpected: %s" % chunk_len) + if chunk_len > BUFSIZE or chunk_len < 0: + raise errors.AnsibleError("chunk size from remote end is invalid: %s" % chunk_len) + if chunk_len == 0: + break + chunk = self._transport.stdout.read(chunk_len) + if len(chunk) != chunk_len: + raise errors.AnsibleError("stderr size from remote end does not match actual stderr length: %s != %s" % (chunk_len, len(chunk))) + out_file.write(chunk) + except Exception: + self._abort_transport() + raise diff --git a/ansible/connection_plugins/test_qubes.py b/ansible/connection_plugins/test_qubes.py new file mode 100644 index 0000000..dddea39 --- /dev/null +++ b/ansible/connection_plugins/test_qubes.py @@ -0,0 +1,81 @@ +import sys, os ; sys.path.append(os.path.dirname(__file__)) + +import StringIO +import qubes +import unittest +import tempfile + + +cases = [ + (['true'], '', 'Y\n0\n0\n0\n'), + (['false'], '', 'Y\n1\n0\n0\n'), + (['sh', '-c', 'echo yes'], '', 'Y\n0\n4\nyes\n0\n'), + (['sh', '-c', 'echo yes >&2'], '', 'Y\n0\n0\n4\nyes\n'), +] +cases_with_harness = [ + (['true'], '', 0, '', ''), + (['false'], '', 1, '', ''), + (['sh', '-c', 'echo yes'], '', 0, 'yes\n', ''), + (['sh', '-c', 'echo yes >&2'], '', 0, '', 'yes\n'), +] + + +class MockPlayContext(object): + shell = 'sh' + become = False + become_method = 'sudo' + remote_addr = '127.0.0.7' + + +def local_connection(): + c = qubes.Connection( + MockPlayContext(), None, + transport_cmd=['sh', '-c', '"$@"'] + ) + c._options = {"management_proxy": None} + return c + + +class TestBasicThings(unittest.TestCase): + + def test_popen(self): + for cmd, in_, out in cases: + outf = StringIO.StringIO() + qubes.popen(cmd, in_, outf=outf) + self.assertMultiLineEqual( + outf.getvalue(), + out + ) + + def test_exec_command_with_harness(self): + for cmd, in_, ret, out, err in cases_with_harness: + c = local_connection() + retcode, stdout, stderr = c.exec_command(cmd) + self.assertEqual(ret, retcode) + self.assertMultiLineEqual(out, stdout) + self.assertMultiLineEqual(err, stderr) + c.close() + self.assertEqual(c._transport, None) + + def test_fetch_file_with_harness(self): + in_text = "abcd" + with tempfile.NamedTemporaryFile() as x: + x.write(in_text) + x.flush() + with tempfile.NamedTemporaryFile() as y: + c = local_connection() + c.fetch_file(in_path=x.name, out_path=y.name) + out_text = y.read() + self.assertEqual(in_text, out_text) + + + def test_put_file_with_harness(self): + in_text = "abcd" + with tempfile.NamedTemporaryFile() as x: + x.write(in_text) + x.flush() + with tempfile.NamedTemporaryFile() as y: + c = local_connection() + c.put_file(in_path=x.name, out_path=y.name) + out_text = y.read() + self.assertEqual(in_text, out_text)