mirror of
https://github.com/Rudd-O/qubes-network-server.git
synced 2025-03-01 14:22:35 +01:00
113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
import qubes.ext
|
|
|
|
|
|
def l(text, *parms):
|
|
return # This exists only to debug.
|
|
if parms:
|
|
text = text % parms
|
|
import sys
|
|
|
|
print("nsext:", text, file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
|
|
l("loaded")
|
|
|
|
|
|
class QubesNetworkServerExtension(qubes.ext.Extension):
|
|
def shutdown_routing_for_vm(self, netvm, appvm):
|
|
l("shutdown routing for vm %s %s", netvm, appvm)
|
|
self.reload_routing_for_vm(netvm, appvm, True)
|
|
|
|
def reload_routing_for_vm(self, netvm, appvm, shutdown=False):
|
|
"""Reload the routing method for the VM."""
|
|
l("reload routing for vm %s %s shutdown %s", netvm, appvm, shutdown)
|
|
if not netvm.is_running():
|
|
return
|
|
for addr_family in (4, 6):
|
|
ip = appvm.ip6 if addr_family == 6 else appvm.ip
|
|
if ip is None:
|
|
continue
|
|
# report routing method
|
|
self.setup_forwarding_for_vm(netvm, appvm, ip, remove=shutdown)
|
|
|
|
def setup_forwarding_for_vm(self, netvm, appvm, ip, remove=False):
|
|
"""
|
|
Record in Qubes DB that the passed VM may be meant to have traffic
|
|
forwarded to and from it, rather than masqueraded from it and blocked
|
|
to it.
|
|
|
|
The relevant incantation on the command line to assign the forwarding
|
|
behavior is `qvm-features <VM> routing-method forward`. If the feature
|
|
is set on the TemplateVM upon which the VM is based, then that counts
|
|
as the forwarding method for the VM as well.
|
|
|
|
The counterpart code in qubes-firewall handles setting up the NetVM
|
|
with the proper networking configuration to permit forwarding without
|
|
masquerading behavior.
|
|
|
|
If `remove` is True, then we remove the respective routing method from
|
|
the Qubes DB instead.
|
|
"""
|
|
l("setup forwarding for vm vm %s %s %s remove %s", netvm, appvm, ip, remove)
|
|
if ip is None:
|
|
return
|
|
routing_method = appvm.features.check_with_template(
|
|
"routing-method", "masquerade"
|
|
)
|
|
base_file = "/qubes-routing-method/{}".format(ip)
|
|
if remove:
|
|
netvm.untrusted_qdb.rm(base_file)
|
|
elif routing_method == "forward":
|
|
netvm.untrusted_qdb.write(base_file, "forward")
|
|
else:
|
|
netvm.untrusted_qdb.write(base_file, "masquerade")
|
|
|
|
@qubes.ext.handler(
|
|
"domain-feature-set:routing-method",
|
|
"domain-feature-delete:routing-method",
|
|
)
|
|
def on_routing_method_changed(self, vm, ignored_feature, **kwargs):
|
|
# pylint: disable=no-self-use,unused-argument
|
|
l("routing method changed %s", vm)
|
|
if "oldvalue" not in kwargs or kwargs.get("oldvalue") != kwargs.get("value"):
|
|
if vm.netvm:
|
|
self.reload_routing_for_vm(vm.netvm, vm)
|
|
|
|
@qubes.ext.handler("domain-qdb-create")
|
|
def on_domain_qdb_create(self, vm, event, **kwargs):
|
|
"""Fills the QubesDB with firewall entries."""
|
|
# pylint: disable=unused-argument
|
|
l("domain create %s %s", vm, event)
|
|
if vm.netvm:
|
|
self.reload_routing_for_vm(vm.netvm, vm)
|
|
|
|
@qubes.ext.handler("domain-start")
|
|
def on_domain_started(self, vm, event, **kwargs):
|
|
# pylint: disable=unused-argument
|
|
l("domain started %s %s", vm, event)
|
|
try:
|
|
for downstream_vm in vm.connected_vms:
|
|
self.reload_routing_for_vm(vm, downstream_vm)
|
|
except AttributeError:
|
|
pass
|
|
|
|
@qubes.ext.handler("domain-shutdown")
|
|
def on_domain_shutdown(self, vm, event, **kwargs):
|
|
# pylint: disable=unused-argument
|
|
l("domain shutdown %s %s", vm, event)
|
|
try:
|
|
for downstream_vm in self.connected_vms:
|
|
self.shutdown_routing_for_vm(vm, downstream_vm)
|
|
except AttributeError:
|
|
pass
|
|
if vm.netvm:
|
|
self.shutdown_routing_for_vm(vm.netvm, vm)
|
|
|
|
@qubes.ext.handler("net-domain-connect")
|
|
def on_net_domain_connect(self, vm, event):
|
|
# pylint: disable=unused-argument
|
|
l("domain connect %s %s", vm, event)
|
|
if vm.netvm:
|
|
self.reload_routing_for_vm(vm.netvm, vm)
|