qubes-pass/bin/qvm-pass
Beto HydroxyButyrate e1aae10b77
qrexec-client-vm was stripping the escape chars on output.
The server was not aware of the TERM setting, so I made it
explicit, based on common assumptions.  For both gnome-term and
Xterm, the output now matches that of the pass command.
2021-03-02 20:55:32 +10:00

322 lines
12 KiB
Python
Executable File

#!/usr/bin/python3 -u
import argparse
import base64
import getpass
import os
import signal
import subprocess
import sys
signal.signal(signal.SIGINT, signal.SIG_DFL)
usage = "\n".join([
"qvm-pass usage:",
"",
" qvm-pass [-d <passvm>] [subcommand] [arguments...]",
"",
"subcommands:",
"",
" <no subcommand>",
" Retrieves the list of keys from the pass store.",
" <key>",
" Retrieves a key from the pass store.",
" generate [-n] [-f] <key> [pass-length]",
" Retrieves a key from the pass store; creates the key",
" with 25 characters length if it does not exist yet,",
" and returns the generated key on standard output.",
" The -n option excludes symbols from being used",
" during password generation.",
" get-or-generate [-n] <key> [pass-length]",
" Retrieves a key from the pass store; creates the key",
" with 25 characters length if it does not exist yet,",
" and returns the generated key on standard output.",
" The -n option excludes symbols from being used",
" during password generation.",
" insert [--echo,-e | --multiline,-m] [--force,-f] <key>",
" Creates a key in the pass store.",
" rm <key>",
" Removes a key from the pass store.",
" cp [-f] <key> <newkey>",
" Copies a key to another key in the pass store,",
" optionally forcefully.",
" mv [-f] <key> <newkey>",
" Moves a key to another key in the pass store,",
" optionally forcefully.",
" init <GPD ID> [GPG IDs...]",
" Initializes the pass store.",
])
force = 0
multiline = 0
echo = 0
nosymbols = 0
parser_for_discrimination = argparse.ArgumentParser(
description="(nobody sees this)"#
)
parser_for_discrimination.add_argument("-d", "--dest-vm", type=str,
help="Set the Qubes domain to operate with.",
default=os.environ.get('QUBES_PASS_DOMAIN', ""))
parser_for_subcommands = argparse.ArgumentParser(
description="A Qubes-RPC inter-vm client for the pass password manager.",
usage=usage
)
parser_for_subcommands.add_argument("-d", "--dest-vm", type=str,
help="Set the Qubes domain to operate with.",
default=os.environ.get('QUBES_PASS_DOMAIN', ""))
subparsers = parser_for_subcommands.add_subparsers(
help='sub-command help (run subcommand with --help as first parameter)',
required=False,
)
_parsers = {}
def _newcmd(name, desc):
if name not in _parsers:
_parsers[name] = subparsers.add_parser(name, help=desc)
_parsers[name].set_defaults(subcommand=name)
return _parsers[name]
for cmd in [("mv", "renames / moves a key in the store"),
("cp", "renames / copies a key in the store to a new location")]:
p = _newcmd(*cmd)
p.add_argument("original", help="original name of the key", type=str)
p.add_argument("new", help="new name for the original key", type=str)
p = _newcmd("init", "initializes a new pass store if none exists")
p.add_argument("gpgid", type=str, nargs="+",
help="list of GPG IDs to initialize the store with")
p = _newcmd("rm", "removes a key in the store")
p.add_argument("key", help="name of the key to be removed", type=str)
p = _newcmd("get-or-generate",
"retrieves a key from the password store, generating one if it does not exist")
p.add_argument("key", help="name of the key to be retrieved / generated", type=str)
p = _newcmd("generate",
"generates a key in the password store")
p.add_argument("key", help="name of the key to be generated", type=str)
p = _newcmd("insert",
"inserts a new key into the pass store")
p.add_argument("key", help="name of the key to be inserted", type=str)
for p in ["get-or-generate", "generate"]:
_parsers[p].add_argument("pass_length", type=int, nargs='?',
help="number of characters in generated password",
default=25)
_parsers[p].add_argument("-n", "--no-symbols", action="store_true",
help="no symbols in generated password",
default=False)
for p in ["mv", "cp", "rm", "insert", "generate"]:
_parsers[p].add_argument("-f", "--force", action="store_true",
help="force overwriting / removing passwords instead of prompting",
default=False)
for p in ["insert"]:
_parsers[p].add_argument("-m", "--multiline", action="store_true",
help="accept multi-line input, ending it with Ctrl+D (EOF)",
default=False)
_parsers[p].add_argument("-e", "--echo", action="store_true",
help="echo the password to the console during entry",
default=False)
def usage(string, *args):
if args:
string = string % args
print(string, file=sys.stderr)
parser_for_subcommands.print_help(sys.stderr)
sys.exit(2)
PASS_READ = "ruddo.PassRead"
PASS_MANAGE = "ruddo.PassManage"
def send_args(rpc, *args, **kwargs):
cmd = ['/usr/lib/qubes/qrexec-client-vm',
'--no-filter-escape-chars-stdout',
'--no-filter-escape-chars-stderr',
opts.dest_vm, rpc]
# print(cmd, file=sys.stderr)
return_stdout = kwargs.get("return_stdout", False)
if "return_stdout" in kwargs:
del kwargs["return_stdout"]
kwargs['stdout'] = subprocess.PIPE
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs)
for arg in args:
# print(arg, file=sys.stderr)
if isinstance(arg, str):
arg = base64.b64encode(arg.encode("utf-8")) + b"\n"
else:
arg = base64.b64encode(arg) + b"\n"
p.stdin.write(arg)
if return_stdout:
out, unused_err = p.communicate('')
p.stdin.close()
if return_stdout:
return p.wait(), out
else:
return p.wait()
def pass_read(*args, **kwargs):
return send_args(PASS_READ, *args, **kwargs)
def pass_manage(*args, **kwargs):
return send_args(PASS_MANAGE, *args, **kwargs)
if not any(x in sys.argv[1:] for x in ['--help', '-h', '-?']):
opts, args = parser_for_discrimination.parse_known_args()
if not opts.dest_vm:
try:
with open("/rw/config/pass-split-domain") as domain:
opts.dest_vm = domain.readlines()[0].strip()
except FileNotFoundError:
pass
if not opts.dest_vm:
usage("error: the QUBES_PASS_DOMAIN variable is not defined."
" Either create /rw/config/pass-split-domain with the VM containing"
" your pass setup, set the environment variable yourself,"
" or pass -d on the command line.",)
show = len(args) > 0 and (args[0] == "--" or args[0] == "ls" or args[0] == "list" or args[0] == "show")
if len(args) == 0 or (len(args) == 1 and show):
sys.exit(pass_read("list"))
elif len(args) == 1 or (len(args) == 2 and show):
sys.exit(pass_read("get", args[-1]))
opts = parser_for_subcommands.parse_args()
args = None
if not opts.dest_vm:
try:
with open("/rw/config/pass-split-domain") as domain:
opts.dest_vm = domain.readlines()[0].strip()
except FileNotFoundError:
pass
if not opts.dest_vm:
usage("error: the QUBES_PASS_DOMAIN variable is not defined."
" Either create /rw/config/pass-split-domain with the VM containing"
" your pass setup, set the environment variable yourself,"
" or pass -d on the command line.",)
if opts.subcommand == "get-or-list":
if opts.key:
sys.exit(pass_read("get", opts.key))
else:
sys.exit(pass_read("list"))
elif opts.subcommand in ("mv", "cp"):
if not opts.force and sys.stdin.isatty():
with open(os.devnull, "w") as null:
if pass_read("get", opts.new, stdout=null, stderr=null) == 0:
sys.stderr.write("%s: overwrite %s? " % (opts.subcommand, opts.new))
sys.stdin.read(1)
sys.exit(pass_manage(opts.subcommand, opts.original, opts.new, str(int(opts.force))))
elif opts.subcommand == "init":
sys.exit(pass_manage(opts.subcommand, *opts.gpgid))
elif opts.subcommand == "rm":
if not opts.force and sys.stdin.isatty():
with open(os.devnull, "w") as null:
if pass_read("get", opts.key, stdout=null, stderr=null) == 0:
sys.stderr.write("Are you sure you would like to delete %s? [y/N] " % (opts.key,))
ans = sys.stdin.readline().strip()
if ans and ans[0] in "yY":
pass
else:
sys.exit(1)
sys.exit(pass_manage(opts.subcommand, opts.key))
elif opts.subcommand == "get-or-generate":
with open(os.devnull, "w") as null:
ret, stdout = pass_read("get", opts.key, return_stdout=True, stderr=null)
if ret == 8:
# Not there.
with open(os.devnull, "w") as null:
ret = pass_manage("generate", opts.key, str(int(opts.no_symbols)), str(int(opts.pass_length)), stdout=null)
if ret != 0:
sys.exit(ret)
sys.exit(pass_read("get", opts.key))
elif ret == 0:
# There.
sys.stdout.buffer.write(stdout)
sys.exit(ret)
else:
# Woops.
sys.exit(ret)
elif opts.subcommand == "generate":
doit = lambda: sys.exit(pass_manage(opts.subcommand, opts.key, str(int(opts.no_symbols)), str(int(opts.pass_length))))
with open(os.devnull, "w") as null:
ret = pass_read("get", opts.key, stdout=null, stderr=null)
if ret == 8:
# Not there.
doit()
elif ret == 0:
# There:
if not opts.force and sys.stdin.isatty():
sys.stderr.write("An entry already exists for %s. Overwrite it? [y/N] " % (opts.key,))
ans = sys.stdin.readline().strip()
if ans and ans[0] in "yY":
doit()
else:
sys.exit(1)
else:
doit()
else:
sys.exit(ret)
elif opts.subcommand == "insert":
if not opts.force and sys.stdin.isatty():
with open(os.devnull, "w") as null:
ret = pass_read("get", opts.key, stdout=null, stderr=null)
if ret == 0:
# There. Confirm.
sys.stderr.write("An entry already exists for %s. Overwrite it? [y/N] " % (opts.key,))
ans = sys.stdin.readline().strip()
if ans and ans[0] in "yY":
pass
else:
sys.exit(1)
elif ret == 8:
# Not there. Fall through.
pass
else:
sys.exit(ret)
if opts.multiline:
print("Enter contents of %s and press Ctrl+D when finished:\n" % (opts.key, ), file=sys.stderr)
contents = sys.stdin.buffer.read()
else:
def promptpw(string):
if sys.stdin.isatty():
if opts.echo:
sys.stderr.write(string)
pw = sys.stdin.buffer.readline()
else:
pw = getpass.getpass(string)
else:
pw = sys.stdin.buffer.readline()
if not sys.stdin.isatty():
print()
if pw and pw[-1] == b"\n":
pw = pw[:-1]
return pw
contents = promptpw("Enter password for %s: " % (opts.key,))
pw2 = promptpw("Retype password for %s: " % (opts.key,))
if contents != pw2:
if sys.stdin.isatty():
print("Error: the entered passwords do not match.", file=sys.stderr)
sys.exit(1)
sys.exit(pass_manage(opts.subcommand, opts.key, str(int(opts.multiline)), contents))
else:
assert 0, "not reached"