diff options
author | Thomas Haller <thaller@redhat.com> | 2018-06-04 20:33:35 +0200 |
---|---|---|
committer | Thomas Haller <thaller@redhat.com> | 2018-06-05 20:08:03 +0200 |
commit | f3dddcff2aa81428988480b289b148e09cd623e3 (patch) | |
tree | d89ed9b9ad09a84953f41b4c37af781ac6c0ccb5 /tools | |
parent | 780af4cffba8b1517cbba06b0cb97e7e1d4c1f2d (diff) |
clients/tests: verify connections in test-networkmanager-service.py using libnm
The real NetworkManager service has a clear understanding how a valid
connection looks like. This is what nm_connection_verify() returns.
Let also our stub-service verify connections the same way.
Note that this is cumbersome, because the stub service uses python's
dbus module, while libnm only accepts creating NMConnection instances
from GVariant. Thus, we need to a cumbersome conversion first.
It would be better if test-networkmanager-service.py would also expose
normalized connections on D-Bus. But that requires the inverse converion
from GVariant to python dbus.
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/test-networkmanager-service.py | 235 |
1 files changed, 183 insertions, 52 deletions
diff --git a/tools/test-networkmanager-service.py b/tools/test-networkmanager-service.py index 3152aa5a7..19e0fdcc1 100755 --- a/tools/test-networkmanager-service.py +++ b/tools/test-networkmanager-service.py @@ -22,6 +22,7 @@ import random import collections import uuid import hashlib +import collections ############################################################################### @@ -79,6 +80,57 @@ class Util: r = tuple(Util.pseudorandom_stream(seed, 6)) return '%02X:%02X:%02X:%02X:%02X:%02X' % r + @staticmethod + def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + @staticmethod + def variant_from_dbus(val): + if isinstance(val, (dbus.String, str)): + return GLib.Variant('s', str(val)) + if isinstance(val, dbus.UInt32): + return GLib.Variant('u', int(val)) + if isinstance(val, dbus.Boolean): + return GLib.Variant('b', bool(val)) + if isinstance(val, dbus.Byte): + return GLib.Variant('y', int(val)) + if isinstance(val, dbus.Array): + try: + if val.signature == 's': + return GLib.Variant('as', [Util.variant_from_dbus(x) for x in val]) + if val.signature == 'b': + return GLib.Variant('ab', [Util.variant_from_dbus(x) for x in val]) + if val.signature == 'y': + return GLib.Variant('ay', [int(x) for x in val]) + if val.signature == 'u': + return GLib.Variant('au', [Util.variant_from_dbus(x) for x in val]) + if val.signature == 'ay': + return GLib.Variant('aay', [Util.variant_from_dbus(x) for x in val]) + if val.signature == 'au': + return GLib.Variant('aau', [Util.variant_from_dbus(x) for x in val]) + if val.signature == 'a{sv}': + return GLib.Variant('aa{sv}', [(str(k), Util.variant_from_dbus(v)) for k, v in val]) + if val.signature == '(ayuay)': + return GLib.Variant('a(ayuay)', [Util.variant_from_dbus(x) for x in val]) + if val.signature == '(ayuayu)': + return GLib.Variant('a(ayuayu)', [Util.variant_from_dbus(x) for x in val]) + except Exception as e: + raise Exception("Cannot convert array element to type '%s': %s" % (val.signature, e.message)) + if isinstance(val, dbus.Dictionary): + if val.signature == 'ss': + return GLib.Variant('a{ss}', collections.OrderedDict([(str(k), str(v)) for k, v in val.items()])) + if val.signature == 'sv': + return GLib.Variant('a{sv}', collections.OrderedDict([(str(k), Util.variant_from_dbus(v)) for k, v in val.items()])) + if val.signature == 'sa{sv}': + c = collections.OrderedDict([ + (str(key1), + collections.OrderedDict([(str(key2), Util.variant_from_dbus(arr2)) for key2, arr2 in arr1.items()]) + ) for key1, arr1 in val.items() + ]) + return GLib.Variant('a{sa{sv}}', c) + + raise Exception("Unsupported type for value '%s'" % (repr(val))) + ############################################################################### IFACE_DBUS = 'org.freedesktop.DBus' @@ -148,6 +200,118 @@ class BusErr: class UserCanceledException(dbus.DBusException): _dbus_error_name = IFACE_AGENT_MANAGER + '.UserCanceled' + @staticmethod + def from_nmerror(e): + try: + domain, code = (e.domain, e.code) + except: + return None + if domain == GLib.quark_to_string(NM.ConnectionError.quark()): + if code == NM.ConnectionError.MISSINGSETTING: + return BusErr.MissingSettingException(e.message) + if code == NM.ConnectionError.INVALIDPROPERTY: + return BusErr.InvalidPropertyException(e.message) + return None + + @staticmethod + def raise_nmerror(e): + e2 = BusErr.from_nmerror(e) + if e2 is not None: + raise e2 + raise e + +############################################################################### + +class NmUtil: + + @staticmethod + def con_hash_to_connection(con_hash, do_verify = False, do_normalize = False): + + x_con = [] + for v_setting_name, v_setting in list(con_hash.items()): + if isinstance(v_setting_name, (dbus.String, str)): + v_setting_name = str(v_setting_name) + else: + raise Exception("Expected string dict, but got '%s' key" % (v_setting_name)) + x_setting = [] + for v_property_name, v_value in list(v_setting.items()): + if isinstance(v_property_name, (dbus.String, str)): + v_property_name = str(v_property_name) + else: + raise Exception("Expected string dict, but got '%s' subkey under %s (%s)" % (v_property_name, v_setting_name, repr(con_hash))) + try: + v = Util.variant_from_dbus(v_value) + except Exception as e: + raise Exception("Unsupported value %s.%s = %s (%s)" % (v_setting_name, v_property_name, v_value, str(e))) + x_setting.append((v_property_name, v)) + + x_con.append((v_setting_name, collections.OrderedDict(x_setting))) + + x_con = GLib.Variant('a{sa{sv}}', collections.OrderedDict(x_con)) + + assert GLib.Variant.equal(x_con, Util.variant_from_dbus(con_hash)) + + try: + con = NM.SimpleConnection.new_from_dbus(x_con) + except: + if do_verify: + raise + return None + + if do_normalize: + try: + con.normalize() + except: + if do_verify: + raise + + if do_verify: + con.verify() + + return con + + @staticmethod + def con_hash_verify(con_hash, do_verify_strict = True): + if NM.SETTING_CONNECTION_SETTING_NAME not in con_hash: + raise BusErr.MissingSettingException('connection: setting is required') + s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] + if NM.SETTING_CONNECTION_TYPE not in s_con: + raise BusErr.MissingPropertyException('connection.type: property is required') + if NM.SETTING_CONNECTION_UUID not in s_con: + raise BusErr.MissingPropertyException('connection.uuid: property is required') + if NM.SETTING_CONNECTION_ID not in s_con: + raise BusErr.MissingPropertyException('connection.id: property is required') + + if not do_verify_strict: + return; + t = s_con[NM.SETTING_CONNECTION_TYPE] + if t not in [ NM.SETTING_WIRED_SETTING_NAME, + NM.SETTING_WIRELESS_SETTING_NAME, + NM.SETTING_VLAN_SETTING_NAME, + NM.SETTING_WIMAX_SETTING_NAME ]: + raise BusErr.InvalidPropertyException('connection.type: unsupported connection type "%s"' % (t)) + + try: + con_nm = NmUtil.con_hash_to_connection(con_hash, do_verify = True, do_normalize = True) + except Exception as e: + BusErr.raise_nmerror(e) + + @staticmethod + def con_hash_get_id(con_hash): + if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: + s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] + if NM.SETTING_CONNECTION_ID in s_con: + return s_con[NM.SETTING_CONNECTION_ID] + return None + + @staticmethod + def con_hash_get_uuid(con_hash): + if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: + s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] + if NM.SETTING_CONNECTION_UUID in s_con: + return s_con[NM.SETTING_CONNECTION_UUID] + return None + ############################################################################### class ExportedObj(dbus.service.Object): @@ -1071,12 +1235,12 @@ class NetworkManager(ExportedObj): gl.settings.auto_remove_next_connection() @dbus.service.method(dbus_interface=IFACE_TEST, in_signature='a{sa{sv}}b', out_signature='o') - def AddConnection(self, con_hash, verify_connection): - return gl.settings.add_connection(con_hash, verify_connection) + def AddConnection(self, con_hash, do_verify_strict): + return gl.settings.add_connection(con_hash, do_verify_strict) @dbus.service.method(dbus_interface=IFACE_TEST, in_signature='sa{sa{sv}}b', out_signature='') - def UpdateConnection(self, path, con_hash, verify_connection): - return gl.settings.update_connection(con_hash, path, verify_connection) + def UpdateConnection(self, path, con_hash, do_verify_strict): + return gl.settings.update_connection(con_hash, path, do_verify_strict) @dbus.service.method(dbus_interface=IFACE_TEST, in_signature='ba{ss}', out_signature='') def ConnectionSetVisible(self, vis, selector_args): @@ -1095,7 +1259,7 @@ class NetworkManager(ExportedObj): PRP_CONNECTION_UNSAVED = 'Unsaved' class Connection(ExportedObj): - def __init__(self, path_counter, con_hash, verify_connection=True): + def __init__(self, path_counter, con_hash, do_verify_strict=True): path = "/org/freedesktop/NetworkManager/Settings/Connection/%s" % (path_counter) @@ -1105,12 +1269,12 @@ class Connection(ExportedObj): if s_con is None: s_con = {} con_hash[NM.SETTING_CONNECTION_SETTING_NAME] = s_con - if self.get_id(con_hash) is None: + if NmUtil.con_hash_get_id(con_hash) is None: s_con[NM.SETTING_CONNECTION_ID] = 'connection-%s' % (path_counter) - if self.get_uuid(con_hash) is None: + if NmUtil.con_hash_get_uuid(con_hash) is None: s_con[NM.SETTING_CONNECTION_UUID] = str(uuid.uuid3(uuid.NAMESPACE_URL, path)) - self.verify(con_hash, verify_strict=verify_connection) + NmUtil.con_hash_verify(con_hash, do_verify_strict=do_verify_strict) self.path = path self.con_hash = con_hash @@ -1122,51 +1286,18 @@ class Connection(ExportedObj): self.dbus_interface_add(IFACE_CONNECTION, props) - def get_id(self, con_hash=None): - if con_hash is None: - con_hash = self.con_hash - if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: - s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] - if NM.SETTING_CONNECTION_ID in s_con: - return s_con[NM.SETTING_CONNECTION_ID] - return None + def get_id(self): + return NmUtil.con_hash_get_id(self.con_hash) - def get_uuid(self, con_hash=None): - if con_hash is None: - con_hash = self.con_hash - if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: - s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] - if NM.SETTING_CONNECTION_UUID in s_con: - return s_con[NM.SETTING_CONNECTION_UUID] - return None + def get_uuid(self): + return NmUtil.con_hash_get_uuid(self.con_hash) - def verify(self, con_hash=None, verify_strict=True): - if con_hash is None: - con_hash = self.con_hash; - if NM.SETTING_CONNECTION_SETTING_NAME not in con_hash: - raise BusErr.MissingSettingException('connection: setting is required') - s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] - if NM.SETTING_CONNECTION_TYPE not in s_con: - raise BusErr.MissingPropertyException('connection.type: property is required') - if NM.SETTING_CONNECTION_UUID not in s_con: - raise BusErr.MissingPropertyException('connection.uuid: property is required') - if NM.SETTING_CONNECTION_ID not in s_con: - raise BusErr.MissingPropertyException('connection.id: property is required') - - if not verify_strict: - return; - t = s_con[NM.SETTING_CONNECTION_TYPE] - if t not in [ NM.SETTING_WIRED_SETTING_NAME, - NM.SETTING_WIRELESS_SETTING_NAME, - NM.SETTING_VLAN_SETTING_NAME, - NM.SETTING_WIMAX_SETTING_NAME ]: - raise BusErr.InvalidPropertyException('connection.type: unsupported connection type "%s"' % (t)) + def update_connection(self, con_hash, do_verify_strict): - def update_connection(self, con_hash, verify_connection): - self.verify(con_hash, verify_strict=verify_connection) + NmUtil.con_hash_verify(con_hash, do_verify_strict = do_verify_strict) old_uuid = self.get_uuid() - new_uuid = self.get_uuid(con_hash) + new_uuid = NmUtil.con_hash_get_uuid(con_hash) if old_uuid != new_uuid: raise BusErr.InvalidPropertyException('connection.uuid: cannot change the uuid from %s to %s' % (old_uuid, new_uuid)) @@ -1255,9 +1386,9 @@ class Settings(ExportedObj): def AddConnection(self, con_hash): return self.add_connection(con_hash) - def add_connection(self, con_hash, verify_connection=True): + def add_connection(self, con_hash, do_verify_strict=True): self.c_counter += 1 - con_inst = Connection(self.c_counter, con_hash, verify_connection) + con_inst = Connection(self.c_counter, con_hash, do_verify_strict) uuid = con_inst.get_uuid() if uuid in [c.get_uuid() for c in self.connections.values()]: @@ -1274,10 +1405,10 @@ class Settings(ExportedObj): return con_inst.path - def update_connection(self, con_hash, path=None, verify_connection=True): + def update_connection(self, con_hash, path=None, do_verify_strict=True): if path not in self.connections: raise BusErr.UnknownConnectionException('Connection not found') - self.connections[path].update_connection(con_hash, verify_connection) + self.connections[path].update_connection(con_hash, do_verify_strict) def delete_connection(self, con_inst): del self.connections[con_inst.path] |