Qubes plugin updated to support pipelining.

This commit is contained in:
Rudd-O 2018-07-16 01:14:59 +00:00
parent 3638d046ba
commit fc52d990eb
2 changed files with 426 additions and 90 deletions

View File

@ -13,12 +13,12 @@ DOCUMENTATION = """
short_description: Execute tasks in Qubes VMs. short_description: Execute tasks in Qubes VMs.
description: description:
- Use the qssh command to run commands in Qubes OS VMs. - Use the qrun command to run commands in Qubes OS VMs.
version_added: "2.0" version_added: "2.0"
requirements: requirements:
- qssh (Python script from ansible-qubes) - qrun (Python script from ansible-qubes)
options: options:
management_proxy: management_proxy:
@ -32,19 +32,31 @@ DOCUMENTATION = """
""" """
import distutils.spawn import distutils.spawn
import inspect
import traceback import traceback
import textwrap
import os import os
import shlex import shlex
import sys
import subprocess import subprocess
import pipes import pipes
from ansible import errors from ansible import errors
from ansible import utils from ansible import utils
from ansible.utils.display import Display from ansible.plugins.loader import connection_loader
display = Display()
from ansible.plugins.connection import ConnectionBase from ansible.plugins.connection import ConnectionBase
from ansible.utils.vars import combine_vars from ansible.utils.vars import combine_vars
from ansible.module_utils._text import to_bytes from ansible.module_utils._text import to_bytes
from ansible.utils.path import unfrackpath
from ansible import constants as C from ansible import constants as C
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
class x(object):
def vvvv(self, text, host=None):
print >> file("/tmp/log", "ab"), text, host
display = x()
BUFSIZE = 1024*1024 BUFSIZE = 1024*1024
@ -54,6 +66,163 @@ CONNECTION_OPTIONS = {
} }
def encode_exception(exc, stream):
stream.write('{}\n'.format(len(exc.__class__.__name__)).encode('ascii'))
stream.write('{}'.format(exc.__class__.__name__).encode('ascii'))
for attr in "errno", "filename", "message", "strerror":
stream.write('{}\n'.format(len('{}'.format(getattr(exc, attr)))).encode('ascii'))
stream.write('{}'.format('{}'.format(getattr(exc, attr))).encode('ascii'))
def decode_exception(stream):
name_len = stream.readline(16)
name_len = int(name_len)
name = stream.read(name_len)
keys = ["errno", "filename", "message", "strerror"]
vals = dict((a, None) for a in keys)
for k in keys:
v_len = stream.readline(16)
v_len = int(v_len)
v = stream.read(v_len)
if v == 'None':
vals[k] = None
else:
try:
vals[k] = int(v)
except Exception:
vals[k] = v
if name == "IOError":
e = IOError()
elif name == "OSError":
e = OSError()
else:
raise TypeError("Exception %s cannot be decoded" % name)
for k, v in vals.items():
setattr(e, k, v)
return e
def popen(cmd, in_data, outf=sys.stdout):
try:
p = subprocess.Popen(
cmd, shell=False, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
out, err = p.communicate(in_data)
ret = p.wait()
outf.write('{}\n'.format('Y').encode('ascii'))
except (IOError, OSError) as e:
outf.write('{}\n'.format('N').encode('ascii'))
encode_exception(e, out)
outf.write('{}\n'.format(ret).encode('ascii'))
outf.write('{}\n'.format(len(out)).encode('ascii'))
outf.write(out)
outf.write('{}\n'.format(len(err)).encode('ascii'))
outf.write(err)
outf.flush()
def put(out_path):
try:
f = open(out_path, "wb")
sys.stdout.write('{}\n'.format('Y').encode('ascii'))
except (IOError, OSError) as e:
sys.stdout.write('{}\n'.format('N').encode('ascii'))
encode_exception(e, sys.stdout)
return
try:
while True:
chunksize = int(sys.stdin.readline(16).decode('ascii'))
if chunksize == 0:
break
chunk = sys.stdin.read(chunksize)
try:
f.write(chunk)
sys.stdout.write('{}\n'.format('Y').encode('ascii'))
except (IOError, OSError) as e:
sys.stdout.write('{}\n'.format('N').encode('ascii'))
encode_exception(e, sys.stdout)
return
try:
f.flush()
except (IOError, OSError) as e:
sys.stdout.write('{}\n'.format('N').encode('ascii'))
encode_exception(e, sys.stdout)
finally:
f.close()
def fetch(in_path, bufsize):
try:
f = open(in_path, "rb")
except (IOError, OSError) as e:
sys.stdout.write('{}\n'.format('N').encode('ascii'))
encode_exception(e, sys.stdout)
return
try:
while True:
try:
data = f.read(bufsize)
except (IOError, OSError) as e:
sys.stdout.write('{}\n'.format('N').encode('ascii'))
encode_exception(e, sys.stdout)
return
sys.stdout.write('{}\n'.format(len(data)).encode('ascii'))
if len(data) == 0:
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
finally:
f.close()
if __name__ == '__main__':
# FIXME: WRITE TESTS!
import StringIO
s = StringIO.StringIO()
try:
file("/doesnotexist")
except Exception as e:
encode_exception(e, s)
s.seek(0)
dec = decode_exception(s)
preamble = '''
from __future__ import print_function
import sys, os, subprocess
sys.ps1 = ''
sys.ps2 = ''
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) if hasattr(sys.stdin, 'buffer') else sys.stdin
sys.stdout = sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout
'''
payload = '\n\n'.join(
inspect.getsource(x)
for x in (encode_exception, popen, put, fetch)
) + \
r'''
_ = sys.stdout.write(b'OK\n')
sys.stdout.flush()
'''
def _prefix_login_path(remote_path):
''' Make sure that we put files into a standard path
If a path is relative, then we need to choose where to put it.
ssh chooses $HOME but we aren't guaranteed that a home dir will
exist in any given chroot. So for now we're choosing "/" instead.
This also happens to be the former default.
Can revisit using $HOME instead if it's a problem
'''
if not remote_path.startswith(os.path.sep):
remote_path = os.path.join(os.path.sep, remote_path)
return os.path.normpath(remote_path)
class QubesRPCError(subprocess.CalledProcessError): class QubesRPCError(subprocess.CalledProcessError):
def __init__(self, returncode, cmd, output=None): def __init__(self, returncode, cmd, output=None):
@ -71,13 +240,18 @@ class Connection(ConnectionBase):
transport = CONNECTION_TRANSPORT transport = CONNECTION_TRANSPORT
connection_options = CONNECTION_OPTIONS connection_options = CONNECTION_OPTIONS
documentation = DOCUMENTATION documentation = DOCUMENTATION
has_pipelining = False
become_from_methods = frozenset(["sudo"]) become_from_methods = frozenset(["sudo"])
has_pipelining = True
transport_cmd = None transport_cmd = None
_transport = None
def __init__(self, play_context, new_stdin, *args, **kwargs): def __init__(self, play_context, new_stdin, *args, **kwargs):
super(Connection, self).__init__(play_context, new_stdin, *args, **kwargs) super(Connection, self).__init__(play_context, new_stdin, *args, **kwargs)
display.vvvv("INSTANTIATING %s" % (os.getppid(),), host=play_context.remote_addr)
if 'transport_cmd' in kwargs:
self.transport_cmd = kwargs['transport_cmd']
return
self.transport_cmd = distutils.spawn.find_executable('qrun') self.transport_cmd = distutils.spawn.find_executable('qrun')
if not self.transport_cmd: if not self.transport_cmd:
self.transport_cmd = os.path.join( self.transport_cmd = os.path.join(
@ -91,106 +265,187 @@ class Connection(ConnectionBase):
self.transport_cmd = None self.transport_cmd = None
if not self.transport_cmd: if not self.transport_cmd:
raise errors.AnsibleError("qrun command not found in PATH") raise errors.AnsibleError("qrun command not found in PATH")
self.transport_cmd = [self.transport_cmd]
display.vvvv("INSTANTIATED %s" % (os.getppid(),), host=play_context.remote_addr)
def _connect(self): def _connect(self):
'''Connect to the VM; nothing to do here ''' '''Connect to the VM.
Unlike in earlier editions of this program, in this edition the
program attempts to create a persistent Python session with the
machine it's trying to connect to, speeding up greatly the exec-
ution of Ansible modules against VMs, whether local or remote
via SSH. In other words, we have pipelining now.
'''
super(Connection, self)._connect() super(Connection, self)._connect()
if not self._connected: if not self._connected:
display.vvv("THIS IS A QUBES VM", host=self._play_context.remote_addr) remote_cmd = [to_bytes(x, errors='surrogate_or_strict') for x in [
# 'strace', '-s', '2048', '-o', '/tmp/log',
'python', '-i', '-c', preamble
]]
addr = self._play_context.remote_addr
proxy = self.get_option("management_proxy")
if proxy:
proxy = ["--proxy=%s" % proxy] if proxy else []
addr = addr.split(".")[0]
else:
proxy = []
cmd = self.transport_cmd + proxy + [addr] + remote_cmd
display.vvvv("CONNECT %s" % (cmd,), host=self._play_context.remote_addr)
self._transport = subprocess.Popen(
cmd, shell=False, stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
try:
self._transport.stdin.write(payload)
self._transport.stdin.flush()
ok = self._transport.stdout.readline(16)
if not ok.startswith("OK\n"):
raise errors.AnsibleError("the remote end of the Qubes connection was not ready")
except Exception:
self._abort_transport()
raise
display.vvvv("CONNECTED %s" % (cmd,), host=self._play_context.remote_addr)
self._connected = True self._connected = True
def _produce_command(self, cmd): def _abort_transport(self):
addr = self._play_context.remote_addr display.vvvv("ABORT", host=self._play_context.remote_addr)
proxy = self.get_option("management_proxy") if self._transport:
if proxy: display.vvvv("ABORTING", host=self._play_context.remote_addr)
proxy = ["--proxy=%s" % proxy] if proxy else [] try:
addr = addr.split(".")[0] self._transport.kill()
else: except Exception:
proxy = [] pass
if isinstance(cmd, basestring): display.vvvv("ABORTED", host=self._play_context.remote_addr)
cmd = shlex.split(cmd) self.close()
cmd = [self.transport_cmd] + proxy + [addr] + cmd
display.vvv("COMMAND %s" % (cmd,), host=self._play_context.remote_addr) def close(self):
return cmd '''Terminate the connection.'''
super(Connection, self).close()
display.vvvv("CLOSE %s" % (os.getppid(),), host=self._play_context.remote_addr)
if self._transport:
display.vvvv("CLOSING %s" % (os.getppid(),), host=self._play_context.remote_addr)
self._transport.stdin.close()
self._transport.stdout.close()
retcode = self._transport.wait()
self._transport = None
self._connected = False
display.vvvv("CLOSED %s" % (os.getppid(),), host=self._play_context.remote_addr)
def exec_command(self, cmd, in_data=None, sudoable=False): def exec_command(self, cmd, in_data=None, sudoable=False):
'''Run a command on the VM.''' '''Run a command on the VM.'''
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
cmd = self._produce_command(cmd) if isinstance(cmd, basestring):
cmd = [to_bytes(i, errors='surrogate_or_strict') for i in cmd] cmd = shlex.split(cmd)
p = subprocess.Popen(cmd, shell=False, stdin=subprocess.PIPE, display.vvvv("EXEC %s" % cmd, host=self._play_context.remote_addr)
stdout=subprocess.PIPE, try:
stderr=subprocess.PIPE) payload = 'popen(%r, %r)\n' % (cmd, in_data)
stdout, stderr = p.communicate(in_data) self._transport.stdin.write(payload)
return (p.returncode, stdout, stderr) self._transport.stdin.flush()
yesno = self._transport.stdout.readline(2)
except Exception:
self._abort_transport()
raise
if yesno == "Y\n":
try:
retcode = self._transport.stdout.readline(16)
retcode = int(retcode)
if retcode > 65536 or retcode < -65535:
raise errors.AnsibleError("return code from remote end is outside the range: %s" % retcode)
stdout_len = self._transport.stdout.readline(16)
stdout_len = int(stdout_len)
if stdout_len > 1024*1024*1024 or stdout_len < 0:
raise errors.AnsibleError("stdout size from remote end is invalid: %s" % stdout_len)
stdout = self._transport.stdout.read(stdout_len) if stdout_len != 0 else ''
if len(stdout) != stdout_len:
raise errors.AnsibleError("stdout size from remote end does not match actual stdout length: %s != %s" % (stdout_len, len(stdout)))
stderr_len = self._transport.stdout.readline(16)
stderr_len = int(stderr_len)
if stdout_len > 1024*1024*1024 or stdout_len < 0:
raise errors.AnsibleError("stderr size from remote end is invalid: %s" % stderr_len)
stderr = self._transport.stdout.read(stderr_len) if stderr_len != 0 else ''
if len(stderr) != stderr_len:
raise errors.AnsibleError("stderr size from remote end does not match actual stderr length: %s != %s" % (stderr_len, len(stderr)))
return (retcode, stdout, stderr)
except Exception:
self._abort_transport()
raise
elif yesno == "N\n":
exc = decode_exception(self._transport.stdin)
raise exc
else:
self._abort_transport()
raise errors.AnsibleError("pass/fail from remote end is unexpected: %s" % yesno)
def put_file(self, in_path, out_path): def put_file(self, in_path, out_path):
''' transfer a file from local to VM ''' '''Transfer a file from local to VM.'''
super(Connection, self).put_file(in_path, out_path) super(Connection, self).put_file(in_path, out_path)
display.vvv("PUT %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr) display.vvvv("PUT %s to %s" % (in_path, out_path), host=self._play_context.remote_addr)
out_path = _prefix_login_path(out_path)
out_path = self._prefix_login_path(out_path) payload = 'put(%r)\n' % (out_path,)
try: self._transport.stdin.write(payload)
with open(in_path, 'rb') as in_file: self._transport.stdin.flush()
yesno = self._transport.stdout.readline(2)
if yesno == "Y\n":
pass
elif yesno == "N\n":
exc = decode_exception(self._transport.stdin)
raise exc
else:
self._abort_transport()
raise errors.AnsibleError("pass/fail from remote end is unexpected: %s" % yesno)
with open(in_path, 'rb') as in_file:
while True:
chunk = in_file.read(BUFSIZE)
try: try:
cmd = self._produce_command(['dd','of=%s' % out_path, 'bs=%s' % BUFSIZE]) self._transport.stdin.write("%s\n" % len(chunk))
p = subprocess.Popen(cmd, shell=False, stdin=in_file, self._transport.stdin.flush()
stdout=subprocess.PIPE, if len(chunk) == 0:
stderr=subprocess.PIPE) break
except OSError: self._transport.stdin.write(chunk)
raise errors.AnsibleError("chroot connection requires dd command in the chroot") self._transport.stdin.flush()
try: except Exception:
stdout, stderr = p.communicate() self._abort_transport()
except: raise
traceback.print_exc() yesno = self._transport.stdout.readline(2)
raise errors.AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) if yesno == "Y\n":
if p.returncode != 0: pass
raise errors.AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) elif yesno == "N\n":
except IOError: exc = decode_exception(self._transport.stdin)
raise errors.AnsibleError("file or module does not exist at: %s" % in_path) raise exc
else:
def _prefix_login_path(self, remote_path): self._abort_transport()
''' Make sure that we put files into a standard path raise errors.AnsibleError("pass/fail from remote end is unexpected: %s" % yesno)
If a path is relative, then we need to choose where to put it.
ssh chooses $HOME but we aren't guaranteed that a home dir will
exist in any given chroot. So for now we're choosing "/" instead.
This also happens to be the former default.
Can revisit using $HOME instead if it's a problem
'''
if not remote_path.startswith(os.path.sep):
remote_path = os.path.join(os.path.sep, remote_path)
return os.path.normpath(remote_path)
def fetch_file(self, in_path, out_path): def fetch_file(self, in_path, out_path):
''' fetch a file from VM to local ''' '''Fetch a file from VM to local.'''
super(Connection, self).fetch_file(in_path, out_path) super(Connection, self).fetch_file(in_path, out_path)
display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr) display.vvvv("FETCH %s to %s" % (in_path, out_path), host=self._play_context.remote_addr)
in_path = _prefix_login_path(in_path)
in_path = self._prefix_login_path(in_path) out_file = open(out_path, "wb")
try: try:
cmd = self._produce_command(['dd', 'if=%s' % in_path, 'bs=%s' % BUFSIZE]) payload = 'fetch(%r, %r)\n' % (in_path, BUFSIZE)
p = subprocess.Popen(cmd, shell=False, stdin=open(os.devnull), self._transport.stdin.write(payload)
stdout=subprocess.PIPE, self._transport.stdin.flush()
stderr=subprocess.PIPE) while True:
except OSError: chunk_len = self._transport.stdout.readline(16)
raise errors.AnsibleError("Qubes connection requires dd command in the chroot") try:
chunk_len = int(chunk_len)
with open(out_path, 'wb+') as out_file: except Exception:
try: if chunk_len == "N\n":
chunk = p.stdout.read(BUFSIZE) exc = decode_exception(self._transport.stdin)
while chunk: raise exc
out_file.write(chunk) else:
chunk = p.stdout.read(BUFSIZE) self._abort_transport()
except: raise errors.AnsibleError("chunk size from remote end is unexpected: %s" % chunk_len)
traceback.print_exc() if chunk_len > BUFSIZE or chunk_len < 0:
raise errors.AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) raise errors.AnsibleError("chunk size from remote end is invalid: %s" % chunk_len)
stdout, stderr = p.communicate() if chunk_len == 0:
if p.returncode != 0: break
raise errors.AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) chunk = self._transport.stdout.read(chunk_len)
if len(chunk) != chunk_len:
def close(self): raise errors.AnsibleError("stderr size from remote end does not match actual stderr length: %s != %s" % (chunk_len, len(chunk)))
''' terminate the connection; nothing to do here ''' out_file.write(chunk)
super(Connection, self).close() except Exception:
self._connected = False self._abort_transport()
raise

