#!/usr/bin/python3 -u import argparse import base64 import getpass import os import signal import subprocess import sys try: from pipes import quote except ImportError: from shlex import quote signal.signal(signal.SIGINT, signal.SIG_DFL) usage = "\n".join( [ "qvm-pass usage:", "", " qvm-pass [-d ] [subcommand] [arguments...]", "", "subcommands (try qvm-pass subcommand --help for more):", "", " [ls|list|show]", " Retrieves the list of keys from the pass store.", " [show] [-c] ", " Retrieves a key from the pass store.", " generate [-n] [-f] [pass-length]", " Retrieves a key from the pass store; creates the key", " with 25 characters length if it does not exist yet,", " and returns the generated key on standard output.", " The -n option excludes symbols from being used", " during password generation.", " get-or-generate [-n] [pass-length]", " Retrieves a key from the pass store; creates the key", " with 25 characters length if it does not exist yet,", " and returns the generated key on standard output.", " The -n option excludes symbols from being used", " during password generation.", " insert [--echo,-e | --multiline,-m] [--force,-f] ", " Creates a key in the pass store.", " rm ", " Removes a key from the pass store.", " cp [-f] ", " Copies a key to another key in the pass store,", " optionally forcefully.", " mv [-f] ", " Moves a key to another key in the pass store,", " optionally forcefully.", " init [GPG IDs...]", " Initializes the pass store.", ] ) # The following code was lifted from the pass program to support # identical functionality. shell_functions = """ X_SELECTION="${PASSWORD_STORE_X_SELECTION:-clipboard}" CLIP_TIME="${PASSWORD_STORE_CLIP_TIME:-45}" clip() { if [[ -n $WAYLAND_DISPLAY ]]; then local copy_cmd=( wl-copy ) local paste_cmd=( wl-paste -n ) if [[ $X_SELECTION == primary ]]; then copy_cmd+=( --primary ) paste_cmd+=( --primary ) fi local display_name="$WAYLAND_DISPLAY" elif [[ -n $DISPLAY ]]; then local copy_cmd=( xclip -selection "$X_SELECTION" ) local paste_cmd=( xclip -o -selection "$X_SELECTION" ) local display_name="$DISPLAY" else die "Error: No X11 or Wayland display detected" fi local sleep_argv0="password store sleep on display $display_name" # This base64 business is because bash cannot store binary data in a shell # variable. Specifically, it cannot store nulls nor (non-trivally) store # trailing new lines. pkill -f "^$sleep_argv0" 2>/dev/null && sleep 0.5 local before="$("${paste_cmd[@]}" 2>/dev/null | $BASE64)" echo -n "$1" | "${copy_cmd[@]}" || die "Error: Could not copy data to the clipboard" ( ( exec -a "$sleep_argv0" bash <<<"trap 'kill %1' TERM; sleep '$CLIP_TIME' & wait" ) local now="$("${paste_cmd[@]}" | $BASE64)" [[ $now != $(echo -n "$1" | $BASE64) ]] && before="$now" # It might be nice to programatically check to see if klipper exists, # as well as checking for other common clipboard managers. But for now, # this works fine -- if qdbus isn't there or if klipper isn't running, # this essentially becomes a no-op. # # Clipboard managers frequently write their history out in plaintext, # so we axe it here: qdbus org.kde.klipper /klipper org.kde.klipper.klipper.clearClipboardHistory &>/dev/null echo "$before" | $BASE64 -d | "${copy_cmd[@]}" ) >/dev/null 2>&1 & disown echo "Copied $2 to clipboard. Will clear in $CLIP_TIME seconds." } qrcode() { if [[ -n $DISPLAY || -n $WAYLAND_DISPLAY ]]; then if type feh >/dev/null 2>&1; then echo -n "$1" | qrencode --size 10 -o - | feh -x --title "pass: $2" -g +200+200 - return elif type gm >/dev/null 2>&1; then echo -n "$1" | qrencode --size 10 -o - | gm display -title "pass: $2" -geometry +200+200 - return elif type display >/dev/null 2>&1; then echo -n "$1" | qrencode --size 10 -o - | display -title "pass: $2" -geometry +200+200 - return fi fi echo -n "$1" | qrencode -t utf8 } """ def pass_frontend_shell(cmd): global shell_functions quoted_cmd = shell_functions + "\n\n" + " ".join(quote(x) for x in cmd) return subprocess.call(["bash", "-c", quoted_cmd]) def clip(data, path): return pass_frontend_shell(["clip", data, path]) def qrcode(data, path): return pass_frontend_shell(["qrcode", data, path]) parser_for_discrimination = argparse.ArgumentParser(description="(nobody sees this)") parser_for_discrimination.add_argument( "-d", "--dest-vm", type=str, help="Set the Qubes domain to operate with.", default=os.environ.get("QUBES_PASS_DOMAIN", ""), ) parser_for_discrimination.add_argument( "arguments", nargs="*", help="the rest of the arguments" ) parser_for_subcommands = argparse.ArgumentParser( description="A Qubes-RPC inter-vm client for the pass password manager.", usage=usage, ) parser_for_subcommands.add_argument( "-d", "--dest-vm", type=str, help="Set the Qubes domain to operate with.", default=os.environ.get("QUBES_PASS_DOMAIN", ""), ) subparsers = parser_for_subcommands.add_subparsers( help="sub-command help (run subcommand with --help as first parameter)", required=False, ) _parsers = {} def _newcmd(name, desc, aliases=None): if name not in _parsers: kwargs = {"aliases": aliases} if aliases else {} _parsers[name] = subparsers.add_parser(name, help=desc, **kwargs) _parsers[name].set_defaults(subcommand=name) return _parsers[name] for cmd in [ ("mv", "renames / moves a key in the store"), ("cp", "renames / copies a key in the store to a new location"), ]: p = _newcmd(*cmd) p.add_argument("original", help="original name of the key", type=str) p.add_argument("new", help="new name for the original key", type=str) p = _newcmd("init", "initializes a new pass store if none exists") p.add_argument( "gpgid", type=str, nargs="+", help="list of GPG IDs to initialize the store with" ) p = _newcmd("rm", "removes a key in the store") p.add_argument("key", help="name of the key to be removed", type=str) p = _newcmd( "get-or-generate", "retrieves a key from the password store, generating one if it does not exist", ) p.add_argument("key", help="name of the key to be retrieved / generated", type=str) p = _newcmd("show", "shows existing password") p.add_argument("key", help="name of the key to be retrieved", type=str, nargs="?") p = _newcmd("ls", "lists passwords", aliases=["list"]) p = _newcmd("generate", "generates a key in the password store") p.add_argument("key", help="name of the key to be generated", type=str) p = _newcmd("insert", "inserts a new key into the pass store") p.add_argument("key", help="name of the key to be inserted", type=str) for p in ["get-or-generate", "generate"]: _parsers[p].add_argument( "pass_length", type=int, nargs="?", help="number of characters in generated password", default=25, ) _parsers[p].add_argument( "-n", "--no-symbols", action="store_true", help="no symbols in generated password", default=False, ) for p in ["show", "get-or-generate", "generate"]: _parsers[p].add_argument( "-c", "--clip", action="store_true", help="copy password to clipboard instead of displaying onscreen", default=False, ) _parsers[p].add_argument( "-q", "--qrcode", action="store_true", help="display password as QR code", default=False, ) for p in ["mv", "cp", "rm", "insert", "generate"]: _parsers[p].add_argument( "-f", "--force", action="store_true", help="force overwriting / removing passwords instead of prompting", default=False, ) for p in ["insert"]: _parsers[p].add_argument( "-m", "--multiline", action="store_true", help="accept multi-line input, ending it with Ctrl+D (EOF)", default=False, ) _parsers[p].add_argument( "-e", "--echo", action="store_true", help="echo the password to the console during entry", default=False, ) known_subparsers = [ x for x in parser_for_subcommands._actions if isinstance(x, argparse._SubParsersAction) ][0] subcommands = known_subparsers.choices.keys() def usage(string, *args): if args: string = string % args print(string, file=sys.stderr) parser_for_subcommands.print_help(sys.stderr) sys.exit(2) PASS_READ = "ruddo.PassRead" PASS_MANAGE = "ruddo.PassManage" def send_args(rpc, *args, **kwargs): cmd = [ "/usr/lib/qubes/qrexec-client-vm", "--no-filter-escape-chars-stdout", "--no-filter-escape-chars-stderr", opts.dest_vm, rpc, ] # print(cmd, file=sys.stderr) return_stdout = kwargs.get("return_stdout", False) if "return_stdout" in kwargs: del kwargs["return_stdout"] kwargs["stdout"] = subprocess.PIPE p = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs) for arg in args: # print(arg, file=sys.stderr) if isinstance(arg, str): arg = base64.b64encode(arg.encode("utf-8")) + b"\n" else: arg = base64.b64encode(arg) + b"\n" p.stdin.write(arg) if return_stdout: out, unused_err = p.communicate("") p.stdin.close() if return_stdout: return p.wait(), out else: return p.wait() def pass_read(*args, **kwargs): return send_args(PASS_READ, *args, **kwargs) def pass_manage(*args, **kwargs): return send_args(PASS_MANAGE, *args, **kwargs) arguments = sys.argv[1:] if "--help" in arguments or "-h" in arguments or "-?" in arguments: parser_for_subcommands.parse_known_args(arguments) sys.exit(os.EX_USAGE) global_opts, args = parser_for_discrimination.parse_known_args(arguments) if len(global_opts.arguments) == 0: global_opts.arguments = ["ls"] + global_opts.arguments + args elif ( len(global_opts.arguments) == 1 and global_opts.arguments[0] not in subcommands ): global_opts.arguments = ["show"] + global_opts.arguments + args arguments = global_opts.arguments + args opts = parser_for_subcommands.parse_args(arguments) if not global_opts.dest_vm: try: with open("/rw/config/pass-split-domain") as domain: global_opts.dest_vm = domain.readlines()[0].strip() except FileNotFoundError: pass if not global_opts.dest_vm: usage( "error: the QUBES_PASS_DOMAIN variable is not defined." " Either create /rw/config/pass-split-domain with the VM containing" " your pass setup, set the environment variable yourself," " or pass -d on the command line.\n", ) opts.dest_vm = global_opts.dest_vm if opts.subcommand == "ls" or (opts.subcommand == "show" and opts.key is None): # User requested ls, or no argument, or show with no argument. sys.exit(pass_read("list")) elif opts.subcommand == "show": # User requested a password, or show with an argument. if opts.clip or opts.qrcode: ret, stdout = pass_read("get", opts.key, return_stdout=True) if ret != 0: sys.exit(ret) stdout = stdout.decode("utf-8") if opts.clip: sys.exit(clip(stdout, opts.key)) elif opts.qrcode: sys.exit(qrcode(stdout, opts.key)) else: sys.exit(pass_read("get", opts.key)) elif opts.subcommand in ("mv", "cp"): if not opts.force and sys.stdin.isatty(): with open(os.devnull, "w") as null: if pass_read("get", opts.new, stdout=null, stderr=null) == 0: sys.stderr.write("%s: overwrite %s? " % (opts.subcommand, opts.new)) sys.stdin.read(1) sys.exit( pass_manage(opts.subcommand, opts.original, opts.new, str(int(opts.force))) ) elif opts.subcommand == "init": sys.exit(pass_manage(opts.subcommand, *opts.gpgid)) elif opts.subcommand == "rm": if not opts.force and sys.stdin.isatty(): with open(os.devnull, "w") as null: if pass_read("get", opts.key, stdout=null, stderr=null) == 0: sys.stderr.write( "Are you sure you would like to delete %s? [y/N] " % (opts.key,) ) ans = sys.stdin.readline().strip() if ans and ans[0] in "yY": pass else: sys.exit(1) sys.exit(pass_manage(opts.subcommand, opts.key)) elif opts.subcommand in ("get-or-generate", "generate"): def doit(): kwargs = {"return_stdout": True} if (opts.clip or opts.qrcode) else {} ret = pass_manage( "generate", opts.key, str(int(opts.no_symbols)), str(int(opts.pass_length)), **kwargs, ) if not kwargs: sys.exit(ret) if opts.clip or opts.qrcode: ret, stdout = pass_read("get", opts.key, **kwargs) if ret != 0: sys.exit(ret) if opts.clip: sys.exit(clip(stdout.decode("utf-8"), opts.key)) elif opts.qrcode: sys.exit(qrcode(stdout.decode("utf-8"), opts.key)) sys.exit(ret) with open(os.devnull, "w") as null: ret, stdout = pass_read("get", opts.key, return_stdout=True, stderr=null) if ret == 8: # Not there. doit() elif ret == 0: if opts.subcommand == "get-or-generate": if opts.clip: sys.exit(clip(stdout.decode("utf-8"), opts.key)) elif opts.qrcode: sys.exit(qrcode(stdout.decode("utf-8"), opts.key)) else: sys.stdout.buffer.write(stdout) sys.exit(ret) else: # generate if not opts.force and sys.stdin.isatty(): sys.stderr.write( "An entry already exists for %s. Overwrite it? [y/N] " % (opts.key,) ) ans = sys.stdin.readline().strip() if ans and ans[0] in "yY": doit() else: sys.exit(1) else: doit() else: # Woops. sys.exit(ret) elif opts.subcommand == "insert": if not opts.force and sys.stdin.isatty(): with open(os.devnull, "w") as null: ret = pass_read("get", opts.key, stdout=null, stderr=null) if ret == 0: # There. Confirm. sys.stderr.write( "An entry already exists for %s. Overwrite it? [y/N] " % (opts.key,) ) ans = sys.stdin.readline().strip() if ans and ans[0] in "yY": pass else: sys.exit(1) elif ret == 8: # Not there. Fall through. pass else: sys.exit(ret) if opts.multiline: print( "Enter contents of %s and press Ctrl+D when finished:\n" % (opts.key,), file=sys.stderr, ) contents = sys.stdin.buffer.read() else: def promptpw(string): if sys.stdin.isatty(): if opts.echo: sys.stderr.write(string) pw = sys.stdin.buffer.readline() else: pw = getpass.getpass(string) else: pw = sys.stdin.buffer.readline() if not sys.stdin.isatty(): print() if pw and pw[-1] == b"\n": pw = pw[:-1] return pw contents = promptpw("Enter password for %s: " % (opts.key,)) pw2 = promptpw("Retype password for %s: " % (opts.key,)) if contents != pw2: if sys.stdin.isatty(): print("Error: the entered passwords do not match.", file=sys.stderr) sys.exit(1) sys.exit(pass_manage(opts.subcommand, opts.key, str(int(opts.multiline)), contents)) else: assert 0, "not reached"