#! /usr/bin/env python3 # Copyright © 2017, 2019 Red Hat, Inc # Copyright © 2020 Canonical Ltd # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # Authors: # Christian J. Kellner # Benjamin Berg # Marco Trevisan import unittest import time import subprocess import os import os.path import sys import tempfile import glob import pwd import re import shutil import socket import struct import dbusmock import gi gi.require_version('FPrint', '2.0') from gi.repository import GLib, Gio, FPrint from output_checker import OutputChecker import cairo import signal try: from subprocess import DEVNULL except ImportError: DEVNULL = open(os.devnull, 'wb') FPRINT_NAMESPACE = 'net.reactivated.Fprint' FPRINT_PATH = '/' + FPRINT_NAMESPACE.replace('.', '/') SERVICE_FILE = '/usr/share/dbus-1/system-services/{}.service'.format(FPRINT_NAMESPACE) class FprintDevicePermission: verify = FPRINT_NAMESPACE.lower() + '.device.verify' enroll = FPRINT_NAMESPACE.lower() + '.device.enroll' set_username = FPRINT_NAMESPACE.lower() + '.device.setusername' FINGERS_MAP = { "left-thumb": FPrint.Finger.LEFT_THUMB, "left-index-finger": FPrint.Finger.LEFT_INDEX, "left-middle-finger": FPrint.Finger.LEFT_MIDDLE, "left-ring-finger": FPrint.Finger.LEFT_RING, "left-little-finger": FPrint.Finger.LEFT_LITTLE, "right-thumb": FPrint.Finger.RIGHT_THUMB, "right-index-finger": FPrint.Finger.RIGHT_INDEX, "right-middle-finger": FPrint.Finger.RIGHT_MIDDLE, "right-ring-finger": FPrint.Finger.RIGHT_RING, "right-little-finger": FPrint.Finger.RIGHT_LITTLE, "any": FPrint.Finger.UNKNOWN, } def get_timeout(topic='default'): vals = { 'valgrind': { 'test': 300, 'device_sleep': 600, 'default': 20, 'daemon_start': 60, 'daemon_stop': 10, }, 'asan': { 'test': 120, 'default': 6, 'device_sleep': 400, 'daemon_start': 10, 'daemon_stop': 8, }, 'default': { 'test': 60, 'device_sleep': 100, 'default': 3, 'daemon_start': 5, 'daemon_stop': 2, } } if os.getenv('VALGRIND') is not None: lut = vals['valgrind'] elif os.getenv('ADDRESS_SANITIZER') is not None: lut = vals['asan'] else: lut = vals['default'] if topic not in lut: raise ValueError('invalid topic') return lut[topic] # Copied from libfprint tests class Connection: def __init__(self, addr): self.addr = addr def __enter__(self): self.con = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.con.connect(self.addr) return self.con def __exit__(self, exc_type, exc_val, exc_tb): self.con.close() del self.con # Speed up tests by only loading a 128x128px area from the center MAX_IMG_SIZE = 128 def load_image(img): png = cairo.ImageSurface.create_from_png(img) # Cairo wants 4 byte aligned rows, so just add a few pixel if necessary w = png.get_width() h = png.get_height() w = (w + 3) // 4 * 4 h = (h + 3) // 4 * 4 w_out = min(MAX_IMG_SIZE, w) h_out = min(MAX_IMG_SIZE, h) x = (w - w_out) // 2 y = (h - h_out) // 2 img = cairo.ImageSurface(cairo.Format.A8, w_out, h_out) cr = cairo.Context(img) cr.set_source_rgba(1, 1, 1, 1) cr.paint() cr.set_source_rgba(0, 0, 0, 0) cr.set_operator(cairo.OPERATOR_SOURCE) cr.set_source_surface(png, -x, -y) cr.paint() return img if hasattr(os.environ, 'TOPSRCDIR'): root = os.environ['TOPSRCDIR'] else: root = os.path.join(os.path.dirname(__file__), '..') imgdir = os.path.join(root, 'tests', 'prints') ctx = GLib.main_context_default() class FPrintdTest(dbusmock.DBusTestCase): @staticmethod def path_from_service_file(sf): with open(SERVICE_FILE) as f: for line in f: if not line.startswith('Exec='): continue return line.split('=', 1)[1].strip() return None @classmethod def setUpClass(cls): # Try to generate backtrace if meson kills as with SIGTERM def r(*args): raise KeyboardInterrupt() signal.signal(signal.SIGTERM, r) super().setUpClass() fprintd = None cls._polkitd = None cls._has_hotplug = FPrint.Device.find_property("removed") is not None if 'FPRINT_BUILD_DIR' in os.environ: print('Testing local build') build_dir = os.environ['FPRINT_BUILD_DIR'] fprintd = os.path.join(build_dir, 'fprintd') elif 'UNDER_JHBUILD' in os.environ: print('Testing JHBuild version') jhbuild_prefix = os.environ['JHBUILD_PREFIX'] fprintd = os.path.join(jhbuild_prefix, 'libexec', 'fprintd') else: print('Testing installed system binaries') fprintd = cls.path_from_service_file(SERVICE_FILE) assert fprintd is not None, 'failed to find daemon' cls.paths = {'daemon': fprintd } cls.tmpdir = tempfile.mkdtemp(prefix='libfprint-') cls.addClassCleanup(shutil.rmtree, cls.tmpdir) cls.sockaddr = os.path.join(cls.tmpdir, 'virtual-image.socket') os.environ[cls.socket_env] = cls.sockaddr cls.prints = {} for f in glob.glob(os.path.join(imgdir, '*.png')): n = os.path.basename(f)[:-4] cls.prints[n] = load_image(f) cls.start_system_bus() cls.dbus = Gio.DBusConnection.new_for_address_sync(os.environ['DBUS_SYSTEM_BUS_ADDRESS'], Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION | Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT, None, None) assert cls.dbus.is_closed() == False cls.addClassCleanup(cls.dbus.close) @classmethod def tearDownClass(cls): dbusmock.DBusTestCase.tearDownClass() del cls.dbus def daemon_start(self, driver='Virtual image device for debugging'): timeout = get_timeout('daemon_start') # seconds env = os.environ.copy() env['G_DEBUG'] = 'fatal-criticals' env['STATE_DIRECTORY'] = (self.state_dir + ':' + '/hopefully/a/state_dir_path/that/shouldnt/be/writable') env['RUNTIME_DIRECTORY'] = self.run_dir # The tests parses the debug output for suspend inhibitor debugging env['G_MESSAGES_DEBUG'] = 'all' argv = [self.paths['daemon'], '-t'] valgrind = os.getenv('VALGRIND') if valgrind is not None: argv.insert(0, 'valgrind') argv.insert(1, '--leak-check=full') if os.path.exists(valgrind): argv.insert(2, '--suppressions=%s' % valgrind) self.valgrind = True self.kill_daemon = False self.daemon_log = OutputChecker() self.addCleanup(self.daemon_log.force_close) self.daemon = subprocess.Popen(argv, env=env, stdout=self.daemon_log.fd, stderr=subprocess.STDOUT) self.daemon_log.writer_attached() #subprocess.Popen(['/usr/bin/dbus-monitor', '--system']) self.addCleanup(self.daemon_stop) timeout_count = timeout * 10 timeout_sleep = 0.1 while timeout_count > 0: time.sleep(timeout_sleep) timeout_count -= 1 try: self.manager = Gio.DBusProxy.new_sync(self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, FPRINT_NAMESPACE, FPRINT_PATH + '/Manager', FPRINT_NAMESPACE + '.Manager', None) devices = self.manager.GetDevices() # Find the virtual device, just in case it is a local run # and there is another usable sensor available locally for path in devices: dev = Gio.DBusProxy.new_sync(self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, FPRINT_NAMESPACE, path, FPRINT_NAMESPACE + '.Device', None) if driver in str(dev.get_cached_property('name')): self.device = dev self._device_cancellable = Gio.Cancellable() self.addCleanup(self._device_cancellable.cancel) break else: print('Did not find virtual device! Probably libfprint was build without the corresponding driver!') break except GLib.GError: pass else: timeout_time = timeout * 10 * timeout_sleep self.fail('daemon did not start in %d seconds' % timeout_time) def daemon_stop(self): if self.daemon: try: self.daemon.terminate() except OSError: pass try: self.daemon.wait(timeout=get_timeout('daemon_stop')) except subprocess.TimeoutExpired as e: if self.kill_daemon: self.daemon.kill() else: raise(e) self.daemon_log.assert_closed() if not self.kill_daemon: self.assertLess(self.daemon.returncode, 128) self.assertGreaterEqual(self.daemon.returncode, 0) self.daemon = None def polkitd_start(self): if self._polkitd: return if 'POLKITD_MOCK_PATH' in os.environ: polkitd_template = os.path.join(os.getenv('POLKITD_MOCK_PATH'), 'polkitd.py') else: polkitd_template = os.path.join(os.path.dirname(__file__), 'dbusmock/polkitd.py') print ('Using template from %s' % polkitd_template) self._polkitd, self._polkitd_obj = self.spawn_server_template( polkitd_template, {}, stdout=subprocess.PIPE) self.addCleanup(self._polkitd.stdout.close) self.addCleanup(self.stop_server, '_polkitd', '_polkitd_obj') return self._polkitd def stop_server(self, proc_attr, obj_attr): proc = getattr(self, proc_attr, None) if proc is None: return proc.terminate() try: proc.wait(timeout=1) except subprocess.TimeoutExpired as e: proc.kill() delattr(self, proc_attr) delattr(self, obj_attr) def polkitd_allow_all(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username, FprintDevicePermission.enroll, FprintDevicePermission.verify]) def get_current_user(self): return pwd.getpwuid(os.getuid()).pw_name def setUp(self): self.test_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.test_dir) self.state_dir = os.path.join(self.test_dir, 'state') self.run_dir = os.path.join(self.test_dir, 'run') self.device_id = 0 self._async_call_res = {} os.environ['FP_DRIVERS_ALLOWLIST'] = self.device_driver # TODO: Remove this when we depend on libfprint 1.94.7 os.environ['FP_DRIVERS_WHITELIST'] = self.device_driver # Always start fake polkitd because of # https://gitlab.freedesktop.org/polkit/polkit/-/merge_requests/95 self.polkitd_start() def assertFprintError(self, fprint_error): if isinstance(fprint_error, list) or isinstance(fprint_error, tuple): fprint_error = [ re.escape(e) for e in fprint_error ] fprint_error = '({})'.format('|'.join(fprint_error)) else: fprint_error = re.escape(fprint_error) return self.assertRaisesRegex(GLib.Error, re.escape('GDBus.Error:{}.Error.'.format(FPRINT_NAMESPACE)) + '{}:'.format(fprint_error)) def skipTestIfCanWrite(self, path): try: os.open(os.path.join(path, "testfile"), os.O_CREAT | os.O_WRONLY) self.skipTest('Permissions aren\'t respected (CI environment?)') except PermissionError: pass def get_print_file_path(self, user, finger): return os.path.join(self.state_dir, user, self.device_driver, str(self.device_id), str(int(finger))) def get_print_name_file_path(self, user, finger_name): return self.get_print_file_path(user, FINGERS_MAP[finger_name]) def get_finger_name(self, finger): return {v: k for k, v in FINGERS_MAP.items()}[finger] def set_print_not_writable(self, user, finger): # Replace the print with a directory, so that deletion via unlink will fail # But it is still listed, not using chmod as it won't work in CI environment print_file = self.get_print_file_path(user, finger) if os.path.exists(print_file): os.rename(print_file, print_file + '_renamed') self.addCleanup(os.rename, print_file + '_renamed', print_file) os.makedirs(print_file) self.addCleanup(os.rmdir, print_file) def assertFingerInStorage(self, user, finger): self.assertTrue(os.path.exists(self.get_print_file_path(user, finger))) def assertFingerNotInStorage(self, user, finger): self.assertFalse(os.path.exists(self.get_print_file_path(user, finger))) @property def finger_needed(self): return self.device.get_cached_property('finger-needed').unpack() @property def finger_present(self): return self.device.get_cached_property('finger-present').unpack() @property def num_enroll_stages(self): return self.device.get_cached_property('num-enroll-stages').unpack() # From libfprint tests def send_retry(self, retry_error=FPrint.DeviceRetry.TOO_SHORT, con=None): if con: con.sendall(struct.pack('ii', -1, retry_error)) return with Connection(self.sockaddr) as con: self.send_retry(retry_error, con) # From libfprint tests def send_error(self, error=FPrint.DeviceError.GENERAL, con=None): if con: con.sendall(struct.pack('ii', -2, error)) return with Connection(self.sockaddr) as con: self.send_error(error, con) # From libfprint tests def send_remove(self, con=None): if con: con.sendall(struct.pack('ii', -5, 0)) return with Connection(self.sockaddr) as con: self.send_remove(con=con) # From libfprint tests def send_image(self, image, con=None): if con: img = self.prints[image] mem = img.get_data() mem = mem.tobytes() self.assertEqual(len(mem), img.get_width() * img.get_height()) encoded_img = struct.pack('ii', img.get_width(), img.get_height()) encoded_img += mem con.sendall(encoded_img) return with Connection(self.sockaddr) as con: self.send_image(image, con) def send_finger_automatic(self, automatic, con=None, iterate=True): # Set whether finger on/off is reported around images if con: con.sendall(struct.pack('ii', -3, 1 if automatic else 0)) return with Connection(self.sockaddr) as con: self.send_finger_automatic(automatic, con=con, iterate=iterate) while iterate and ctx.pending(): ctx.iteration(False) def send_finger_report(self, has_finger, con=None, iterate=True): # Send finger on/off if con: con.sendall(struct.pack('ii', -4, 1 if has_finger else 0)) return with Connection(self.sockaddr) as con: self.send_finger_report(has_finger, con=con) while iterate and self.finger_present != has_finger: ctx.iteration(False) def send_sleep(self, con=None): self.skipTest('Not implemented for {}'.format(self.device_driver)) def set_keep_alive(self, value): self.skipTest('Not implemented for {}'.format(self.device_driver)) def _maybe_reduce_enroll_stages(self): pass def call_proxy_method_async(self, proxy, method, *args): def call_handler(proxy, res): nonlocal method if proxy not in self._async_call_res.keys(): self._async_call_res[proxy] = {} if method not in self._async_call_res[proxy].keys(): self._async_call_res[proxy][method] = [] try: ret = proxy.call_finish(res) except Exception as e: ret = e self._async_call_res[proxy][method].append(ret) self.device.call(method, GLib.Variant(*args), Gio.DBusCallFlags.NONE, -1, self._device_cancellable, call_handler) def call_device_method_async(self, method, *args): return self.call_proxy_method_async(self.device, method, *args) def wait_for_async_reply(self, proxy, method=None, expected_replies=1): if proxy in self._async_call_res: proxy_replies = self._async_call_res[proxy] if method and method in proxy_replies: proxy_replies[method] = [] else: proxy_replies = {} def get_replies(): nonlocal proxy, method return (self.get_all_async_replies(proxy=proxy) if not method else self.get_async_replies(proxy=proxy, method=method)) while len(get_replies()) != expected_replies: ctx.iteration(True) for res in get_replies(): if isinstance(res, Exception): raise res def wait_for_device_reply(self, method=None, expected_replies=1): return self.wait_for_async_reply(self.device, method=method, expected_replies=expected_replies) def wait_for_device_reply_relaxed(self, method=None, expected_replies=1, accepted_exceptions=[]): try: self.wait_for_device_reply(method=method, expected_replies=expected_replies) except GLib.Error as e: for ae in accepted_exceptions: if 'GDBus.Error:{}.Error.{}'.format(FPRINT_NAMESPACE, ae) in str(e): return raise(e) def get_async_replies(self, method=None, proxy=None): method_calls = self._async_call_res.get(proxy if proxy else self.device, {}) return method_calls.get(method, []) if method else method_calls def get_all_async_replies(self, proxy=None): method_calls = self.get_async_replies(proxy=proxy) all_replies = [] for method, replies in method_calls.items(): all_replies.extend(replies) return all_replies def gdbus_device_method_call_process(self, method, args=[]): proc = subprocess.Popen([ 'gdbus', 'call', '--system', '--dest', self.device.get_name(), '--object-path', self.device.get_object_path(), '--method', '{}.{}'.format(self.device.get_interface_name(), method), ] + args, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) self.addCleanup(proc.stdout.close) return proc def call_device_method_from_other_client(self, method, args=[]): try: proc = self.gdbus_device_method_call_process(method, args) proc.wait(timeout=5) if proc.returncode != 0: raise GLib.GError(proc.stdout.read()) return proc.stdout.read() except subprocess.TimeoutExpired as e: raise GLib.GError(e.output) class FPrintdVirtualImageDeviceBaseTests(FPrintdTest): socket_env = 'FP_VIRTUAL_IMAGE' device_driver = 'virtual_image' driver_name = 'Virtual image device for debugging' has_identification = True class FPrintdVirtualDeviceBaseTest(FPrintdVirtualImageDeviceBaseTests): def setUp(self): super().setUp() self.manager = None self.device = None fifo_path = os.path.join(self.tmpdir, 'logind_inhibit_fifo') os.mkfifo(fifo_path) self.addCleanup(os.unlink, fifo_path) self.logind_inhibit_fifo = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK | os.O_CLOEXEC) self.addCleanup(os.close, self.logind_inhibit_fifo) # EOF without a writer, BlockingIOError with a writer self.assertFalse(self.holds_inhibitor()) self.logind, self.logind_obj = self.spawn_server_template('logind', { }) self.logind_obj.AddMethod('org.freedesktop.login1.Manager', 'Inhibit', 'ssss', 'h', 'ret = os.open("%s", os.O_WRONLY)\n' % fifo_path + 'from gi.repository import GLib\n' + 'GLib.idle_add(lambda fd: os.close(fd), ret)') self.addCleanup(self.stop_server, 'logind', 'logind_obj') self.daemon_start(self.driver_name) self.wait_got_delay_inhibitor(timeout=5) if self.device is None: self.skipTest("Need {} device to run the test".format(self.device_driver)) self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username, FprintDevicePermission.enroll, FprintDevicePermission.verify]) def signal_cb(proxy, sender, signal, params): print(signal, params) if signal == 'EnrollStatus': self._abort = params[1] self._last_result = params[0] if not self._abort and self._last_result.startswith('enroll-'): # Exit wait loop, onto next enroll state (if any) self._abort = True elif self._abort: pass else: self._abort = True self._last_result = 'Unexpected signal values' print('Unexpected signal values') elif signal == 'VerifyFingerSelected': self._selected_finger = params[0] elif signal == 'VerifyStatus': self._abort = True self._last_result = params[0] self._verify_stopped = params[1] else: self._abort = True self._last_result = 'Unexpected signal' def property_cb(proxy, changed, invalidated): print('Changed properties', changed, 'invalidated', invalidated) self._changed_properties.append(changed.unpack()) signal_id = self.device.connect('g-signal', signal_cb) self.addCleanup(self.device.disconnect, signal_id) signal_id = self.device.connect('g-properties-changed', property_cb) self.addCleanup(self.device.disconnect, signal_id) self._changed_properties = [] def tearDown(self): self.device = None self.manager = None super().tearDown() def try_release(self): if not self.device: return try: self.device.Release() except GLib.GError as e: if not 'net.reactivated.Fprint.Error.ClaimDevice' in e.message: raise(e) def wait_for_result(self, expected=None, max_wait=-1): self._last_result = None self._verify_stopped = False self._selected_finger = None self._abort = False if max_wait > 0: def abort_timeout(): self._abort = True GLib.timeout_add(max_wait, abort_timeout) while not self._abort: ctx.iteration(True) self.assertTrue(self._abort) self._abort = False if expected is not None: self.assertEqual(self._last_result, expected) def holds_inhibitor(self): try: if os.read(self.logind_inhibit_fifo, 1) == b'': return False except BlockingIOError: return True raise AssertionError("logind inhibitor fifo in unexpected state") def wait_got_delay_inhibitor(self, timeout=0): self.daemon_log.check_line('Got delay inhibitor for sleep', timeout=timeout) self.assertTrue(self.holds_inhibitor()) def wait_released_delay_inhibitor(self, timeout=0): self.daemon_log.check_line('Released delay inhibitor for sleep', timeout=timeout) self.assertFalse(self.holds_inhibitor()) def enroll_image(self, img, device=None, finger='right-index-finger', expected_result='enroll-completed', claim_user=None, start=True, stop=True): if device is None: device = self.device if claim_user: device.Claim('(s)', claim_user) if device is self.device: self._maybe_reduce_enroll_stages() if start: device.EnrollStart('(s)', finger) while not self.finger_needed: ctx.iteration(False) self.assertTrue(self.finger_needed) if expected_result == 'enroll-duplicate': stages = 1 else: stages = self.num_enroll_stages for stage in range(stages): self.send_image(img) if stage < stages - 1: self.wait_for_result('enroll-stage-passed') else: self.wait_for_result(expected_result) self.assertFalse(self.finger_needed) if stop: device.EnrollStop() self.assertEqual(self._last_result, expected_result) self.assertFalse(self.finger_needed) if claim_user: device.Release() def enroll_multiple_images(self, images_override={}, return_index=-1): enroll_map = { 'left-thumb': 'whorl', 'right-index-finger': 'arch', 'left-little-finger': 'loop-right', } enroll_map.update(images_override) for finger, print in enroll_map.items(): self.enroll_image(print, finger=finger) enrolled = self.device.ListEnrolledFingers('(s)', 'testuser') self.assertCountEqual(enroll_map.keys(), enrolled) if return_index >= 0: return enroll_map[enrolled[return_index]] return (enrolled, enroll_map) def enroll_users_images(self, enroll_map={}, images_override={}, allow_duplicates=False): if not enroll_map: enroll_map = { 'test-user1': { 'left-thumb': 'whorl' }, 'test-user2': { 'right-index-finger': 'arch' }, 'test-user3': { 'left-little-finger': 'loop-right', 'left-thumb': 'tented_arch' }, } enroll_map.update(images_override) enrolled_prints = [] enrolled_prints_info = {} duplicates_prints_info = {} self.try_release() for user, print_map in enroll_map.items(): self.device.Claim('(s)', user) for finger, p in print_map.items(): if allow_duplicates and p in enrolled_prints: prints_infos = duplicates_prints_info.get(p, []) prints_infos.append((user, finger)) duplicates_prints_info[p] = prints_infos if self.has_identification: self.enroll_image(p, finger=finger, expected_result='enroll-duplicate') continue self.enroll_image(p, finger=finger) enrolled_prints.append(p) enrolled_prints_info[p] = (user, finger) self.device.Release() if allow_duplicates and duplicates_prints_info: # We can't just enroll duplicates prints, as fprint will check for # duplicates prints, so we've to handle this manually, copying the # actual prints for print_image, print_infos in duplicates_prints_info.items(): for print_info in print_infos: orig_username, orig_finger = enrolled_prints_info[print_image] dup_username, dup_finger = print_info dup_fp_finger = FINGERS_MAP[dup_finger] orig_path = self.get_print_name_file_path(orig_username, orig_finger) self.assertTrue(os.path.exists(orig_path)) with open(orig_path, mode='rb') as print_file: dup_print = FPrint.Print.deserialize(print_file.read()) dup_print.set_username(dup_username) dup_print.set_finger(dup_fp_finger) dup_path = self.get_print_name_file_path(dup_username, dup_finger) os.makedirs(os.path.dirname(dup_path), exist_ok=True) with open(dup_path, mode='wb') as new_print_file: new_print_file.write(dup_print.serialize()) print('Created ',dup_username,'duplicated',dup_finger, 'print in', dup_path) self.assertFingerInStorage(dup_username, dup_fp_finger) else: self.assertCountEqual(enrolled_prints, set(enrolled_prints)) for user in enroll_map: enrolled_fingers = enroll_map[user].keys() if enrolled_fingers: enrolled = self.device.ListEnrolledFingers('(s)', user) self.assertCountEqual(enrolled_fingers, enrolled) else: with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', user) return (enroll_map, enrolled_prints_info) def get_secondary_bus_and_device(self, claim=None): addr = os.environ['DBUS_SYSTEM_BUS_ADDRESS'] # Get a separate bus connection bus = Gio.DBusConnection.new_for_address_sync(addr, Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION | Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT, None, None) assert bus.is_closed() == False dev_path = self.device.get_object_path() dev = Gio.DBusProxy.new_sync(bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, FPRINT_NAMESPACE, dev_path, FPRINT_NAMESPACE + '.Device', None) if claim is not None: dev.Claim('(s)', claim) return bus, dev def assertVerifyMatch(self, selected_finger=None): self.wait_for_result(expected='verify-match') self.assertTrue(self._verify_stopped) if selected_finger: self.assertEqual(selected_finger, self._selected_finger) def assertVerifyNoMatch(self, selected_finger=None): self.wait_for_result(expected='verify-no-match') self.assertTrue(self._verify_stopped) if selected_finger: self.assertEqual(selected_finger, self._selected_finger) class FPrintdVirtualStorageDeviceBaseTest(FPrintdVirtualDeviceBaseTest): socket_env = 'FP_VIRTUAL_DEVICE_STORAGE' device_driver = 'virtual_device_storage' driver_name = 'Virtual device with storage and identification for debugging' enroll_stages = 2 def _send_command(self, con, command, *args): params = ' '.join(str(p) for p in args) con.sendall('{} {}'.format(command, params).encode('utf-8')) res = [] while True: r = con.recv(1024) if not r: break res.append(r) return b''.join(res) def send_command(self, command, *args): self.assertIn(command, ['INSERT', 'REMOVE', 'SCAN', 'ERROR', 'RETRY', 'FINGER', 'UNPLUG', 'SLEEP', 'SET_ENROLL_STAGES', 'SET_SCAN_TYPE', 'SET_CANCELLATION_ENABLED', 'LIST', 'IGNORED_COMMAND', 'SET_KEEP_ALIVE', 'CONT']) with Connection(self.sockaddr) as con: res = self._send_command(con, command, *args) return res def send_image(self, image, con=None): # This is meant to simulate the image scanning for image device self.send_command('SCAN', image) def send_error(self, error=FPrint.DeviceError.GENERAL, con=None): self.send_command('ERROR', int(error)) def send_retry(self, retry_error=FPrint.DeviceRetry.TOO_SHORT, con=None): self.send_command('RETRY', int(retry_error)) def send_remove(self, con=None): self.send_command('UNPLUG') def send_finger_automatic(self, automatic, con=None, iterate=True): if not automatic: return self.skipTest('Not implemented for {}'.format(self.device_driver)) def send_finger_report(self, has_finger, con=None, iterate=True): self.send_command('FINGER', 1 if has_finger else 0) while iterate and self.finger_present != has_finger: ctx.iteration(False) def send_sleep(self, timeout, con=None): self.assertGreaterEqual(timeout, 0) self.send_command('SLEEP', timeout) def set_keep_alive(self, value): self.send_command('SET_KEEP_ALIVE', 1 if value else 0) def enroll_print(self, nick, finger='right-index-finger', expected_result='enroll-completed'): # Using the name of the image as the print id super().enroll_image(img=nick, finger=finger, expected_result=expected_result) def _maybe_reduce_enroll_stages(self, stages=-1): # Reduce the number of default enroll stages, we can go a bit faster stages = stages if stages > 0 else self.enroll_stages if self.has_identification: stages += 1 # Adding the extra stage for duplicates-check if self.num_enroll_stages == stages: return device_stages = stages -1 if self.has_identification else stages self.send_command('SET_ENROLL_STAGES', device_stages) while self.num_enroll_stages != stages: ctx.iteration(True) self.assertIn({'num-enroll-stages': stages}, self._changed_properties) self._changed_properties.remove({'num-enroll-stages': stages}) self.assertEqual(self.num_enroll_stages, stages) def get_stored_prints(self): return self.send_command('LIST').decode('ascii').split('\n')[:-1] class FPrintdVirtualStorageDeviceTests(FPrintdVirtualStorageDeviceBaseTest): def setUp(self): super().setUp() self.device.Claim('(s)', 'testuser') def tearDown(self): self.try_release() super().tearDown() def test_garbage_collect(self): # We expect collection in this order garbage_collect = [ 'no-metadata-print', 'FP1-20201216-7-ABCDEFGH-testuser', 'FP1-20201217-7-12345678-testuser', ] for e in garbage_collect: self.send_command('INSERT', e) # Enroll a few prints that must not be touched, sort them in at various points enrolled_prints = { 'FP1-20000101-7-ABCDEFGH-testuser' : 'left-index-finger', 'FP1-20201231-7-ABCDEFGH-testuser' : 'right-index-finger', 'no-metadata-new' : 'left-middle-finger', } # Device supports listing, so no initial cleanup for i, f in enrolled_prints.items(): self.enroll_print(i, f) # The virtual device sends a trailing \n prints = self.get_stored_prints() self.assertEqual(set(prints), set(garbage_collect + list(enrolled_prints.keys()))) def trigger_garbagecollect(): self.send_image('some-other-print') self.send_command('ERROR', int(FPrint.DeviceError.DATA_FULL)) self.device.EnrollStart('(s)', 'right-thumb') self.device.EnrollStop() trigger_garbagecollect() prints = self.get_stored_prints() garbage_collect.pop() self.assertEqual(set(prints), set(garbage_collect + list(enrolled_prints.keys()))) trigger_garbagecollect() prints = self.get_stored_prints() garbage_collect.pop() self.assertEqual(set(prints), set(garbage_collect + list(enrolled_prints.keys()))) trigger_garbagecollect() prints = self.get_stored_prints() garbage_collect.pop() self.assertEqual(set(prints), set(garbage_collect + list(enrolled_prints.keys()))) def test_garbage_collect_on_duplicate(self): self._maybe_reduce_enroll_stages(stages=1) self.send_command('INSERT', 'stored-print') self.device.Release() self.device.Claim('(s)', 'testuser') self.assertEqual(self.get_stored_prints(), ['stored-print']) # Device supports listing, so no initial cleanup self.device.EnrollStart('(s)', 'right-thumb') self.send_image('stored-print') # During identify self.wait_for_result('enroll-stage-passed') self.assertFalse(self.get_stored_prints()) self.send_image('stored-print') self.wait_for_result('enroll-completed') # During enroll self.assertEqual(self.get_stored_prints(), ['stored-print']) self.device.EnrollStop() def test_garbage_collect_failed_on_duplicate(self): self._maybe_reduce_enroll_stages(stages=1) self.send_command('INSERT', 'stored-print') self.device.Release() self.device.Claim('(s)', 'testuser') self.assertEqual(self.get_stored_prints(), ['stored-print']) # Device supports listing, so no initial cleanup self.device.EnrollStart('(s)', 'right-thumb') self.send_image('stored-print') # During identify self.send_error(FPrint.DeviceError.PROTO) # During garbage collecting self.wait_for_result('enroll-duplicate') self.assertEqual(self.get_stored_prints(), ['stored-print']) self.device.EnrollStop() def test_delete(self): # We expect collection in this order garbage_prints = [ 'no-metadata-print', 'FP1-20201216-7-ABCDEFGH-testuser', 'FP1-20201217-7-12345678-testuser', 'FP1-20201216-7-ABCDEFGH-other', 'FP1-20201217-7-12345678-other', ] for e in garbage_prints: self.send_command('INSERT', e) # Enroll a few prints that will be deleted enrolled_prints = { 'FP1-20000101-7-ABCDEFGH-testuser' : 'left-index-finger', 'FP1-20201231-7-ABCDEFGH-testuser' : 'right-index-finger', 'no-metadata-new' : 'left-middle-finger', } for i, f in enrolled_prints.items(): self.enroll_print(i, f) # The virtual device sends a trailing \n prints = self.get_stored_prints() self.assertEqual(set(prints), set(garbage_prints + list(enrolled_prints.keys()))) # Now, delete all prints for the user self.device.DeleteEnrolledFingers2() # And verify they are all gone prints = self.get_stored_prints() self.assertEqual(set(prints), set(garbage_prints)) def test_local_storage_cleanup_data_error(self): # Enroll a print and delete it self.enroll_print('deleted-print', finger='left-thumb') self.send_command('REMOVE', 'deleted-print') # Note: would be thrown anyway by the storage device if we scan something self.send_error(FPrint.DeviceError.DATA_NOT_FOUND) self.device.VerifyStart('(s)', 'any') self.wait_for_result('verify-no-match') self.device.VerifyStop() # At this point, there is no print left with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'testuser') def test_local_storage_cleanup_no_match(self): # Enroll a print and delete it self.enroll_print('existing-print', finger='right-index-finger') self.enroll_print('deleted-print', finger='left-thumb') self.send_command('REMOVE', 'deleted-print') # We need to send a print that is known to the device self.send_image('other-print') self.device.VerifyStart('(s)', 'right-index-finger') self.wait_for_result('verify-no-match') self.device.VerifyStop() # At this point, the deleted print has disappeared self.assertEqual(set(self.device.ListEnrolledFingers('(s)', 'testuser')), {'right-index-finger'}) # Now, do the same thing, and the print will not be deleted self.enroll_print('deleted-print', finger='left-thumb') self.send_command('REMOVE', 'deleted-print') self.send_image('other-print') self.device.VerifyStart('(s)', 'right-index-finger') self.wait_for_result('verify-no-match') self.device.VerifyStop() # At this point, the deleted print is still there self.assertEqual(set(self.device.ListEnrolledFingers('(s)', 'testuser')), {'right-index-finger', 'left-thumb'}) def test_enroll_with_one_stage_only(self): self._maybe_reduce_enroll_stages(stages=1) self.enroll_print('FP1-20000101-7-ABCDEFGH-testuser', 'left-index-finger') self.assertEqual(self.device.ListEnrolledFingers('(s)', 'testuser'), ['left-index-finger']) def test_scan_type_changes(self): for scan_type in [FPrint.ScanType.PRESS, FPrint.ScanType.SWIPE]: scan_type = scan_type.value_nick self.send_command('SET_SCAN_TYPE', scan_type) while self.device.get_cached_property('scan-type').unpack() != scan_type: ctx.iteration(True) self.assertIn({'scan-type': scan_type}, self._changed_properties) self.assertEqual(self.device.get_cached_property('scan-type').unpack(), scan_type) class FPrintdVirtualStorageNoListDeviceTests(FPrintdVirtualStorageDeviceBaseTest): socket_env = 'FP_VIRTUAL_DEVICE_STORAGE_NO_LIST' def setUp(self): super().setUp() self.device.Claim('(s)', 'testuser') def tearDown(self): self.try_release() super().tearDown() def test_clear_storage(self): # We expect collection in this order garbage_collect = [ 'no-metadata-print', 'FP1-20201216-7-ABCDEFGH-testuser', 'FP1-20201217-7-12345678-testuser', ] for e in garbage_collect: self.send_command('INSERT', e) # Enroll print, return OK for storage clearing self.send_command('CONT', 0) self.enroll_print('print-1', 'left-index-finger') prints = self.get_stored_prints() self.assertEqual(set(prints), {'print-1'}) self.enroll_print('print-2', 'right-index-finger') prints = self.get_stored_prints() self.assertEqual(set(prints), {'print-1', 'print-2'}) class FPrintdVirtualNoStorageDeviceBaseTest(FPrintdVirtualStorageDeviceBaseTest): socket_env = 'FP_VIRTUAL_DEVICE' device_driver = 'virtual_device' driver_name = 'Virtual device for debugging' has_identification = False class FPrintdVirtualNoStorageDeviceTest(FPrintdVirtualNoStorageDeviceBaseTest): def check_verify_finger_match(self, image, expect_match, finger): self.device.VerifyStart('(s)', 'any') self.send_image(image) if expect_match: self.assertVerifyMatch(selected_finger=finger) else: self.assertVerifyNoMatch(selected_finger=finger) self.device.VerifyStop() def test_verify_any_finger_match_first_only(self): self.device.Claim('(s)', 'testuser') self.addCleanup(self.device.Release) enrolled, enroll_map = self.enroll_multiple_images() self.check_verify_finger_match(enroll_map[enrolled[0]], expect_match=True, finger=enrolled[0]) self.check_verify_finger_match(enroll_map[enrolled[1]], expect_match=False, finger=enrolled[0]) self.check_verify_finger_match(enroll_map[enrolled[2]], expect_match=False, finger=enrolled[0]) def test_verify_any_finger_no_match(self): self.device.Claim('(s)', 'testuser') self.addCleanup(self.device.Release) FPrintdVirtualDeviceClaimedTest.test_verify_any_finger_no_match(self, selected_finger=None) class FPrintdManagerTests(FPrintdVirtualDeviceBaseTest): def setUp(self): super().setUp() self._polkitd_obj.SetAllowed(['']) def test_manager_get_devices(self): self.assertListEqual(self.manager.GetDevices(), [ self.device.get_object_path() ]) def test_manager_get_default_device(self): self.assertEqual(self.manager.GetDefaultDevice(), self.device.get_object_path()) class FPrintdManagerPreStartTests(FPrintdVirtualImageDeviceBaseTests): def test_manager_get_no_devices(self): os.environ['FP_DRIVERS_ALLOWLIST'] = 'hopefully_no_existing_driver' # TODO: Remove this when we depend on libfprint 1.94.7 os.environ['FP_DRIVERS_WHITELIST'] = 'hopefully_no_existing_driver' self.daemon_start() self.assertListEqual(self.manager.GetDevices(), []) def test_manager_get_no_default_device(self): os.environ['FP_DRIVERS_ALLOWLIST'] = 'hopefully_no_existing_driver' # TODO: Remove this when we depend on libfprint 1.94.7 os.environ['FP_DRIVERS_WHITELIST'] = 'hopefully_no_existing_driver' self.daemon_start() with self.assertFprintError('NoSuchDevice'): self.manager.GetDefaultDevice() def test_manager_get_devices_on_name_appeared(self): self._appeared_name = None def on_name_appeared(connection, name, name_owner): self._appeared_name = name def on_name_vanished(connection, name): self._appeared_name = 'NAME_VANISHED' id = Gio.bus_watch_name_on_connection(self.dbus, FPRINT_NAMESPACE, Gio.BusNameWatcherFlags.NONE, on_name_appeared, on_name_vanished) self.addCleanup(Gio.bus_unwatch_name, id) self.daemon_start() while not self._appeared_name: ctx.iteration(True) self.assertEqual(self._appeared_name, FPRINT_NAMESPACE) try: appeared_device = self.dbus.call_sync( FPRINT_NAMESPACE, FPRINT_PATH + '/Manager', FPRINT_NAMESPACE + '.Manager', 'GetDefaultDevice', None, None, Gio.DBusCallFlags.NO_AUTO_START, 500, None) except GLib.GError as e: if FPRINT_NAMESPACE + '.Error.NoSuchDevice' in e.message: self.skipTest("Need virtual_image device to run the test") raise(e) self.assertIsNotNone(appeared_device) [dev_path] = appeared_device self.assertTrue(dev_path.startswith(FPRINT_PATH + '/Device/')) class FPrintdVirtualDeviceTest(FPrintdVirtualDeviceBaseTest): def test_name_property(self): self.assertEqual(self.device.get_cached_property('name').unpack(), self.driver_name) def test_enroll_stages_property(self): self.assertEqual(self.device.get_cached_property('num-enroll-stages').unpack(), 6) def test_scan_type(self): self.assertEqual(self.device.get_cached_property('scan-type').unpack(), 'swipe') def test_initial_finger_needed(self): self.assertFalse(self.finger_needed) def test_initial_finger_needed(self): self.assertFalse(self.finger_present) def test_allowed_claim_release_enroll(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username, FprintDevicePermission.enroll]) self.device.Claim('(s)', 'testuser') self.device.Release() def test_allowed_claim_release_verify(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username, FprintDevicePermission.verify]) self.device.Claim('(s)', 'testuser') self.device.Release() def test_allowed_claim_current_user(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) self.device.Claim('(s)', '') self.device.Release() self.device.Claim('(s)', self.get_current_user()) self.device.Release() def test_allowed_list_enrolled_fingers_empty_user(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) self.device.Claim('(s)', '') self.enroll_image('whorl', finger='left-thumb') self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) self.assertEqual(self.device.ListEnrolledFingers('(s)', ''), ['left-thumb']) self.assertEqual(self.device.ListEnrolledFingers('(s)', self.get_current_user()), ['left-thumb']) def test_allowed_list_enrolled_fingers_current_user(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) self.device.Claim('(s)', self.get_current_user()) self.enroll_image('whorl', finger='right-thumb') self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) self.assertEqual(self.device.ListEnrolledFingers('(s)', ''), ['right-thumb']) self.assertEqual(self.device.ListEnrolledFingers('(s)', self.get_current_user()), ['right-thumb']) def test_unallowed_claim(self): self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.Claim('(s)', 'testuser') self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username]) with self.assertFprintError('PermissionDenied'): self.device.Claim('(s)', 'testuser') self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) with self.assertFprintError('PermissionDenied'): self.device.Claim('(s)', 'testuser') self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) with self.assertFprintError('PermissionDenied'): self.device.Claim('(s)', 'testuser') def test_unallowed_enroll_with_verify_claim(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) self.device.Claim('(s)', '') with self.assertFprintError('PermissionDenied'): self.enroll_image('whorl', finger='right-thumb') def test_unallowed_delete_with_verify_claim(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) self.device.Claim('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_unallowed_delete2_with_verify_claim(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) self.device.Claim('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFingers2() def test_unallowed_delete_single_with_verify_claim(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) self.device.Claim('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFingers('(s)', 'right-thumb') def test_unallowed_verify_with_enroll_claim(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) self.device.Claim('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.VerifyStart('(s)', 'any') def test_unallowed_claim_current_user(self): self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.Claim('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.Claim('(s)', self.get_current_user()) def test_multiple_claims(self): self.device.Claim('(s)', 'testuser') with self.assertFprintError('AlreadyInUse'): self.device.Claim('(s)', 'testuser') self.device.Release() def test_always_allowed_release(self): self.device.Claim('(s)', 'testuser') self._polkitd_obj.SetAllowed(['']) self.device.Release() def test_unclaimed_release(self): with self.assertFprintError('ClaimDevice'): self.device.Release() def test_unclaimed_verify_start(self): with self.assertFprintError('ClaimDevice'): self.device.VerifyStart('(s)', 'any') def test_unclaimed_verify_stop(self): with self.assertFprintError('ClaimDevice'): self.device.VerifyStop() def test_unclaimed_enroll_start(self): with self.assertFprintError('ClaimDevice'): self.device.EnrollStart('(s)', 'left-index-finger') def test_unclaimed_enroll_stop(self): with self.assertFprintError('ClaimDevice'): self.device.EnrollStop() def test_unclaimed_delete_enrolled_fingers(self): self.enroll_image('whorl', claim_user='foo-user') self.device.DeleteEnrolledFingers('(s)', 'foo-user') def test_unclaimed_delete_enrolled_fingers_no_prints(self): with self.assertFprintError('NoEnrolledPrints'): self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_unclaimed_delete_enrolled_finger(self): with self.assertFprintError('ClaimDevice'): self.device.DeleteEnrolledFinger('(s)', 'left-index-finger') def test_unclaimed_delete_enrolled_fingers2(self): with self.assertFprintError('ClaimDevice'): self.device.DeleteEnrolledFingers2() def test_unclaimed_list_enrolled_fingers(self): self.enroll_image('whorl', finger='left-thumb', claim_user='testuser') self.assertEqual(self.device.ListEnrolledFingers('(s)', 'testuser'), ['left-thumb']) def test_unclaimed_list_enrolled_fingers_error(self): with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'testuser') def test_unclaimed_list_enrolled_fingers_ignores_invalid(self): print_path = self.get_print_file_path('testuser', FPrint.Finger.LEFT_INDEX) os.makedirs(os.path.dirname(print_path), exist_ok=True) with open(print_path, mode='wb') as new_print_file: new_print_file.write(b'I am an invalid print!') with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'testuser') def test_claim_device_open_fail(self): os.rename(self.tmpdir, self.tmpdir + '-moved') self.addCleanup(os.rename, self.tmpdir + '-moved', self.tmpdir) with self.assertFprintError('Internal'): self.device.Claim('(s)', 'testuser') def test_claim_from_other_client_is_released_when_vanished(self): self.call_device_method_from_other_client('Claim', ['testuser']) time.sleep(1) self.device.Claim('(s)', 'testuser') self.device.Release() def test_claim_disconnect(self): bus, dev = self.get_secondary_bus_and_device() def call_done(obj, result, user_data): # Ignore the callback (should be an error) pass # Do an async call to claim and immediately close dev.Claim('(s)', 'testuser', result_handler=call_done) # Ensure the call is on the wire, then close immediately bus.flush_sync() bus.close_sync() time.sleep(1) def test_enroll_running_disconnect(self): bus, dev = self.get_secondary_bus_and_device(claim='testuser') # Start an enroll and disconnect, without finishing/cancelling dev.EnrollStart('(s)', 'left-index-finger') # Ensure the call is on the wire, then close immediately bus.flush_sync() bus.close_sync() time.sleep(1) def test_enroll_done_disconnect(self): bus, dev = self.get_secondary_bus_and_device(claim='testuser') # Start an enroll and disconnect, without finishing/cancelling dev.EnrollStart('(s)', 'left-index-finger') # This works because we also receive the signals on the main connection stages = dev.get_cached_property('num-enroll-stages').unpack() for stage in range(stages): self.send_image('whorl') if stage < stages - 1: self.wait_for_result('enroll-stage-passed') else: self.wait_for_result('enroll-completed') bus.close_sync() time.sleep(1) def test_verify_running_disconnect(self): bus, dev = self.get_secondary_bus_and_device(claim='testuser') self.enroll_image('whorl', device=dev) # Start an enroll and disconnect, without finishing/cancelling dev.VerifyStart('(s)', 'right-index-finger') bus.close_sync() time.sleep(1) def test_verify_done_disconnect(self): bus, dev = self.get_secondary_bus_and_device(claim='testuser') self.enroll_image('whorl', device=dev) # Start an enroll and disconnect, without finishing/cancelling dev.VerifyStart('(s)', 'right-index-finger') self.send_image('whorl') # Wait for match and sleep a bit to give fprintd time to wrap up self.wait_for_result('verify-match') time.sleep(1) bus.close_sync() time.sleep(1) def test_identify_running_disconnect(self): bus, dev = self.get_secondary_bus_and_device(claim='testuser') self.enroll_image('whorl', device=dev) # Start an enroll and disconnect, without finishing/cancelling dev.VerifyStart('(s)', 'any') bus.close_sync() time.sleep(1) def test_identify_done_disconnect(self): bus, dev = self.get_secondary_bus_and_device(claim='testuser') self.enroll_image('whorl', device=dev) # Start an enroll and disconnect, without finishing/cancelling dev.VerifyStart('(s)', 'any') self.send_image('whorl') # Wait for match and sleep a bit to give fprintd time to wrap up self.wait_for_result('verify-match') time.sleep(1) bus.close_sync() time.sleep(1) def test_removal_during_enroll(self): if not self._has_hotplug: self.skipTest("libfprint is too old for hotplug") self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username, FprintDevicePermission.enroll]) self.device.Claim('(s)', 'testuser') self.device.EnrollStart('(s)', 'left-index-finger') # Now remove the device while we are enrolling, which will cause an error self.send_remove() self.wait_for_result(expected='enroll-unknown-error') # The device will still be there now until it is released devices = self.manager.GetDevices() self.assertIn(self.device.get_object_path(), devices) with self.assertFprintError('Internal'): self.device.Release() # And now it will be gone devices = self.manager.GetDevices() self.assertNotIn(self.device.get_object_path(), devices) def test_concourrent_claim(self): self.call_device_method_async('Claim', '(s)', ['']) self.call_device_method_async('Claim', '(s)', ['']) with self.assertFprintError('AlreadyInUse'): self.wait_for_device_reply(expected_replies=2) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_suspend_inhibit_unclaimed(self): self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [True]) self.daemon_log.check_line('Preparing devices for sleep', timeout=1) self.wait_released_delay_inhibitor(timeout=1) self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [False]) self.daemon_log.check_line('Preparing devices for resume', timeout=1) self.wait_got_delay_inhibitor(timeout=1) def test_suspend_inhibit_claimed(self): self.device.Claim('(s)', 'testuser') self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [True]) self.daemon_log.check_line('Preparing devices for sleep', timeout=1) self.wait_released_delay_inhibitor(timeout=1) self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [False]) self.daemon_log.check_line('Preparing devices for resume', timeout=1) self.wait_got_delay_inhibitor(timeout=1) self.device.Release() def test_suspend_inhibit_cancels_enroll(self): self.device.Claim('(s)', 'testuser') self.device.EnrollStart('(s)', 'right-thumb') # Now prepare for sleep, which will trigger an internal cancellation self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [True]) self.daemon_log.check_line('Preparing devices for sleep', timeout=1) self.wait_for_result(expected='enroll-unknown-error') self.wait_released_delay_inhibitor(timeout=1) self.assertEqual(os.read(self.logind_inhibit_fifo, 1), b'') self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [False]) self.daemon_log.check_line('Preparing devices for resume', timeout=1) self.wait_got_delay_inhibitor(timeout=1) self.device.Release() def test_suspend_prevents_enroll(self): self.device.Claim('(s)', 'testuser') # Now prepare for sleep, which will trigger an internal cancellation self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [True]) self.daemon_log.check_line('Preparing devices for sleep', timeout=1) self.wait_released_delay_inhibitor(timeout=1) self.device.EnrollStart('(s)', 'right-thumb') self.wait_for_result(expected='enroll-unknown-error') self.logind_obj.EmitSignal("", "PrepareForSleep", "b", [False]) self.daemon_log.check_line('Preparing devices for resume', timeout=1) self.wait_got_delay_inhibitor(timeout=1) self.device.Release() class FPrintdVirtualDeviceStorageTest(FPrintdVirtualStorageDeviceBaseTest, FPrintdVirtualDeviceTest): # Repeat the tests for the Virtual storage device def test_claim_error(self): self.device.Claim('(s)', self.get_current_user()) self.addCleanup(self.try_release) self.set_keep_alive(True) self.device.Release() self.send_error(FPrint.DeviceError.PROTO) with self.assertFprintError('Internal'): self.device.Claim('(s)', 'testuser') class FPrintdVirtualDeviceClaimedTest(FPrintdVirtualDeviceBaseTest): def setUp(self): super().setUp() self.device.Claim('(s)', 'testuser') def tearDown(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) self.try_release() super().tearDown() def test_any_finger_enroll_start(self): with self.assertFprintError('InvalidFingername'): self.device.EnrollStart('(s)', 'any') def test_wrong_finger_enroll_start(self): with self.assertFprintError('InvalidFingername'): self.device.EnrollStart('(s)', 'sixth-right-finger') def test_any_finger_delete_print(self): with self.assertFprintError('InvalidFingername'): self.device.DeleteEnrolledFinger('(s)', 'any') def test_wrong_finger_delete_print(self): with self.assertFprintError('InvalidFingername'): self.device.DeleteEnrolledFinger('(s)', 'sixth-left-finger') def test_delete_with_no_enrolled_prints(self): with self.assertFprintError('NoEnrolledPrints'): self.device.DeleteEnrolledFinger('(s)', 'left-index-finger') def test_verify_with_no_enrolled_prints(self): with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'any') def test_enroll_verify_list_delete(self): # This test can trigger a race in older libfprint, only run if we have # hotplug support, which coincides with the fixed release. if not self._has_hotplug: self.skipTest("libfprint is too old for hotplug") with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'testuser') with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'nottestuser') self.enroll_image('whorl') self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'nottestuser') self.assertEqual(self.device.ListEnrolledFingers('(s)', 'testuser'), ['right-index-finger']) # Finger is enrolled, try to verify it self.device.VerifyStart('(s)', 'any') while not self.finger_needed: ctx.iteration(True) self.assertTrue(self.finger_needed) self.assertFalse(self.finger_present) # Try a wrong print; will stop verification self.send_image('tented_arch') self.assertVerifyNoMatch() self.device.VerifyStop() self.device.VerifyStart('(s)', 'any') # Send a retry error (swipe too short); will not stop verification self.send_retry() self.wait_for_result() self.assertFalse(self._verify_stopped) self.assertEqual(self._last_result, 'verify-swipe-too-short') # Try the correct print; will stop verification self.send_image('whorl') self.wait_for_result() self.assertTrue(self._verify_stopped) self.assertEqual(self._last_result, 'verify-match') self.device.VerifyStop() self.assertEqual(self.device.ListEnrolledFingers('(s)', 'testuser'), ['right-index-finger']) # And delete the print(s) again self.device.DeleteEnrolledFingers('(s)', 'testuser') self.assertFingerNotInStorage('testuser', FPrint.Finger.RIGHT_INDEX) with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'testuser') def test_enroll_delete_storage_error(self): self.enroll_image('whorl') self.enroll_image('tented_arch', finger='left-index-finger') self.set_print_not_writable('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) with self.assertFprintError('PrintsNotDeleted'): self.device.DeleteEnrolledFingers('(s)', 'testuser') self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerNotInStorage('testuser', FPrint.Finger.LEFT_INDEX) def test_enroll_delete2(self): self.enroll_image('whorl') self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) # And delete the print(s) again using the new API self.device.DeleteEnrolledFingers2() self.assertFingerNotInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFalse(os.path.exists(os.path.join(self.state_dir, 'testuser'))) self.assertTrue(os.path.exists(self.state_dir)) def test_enroll_delete2_multiple(self): self.enroll_image('whorl') self.enroll_image('tented_arch', finger='left-index-finger') self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) self.device.DeleteEnrolledFingers2() self.assertFingerNotInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerNotInStorage('testuser', FPrint.Finger.LEFT_INDEX) def test_enroll_delete2_storage_error(self): self.enroll_image('whorl') self.enroll_image('tented_arch', finger='left-index-finger') self.set_print_not_writable('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) with self.assertFprintError('PrintsNotDeleted'): self.device.DeleteEnrolledFingers2() self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerNotInStorage('testuser', FPrint.Finger.LEFT_INDEX) def test_enroll_delete_single(self): self.enroll_image('whorl', finger='right-index-finger') self.enroll_image('tented_arch', finger='left-index-finger') self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) self.device.DeleteEnrolledFinger('(s)', 'right-index-finger') self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) self.assertFingerNotInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.device.DeleteEnrolledFinger('(s)', 'left-index-finger') self.assertFingerNotInStorage('testuser', FPrint.Finger.LEFT_INDEX) self.assertFingerNotInStorage('testuser', FPrint.Finger.RIGHT_INDEX) def test_enroll_delete_single_storage_error(self): self.enroll_image('whorl', finger='right-index-finger') self.enroll_image('tented_arch', finger='left-index-finger') self.set_print_not_writable('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) with self.assertFprintError('PrintsNotDeleted'): self.device.DeleteEnrolledFinger('(s)', 'right-index-finger') self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) self.set_print_not_writable('testuser', FPrint.Finger.LEFT_INDEX) with self.assertFprintError('PrintsNotDeleted'): self.device.DeleteEnrolledFinger('(s)', 'left-index-finger') self.assertFingerInStorage('testuser', FPrint.Finger.LEFT_INDEX) self.assertFingerInStorage('testuser', FPrint.Finger.RIGHT_INDEX) def test_enroll_invalid_storage_dir(self): # Directory will not exist yet os.makedirs(self.state_dir, mode=0o500) self.addCleanup(os.chmod, self.state_dir, mode=0o700) self.skipTestIfCanWrite(self.state_dir) self.enroll_image('whorl', expected_result='enroll-failed') def test_enroll_write_print_error(self): self.set_print_not_writable('testuser', FPrint.Finger.LEFT_THUMB) self.enroll_image('whorl', expected_result='enroll-failed', finger='left-thumb') def test_verify_invalid_storage_dir(self): self.enroll_image('whorl') os.chmod(self.state_dir, mode=0o000) self.addCleanup(os.chmod, self.state_dir, mode=0o700) self.skipTestIfCanWrite(self.state_dir) with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'any') def test_verify_read_print_error(self): self.enroll_image('whorl', finger='left-thumb') self.set_print_not_writable('testuser', FPrint.Finger.LEFT_THUMB) with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'any') def test_enroll_stop_cancels(self): self.device.EnrollStart('(s)', 'left-index-finger') self.device.EnrollStop() self.wait_for_result(expected='enroll-failed') def test_verify_stop_cancels(self): self.enroll_image('whorl') self.device.VerifyStart('(s)', 'any') self.device.VerifyStop() self.wait_for_result(expected='verify-no-match') def test_verify_finger_stop_cancels(self): self.enroll_image('whorl', finger='left-thumb') self.device.VerifyStart('(s)', 'left-thumb') self.device.VerifyStop() def test_busy_device_release_on_enroll(self): self.device.EnrollStart('(s)', 'left-index-finger') self.device.Release() self.wait_for_result(expected='enroll-failed') def test_busy_device_release_on_verify(self): self.enroll_image('whorl', finger='left-index-finger') self.device.VerifyStart('(s)', 'any') self.device.Release() self.wait_for_result(expected='verify-no-match') def test_busy_device_release_on_verify_finger(self): self.enroll_image('whorl', finger='left-middle-finger') self.device.VerifyStart('(s)', 'left-middle-finger') self.device.Release() self.wait_for_result(expected='verify-no-match') def test_enroll_stop_not_started(self): with self.assertFprintError('NoActionInProgress'): self.device.EnrollStop() def test_verify_stop_not_started(self): with self.assertFprintError('NoActionInProgress'): self.device.VerifyStop() def test_verify_finger_match(self): self.enroll_image('whorl', finger='left-thumb') self.device.VerifyStart('(s)', 'left-thumb') self.send_image('whorl') self.wait_for_result() self.assertTrue(self._verify_stopped) self.assertEqual(self._last_result, 'verify-match') self.assertEqual(self._selected_finger, 'left-thumb') self.device.VerifyStop() def test_verify_finger_no_match(self): self.enroll_image('whorl', finger='left-thumb') self.device.VerifyStart('(s)', 'left-thumb') self.send_image('tented_arch') self.assertVerifyNoMatch(selected_finger='left-thumb') self.device.VerifyStop() def test_verify_finger_no_match_restart(self): self.enroll_image('whorl', finger='left-thumb') self.device.VerifyStart('(s)', 'left-thumb') self.send_image('tented_arch') self.assertVerifyNoMatch(selected_finger='left-thumb') self.device.VerifyStop() # Immediately starting again after a no-match must work self.device.VerifyStart('(s)', 'left-thumb') self.send_image('whorl') self.wait_for_result() self.assertTrue(self._verify_stopped) self.assertEqual(self._last_result, 'verify-match') self.assertEqual(self._selected_finger, 'left-thumb') self.device.VerifyStop() def test_verify_wrong_finger_match(self): self.enroll_image('whorl', finger='left-thumb') self.device.VerifyStart('(s)', 'left-toe') self.send_image('whorl') self.wait_for_result() self.assertTrue(self._verify_stopped) self.assertEqual(self._last_result, 'verify-match') self.assertEqual(self._selected_finger, 'left-thumb') self.device.VerifyStop() def test_verify_wrong_finger_no_match(self): self.enroll_image('whorl', finger='right-thumb') self.device.VerifyStart('(s)', 'right-toe') self.send_image('tented_arch') self.assertVerifyNoMatch(selected_finger='right-thumb') self.device.VerifyStop() def test_verify_any_finger_match(self): second_image = self.enroll_multiple_images(return_index=1) self.device.VerifyStart('(s)', 'any') self.send_image(second_image) self.wait_for_result() self.assertTrue(self._verify_stopped) self.assertEqual(self._last_result, 'verify-match') self.assertEqual(self._selected_finger, 'any') self.device.VerifyStop() def test_verify_any_finger_no_match(self, selected_finger='any'): enrolled, _map = self.enroll_multiple_images() verify_image = 'tented_arch' self.assertNotIn(verify_image, enrolled) self.device.VerifyStart('(s)', 'any') self.send_image(verify_image) self.assertVerifyNoMatch(selected_finger) self.device.VerifyStop() def test_verify_any_finger_multiple_users(self): enroll_map, enrolled_prints_info = self.enroll_users_images() enrolled_users = list(enroll_map) for verifying_user in enrolled_users: self.device.Claim('(s)', verifying_user) for enrolled_user in enrolled_users: should_match = enrolled_user == verifying_user for finger, print in enroll_map[enrolled_user].items(): self.device.VerifyStart('(s)', 'any') self.send_image(print) if should_match: self.assertVerifyMatch() else: self.assertVerifyNoMatch() self.device.VerifyStop() self.device.Release() def test_enroll_users_duplicate_prints(self): _enroll_map, prints_info = self.enroll_users_images(enroll_map={ 'test-user1': {'left-thumb': 'whorl', 'right-thumb': 'whorl'}, 'test-user2': {'left-index-finger': 'whorl'}, 'test-user3': {'left-little-finger': 'tented_arch'}, }, allow_duplicates=True) self.assertEqual(prints_info, { 'whorl': ('test-user1', 'left-thumb'), 'tented_arch': ('test-user3', 'left-little-finger'), }) def test_verify_finger_not_enrolled(self): self.enroll_image('whorl', finger='left-thumb') with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'right-thumb') def test_verify_finger_not_enrolled_stops_verification(self): self.enroll_image('whorl', finger='left-thumb') with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'right-thumb') with self.assertFprintError('NoActionInProgress'): self.device.VerifyStop() def test_identify_finger_not_enrolled(self): with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'any') def test_identify_finger_not_enrolled_stops_verification(self): with self.assertFprintError('NoEnrolledPrints'): self.device.VerifyStart('(s)', 'any') with self.assertFprintError('NoActionInProgress'): self.device.VerifyStop() def test_unallowed_enroll_start(self): self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.EnrollStart('(s)', 'right-index-finger') self._polkitd_obj.SetAllowed([FprintDevicePermission.enroll]) self.enroll_image('whorl') def test_always_allowed_enroll_stop(self): self.device.EnrollStart('(s)', 'right-index-finger') self._polkitd_obj.SetAllowed(['']) self.device.EnrollStop() def test_unallowed_verify_start(self): self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.VerifyStart('(s)', 'any') def test_always_allowed_verify_stop(self): self.enroll_image('whorl') self.device.VerifyStart('(s)', 'any') self._polkitd_obj.SetAllowed(['']) self.device.VerifyStop() def test_list_enrolled_fingers_current_user(self): self.enroll_image('whorl') self._polkitd_obj.SetAllowed([FprintDevicePermission.verify]) with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', '') with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', self.get_current_user()) def test_unallowed_list_enrolled_fingers(self): self.enroll_image('whorl') self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.ListEnrolledFingers('(s)', 'testuser') self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username]) with self.assertFprintError('PermissionDenied'): self.device.ListEnrolledFingers('(s)', 'testuser') def test_unallowed_list_enrolled_fingers_current_user(self): self.enroll_image('whorl') self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.ListEnrolledFingers('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.ListEnrolledFingers('(s)', self.get_current_user()) self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username]) with self.assertFprintError('PermissionDenied'): self.device.ListEnrolledFingers('(s)', '') with self.assertFprintError('PermissionDenied'): self.device.ListEnrolledFingers('(s)', self.get_current_user()) def test_unallowed_delete_enrolled_fingers(self): self.enroll_image('whorl') self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFingers('(s)', 'testuser') self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username]) with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_unallowed_delete_enrolled_fingers2(self): self.enroll_image('whorl') self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFingers2() def test_unallowed_delete_enrolled_finger(self): self.enroll_image('whorl') self._polkitd_obj.SetAllowed(['']) with self.assertFprintError('PermissionDenied'): self.device.DeleteEnrolledFinger('(s)', 'left-little-finger') def test_delete_enrolled_fingers_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('DeleteEnrolledFingers', ['testuser']) def test_delete_enrolled_fingers2_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('DeleteEnrolledFingers2') def test_delete_enrolled_finger_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('DeleteEnrolledFinger', ['left-index-finger']) def test_release_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('Release') def test_enroll_start_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('EnrollStart', ['left-index-finger']) def test_verify_start_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('VerifyStart', ['any']) def test_verify_start_finger_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('VerifyStart', ['left-thumb']) def test_enroll_finger_status(self): self.assertFalse(self.finger_present) self.assertFalse(self.finger_needed) self.device.EnrollStart('(s)', 'right-middle-finger') self.assertEqual(self._changed_properties, []) while not self.finger_needed: ctx.iteration(False) self.assertIn({'finger-needed': True}, self._changed_properties) self.assertTrue(self.finger_needed) self.assertFalse(self.finger_present) self._changed_properties = [] self.send_finger_report(True) self.assertEqual([{'finger-present': True}], self._changed_properties) self.assertTrue(self.finger_needed) self.assertTrue(self.finger_present) self._changed_properties = [] self.send_finger_report(False) self.assertFalse(self.finger_present) self.assertTrue(self.finger_needed) self.assertEqual([{'finger-present': False}], self._changed_properties) self._changed_properties = [] self.device.EnrollStop() while self.finger_needed: ctx.iteration(False) self.assertFalse(self.finger_present) self.assertFalse(self.finger_needed) self.assertEqual([{'finger-needed': False}], self._changed_properties) def test_verify_finger_status(self): self.assertFalse(self.finger_present) self.assertFalse(self.finger_needed) self.assertEqual(self._changed_properties, []) self.enroll_image('whorl') self.assertIn({'finger-needed': True}, self._changed_properties) self.assertIn({'finger-needed': False}, self._changed_properties) self.assertFalse(self.finger_present) self.assertFalse(self.finger_needed) self._changed_properties = [] self.device.VerifyStart('(s)', 'any') self.assertEqual(self._changed_properties, []) while not self.finger_needed: ctx.iteration(False) self.assertIn({'finger-needed': True}, self._changed_properties) self.assertTrue(self.finger_needed) self.assertFalse(self.finger_present) self._changed_properties = [] self.send_finger_report(True) self.assertEqual([{'finger-present': True}], self._changed_properties) self.assertTrue(self.finger_needed) self.assertTrue(self.finger_present) self._changed_properties = [] self.send_finger_report(False) self.assertFalse(self.finger_present) self.assertTrue(self.finger_needed) self.assertEqual([{'finger-present': False}], self._changed_properties) self._changed_properties = [] self.device.VerifyStop() while self.finger_needed: ctx.iteration(False) self.assertFalse(self.finger_present) self.assertFalse(self.finger_needed) self.assertEqual([{'finger-needed': False}], self._changed_properties) def test_concourrent_enroll_start(self): self.call_device_method_async('EnrollStart', '(s)', ['left-little-finger']) self.call_device_method_async('EnrollStart', '(s)', ['left-thumb']) with self.assertFprintError('AlreadyInUse'): self.wait_for_device_reply(expected_replies=2) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_concourrent_verify_start(self): self.enroll_image('whorl', finger='left-thumb') self.call_device_method_async('VerifyStart', '(s)', ['any']) self.call_device_method_async('VerifyStart', '(s)', ['left-thumb']) with self.assertFprintError('AlreadyInUse'): self.wait_for_device_reply(expected_replies=2) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_concourrent_list_enrolled_fingers(self): self.enroll_image('whorl') self.call_device_method_async('ListEnrolledFingers', '(s)', ['testuser']) self.call_device_method_async('ListEnrolledFingers', '(s)', ['testuser']) # No failure is expected here since it's all sync self.wait_for_device_reply(expected_replies=2) self.assertEqual([(['right-index-finger'],), (['right-index-finger'],)], [ f.unpack() for f in self.get_all_async_replies() ]) def test_concourrent_delete_enrolled_fingers(self): self.enroll_image('whorl') self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) accepted_exceptions = ['NoEnrolledPrints'] if self.device_driver == 'virtual_device_storage': accepted_exceptions.append('AlreadyInUse') self.wait_for_device_reply_relaxed(expected_replies=2, accepted_exceptions=accepted_exceptions) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_concourrent_delete_enrolled_fingers_unclaimed(self): self.enroll_image('whorl') self.device.Release() self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) accepted_exceptions = ['NoEnrolledPrints'] if self.device_driver == 'virtual_device_storage': accepted_exceptions.append('AlreadyInUse') self.wait_for_device_reply_relaxed(expected_replies=2, accepted_exceptions=accepted_exceptions) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_concourrent_delete_enrolled_fingers2(self): self.enroll_image('whorl') self.call_device_method_async('DeleteEnrolledFingers2', '()', []) self.call_device_method_async('DeleteEnrolledFingers2', '()', []) accepted_exceptions = ['NoEnrolledPrints'] if self.device_driver == 'virtual_device_storage': accepted_exceptions.append('AlreadyInUse') self.wait_for_device_reply_relaxed(expected_replies=2, accepted_exceptions=accepted_exceptions) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_concourrent_delete_enrolled_finger(self): self.enroll_image('whorl', finger='left-thumb') self.enroll_image('tented_arch', finger='right-thumb') self.call_device_method_async('DeleteEnrolledFinger', '(s)', ['left-thumb']) self.call_device_method_async('DeleteEnrolledFinger', '(s)', ['right-thumb']) accepted_exceptions = [] if self.device_driver == 'virtual_device_storage': accepted_exceptions.append('AlreadyInUse') self.wait_for_device_reply_relaxed(expected_replies=2, accepted_exceptions=accepted_exceptions) if self.device_driver == 'virtual_device_storage': self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) else: self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], self.get_all_async_replies()) def test_concourrent_release(self): self.call_device_method_async('Release', '()', []) self.call_device_method_async('Release', '()', []) with self.assertFprintError(['AlreadyInUse', 'ClaimDevice']): self.wait_for_device_reply(expected_replies=2) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_already_claimed_same_user_delete_enrolled_fingers(self): self.enroll_image('whorl') self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_already_claimed_same_user_delete_enrolled_fingers_no_prints(self): with self.assertFprintError('NoEnrolledPrints'): self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_already_claimed_other_user_delete_enrolled_fingers(self): self.device.Release() self.enroll_image('whorl', claim_user='nottestuser') self.device.Claim('(s)', 'testuser') self.device.DeleteEnrolledFingers('(s)', 'nottestuser') def test_already_claimed_other_user_delete_enrolled_fingers_no_prints(self): with self.assertFprintError('NoEnrolledPrints'): self.device.DeleteEnrolledFingers('(s)', 'nottestuser') class FPrintdVirtualDeviceEnrollTests(FPrintdVirtualDeviceBaseTest): def setUp(self): super().setUp() self._abort = False self.device.Claim('(s)', 'testuser') self.device.EnrollStart('(s)', 'left-middle-finger') self.stop_on_teardown = True def tearDown(self): if self.stop_on_teardown: self.device.EnrollStop() self.device.Release() super().tearDown() def assertEnrollRetry(self, device_error, expected_error): self.send_retry(retry_error=device_error) self.wait_for_result(expected=expected_error) def assertEnrollError(self, device_error, expected_error): self.send_error(error=device_error) self.wait_for_result(expected=expected_error) def test_enroll_retry_general(self): self.assertEnrollRetry(FPrint.DeviceRetry.GENERAL, 'enroll-retry-scan') def test_enroll_retry_too_short(self): self.assertEnrollRetry(FPrint.DeviceRetry.TOO_SHORT, 'enroll-swipe-too-short') def test_enroll_retry_remove_finger(self): self.assertEnrollRetry(FPrint.DeviceRetry.REMOVE_FINGER, 'enroll-remove-and-retry') def test_enroll_retry_center_finger(self): self.assertEnrollRetry(FPrint.DeviceRetry.CENTER_FINGER, 'enroll-finger-not-centered') def test_enroll_error_general(self): self.assertEnrollError(FPrint.DeviceError.GENERAL, 'enroll-unknown-error') def test_enroll_error_not_supported(self): self.assertEnrollError(FPrint.DeviceError.NOT_SUPPORTED, 'enroll-unknown-error') def test_enroll_error_not_open(self): self.assertEnrollError(FPrint.DeviceError.NOT_OPEN, 'enroll-unknown-error') def test_enroll_error_already_open(self): self.assertEnrollError(FPrint.DeviceError.ALREADY_OPEN, 'enroll-unknown-error') def test_enroll_error_busy(self): self.assertEnrollError(FPrint.DeviceError.BUSY, 'enroll-unknown-error') def test_enroll_error_proto(self): self.assertEnrollError(FPrint.DeviceError.PROTO, 'enroll-disconnected') def test_enroll_error_data_invalid(self): self.assertEnrollError(FPrint.DeviceError.DATA_INVALID, 'enroll-unknown-error') def test_enroll_error_data_not_found(self): if self.has_identification: self.assertEnrollError( FPrint.DeviceError.DATA_NOT_FOUND, 'enroll-stage-passed') self.assertEnrollError(FPrint.DeviceError.DATA_NOT_FOUND, 'enroll-unknown-error') def test_enroll_error_data_full(self): self.assertEnrollError(FPrint.DeviceError.DATA_FULL, 'enroll-data-full') def test_enroll_already_enrolled_finger(self): self.enroll_image('whorl', start=False) # We can enroll a new image deleting the first self.device.EnrollStart('(s)', 'left-middle-finger') self.enroll_image('arch', start=False) self.stop_on_teardown = False # If we verify, 'arch' will match, 'whorl' will not match self.device.VerifyStart('(s)', 'any') self.send_image('whorl') self.assertVerifyNoMatch() self.device.VerifyStop() self.device.VerifyStart('(s)', 'any') self.send_image('arch') self.assertVerifyMatch() self.device.VerifyStop() def test_enroll_duplicate_image(self): self.enroll_image('whorl', finger='left-thumb', start=False) self.enroll_image('whorl', finger='right-thumb', stop=False, expected_result='enroll-duplicate' if self.has_identification else 'enroll-completed') def test_enroll_start_during_enroll(self): with self.assertFprintError('AlreadyInUse'): self.device.EnrollStart('(s)', 'left-thumb') def test_verify_start_during_enroll(self): self.enroll_image('whorl', start=False) self.device.EnrollStart('(s)', 'right-thumb') with self.assertFprintError('AlreadyInUse'): self.device.VerifyStart('(s)', 'any') def test_verify_stop_during_enroll(self): with self.assertFprintError('AlreadyInUse'): self.device.VerifyStop() def test_enroll_stop_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('EnrollStop') def test_delete_fingers_during_enroll(self): with self.assertFprintError('AlreadyInUse'): self.device.DeleteEnrolledFingers('(s)', '') def test_delete_fingers2_during_enroll(self): with self.assertFprintError('AlreadyInUse'): self.device.DeleteEnrolledFingers2() def test_delete_finger_during_enroll(self): with self.assertFprintError('AlreadyInUse'): self.device.DeleteEnrolledFinger('(s)', 'left-thumb') def test_enroll_concourrent_stop(self): self.stop_on_teardown = False self.call_device_method_async('EnrollStop', '()', []) self.call_device_method_async('EnrollStop', '()', []) with self.assertFprintError(['AlreadyInUse', 'NoActionInProgress']): self.wait_for_device_reply(method='EnrollStop', expected_replies=2) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) class FPrintdVirtualDeviceNoStorageEnrollTests(FPrintdVirtualNoStorageDeviceBaseTest, FPrintdVirtualDeviceEnrollTests): # Repeat the tests for the Virtual device (with no storage) pass class FPrintdVirtualDeviceStorageClaimedTest(FPrintdVirtualStorageDeviceBaseTest, FPrintdVirtualDeviceClaimedTest): # Repeat the tests for the Virtual storage device def test_release_waits_for_deletion(self): self.enroll_print('new-print') self.send_sleep(get_timeout('daemon_stop') * 1000 * 0.5) self.call_device_method_async('DeleteEnrolledFingers2', '()', []) self.wait_for_result(max_wait=100) self.call_device_method_async('Release', '()', []) with self.assertFprintError('Internal'): self.wait_for_device_reply(method='Release') self.assertFalse(self.get_async_replies( method='DeleteEnrolledFingers2')) def test_delete_enrolled_fingers_device_error(self): self.enroll_print('new-print') self.send_sleep(10) self.send_error(FPrint.DeviceError.BUSY) with self.assertFprintError('PrintsNotDeletedFromDevice'): self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_delete_enrolled_fingers2_device_error(self): self.enroll_print('new-print') self.send_sleep(10) self.send_error(FPrint.DeviceError.BUSY) with self.assertFprintError('PrintsNotDeletedFromDevice'): self.device.DeleteEnrolledFingers2() def test_delete_enrolled_finger_device_error(self): self.enroll_print('new-print', finger='left-thumb') self.send_sleep(10) self.send_error(FPrint.DeviceError.BUSY) with self.assertFprintError('PrintsNotDeletedFromDevice'): self.device.DeleteEnrolledFinger('(s)', 'left-thumb') def test_delete_enrolled_fingers_device_removed(self): self.enroll_print('deleted-print') self.send_command('REMOVE', 'deleted-print') with self.assertFprintError('PrintsNotDeletedFromDevice'): self.device.DeleteEnrolledFingers('(s)', 'testuser') def test_delete_enrolled_fingers2_device_removed(self): self.enroll_print('deleted-print') self.send_command('REMOVE', 'deleted-print') with self.assertFprintError('PrintsNotDeletedFromDevice'): self.device.DeleteEnrolledFingers2() def test_delete_enrolled_finger_device_removed(self): self.enroll_print('deleted-print', finger='left-thumb') self.send_command('REMOVE', 'deleted-print') with self.assertFprintError('PrintsNotDeletedFromDevice'): self.device.DeleteEnrolledFinger('(s)', 'left-thumb') def test_delete_enrolled_fingers_storage_error_has_higher_priority(self): self.enroll_print('deleted-print', finger='left-thumb') self.send_sleep(get_timeout('device_sleep')) self.send_error(FPrint.DeviceError.BUSY) self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) self.wait_for_result(max_wait=get_timeout('device_sleep') / 2) self.assertFalse(self.get_all_async_replies()) self.set_print_not_writable('testuser', FPrint.Finger.LEFT_THUMB) with self.assertFprintError('PrintsNotDeleted'): self.wait_for_device_reply() def test_delete_enrolled_fingers2_storage_error_has_higher_priority(self): self.enroll_print('deleted-print', finger='left-thumb') self.send_sleep(get_timeout('device_sleep')) self.send_error(FPrint.DeviceError.BUSY) self.call_device_method_async('DeleteEnrolledFingers2', '()', []) self.wait_for_result(max_wait=get_timeout('device_sleep') / 2) self.assertFalse(self.get_all_async_replies()) self.set_print_not_writable('testuser', FPrint.Finger.LEFT_THUMB) with self.assertFprintError('PrintsNotDeleted'): self.wait_for_device_reply() def test_delete_enrolled_finger_storage_error_has_higher_priority(self): self.enroll_print('deleted-print', finger='left-thumb') self.send_sleep(get_timeout('device_sleep')) self.send_error(FPrint.DeviceError.BUSY) self.call_device_method_async('DeleteEnrolledFinger', '(s)', ['left-thumb']) self.wait_for_result(max_wait=get_timeout('device_sleep') / 2) self.assertFalse(self.get_all_async_replies()) self.set_print_not_writable('testuser', FPrint.Finger.LEFT_THUMB) with self.assertFprintError('PrintsNotDeleted'): self.wait_for_device_reply() def test_release_error(self): self.send_error(FPrint.DeviceError.PROTO) with self.assertFprintError('Internal'): self.device.Release() def test_release_fails_while_closing(self): self.send_sleep(300) self.call_device_method_async('Release', '()', []) self.wait_for_result(max_wait=150) self.assertFalse(self.get_all_async_replies()) with self.assertFprintError('AlreadyInUse'): self.device.Release() self.wait_for_device_reply() self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): @classmethod def setUpClass(cls): super().setUpClass() cls.enroll_finger = 'left-middle-finger' cls.verify_finger = cls.enroll_finger cls.stop_on_teardown = True cls.releases_on_teardown = True def setUp(self): super().setUp() self.device.Claim('(s)', 'testuser') self.enroll_image('whorl', finger=self.enroll_finger) self.device.VerifyStart('(s)', self.verify_finger) def tearDown(self): if self.stop_on_teardown: self.device.VerifyStop() if self.releases_on_teardown: self.device.Release() super().tearDown() def assertVerifyRetry(self, device_error, expected_error): self.send_retry(retry_error=device_error) self.wait_for_result() self.assertFalse(self._verify_stopped) self.assertEqual(self._last_result, expected_error) def assertVerifyError(self, device_error, expected_error): self.send_error(error=device_error) self.wait_for_result() self.assertTrue(self._verify_stopped) self.assertEqual(self._last_result, expected_error) def test_verify_retry_general(self): self.assertVerifyRetry(FPrint.DeviceRetry.GENERAL, 'verify-retry-scan') def test_verify_retry_general_restarted(self): self.assertVerifyRetry(FPrint.DeviceRetry.GENERAL, 'verify-retry-scan') # Give fprintd time to re-start the request. We can't force the other # case (cancellation before restart happened), but we can force this one. time.sleep(1) def test_verify_retry_too_short(self): self.assertVerifyRetry(FPrint.DeviceRetry.TOO_SHORT, 'verify-swipe-too-short') def test_verify_retry_remove_finger(self): self.assertVerifyRetry(FPrint.DeviceRetry.REMOVE_FINGER, 'verify-remove-and-retry') def test_verify_retry_center_finger(self): self.assertVerifyRetry(FPrint.DeviceRetry.CENTER_FINGER, 'verify-finger-not-centered') def test_verify_error_general(self): self.assertVerifyError(FPrint.DeviceError.GENERAL, 'verify-unknown-error') def test_verify_error_not_supported(self): self.assertVerifyError(FPrint.DeviceError.NOT_SUPPORTED, 'verify-unknown-error') def test_verify_error_not_open(self): self.assertVerifyError(FPrint.DeviceError.NOT_OPEN, 'verify-unknown-error') def test_verify_error_already_open(self): self.assertVerifyError(FPrint.DeviceError.ALREADY_OPEN, 'verify-unknown-error') def test_verify_error_busy(self): self.assertVerifyError(FPrint.DeviceError.BUSY, 'verify-unknown-error') def test_verify_error_proto(self): self.assertVerifyError(FPrint.DeviceError.PROTO, 'verify-disconnected') def test_verify_error_data_invalid(self): self.assertVerifyError(FPrint.DeviceError.DATA_INVALID, 'verify-unknown-error') def test_verify_error_data_not_found(self): self.assertVerifyError(FPrint.DeviceError.DATA_NOT_FOUND, 'verify-no-match') def test_verify_error_data_full(self): self.assertVerifyError(FPrint.DeviceError.DATA_FULL, 'verify-unknown-error') def test_multiple_verify(self): self.send_image('tented_arch') self.assertVerifyNoMatch() self.device.VerifyStop() self.device.VerifyStart('(s)', self.verify_finger) self.send_image('whorl') self.assertVerifyMatch() def start_verify_with_delayed_stop(self, image): with Connection(self.sockaddr) as con: self.send_finger_automatic(False, con=con) self.send_finger_report(True, con=con) self.send_image(image, con=con) def test_multiple_verify_cancelled(self): self.start_verify_with_delayed_stop('tented_arch') self.assertVerifyNoMatch() self.device.VerifyStop() self.device.VerifyStart('(s)', self.verify_finger) self.send_finger_report(False) self.send_image('whorl') self.assertVerifyMatch() def test_verify_start_during_verify(self): with self.assertFprintError('AlreadyInUse'): self.device.VerifyStart('(s)', self.verify_finger) def test_enroll_start_during_verify(self): with self.assertFprintError('AlreadyInUse'): self.device.EnrollStart('(s)', 'right-thumb') def test_enroll_stop_during_verify(self): with self.assertFprintError('AlreadyInUse'): self.device.EnrollStop() def test_verify_stop_from_other_client(self): with self.assertFprintError('AlreadyInUse'): self.call_device_method_from_other_client('VerifyStop') def test_delete_fingers_during_verify(self): with self.assertFprintError('AlreadyInUse'): self.device.DeleteEnrolledFingers('(s)', '') def test_delete_fingers2_during_verify(self): with self.assertFprintError('AlreadyInUse'): self.device.DeleteEnrolledFingers2() def test_delete_finger_during_verify(self): with self.assertFprintError('AlreadyInUse'): self.device.DeleteEnrolledFinger('(s)', 'left-thumb') def test_verify_concourrent_stop(self): self.stop_on_teardown = False self.call_device_method_async('VerifyStop', '()', []) self.call_device_method_async('VerifyStop', '()', []) with self.assertFprintError(['AlreadyInUse', 'NoActionInProgress']): self.wait_for_device_reply(method='VerifyStop', expected_replies=2) self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_verify_error_ignored_after_report(self): if self.device_driver != 'virtual_image': self.skipTest('Relies on virtual_image driver specifics') with Connection(self.sockaddr) as con: self.send_finger_automatic(False, con=con) self.send_finger_report(True, con=con) self.send_image('whorl', con=con) self.assertVerifyMatch() self.assertTrue(self.finger_present) self.send_error(con=con) self.wait_for_result(max_wait=200) self.assertIsNone(self._last_result) self.assertFalse(self.finger_present) def test_verify_stop_waits_for_completion(self): self.stop_on_teardown = False self.start_verify_with_delayed_stop('tented_arch') self.assertVerifyNoMatch() self.call_device_method_async('VerifyStop', '()', []) def restart_verify(abort=False): self.call_device_method_async('VerifyStart', '(s)', [self.verify_finger]) with self.assertFprintError('AlreadyInUse'): self.wait_for_device_reply(method='VerifyStart') self.assertFalse(self.get_async_replies(method='VerifyStop')) self._abort = abort restart_verify() GLib.timeout_add(100, restart_verify) GLib.timeout_add(300, restart_verify, True) self.wait_for_result() def test_verify_stop_waits_for_completion_waiting_timeout(self): self.test_verify_stop_waits_for_completion() self.wait_for_device_reply(method='VerifyStop') self.assertTrue(self.get_async_replies(method='VerifyStop')) def test_verify_stop_waits_for_completion_is_stopped_by_release(self): # During the release here we're testing the case in which # while we're waiting for VerifyStop to return, Release stops the # verification, making the invocation to return self.releases_on_teardown = False self.test_verify_stop_waits_for_completion() self.assertFalse(self.get_async_replies(method='VerifyStop')) self.call_device_method_async('Release', '()', []) self.wait_for_device_reply(method='Release') self.assertTrue(self.get_async_replies(method='VerifyStop')) class FPrintdVirtualDeviceStorageVerificationUtils(object): def start_verify_with_delayed_stop(self, image, match=None): self.send_sleep(50) self.send_image(image) self.send_sleep(get_timeout('test') * 1000) def test_verify_error_ignored_after_report(self): self.send_sleep(50) self.send_image('whorl') self.send_sleep(0) self.send_error(FPrint.DeviceError.BUSY) self.assertVerifyMatch() self.wait_for_result(max_wait=200) self.assertIsNone(self._last_result) self.assertFalse(self.finger_present) class FPrintdVirtualDeviceStorageVerificationTests(FPrintdVirtualStorageDeviceBaseTest, FPrintdVirtualDeviceStorageVerificationUtils, FPrintdVirtualDeviceVerificationTests): # Repeat the tests for the Virtual storage device, with specific overrides pass class FPrintdVirtualDeviceNoStorageVerificationTests(FPrintdVirtualNoStorageDeviceBaseTest, FPrintdVirtualDeviceStorageVerificationTests): # Repeat the tests for the Virtual device (with no storage) pass class FPrintdVirtualDeviceIdentificationTests(FPrintdVirtualDeviceVerificationTests): '''This class will just repeat the tests of FPrintdVirtualDeviceVerificationTests but with 'any' finger parameter (leading to an identification, when possible under the hood). ''' @classmethod def setUpClass(cls): super().setUpClass() cls.verify_finger = 'any' class FPrintdVirtualDeviceStorageIdentificationTests(FPrintdVirtualStorageDeviceBaseTest, FPrintdVirtualDeviceStorageVerificationUtils, FPrintdVirtualDeviceIdentificationTests): # Repeat the tests for the Virtual storage device pass class FPrintdVirtualDeviceNoStorageIdentificationTests(FPrintdVirtualNoStorageDeviceBaseTest, FPrintdVirtualDeviceStorageIdentificationTests): # Repeat the tests for the Virtual device (with no storage) pass class FPrindConcurrentPolkitRequestsTest(FPrintdVirtualStorageDeviceBaseTest): def wait_for_hanging_clients(self): while not self._polkitd_obj.HaveHangingCalls(): pass self.assertTrue(self._polkitd_obj.HaveHangingCalls()) def start_hanging_gdbus_claim(self, user='testuser'): gdbus = self.gdbus_device_method_call_process('Claim', [user]) self.assertIsNone(gdbus.poll()) self.wait_for_hanging_clients() self.addCleanup(gdbus.kill) return gdbus def test_hanging_claim_does_not_block_new_claim_external_client(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.enroll ]) self._polkitd_obj.SimulateHang(True) self._polkitd_obj.SetDelay(0.5) gdbus = self.start_hanging_gdbus_claim() self._polkitd_obj.SimulateHang(False) self.device.Claim('(s)', self.get_current_user()) self.assertIsNone(gdbus.poll()) self._polkitd_obj.ReleaseHangingCalls() gdbus.wait() with self.assertFprintError('AlreadyInUse'): raise GLib.GError(gdbus.stdout.read()) self.device.Release() def test_hanging_claim_does_not_block_new_claim(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.enroll ]) self._polkitd_obj.SimulateHang(True) self._polkitd_obj.SetDelay(0.5) self.call_device_method_async('Claim', '(s)', ['']) self.wait_for_hanging_clients() self._polkitd_obj.SimulateHang(False) self.device.Claim('(s)', self.get_current_user()) self._polkitd_obj.ReleaseHangingCalls() with self.assertFprintError('AlreadyInUse'): self.wait_for_device_reply() self.device.Release() def test_hanging_claim_enroll_does_not_block_new_claim(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.enroll ]) self._polkitd_obj.SimulateHangActions([ FprintDevicePermission.enroll]) self._polkitd_obj.SetDelay(0.5) gdbus = self.start_hanging_gdbus_claim() self._polkitd_obj.SimulateHangActions(['']) self.device.Claim('(s)', self.get_current_user()) self.assertIsNone(gdbus.poll()) self._polkitd_obj.ReleaseHangingCalls() gdbus.wait() with self.assertFprintError('AlreadyInUse'): raise GLib.GError(gdbus.stdout.read()) self.device.Release() def test_hanging_claim_does_not_block_new_release(self): self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username]) self._polkitd_obj.SimulateHang(True) gdbus = self.gdbus_device_method_call_process('Claim', ['testuser']) self.addCleanup(gdbus.kill) self.wait_for_hanging_clients() with self.assertFprintError('ClaimDevice'): self.device.Release() self.assertIsNone(gdbus.poll()) def test_hanging_claim_does_not_block_list(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.enroll, FprintDevicePermission.verify]) self.device.Claim('(s)', '') self.enroll_image('whorl', finger='left-thumb') self.device.Release() self._polkitd_obj.SimulateHangActions([ FprintDevicePermission.set_username]) gdbus = self.start_hanging_gdbus_claim() self.assertEqual(self.device.ListEnrolledFingers('(s)', self.get_current_user()), ['left-thumb']) self.assertIsNone(gdbus.poll()) def test_hanging_claim_can_proceed_when_released(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.verify]) self._polkitd_obj.SimulateHangActions([ FprintDevicePermission.set_username]) gdbus = self.start_hanging_gdbus_claim() self._polkitd_obj.SimulateHangActions(['']) self.device.Claim('(s)', 'testuser') self.device.Release() self.assertIsNone(gdbus.poll()) self._polkitd_obj.ReleaseHangingCalls() gdbus.wait() self.assertEqual(gdbus.returncode, 0) def test_hanging_claim_does_not_block_empty_list(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.enroll, FprintDevicePermission.verify]) self._polkitd_obj.SimulateHangActions([ FprintDevicePermission.set_username]) gdbus = self.start_hanging_gdbus_claim() with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', self.get_current_user()) self.assertIsNone(gdbus.poll()) def test_hanging_claim_does_not_block_verification(self): self._polkitd_obj.SetAllowed([ FprintDevicePermission.set_username, FprintDevicePermission.enroll, FprintDevicePermission.verify]) self.device.Claim('(s)', '') self.enroll_image('whorl', finger='left-thumb') self.device.Release() self._polkitd_obj.SimulateHangActions([ FprintDevicePermission.set_username]) gdbus = self.start_hanging_gdbus_claim() self.device.Claim('(s)', '') self.device.VerifyStart('(s)', 'any') self.send_image('whorl') self.assertVerifyMatch(selected_finger='left-thumb') self.device.VerifyStop() self.device.Release() self.assertIsNone(gdbus.poll()) class FPrintdUtilsTest(FPrintdVirtualStorageDeviceBaseTest): @classmethod def setUpClass(cls): super().setUpClass() utils = { 'delete': None, 'enroll': None, 'list': None, 'verify': None, } for util in utils: util_bin = 'fprintd-{}'.format(util) if 'FPRINT_BUILD_DIR' in os.environ: print('Testing local build') build_dir = os.environ['FPRINT_BUILD_DIR'] path = os.path.join(build_dir, '../utils', util_bin) elif 'UNDER_JHBUILD' in os.environ: print('Testing JHBuild version') jhbuild_prefix = os.environ['JHBUILD_PREFIX'] path = os.path.join(jhbuild_prefix, 'bin', util_bin) else: # Assume it is in path utils[util] = util_bin continue assert os.path.exists(path), 'failed to find {} in {}'.format(util, path) utils[util] = path cls.utils = utils cls.utils_proc = {} def util_start(self, name, args=[]): env = os.environ.copy() env['G_DEBUG'] = 'fatal-criticals' env['STATE_DIRECTORY'] = self.state_dir env['RUNTIME_DIRECTORY'] = self.run_dir argv = [self.utils[name]] + args valgrind = os.getenv('VALGRIND') if valgrind is not None: argv.insert(0, 'valgrind') argv.insert(1, '--leak-check=full') if os.path.exists(valgrind): argv.insert(2, '--suppressions=%s' % valgrind) self.valgrind = True output = OutputChecker() self.utils_proc[name] = subprocess.Popen(argv, env=env, stdout=output.fd, stderr=subprocess.STDOUT) output.writer_attached() self.addCleanup(self.utils_proc[name].wait) self.addCleanup(self.utils_proc[name].terminate) self.addCleanup(output.assert_closed) return self.utils_proc[name], output def test_vanished_client_operation_is_cancelled(self): self.device.Claim('(s)', self.get_current_user()) self.enroll_image('whorl') self.device.Release() verify, output = self.util_start('verify') time.sleep(1) verify.terminate() self.assertLess(verify.wait(), 128) time.sleep(1) self.device.Claim('(s)', self.get_current_user()) self.device.Release() def test_delete_no_prints(self): delete, out = self.util_start('delete', [self.get_current_user()]) out.check_line('No fingerprints to delete on {}'.format( self.driver_name).encode('utf-8'), get_timeout()) self.assertEqual(delete.wait(), 0) def test_delete_already_claimed(self): self.device.Claim('(s)', self.get_current_user()) self.addCleanup(self.try_release) self.enroll_image('whorl') delete, out = self.util_start('delete', [self.get_current_user()]) out.check_line('{}.Error.AlreadyInUse'.format(FPRINT_NAMESPACE), get_timeout()) self.assertNotEqual(delete.wait(), 0) self.assertLess(delete.wait(), 128) def test_delete_error_claiming(self): self.device.Claim('(s)', self.get_current_user()) self.addCleanup(self.try_release) self.set_keep_alive(True) self.device.Release() self.send_error(FPrint.DeviceError.PROTO) delete, out = self.util_start('delete', [self.get_current_user()]) out.check_line('{}.Error.Internal'.format(FPRINT_NAMESPACE), get_timeout()) self.assertNotEqual(delete.wait(), 0) self.assertLess(delete.wait(), 128) def test_delete_error(self): self.device.Claim('(s)', self.get_current_user()) self.addCleanup(self.try_release) self.enroll_image('whorl') self.set_keep_alive(True) self.device.Release() self.send_command('IGNORED_COMMAND') # During claim self.send_error(FPrint.DeviceError.PROTO) # During delete delete, out = self.util_start('delete', [self.get_current_user()]) out.check_line('Failed to delete fingerprints', get_timeout()) self.assertNotEqual(delete.wait(), 0) self.assertLess(delete.wait(), 128) def test_delete_release_error(self): self.device.Claim('(s)', self.get_current_user()) self.addCleanup(self.try_release) self.set_keep_alive(True) self.device.Release() self.send_command('IGNORED_COMMAND') # During claim self.send_error(FPrint.DeviceError.PROTO) # During release delete, out = self.util_start('delete', [self.get_current_user()]) out.check_line('Release failed with error', get_timeout()) self.assertNotEqual(delete.wait(), 0) self.assertLess(delete.wait(), 128) def test_delete_single_finger(self): self.device.Claim('(s)', 'testuser') enrolled, enroll_map = self.enroll_multiple_images() self.addCleanup(self.try_release) self.device.Release() finger_name = enrolled[0] delete, out = self.util_start('delete', ['testuser', '-f', finger_name]) out.check_line('Using device {}'.format( self.device.get_object_path()), get_timeout()) out.check_line('Fingerprint {} of user {} deleted on {}'.format( finger_name, 'testuser', self.driver_name), get_timeout()) self.assertEqual(delete.wait(), 0) remaining = self.device.ListEnrolledFingers('(s)', 'testuser') self.assertNotIn(finger_name, remaining) self.assertCountEqual(enrolled[1:], remaining) def test_delete_multiple_users_single_finger(self): self.addCleanup(self.try_release) enroll_map, enrolled_prints_info = self.enroll_users_images() delete_args = [] for user, print_info in enroll_map.items(): for f in print_info: delete_args.append(user) delete_args.append('-f') delete_args.append(f) delete, out = self.util_start('delete', delete_args) out.check_line('Using device {}'.format( self.device.get_object_path()), get_timeout()) for user, print_info in enroll_map.items(): for f in print_info: out.check_line('Fingerprint {} of user {} deleted on {}'.format( f, user, self.driver_name), get_timeout()) self.assertEqual(delete.wait(), 0) with self.assertFprintError('NoEnrolledPrints'): self.device.ListEnrolledFingers('(s)', 'testuser') def test_enroll(self): self.device.Claim('(s)', self.get_current_user()) self.set_keep_alive(True) self.device.Release() # Open (no clear storage as list is supported) self.send_command('CONT') finger_name = self.get_finger_name(FPrint.Finger.LEFT_THUMB) enroll, out = self.util_start('enroll', [self.get_current_user(), '-f', finger_name]) out.check_line('Using device {}'.format( self.device.get_object_path()), get_timeout()) out.check_line('Enrolling {} finger.'.format(finger_name).encode('utf-8'), get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_retry(FPrint.DeviceRetry.TOO_SHORT) out.check_line('Enroll result: enroll-swipe-too-short', get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_retry(FPrint.DeviceRetry.CENTER_FINGER) out.check_line('Enroll result: enroll-finger-not-centered', get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-completed', get_timeout()) self.assertEqual(enroll.wait(), 0) def test_enroll_error(self): self.device.Claim('(s)', self.get_current_user()) self.set_keep_alive(True) self.device.Release() # Open (no clear storage as list is supported) self.send_command('CONT') finger_name = self.get_finger_name(FPrint.Finger.LEFT_MIDDLE) enroll, out = self.util_start('enroll', [self.get_current_user(), '-f', finger_name]) out.check_line('Using device {}'.format( self.device.get_object_path()), get_timeout()) out.check_line('Enrolling {} finger.'.format(finger_name).encode('utf-8'), get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_image('print-id') out.check_line('Enroll result: enroll-stage-passed', get_timeout()) self.send_retry(FPrint.DeviceRetry.TOO_SHORT) out.check_line('Enroll result: enroll-swipe-too-short', get_timeout()) self.send_error(FPrint.DeviceError.PROTO) out.check_line('Enroll result: enroll-disconnected', get_timeout()) self.assertNotEqual(enroll.wait(), 0) self.assertLess(enroll.wait(), 128) def test_enroll_error_invalid_finger(self): finger_name = 'eleventh-hand-finger' enroll, out = self.util_start('enroll', [self.get_current_user(), '-f', finger_name]) out.check_line('Invalid finger name \'{}\''.format(finger_name), get_timeout()) self.assertNotEqual(enroll.wait(), 0) self.assertLess(enroll.wait(), 128) def run_verify(self, finger, match, error=None): self.device.Claim('(s)', 'testuser') finger_name = self.get_finger_name(finger) override = {} if finger is FPrint.Finger.UNKNOWN else { finger_name: 'print-id', } enrolled, enroll_map = self.enroll_multiple_images( images_override=override) self.set_keep_alive(True) self.device.Release() verify, out = self.util_start( 'verify', ['-f', finger_name, 'testuser']) out.check_line('Using device {}'.format( self.device.get_object_path()), get_timeout()) out.check_line('Verify started!', get_timeout()) out.check_line('Verifying: {}'.format(finger_name), get_timeout()) if error: self.send_error(error) self.assertNotEqual(verify.wait(), 0) self.assertLess(verify.wait(), 128) out.check_line('Verify result: verify-disconnected (done)', get_timeout()) elif match: verify_finger = enrolled[0] if finger is FPrint.Finger.UNKNOWN else finger_name self.send_image(enroll_map[verify_finger]) out.check_line('Verify result: verify-match (done)', get_timeout()) self.assertEqual(verify.wait(), 0) else: if finger is FPrint.Finger.UNKNOWN: verify_image = 'another-print' else: enroll_map.pop(finger_name) verify_image = list(enroll_map.values())[0] self.send_image(verify_image) out.check_line('Verify result: verify-no-match (done)', get_timeout()) self.assertNotEqual(verify.wait(), 0) self.assertLess(verify.wait(), 128) def test_verify_match(self): self.run_verify(finger=FPrint.Finger.RIGHT_THUMB, match=True) def test_verify_no_match(self): self.run_verify(finger=FPrint.Finger.LEFT_MIDDLE, match=False) def test_verify_error(self): self.run_verify(finger=FPrint.Finger.RIGHT_THUMB, match=False, error=FPrint.DeviceError.PROTO) def test_verify_any_finger_match(self): self.run_verify(finger=FPrint.Finger.UNKNOWN, match=True) def test_verify_any_finger_no_match(self): self.run_verify(finger=FPrint.Finger.UNKNOWN, match=False) def test_verify_any_finger_error(self): self.run_verify(finger=FPrint.Finger.UNKNOWN, match=False, error=FPrint.DeviceError.PROTO) def list_tests(): import unittest_inspector return unittest_inspector.list_tests(sys.modules[__name__]) if __name__ == '__main__': if len(sys.argv) == 2 and sys.argv[1] == "list-tests": for machine, human in list_tests(): print("%s %s" % (machine, human), end="\n") sys.exit(0) prog = unittest.main(verbosity=2, exit=False) if prog.result.errors or prog.result.failures: sys.exit(1) # Translate to skip error if prog.result.testsRun == len(prog.result.skipped): sys.exit(77) sys.exit(0)