View File

@ -0,0 +1,81 @@
import sys, os ; sys.path.append(os.path.dirname(__file__))
import StringIO
import qubes
import unittest
import tempfile
cases = [
(['true'], '', 'Y\n0\n0\n0\n'),
(['false'], '', 'Y\n1\n0\n0\n'),
(['sh', '-c', 'echo yes'], '', 'Y\n0\n4\nyes\n0\n'),
(['sh', '-c', 'echo yes >&2'], '', 'Y\n0\n0\n4\nyes\n'),
]
cases_with_harness = [
(['true'], '', 0, '', ''),
(['false'], '', 1, '', ''),
(['sh', '-c', 'echo yes'], '', 0, 'yes\n', ''),
(['sh', '-c', 'echo yes >&2'], '', 0, '', 'yes\n'),
]
class MockPlayContext(object):
shell = 'sh'
become = False
become_method = 'sudo'
remote_addr = '127.0.0.7'
def local_connection():
c = qubes.Connection(
MockPlayContext(), None,
transport_cmd=['sh', '-c', '"$@"']
)
c._options = {"management_proxy": None}
return c
class TestBasicThings(unittest.TestCase):
def test_popen(self):
for cmd, in_, out in cases:
outf = StringIO.StringIO()
qubes.popen(cmd, in_, outf=outf)
self.assertMultiLineEqual(
outf.getvalue(),
out
)
def test_exec_command_with_harness(self):
for cmd, in_, ret, out, err in cases_with_harness:
c = local_connection()
retcode, stdout, stderr = c.exec_command(cmd)
self.assertEqual(ret, retcode)
self.assertMultiLineEqual(out, stdout)
self.assertMultiLineEqual(err, stderr)
c.close()
self.assertEqual(c._transport, None)
def test_fetch_file_with_harness(self):
in_text = "abcd"
with tempfile.NamedTemporaryFile() as x:
x.write(in_text)
x.flush()
with tempfile.NamedTemporaryFile() as y:
c = local_connection()
c.fetch_file(in_path=x.name, out_path=y.name)
out_text = y.read()
self.assertEqual(in_text, out_text)
def test_put_file_with_harness(self):
in_text = "abcd"
with tempfile.NamedTemporaryFile() as x:
x.write(in_text)
x.flush()
with tempfile.NamedTemporaryFile() as y:
c = local_connection()
c.put_file(in_path=x.name, out_path=y.name)
out_text = y.read()
self.assertEqual(in_text, out_text)