mirror of
https://github.com/gaschz/qubes-pass.git
synced 2025-03-01 14:22:31 +01:00
511 lines
16 KiB
Python
Executable File
511 lines
16 KiB
Python
Executable File
#!/usr/bin/python3 -u
|
|
|
|
import argparse
|
|
import base64
|
|
import getpass
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
|
|
try:
|
|
from pipes import quote
|
|
except ImportError:
|
|
from shlex import quote
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
|
|
|
|
|
usage_string = (
|
|
"\n".join(
|
|
[
|
|
"qvm-pass usage:",
|
|
"",
|
|
" qvm-pass [-d <passvm>] [subcommand] [arguments...]",
|
|
"",
|
|
"subcommands (try qvm-pass subcommand --help for more):",
|
|
"",
|
|
" [ls|list|show]",
|
|
" Retrieves the list of keys from the pass store.",
|
|
" [show] [-c] <key>",
|
|
" Retrieves a key from the pass store.",
|
|
" generate [-n] [-f] <key> [pass-length]",
|
|
" Retrieves a key from the pass store; creates the key",
|
|
" with 25 characters length if it does not exist yet,",
|
|
" and returns the generated key on standard output.",
|
|
" The -n option excludes symbols from being used",
|
|
" during password generation.",
|
|
" get-or-generate [-n] <key> [pass-length]",
|
|
" Retrieves a key from the pass store; creates the key",
|
|
" with 25 characters length if it does not exist yet,",
|
|
" and returns the generated key on standard output.",
|
|
" The -n option excludes symbols from being used",
|
|
" during password generation.",
|
|
" insert [--echo,-e | --multiline,-m] [--force,-f] <key>",
|
|
" Creates a key in the pass store.",
|
|
" rm <key>",
|
|
" Removes a key from the pass store.",
|
|
" cp [-f] <key> <newkey>",
|
|
" Copies a key to another key in the pass store,",
|
|
" optionally forcefully.",
|
|
" mv [-f] <key> <newkey>",
|
|
" Moves a key to another key in the pass store,",
|
|
" optionally forcefully.",
|
|
" init <GPD ID> [GPG IDs...]",
|
|
" Initializes the pass store.",
|
|
]
|
|
)
|
|
+ "\n"
|
|
)
|
|
|
|
|
|
# The following code was lifted from the pass program to support
|
|
# identical functionality.
|
|
shell_functions = """
|
|
X_SELECTION="${PASSWORD_STORE_X_SELECTION:-clipboard}"
|
|
CLIP_TIME="${PASSWORD_STORE_CLIP_TIME:-45}"
|
|
|
|
clip() {
|
|
if [[ -n $WAYLAND_DISPLAY ]]; then
|
|
local copy_cmd=( wl-copy )
|
|
local paste_cmd=( wl-paste -n )
|
|
if [[ $X_SELECTION == primary ]]; then
|
|
copy_cmd+=( --primary )
|
|
paste_cmd+=( --primary )
|
|
fi
|
|
local display_name="$WAYLAND_DISPLAY"
|
|
elif [[ -n $DISPLAY ]]; then
|
|
local copy_cmd=( xclip -selection "$X_SELECTION" )
|
|
local paste_cmd=( xclip -o -selection "$X_SELECTION" )
|
|
local display_name="$DISPLAY"
|
|
else
|
|
die "Error: No X11 or Wayland display detected"
|
|
fi
|
|
local sleep_argv0="password store sleep on display $display_name"
|
|
|
|
# This base64 business is because bash cannot store binary data in a shell
|
|
# variable. Specifically, it cannot store nulls nor (non-trivally) store
|
|
# trailing new lines.
|
|
pkill -f "^$sleep_argv0" 2>/dev/null && sleep 0.5
|
|
local before="$("${paste_cmd[@]}" 2>/dev/null | $BASE64)"
|
|
echo -n "$1" | "${copy_cmd[@]}" || die "Error: Could not copy data to the clipboard"
|
|
(
|
|
( exec -a "$sleep_argv0" bash <<<"trap 'kill %1' TERM; sleep '$CLIP_TIME' & wait" )
|
|
local now="$("${paste_cmd[@]}" | $BASE64)"
|
|
[[ $now != $(echo -n "$1" | $BASE64) ]] && before="$now"
|
|
|
|
# It might be nice to programatically check to see if klipper exists,
|
|
# as well as checking for other common clipboard managers. But for now,
|
|
# this works fine -- if qdbus isn't there or if klipper isn't running,
|
|
# this essentially becomes a no-op.
|
|
#
|
|
# Clipboard managers frequently write their history out in plaintext,
|
|
# so we axe it here:
|
|
qdbus org.kde.klipper /klipper org.kde.klipper.klipper.clearClipboardHistory &>/dev/null
|
|
|
|
echo "$before" | $BASE64 -d | "${copy_cmd[@]}"
|
|
) >/dev/null 2>&1 & disown
|
|
echo "Copied $2 to clipboard. Will clear in $CLIP_TIME seconds."
|
|
}
|
|
|
|
qrcode() {
|
|
if [[ -n $DISPLAY || -n $WAYLAND_DISPLAY ]]; then
|
|
if type feh >/dev/null 2>&1; then
|
|
echo -n "$1" | qrencode --size 10 -o - | feh -x --title "pass: $2" -g +200+200 -
|
|
return
|
|
elif type gm >/dev/null 2>&1; then
|
|
echo -n "$1" | qrencode --size 10 -o - | gm display -title "pass: $2" -geometry +200+200 -
|
|
return
|
|
elif type display >/dev/null 2>&1; then
|
|
echo -n "$1" | qrencode --size 10 -o - | display -title "pass: $2" -geometry +200+200 -
|
|
return
|
|
fi
|
|
fi
|
|
echo -n "$1" | qrencode -t utf8
|
|
}
|
|
"""
|
|
|
|
|
|
def pass_frontend_shell(cmd):
|
|
global shell_functions
|
|
quoted_cmd = shell_functions + "\n\n" + " ".join(quote(x) for x in cmd)
|
|
return subprocess.call(["bash", "-c", quoted_cmd])
|
|
|
|
|
|
def clip(data, path):
|
|
return pass_frontend_shell(["clip", data, path])
|
|
|
|
|
|
def qrcode(data, path):
|
|
return pass_frontend_shell(["qrcode", data, path])
|
|
|
|
|
|
parser_for_discrimination = argparse.ArgumentParser(description="(nobody sees this)")
|
|
parser_for_discrimination.add_argument(
|
|
"-d",
|
|
"--dest-vm",
|
|
type=str,
|
|
help="Set the Qubes domain to operate with.",
|
|
default=os.environ.get("QUBES_PASS_DOMAIN", ""),
|
|
)
|
|
parser_for_discrimination.add_argument(
|
|
"arguments", nargs="*", help="the rest of the arguments"
|
|
)
|
|
|
|
|
|
parser_for_subcommands = argparse.ArgumentParser(
|
|
description="A Qubes-RPC inter-vm client for the pass password manager.",
|
|
usage=usage_string,
|
|
)
|
|
parser_for_subcommands.add_argument(
|
|
"-d",
|
|
"--dest-vm",
|
|
type=str,
|
|
help="Set the Qubes domain to operate with.",
|
|
default=os.environ.get("QUBES_PASS_DOMAIN", ""),
|
|
)
|
|
|
|
subparsers = parser_for_subcommands.add_subparsers(
|
|
help="sub-command help (run subcommand with --help as first parameter)",
|
|
required=False,
|
|
)
|
|
|
|
_parsers = {}
|
|
|
|
|
|
def _newcmd(name, desc, aliases=None):
|
|
if name not in _parsers:
|
|
kwargs = {"aliases": aliases} if aliases else {}
|
|
_parsers[name] = subparsers.add_parser(name, help=desc, **kwargs)
|
|
_parsers[name].set_defaults(subcommand=name)
|
|
return _parsers[name]
|
|
|
|
|
|
for cmd in [
|
|
("mv", "renames / moves a key in the store"),
|
|
("cp", "renames / copies a key in the store to a new location"),
|
|
]:
|
|
p = _newcmd(*cmd)
|
|
p.add_argument("original", help="original name of the key", type=str)
|
|
p.add_argument("new", help="new name for the original key", type=str)
|
|
|
|
p = _newcmd("init", "initializes a new pass store if none exists")
|
|
p.add_argument(
|
|
"gpgid", type=str, nargs="+", help="list of GPG IDs to initialize the store with"
|
|
)
|
|
|
|
p = _newcmd("rm", "removes a key in the store")
|
|
p.add_argument("key", help="name of the key to be removed", type=str)
|
|
|
|
p = _newcmd(
|
|
"get-or-generate",
|
|
"retrieves a key from the password store, generating one if it does not exist",
|
|
)
|
|
p.add_argument("key", help="name of the key to be retrieved / generated", type=str)
|
|
|
|
p = _newcmd("show", "shows existing password")
|
|
p.add_argument("key", help="name of the key to be retrieved", type=str, nargs="?")
|
|
|
|
p = _newcmd("ls", "lists passwords", aliases=["list"])
|
|
|
|
p = _newcmd("generate", "generates a key in the password store")
|
|
p.add_argument("key", help="name of the key to be generated", type=str)
|
|
|
|
p = _newcmd("insert", "inserts a new key into the pass store")
|
|
p.add_argument("key", help="name of the key to be inserted", type=str)
|
|
|
|
for p in ["get-or-generate", "generate"]:
|
|
_parsers[p].add_argument(
|
|
"pass_length",
|
|
type=int,
|
|
nargs="?",
|
|
help="number of characters in generated password",
|
|
default=25,
|
|
)
|
|
_parsers[p].add_argument(
|
|
"-n",
|
|
"--no-symbols",
|
|
action="store_true",
|
|
help="no symbols in generated password",
|
|
default=False,
|
|
)
|
|
for p in ["show", "get-or-generate", "generate"]:
|
|
_parsers[p].add_argument(
|
|
"-c",
|
|
"--clip",
|
|
action="store_true",
|
|
help="copy password to clipboard instead of displaying onscreen",
|
|
default=False,
|
|
)
|
|
_parsers[p].add_argument(
|
|
"-q",
|
|
"--qrcode",
|
|
action="store_true",
|
|
help="display password as QR code",
|
|
default=False,
|
|
)
|
|
|
|
for p in ["mv", "cp", "rm", "insert", "generate"]:
|
|
_parsers[p].add_argument(
|
|
"-f",
|
|
"--force",
|
|
action="store_true",
|
|
help="force overwriting / removing passwords instead of prompting",
|
|
default=False,
|
|
)
|
|
for p in ["insert"]:
|
|
_parsers[p].add_argument(
|
|
"-m",
|
|
"--multiline",
|
|
action="store_true",
|
|
help="accept multi-line input, ending it with Ctrl+D (EOF)",
|
|
default=False,
|
|
)
|
|
_parsers[p].add_argument(
|
|
"-e",
|
|
"--echo",
|
|
action="store_true",
|
|
help="echo the password to the console during entry",
|
|
default=False,
|
|
)
|
|
|
|
|
|
known_subparsers = [
|
|
x
|
|
for x in parser_for_subcommands._actions
|
|
if isinstance(x, argparse._SubParsersAction)
|
|
][0]
|
|
subcommands = known_subparsers.choices.keys()
|
|
|
|
|
|
def usage(string, *args):
|
|
if args:
|
|
string = string % args
|
|
print(string, file=sys.stderr)
|
|
parser_for_subcommands.print_help(sys.stderr)
|
|
sys.exit(2)
|
|
|
|
|
|
PASS_READ = "ruddo.PassRead"
|
|
PASS_MANAGE = "ruddo.PassManage"
|
|
|
|
|
|
def send_args(rpc, *args, **kwargs):
|
|
cmd = [
|
|
"/usr/lib/qubes/qrexec-client-vm",
|
|
"--no-filter-escape-chars-stdout",
|
|
"--no-filter-escape-chars-stderr",
|
|
opts.dest_vm,
|
|
rpc,
|
|
]
|
|
# print(cmd, file=sys.stderr)
|
|
return_stdout = kwargs.get("return_stdout", False)
|
|
if "return_stdout" in kwargs:
|
|
del kwargs["return_stdout"]
|
|
kwargs["stdout"] = subprocess.PIPE
|
|
|
|
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs)
|
|
for arg in args:
|
|
# print(arg, file=sys.stderr)
|
|
if isinstance(arg, str):
|
|
arg = base64.b64encode(arg.encode("utf-8")) + b"\n"
|
|
else:
|
|
arg = base64.b64encode(arg) + b"\n"
|
|
p.stdin.write(arg)
|
|
if return_stdout:
|
|
out, unused_err = p.communicate("")
|
|
p.stdin.close()
|
|
if return_stdout:
|
|
return p.wait(), out
|
|
else:
|
|
return p.wait()
|
|
|
|
|
|
def pass_read(*args, **kwargs):
|
|
return send_args(PASS_READ, *args, **kwargs)
|
|
|
|
|
|
def pass_manage(*args, **kwargs):
|
|
return send_args(PASS_MANAGE, *args, **kwargs)
|
|
|
|
|
|
# -------------------- Begin ----------------------
|
|
|
|
|
|
arguments = sys.argv[1:]
|
|
if "--help" in arguments or "-h" in arguments or "-?" in arguments:
|
|
parser_for_subcommands.parse_known_args(arguments)
|
|
sys.exit(os.EX_USAGE)
|
|
|
|
|
|
global_opts, args = parser_for_discrimination.parse_known_args(arguments)
|
|
|
|
if len(global_opts.arguments) == 0:
|
|
global_opts.arguments = ["ls"] + global_opts.arguments + args
|
|
elif len(global_opts.arguments) == 1 and global_opts.arguments[0] not in subcommands:
|
|
global_opts.arguments = ["show"] + global_opts.arguments + args
|
|
else:
|
|
global_opts.arguments = global_opts.arguments + args
|
|
|
|
opts = parser_for_subcommands.parse_args(global_opts.arguments)
|
|
|
|
if not global_opts.dest_vm:
|
|
try:
|
|
with open("/rw/config/pass-split-domain") as domain:
|
|
global_opts.dest_vm = domain.readlines()[0].strip()
|
|
except FileNotFoundError:
|
|
pass
|
|
if not global_opts.dest_vm:
|
|
usage(
|
|
"error: the QUBES_PASS_DOMAIN variable is not defined."
|
|
" Either create /rw/config/pass-split-domain with the VM containing"
|
|
" your pass setup, set the environment variable yourself,"
|
|
" or pass -d on the command line.\n",
|
|
)
|
|
opts.dest_vm = global_opts.dest_vm
|
|
|
|
if opts.subcommand == "ls" or (opts.subcommand == "show" and opts.key is None):
|
|
# User requested ls, or no argument, or show with no argument.
|
|
sys.exit(pass_read("list"))
|
|
elif opts.subcommand == "show":
|
|
# User requested a password, or show with an argument.
|
|
if opts.clip or opts.qrcode:
|
|
ret, stdout = pass_read("get", opts.key, return_stdout=True)
|
|
if ret != 0:
|
|
sys.exit(ret)
|
|
stdout = stdout.decode("utf-8")
|
|
if opts.clip:
|
|
sys.exit(clip(stdout, opts.key))
|
|
elif opts.qrcode:
|
|
sys.exit(qrcode(stdout, opts.key))
|
|
else:
|
|
sys.exit(pass_read("get", opts.key))
|
|
elif opts.subcommand in ("mv", "cp"):
|
|
if not opts.force and sys.stdin.isatty():
|
|
with open(os.devnull, "w") as null:
|
|
if pass_read("get", opts.new, stdout=null, stderr=null) == 0:
|
|
sys.stderr.write("%s: overwrite %s? " % (opts.subcommand, opts.new))
|
|
sys.stdin.read(1)
|
|
sys.exit(
|
|
pass_manage(opts.subcommand, opts.original, opts.new, str(int(opts.force)))
|
|
)
|
|
elif opts.subcommand == "init":
|
|
sys.exit(pass_manage(opts.subcommand, *opts.gpgid))
|
|
elif opts.subcommand == "rm":
|
|
if not opts.force and sys.stdin.isatty():
|
|
with open(os.devnull, "w") as null:
|
|
if pass_read("get", opts.key, stdout=null, stderr=null) == 0:
|
|
sys.stderr.write(
|
|
"Are you sure you would like to delete %s? [y/N] " % (opts.key,)
|
|
)
|
|
ans = sys.stdin.readline().strip()
|
|
if ans and ans[0] in "yY":
|
|
pass
|
|
else:
|
|
sys.exit(1)
|
|
sys.exit(pass_manage(opts.subcommand, opts.key))
|
|
elif opts.subcommand in ("get-or-generate", "generate"):
|
|
|
|
def doit():
|
|
kwargs = {"return_stdout": True} if (opts.clip or opts.qrcode) else {}
|
|
ret = pass_manage(
|
|
"generate",
|
|
opts.key,
|
|
str(int(opts.no_symbols)),
|
|
str(int(opts.pass_length)),
|
|
**kwargs,
|
|
)
|
|
if not kwargs:
|
|
sys.exit(ret)
|
|
if opts.clip or opts.qrcode:
|
|
ret, stdout = pass_read("get", opts.key, **kwargs)
|
|
if ret != 0:
|
|
sys.exit(ret)
|
|
if opts.clip:
|
|
sys.exit(clip(stdout.decode("utf-8"), opts.key))
|
|
elif opts.qrcode:
|
|
sys.exit(qrcode(stdout.decode("utf-8"), opts.key))
|
|
sys.exit(ret)
|
|
|
|
with open(os.devnull, "w") as null:
|
|
ret, stdout = pass_read("get", opts.key, return_stdout=True, stderr=null)
|
|
|
|
if ret == 8:
|
|
# Not there.
|
|
doit()
|
|
elif ret == 0:
|
|
if opts.subcommand == "get-or-generate":
|
|
if opts.clip:
|
|
sys.exit(clip(stdout.decode("utf-8"), opts.key))
|
|
elif opts.qrcode:
|
|
sys.exit(qrcode(stdout.decode("utf-8"), opts.key))
|
|
else:
|
|
sys.stdout.buffer.write(stdout)
|
|
sys.exit(ret)
|
|
else: # generate
|
|
if not opts.force and sys.stdin.isatty():
|
|
sys.stderr.write(
|
|
"An entry already exists for %s. Overwrite it? [y/N] " % (opts.key,)
|
|
)
|
|
ans = sys.stdin.readline().strip()
|
|
if ans and ans[0] in "yY":
|
|
doit()
|
|
else:
|
|
sys.exit(1)
|
|
else:
|
|
doit()
|
|
else:
|
|
# Woops.
|
|
sys.exit(ret)
|
|
elif opts.subcommand == "insert":
|
|
if not opts.force and sys.stdin.isatty():
|
|
with open(os.devnull, "w") as null:
|
|
ret = pass_read("get", opts.key, stdout=null, stderr=null)
|
|
if ret == 0:
|
|
# There. Confirm.
|
|
sys.stderr.write(
|
|
"An entry already exists for %s. Overwrite it? [y/N] " % (opts.key,)
|
|
)
|
|
ans = sys.stdin.readline().strip()
|
|
if ans and ans[0] in "yY":
|
|
pass
|
|
else:
|
|
sys.exit(1)
|
|
elif ret == 8:
|
|
# Not there. Fall through.
|
|
pass
|
|
else:
|
|
sys.exit(ret)
|
|
if opts.multiline:
|
|
print(
|
|
"Enter contents of %s and press Ctrl+D when finished:\n" % (opts.key,),
|
|
file=sys.stderr,
|
|
)
|
|
contents = sys.stdin.buffer.read()
|
|
else:
|
|
|
|
def promptpw(string):
|
|
if sys.stdin.isatty():
|
|
if opts.echo:
|
|
sys.stderr.write(string)
|
|
pw = sys.stdin.buffer.readline()
|
|
else:
|
|
pw = getpass.getpass(string)
|
|
else:
|
|
pw = sys.stdin.buffer.readline()
|
|
if not sys.stdin.isatty():
|
|
print()
|
|
if pw and pw[-1] == b"\n":
|
|
pw = pw[:-1]
|
|
return pw
|
|
|
|
contents = promptpw("Enter password for %s: " % (opts.key,))
|
|
pw2 = promptpw("Retype password for %s: " % (opts.key,))
|
|
if sys.stdin.isatty() and contents != pw2:
|
|
print("Error: the entered passwords do not match.", file=sys.stderr)
|
|
sys.exit(1)
|
|
sys.exit(pass_manage(opts.subcommand, opts.key, str(int(opts.multiline)), contents))
|
|
else:
|
|
assert 0, "not reached"
|