#!/usr/bin/python3 __author__ = 'Martin Pitt ' __copyright__ = '(C) 2012 Canonical Ltd.' __license__ = 'LGPL v2 or later' # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General # Public License along with this library; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, # Boston, MA 02110-1301, USA. import os import os.path import sys import unittest import subprocess import tempfile import tarfile import zipfile import time import shutil import fcntl import re import locale from glob import glob from gi.repository import GLib, Gio def find_alternative(cmds): '''Find command in cmds array and return the found alternative''' for cmd in cmds: if subprocess.call(['which', cmd], stdout=subprocess.PIPE) == 0: return cmd in_testbed = os.path.exists('/home/gvfs_sandbox_marker') samba_running = subprocess.call(['pidof', 'smbd'], stdout=subprocess.PIPE) == 0 httpd_cmd = find_alternative(['apache2', 'httpd', 'apachectl']) have_httpd = httpd_cmd is not None sshd_path = subprocess.check_output(['which', 'sshd'], universal_newlines=True).strip() local_ip = subprocess.check_output("ip -4 addr | sed -nr '/127\.0\.0/ n; " "/inet / { s/^.*inet ([0-9.]+).*$/\\1/; p; q }'", shell=True, universal_newlines=True) # when running in the build tree, check whether Dav backend is enabled have_dav_backend = True if 'GVFS_MOUNTABLE_DIR' in os.environ: have_dav_backend = os.path.exists(os.path.join(os.environ['GVFS_MOUNTABLE_DIR'], 'dav.localmount')) my_dir = os.path.dirname(os.path.abspath(__file__)) # we need this flag to check if we can test error messages locale.setlocale(locale.LC_ALL, '') lc = locale.getlocale(locale.LC_MESSAGES)[0] english_messages = not lc or lc.startswith('en_') # http://sg.danny.cz/sg/sdebug26.html PTYPE_DISK = 0 PTYPE_CDROM = 5 # local D-BUS daemon if we don't run under gvfs-testbed dbus_daemon = None class GvfsTestCase(unittest.TestCase): '''Gvfs tests base class. Provide some utility functions and a temporary work dir. ''' def setUp(self): self.workdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.workdir) def run(self, result=None): '''Show dbus daemon output on failed tests''' if result: orig_err_fail = len(result.errors) + len(result.failures) super().run(result) # always read the logs, so that we only get the ones relevant to this # particular test case if dbus_daemon: dbus_out = dbus_daemon.stdout.read() dbus_err = dbus_daemon.stderr.read() if result and len(result.errors) + len(result.failures) > orig_err_fail: print('\n----- dbus stdout -----\n%s\n----- dbus stderr -----\n%s\n' % (dbus_out and dbus_out.decode('UTF-8') or '', dbus_err and dbus_err.decode('UTF-8') or '')) def program_code_out_err(self, argv): '''Return (exitcode, stdout, stderr) from a program call.''' prog = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (out, err) = prog.communicate() return (prog.returncode, out, err) def program_out_err(self, argv): '''Return (stdout, stderr) from a program call.''' (code, out, err) = self.program_code_out_err(argv) self.assertEqual(code, 0, err) return (out, err) def program_out_success(self, argv): '''Return stdout from a successful program call.''' (out, err) = self.program_out_err(argv) self.assertEqual(err, '', err) return out @classmethod def root_command(klass, command): '''Run a shell command string as root. This only works when running under gvfs-testbed. Return (code, stdout, stderr). ''' assert in_testbed, 'root_command() only works under gvfs-testbed' rootsh = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # set reasonable path that includes /sbin rootsh.stdin.write(b'export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\n') (out, err) = rootsh.communicate(command.encode('UTF-8')) return (rootsh.returncode, out.decode('UTF-8'), err.decode('UTF-8')) @classmethod def root_command_success(klass, command): '''root_command() for commands that should succeed without errors.''' (code, out, err) = klass.root_command(command) if code != 0: raise SystemError('command "%s" failed with code %i:\n%s' % (command, code, err)) if err: raise SystemError('command "%s" produced error:\n%s' % (command, err)) def unmount(self, uri): self.program_out_success(['gvfs-mount', '-u', uri]) timeout = 5 while timeout > 0: (out, err) = self.program_out_err(['gvfs-mount', '-li']) if uri not in out: break timeout -= 1 time.sleep(1) else: self.fail('gvfs-mount -u %s failed' % uri) @classmethod def quote(klass, path): '''Quote a path for GIO URLs''' return path.replace('%', '%25').replace('/', '%2F').replace(':', '%3A') def wait_for_gvfs_mount_user_prompt(self, popen): '''Wait for a gvfs-mount Popen process to show an User auth prompt''' empty_timeout = 50 while True: r = popen.stdout.read(1000) #print(' wait_for_gvfs_mount_user_prompt: got "%s"' % str(r)) if r and (b'User' in r or b'Domain' in r): break self.assertGreater(empty_timeout, 0, 'timed out waiting for auth prompt') empty_timeout -= 1 time.sleep(0.1) def mount_api(self, gfile, mount_op=None): '''Mount a Gio.File using the Gio API This times out after 30 seconds. Return True on success or a GLib.GError object from the mount call. ''' self.cb_result = None def mount_done(obj, result, main_loop): ml.quit() try: success = obj.mount_enclosing_volume_finish(result) self.cb_result = (obj, success) except GLib.GError as e: self.cb_result = (obj, e) ml = GLib.MainLoop() gfile.mount_enclosing_volume(Gio.MountMountFlags.NONE, mount_op, None, mount_done, ml) # ensure we are timing out GLib.timeout_add_seconds(30, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.cb_result, None, 'operation timed out') self.assertEqual(self.cb_result[0], gfile) return self.cb_result[1] def unmount_api(self, gfile): '''Umount a mounted Gio.File using the Gio API This times out after 5 seconds. ''' self.cb_result = None def unmount_done(obj, result, main_loop): success = obj.unmount_with_operation_finish(result) self.cb_result = (obj, success) main_loop.quit() mount = gfile.find_enclosing_mount(None) self.assertNotEqual(mount, None) ml = GLib.MainLoop() mount.unmount_with_operation(Gio.MountUnmountFlags.NONE, None, None, unmount_done, ml) # ensure we are timing out GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.cb_result, None, 'operation timed out') self.assertEqual(self.cb_result[0], mount) self.assertTrue(self.cb_result[1]) def make_mountop(self, user, password): '''Create a Gio.MountOperation from given credentials On the first ask_password signal this sends the password, and aborts the second request (for tests that use wrong credentials). ''' def pwd_cb(op, message, default_user, default_domain, flags, data): # first call: send correct result if op.get_username(): op.reply(Gio.MountOperationResult.HANDLED) # subsequent calls: abort op.set_username('') op.reply(Gio.MountOperationResult.ABORTED) mo = Gio.MountOperation.new() mo.set_username(user) mo.set_password(password) mo.connect('ask_password', pwd_cb, None) return mo class Programs(GvfsTestCase): '''Test gvfs-* programs''' def test_gvfs_info_filesystem(self): '''gvfs-info --filesystem''' out = self.program_out_success(['gvfs-info', '-f', '/']) self.assertTrue('filesystem::size:' in out, out) self.assertTrue('filesystem::type:' in out, out) class ArchiveMounter(GvfsTestCase): def add_files(self, add_fn): '''Add test files to an archive''' p = os.path.join(self.workdir, 'hello.txt') with open(p, 'w') as f: f.write('hello\n') add_fn(p, 'hello.txt') p = os.path.join(self.workdir, 'bye.txt') with open(p, 'w') as f: f.write('bye\n') add_fn(p, 'stuff/bye.txt') def test_tar(self): '''archive:// for tar''' tar_path = os.path.join(self.workdir, 'stuff.tar') tf = tarfile.open(tar_path, 'w') self.add_files(tf.add) tf.close() self.do_test_for_archive(tar_path) def test_tar_gz(self): '''archive:// for tar.gz''' tar_path = os.path.join(self.workdir, 'stuff.tar.gz') tf = tarfile.open(tar_path, 'w:gz') self.add_files(tf.add) tf.close() self.do_test_for_archive(tar_path) def test_tar_bz2(self): '''archive:// for tar.bz2''' tar_path = os.path.join(self.workdir, 'stuff.tar.bz2') tf = tarfile.open(tar_path, 'w:bz2') self.add_files(tf.add) tf.close() self.do_test_for_archive(tar_path) def test_zip(self): '''archive:// for .zip''' zip_path = os.path.join(self.workdir, 'stuff.zip') zf = zipfile.ZipFile(zip_path, 'w') self.add_files(zf.write) zf.close() self.do_test_for_archive(zip_path) def test_iso_rr(self): '''archive:// for RockRidge .iso''' iso = os.path.join(self.workdir, 'bogus-cd.iso') with open(iso, 'wb') as f: subprocess.check_call(['bzip2', '-cd', os.path.join(my_dir, 'files', 'bogus-cd.iso.bz2')], stdout=f) self.do_test_for_archive(iso) def test_iso_joliet(self): '''archive:// for Joliet .iso''' iso = os.path.join(self.workdir, 'joliet.iso') with open(iso, 'wb') as f: subprocess.check_call(['bzip2', '-cd', os.path.join(my_dir, 'files', 'joliet.iso.bz2')], stdout=f) self.do_test_for_archive(iso) def do_test_for_archive(self, path): # mount it; yes, gvfs expects double quoting uri = 'archive://' + self.quote(self.quote('file://' + path)) subprocess.check_call(['gvfs-mount', uri]) # appears in gvfs-mount list (out, err) = self.program_out_err(['gvfs-mount', '-li']) try: self.assertTrue('Mount(0)' in out, out) self.assertTrue('%s -> %s' % (os.path.basename(path), uri) in out, out) # check gvfs-info out = self.program_out_success(['gvfs-info', uri]) self.assertTrue('standard::content-type: inode/directory' in out, out) self.assertTrue('access::can-read: TRUE' in out, out) # check gvfs-cat out = self.program_out_success(['gvfs-cat', uri + '/hello.txt']) self.assertEqual(out, 'hello\n') out = self.program_out_success(['gvfs-cat', uri + '/stuff/bye.txt']) self.assertEqual(out, 'bye\n') finally: self.unmount(uri) def test_api(self): '''archive:// with Gio API''' tar_path = os.path.join(self.workdir, 'stuff.tar') tf = tarfile.open(tar_path, 'w') tf.add(__file__, 'gvfs-test.py') tf.close() uri = 'archive://' + self.quote(self.quote('file://' + tar_path)) gfile = Gio.File.new_for_uri(uri) # not mounted yet, should fail self.assertRaises(GLib.GError, gfile.query_info, '*', 0, None) self.assertEqual(self.mount_api(gfile), True) try: info = gfile.query_info('*', 0, None) self.assertEqual(info.get_content_type(), 'inode/directory') self.assertEqual(info.get_file_type(), Gio.FileType.DIRECTORY) self.assertTrue('stuff.tar' in info.get_display_name(), info.get_display_name()) self.assertEqual(info.get_attribute_boolean('access::can-read'), True) finally: self.unmount_api(gfile) @unittest.skipUnless(os.getenv('XDG_RUNTIME_DIR'), 'No $XDG_RUNTIME_DIR available') class Sftp(GvfsTestCase): def setUp(self): '''Run ssh server''' super().setUp() self.assertTrue(os.path.exists(os.path.expanduser('~/.ssh/id_rsa')), 'This test needs an existing ~/.ssh/id_rsa') # find sftp-server for dir in ['/usr/local/lib/openssh', '/usr/lib/openssh', '/usr/local/libexec/openssh', '/usr/libexec/openssh', '/usr/lib/misc', '/usr/lib64/misc']: sftp_server = os.path.join(dir, 'sftp-server') if os.path.exists(sftp_server): break else: self.fail('Cannot locate OpenSSH sftp-server program, please update tests for your distribution') # look for authorized_keys in a temporary dir, to avoid having to mess # with the actual user files when not calling this through gvfs-testbed # (unfortunately ssh doesn't consider $HOME); NB we cannot use # self.workdir as ssh refuses files in /tmp. self.authorized_keys = os.path.join(os.environ['XDG_RUNTIME_DIR'], 'gvfs_test_authorized_keys') # generate sshd configuration; note that we must ensure that the # private key is not world-readable, so we need to copy it shutil.copy(os.path.join(my_dir, 'files', 'ssh_host_rsa_key'), self.workdir) os.chmod(os.path.join(self.workdir, 'ssh_host_rsa_key'), 0o600) shutil.copy(os.path.join(my_dir, 'files', 'ssh_host_rsa_key.pub'), self.workdir) self.sshd_config = os.path.join(self.workdir, 'sshd_config') with open(self.sshd_config, 'w') as f: f.write('''Port 22222 HostKey %(workdir)s/ssh_host_rsa_key UsePrivilegeSeparation no AuthorizedKeysFile %(authorized_keys)s UsePam no Subsystem sftp %(sftp_server)s ''' % {'workdir': self.workdir, 'sftp_server': sftp_server, 'authorized_keys': self.authorized_keys, }) self.sshd = subprocess.Popen([sshd_path, '-Dde', '-f', self.sshd_config], universal_newlines=True, stderr=subprocess.PIPE) def tearDown(self): if self.sshd.returncode is None: self.sshd.terminate() self.sshd.wait() super().tearDown() def run(self, result=None): '''Show sshd log output on failed tests''' if result: orig_err_fail = len(result.errors) + len(result.failures) super().run(result) if result and len(result.errors) + len(result.failures) > orig_err_fail and hasattr(self, 'sshd'): print('\n----- sshd log -----\n%s\n------\n' % self.sshd.stderr.read()) def test_rsa(self): '''sftp://localhost with RSA authentication''' # accept our key for localhost logins shutil.copy(os.path.expanduser('~/.ssh/id_rsa.pub'), self.authorized_keys) # mount it uri = 'sftp://localhost:22222' subprocess.check_call(['gvfs-mount', uri]) self.do_mount_check(uri) # if we are in the testbed, then ssh defaults to # "StrictHostKeyChecking ask", and a connection attempt should fail; # otherwise this is client-configurable behaviour which cannot be # temporarily overridden @unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') @unittest.skipUnless(local_ip, 'not having any non-localhost IP') def test_unknown_host(self): '''sftp:// with RSA authentication for unknown host''' # accept our key for localhost logins shutil.copy(os.path.expanduser('~/.ssh/id_rsa.pub'), self.authorized_keys) # try to mount it; should fail as it's an unknown host uri = 'sftp://%s:22222' % local_ip (code, out, err) = self.program_code_out_err(['gvfs-mount', uri]) self.assertNotEqual(code, 0) # there is nothing in our testbed which would show or answer the # dialog if english_messages: self.assertTrue('Login dialog cancelled' in err, err) def do_mount_check(self, uri): with open('stuff.txt', 'w') as f: f.write('moo!') # appears in gvfs-mount list (out, err) = self.program_out_err(['gvfs-mount', '-li']) try: self.assertRegex(out, 'Mount\(\d+\):.*localhost -> %s' % uri) # check gvfs-info out = self.program_out_success(['gvfs-info', uri]) self.assertRegex(out, 'display name: / .* localhost') self.assertTrue('type: directory' in out, out) self.assertTrue('access::can-read: TRUE' in out, out) # check gvfs-ls out = self.program_out_success(['gvfs-ls', uri + '/home']) self.assertTrue('%s\n' % os.environ['USER'] in out, out) # check gvfs-cat out = self.program_out_success(['gvfs-cat', uri + '/etc/passwd']) self.assertTrue('root:' in out, out) finally: self.unmount(uri) class Ftp(GvfsTestCase): def setUp(self): '''Launch FTP server''' super().setUp() with open(os.path.join(self.workdir, 'myfile.txt'), 'w') as f: f.write('hello world\n') os.mkdir(os.path.join(self.workdir, 'mydir')) secret_path = os.path.join(self.workdir, 'mydir', 'onlyme.txt') with open(secret_path, 'w') as f: f.write('secret\n') os.chmod(secret_path, 0o600) self.ftpd = subprocess.Popen(['twistd', '-n', 'ftp', '-p', '2121', '-r', self.workdir, '--auth', 'memory:testuser:pwd1'], stdout=subprocess.PIPE) # give ftp server some time to start up time.sleep(0.5) def tearDown(self): '''Shut down FTP server''' self.ftpd.terminate() self.ftpd.wait() super().tearDown() def test_anonymous_cli(self): '''ftp:// anonymous (CLI)''' uri = 'ftp://anonymous@localhost:2121' subprocess.check_call(['gvfs-mount', uri]) self.do_mount_check_cli(uri, True) def test_authenticated_cli(self): '''ftp:// authenticated (CLI)''' uri = 'ftp://localhost:2121' mount = subprocess.Popen(['gvfs-mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # wrong user name self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'eve\nh4ck\n') # wrong password name self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'testuser\nh4ck\n') # correct credentials self.wait_for_gvfs_mount_user_prompt(mount) (out, err) = mount.communicate(b'testuser\npwd1\n') self.assertEqual(mount.returncode, 0) self.assertEqual(err, b'') # in test bed, there is nothing interesting in /home/testuser/, and # without the test bed we do not know what's in the folder, so skip # gvfs-ls check self.do_mount_check_cli(uri, False) def do_mount_check_cli(self, uri, check_contents): # appears in gvfs-mount list (out, err) = self.program_out_err(['gvfs-mount', '-li']) try: self.assertRegex(out, 'Mount\(\d+\):.* -> ftp://([a-z0-9]+@)?localhost:2121') # check gvfs-info out = self.program_out_success(['gvfs-info', uri]) self.assertRegex(out, 'display name: / .* localhost', out) self.assertTrue('type: directory' in out, out) # check gvfs-ls if check_contents: out = self.program_out_success(['gvfs-ls', uri]) self.assertEqual(set(out.split()), set(['myfile.txt', 'mydir'])) out = self.program_out_success(['gvfs-ls', uri + '/mydir']) self.assertEqual(out, 'onlyme.txt\n') # check gvfs-cat out = self.program_out_success(['gvfs-cat', uri + '/myfile.txt']) self.assertEqual(out, 'hello world\n') finally: self.unmount(uri) def test_anonymous_api(self): '''ftp:// anonymous (API)''' uri = 'ftp://anonymous@localhost:2121' gfile = Gio.File.new_for_uri(uri) self.assertEqual(self.mount_api(gfile), True) try: self.do_mount_check_api(gfile, True) finally: self.unmount_api(gfile) def test_authenticated_api(self): '''ftp:// authenticated (API)''' uri = 'ftp://localhost:2121' gfile = Gio.File.new_for_uri(uri) # no password supplied res = self.mount_api(gfile) self.assertTrue(isinstance(res, GLib.GError), res) # wrong username res = self.mount_api(gfile, self.make_mountop('eve', 'h4ck')) self.assertTrue(isinstance(res, GLib.GError)) # wrong password res = self.mount_api(gfile, self.make_mountop('testuser', 'h4ck')) self.assertTrue(isinstance(res, GLib.GError)) # correct credentials res = self.mount_api(gfile, self.make_mountop('testuser', 'pwd1')) self.assertEqual(res, True) try: self.do_mount_check_api(gfile, False) finally: self.unmount_api(gfile) def do_mount_check_api(self, gfile, check_contents): info = gfile.query_info('*', 0, None) self.assertEqual(info.get_content_type(), 'inode/directory') self.assertEqual(info.get_file_type(), Gio.FileType.DIRECTORY) self.assertTrue('localhost' in info.get_display_name(), info.get_display_name()) #FIXME: this is actually supposed to be true! #self.assertEqual(info.get_attribute_boolean('access::can-read'), True) if check_contents: # check available files enum = gfile.enumerate_children('*', Gio.FileQueryInfoFlags.NONE, None) files = set() while True: info = enum.next_file(None) if info is None: break files.add(info.get_name()) self.assertEqual(files, set(['myfile.txt', 'mydir'])) gfile_myfile = Gio.File.new_for_uri(gfile.get_uri() + '/myfile.txt') (success, contents, etags) = gfile_myfile.load_contents(None) self.assertTrue(success) self.assertEqual(contents, b'hello world\n') class Smb(GvfsTestCase): def setUp(self): '''start local smbd as user if we are not in test bed''' super().setUp() # create a few test files if in_testbed: pubdir = os.path.expanduser('~/public') privdir = os.path.expanduser('~/private') else: pubdir = os.path.join(self.workdir, 'public') privdir = os.path.join(self.workdir, 'private') if not os.path.exists(pubdir): # only run this once os.mkdir(pubdir) os.makedirs(os.path.join(privdir, 'mydir')) with open(os.path.join(pubdir, 'myfile.txt'), 'w') as f: f.write('hello world\n') secret_path = os.path.join(privdir, 'mydir', 'onlyme.txt') with open(secret_path, 'w') as f: f.write('secret\n') os.chmod(secret_path, 0o600) if in_testbed: return # smbpasswd file with password "foo" smbpasswd = os.path.join(self.workdir, 'smbpasswd') with open(smbpasswd, 'w') as f: f.write(os.environ['USER']) f.write(':2:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX:AC8E657F83DF82BEEA5D43BDAF7800CC:[U ]:LCT-507C14C7:\n') # create local samba configuration smbdir = os.path.join(self.workdir, 'samba') os.mkdir(smbdir, 0o755) smbconf = os.path.join(self.workdir, 'smb.conf') with open(smbconf, 'w') as f: f.write('''[global] workgroup = TESTGROUP interfaces = lo 127.0.0.0/8 smb ports = 1445 log level = 2 map to guest = Bad User passdb backend = smbpasswd smb passwd file = %(workdir)s/smbpasswd lock directory = %(workdir)s/samba state directory = %(workdir)s/samba cache directory = %(workdir)s/samba pid directory = %(workdir)s/samba private dir = %(workdir)s/samba ncalrpc dir = %(workdir)s/samba [public] path = %(workdir)s/public guest ok = yes [private] path = %(workdir)s/private read only = no ''' % {'workdir': self.workdir}) # start smbd self.smbd = subprocess.Popen(['smbd', '-iFS', '-s', smbconf], universal_newlines=True, stdout=subprocess.PIPE) time.sleep(1) def tearDown(self): # stop smbd if hasattr(self, 'smbd') and self.smbd.returncode is None: self.smbd.terminate() self.smbd.wait() super().tearDown() def run(self, result=None): '''Show smbd log output on failed tests''' if result: orig_err_fail = len(result.errors) + len(result.failures) super().run(result) if hasattr(self, 'smbd'): if result and len(result.errors) + len(result.failures) > orig_err_fail: print('\n----- smbd log -----\n%s\n------\n' % self.smbd.stdout.read()) def test_anonymous(self): '''smb:// anonymous''' uri = 'smb://%s/public' % os.uname()[1] # ensure that this does not ask for any credentials mount = subprocess.Popen(['gvfs-mount', uri]) timeout = 50 while timeout > 0: time.sleep(0.1) timeout -= 1 if mount.poll() is not None: self.assertEqual(mount.returncode, 0, 'gvfs-mount %s failed' % uri) break else: mount.terminate() self.fail('timed out waiting for gvfs-mount %s' % uri) mount.wait() self.do_mount_check(uri, False) def test_authenticated(self): '''smb:// authenticated''' uri = 'smb://%s@%s/private' % (os.environ['USER'], os.uname()[1]) mount = subprocess.Popen(['gvfs-mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # correct credentials self.wait_for_gvfs_mount_user_prompt(mount) # default domain, password (out, err) = mount.communicate(b'\nfoo\n') self.assertEqual(mount.returncode, 0, err) #self.assertEqual(err, b'') # we get some warnings self.do_mount_check(uri, True) def do_mount_check(self, uri, writable): sharename = uri.split('/')[-1] # appears in gvfs-mount list (out, err) = self.program_out_err(['gvfs-mount', '-li']) try: self.assertRegex(out, 'Mount\(0\): %s .* smb://.*/%s' % (sharename, sharename)) # check gvfs-info out = self.program_out_success(['gvfs-info', uri]) self.assertTrue('display name: ' + sharename in out, out) self.assertTrue('type: directory' in out, out) # check gvfs-ls and gvfs-cat out = self.program_out_success(['gvfs-ls', uri]) if sharename == 'public': self.assertEqual(out, 'myfile.txt\n') out = self.program_out_success(['gvfs-cat', uri + '/myfile.txt']) self.assertEqual(out, 'hello world\n') else: self.assertEqual(out, 'mydir\n') self.assertEqual(self.program_out_success(['gvfs-ls', uri + '/mydir']), 'onlyme.txt\n') out = self.program_out_success(['gvfs-cat', uri + '/mydir/onlyme.txt']) self.assertEqual(out, 'secret\n') if writable: # should be writable self.program_out_success(['gvfs-copy', '/etc/passwd', uri + '/newfile.txt']) out = self.program_out_success(['gvfs-cat', uri + '/newfile.txt']) with open('/etc/passwd') as f: self.assertEqual(out, f.read()) else: # should not be writable (code, out, err) = self.program_code_out_err( ['gvfs-copy', '/etc/passwd', uri + '/newfile.txt']) self.assertNotEqual(code, 0) self.assertEqual(out, '') self.assertNotEqual(err, '') finally: self.unmount(uri) @unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') @unittest.skipIf(os.path.exists('/sys/module/scsi_debug'), 'scsi_debug is already loaded') class Drive(GvfsTestCase): @classmethod def setUpClass(klass): '''Load scsi_debug''' klass.root_command_success('modprobe scsi_debug add_host=0 dev_size_mb=64') @classmethod def tearDownClass(klass): # remove scsi_debug; might need a few tries while being busy timeout = 10 while timeout > 0: (code, out, err) = klass.root_command('rmmod -v scsi_debug') if code == 0: break if 'in use' in err: time.sleep(0.2) else: break if code != 0: raise SystemError('cannot rmmod scsi_debug: ' + err) @classmethod def get_devices(klass): '''Return current set of device names from scsi_debug''' devs = [] for dir in glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block'): try: devs += os.listdir(dir) except OSError: # TOCTOU, might change underneath us pass return set(devs) @classmethod def create_host(klass, ptype): '''Create a new SCSI host. Return device name. ''' orig_devs = klass.get_devices() klass.root_command_success('echo %i > /sys/bus/pseudo/drivers/scsi_debug/ptype' % ptype) klass.root_command_success('echo 1 > /sys/bus/pseudo/drivers/scsi_debug/add_host') timeout = 1000 while timeout >= 0: devs = klass.get_devices() if devs - orig_devs: break time.sleep(0.2) timeout -= 1 else: raise SystemError('timed out waiting for new device') new_devs = devs - orig_devs assert len(new_devs) == 1 return new_devs.pop() @classmethod def remove_device(klass, device): '''Remove given device name.''' klass.root_command_success('echo 1 > /sys/block/%s/device/delete' % device) def load_image(self, fname): '''Install a test image on the scsi_debug drive This must be a bzip2'ed file in test/files/. ''' # we cannot write to a scsi_debug CD drive, so write it into it in hard # disk mode dev = self.create_host(PTYPE_DISK) # put test.iso onto disk img = os.path.join(my_dir, 'files', fname) self.root_command_success('bzip2 -cd %s > /dev/%s; sync' % (img, dev)) # leave the actual device creation to the individual tests; all devices # created henceforth will default to the image contents self.remove_device(dev) while dev in self.get_devices(): time.sleep(0.2) # flush volume monitor output ctx = GLib.MainContext().default() while ctx.iteration(False): pass self.monitor.stdout.readall() def setUp(self): self.mock_polkit = None self.monitor = subprocess.Popen(['gvfs-mount', '-oi'], stdout=subprocess.PIPE) # set monitor stdout to non-blocking fl = fcntl.fcntl(self.monitor.stdout, fcntl.F_GETFL) fcntl.fcntl(self.monitor.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) # wait until monitor is ready while 'Monitoring events' not in self.get_monitor_output(): time.sleep(0.1) def tearDown(self): for dev in self.get_devices(): self.remove_device(dev) self.monitor.terminate() self.monitor.wait() self.stop_polkit() def test_cdrom(self): '''drive mount: cdrom''' self.load_image('bogus-cd.iso.bz2') dev = self.create_host(PTYPE_CDROM) # check that gvfs monitor picks up the new drive out = self.get_monitor_output() self.assertRegex(out, 'Drive connected:') self.assertRegex(out, '\[drive-optical\]') self.assertRegex(out, '\[media-optical-cd\]') self.assertRegex(out, 'unix-device:.*/dev/%s' % dev) self.assertRegex(out, 'has_media=1') self.assertRegex(out, 'Volume added:\s+.*bogus-cd') self.assertRegex(out, "label:\s+'bogus-cd") self.assertRegex(out, 'can_mount=1') self.assertRegex(out, 'should_automount=1') self.assertRegex(out, 'themed icons:.*media-optical') # user is not on any local session in the sandbox, so mounting ought to # fail (code, out, err) = self.program_code_out_err(['gvfs-mount', '-d', '/dev/' + dev]) self.assertNotEqual(code, 0) self.assertRegex(err, 'Not authorized') # tell polkit to do allow removable (but not internal) storage self.start_polkit(['org.freedesktop.udisks2.filesystem-mount']) # now mounting should succeed (out, err) = self.program_out_err(['gvfs-mount', '-d', '/dev/' + dev]) # should appear as Mount (out, err) = self.program_out_err(['gvfs-mount', '-li']) self.assertEqual(err.strip(), '') match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) self.assertTrue(match, 'no Mount found in gvfs-mount -li output:\n' + out) # unmount it again self.unmount(match.group(1)) def test_cdrom_api(self): '''drive mount: cdrom with Gio API''' self.load_image('bogus-cd.iso.bz2') self.start_polkit(['org.freedesktop.udisks2.filesystem-mount']) self.bogus_volume = None # add CD and wait for it to appear in the monitor def volume_added(vm, v, main_loop): if v.get_name() == 'bogus-cd': self.bogus_volume = v main_loop.quit() vm = Gio.VolumeMonitor.get() ml = GLib.MainLoop() vm.connect('volume-added', volume_added, ml) dev = self.create_host(PTYPE_CDROM) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.bogus_volume, None, 'timed out waiting for bogus-cd volume') ml.get_context().find_source_by_id(timeout_id).destroy() # check properties ids = self.bogus_volume.enumerate_identifiers() self.assertTrue('unix-device' in ids, ids) self.assertTrue('label' in ids, ids) self.assertEqual(self.bogus_volume.get_identifier('unix-device'), '/dev/' + dev) self.assertEqual(self.bogus_volume.get_identifier('label'), 'bogus-cd') self.assertEqual(self.bogus_volume.get_mount(), None) # mount it self.cb_result = None def mount_done(obj, result, main_loop): main_loop.quit() try: success = obj.mount_finish(result) self.cb_result = (obj, success) except GLib.GError as e: self.cb_result = (obj, e) self.bogus_volume.mount(Gio.MountMountFlags.NONE, None, None, mount_done, ml) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.cb_result, None, 'timed out waiting for bogus-cd mount') ml.get_context().find_source_by_id(timeout_id).destroy() self.assertEqual(self.cb_result[1], True) self.assertEqual(self.cb_result[0], self.bogus_volume) # get Mount object mount = self.bogus_volume.get_mount() self.assertNotEqual(mount, None) self.assertEqual(mount.get_name(), 'bogus-cd') p = mount.get_root().get_path() self.assertTrue(os.path.isdir(p), p) self.assertTrue(os.path.isfile(os.path.join(p, 'hello.txt'))) self.assertTrue('/media/' in p, p) self.assertEqual(mount.get_volume(), self.bogus_volume) # unmount self.cb_result = None def unmount_done(obj, result, main_loop): main_loop.quit() try: success = obj.unmount_with_operation_finish(result) self.cb_result = (obj, success) except GLib.GError as e: self.cb_result = (obj, e) mount.unmount_with_operation(Gio.MountUnmountFlags.NONE, None, None, unmount_done, ml) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertEqual(self.cb_result[1], True) self.assertEqual(self.bogus_volume.get_mount(), None) def test_system_partition(self): '''drive mount: system partition''' self.load_image('vfat.img.bz2') dev = self.create_host(PTYPE_DISK) # check that gvfs monitor picks up the new drive out = self.get_monitor_output() self.assertRegex(out, 'Drive connected:') self.assertRegex(out, '\[drive-harddisk\]') self.assertRegex(out, 'unix-device:.*/dev/%s' % dev) self.assertRegex(out, 'has_media=1') self.assertRegex(out, 'Volume added:\s+.*testvfat') self.assertRegex(out, "label:\s+'testvfat") self.assertRegex(out, 'should_automount=0') self.assertRegex(out, 'themed icons:.*harddisk') # should fail with only allowing the user to mount removable storage self.start_polkit(['org.freedesktop.udisks2.filesystem-mount']) (code, out, err) = self.program_code_out_err(['gvfs-mount', '-d', '/dev/' + dev]) self.assertNotEqual(code, 0) self.assertRegex(err, 'Not authorized') # should succeed with allowing the user to mount system storage self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) (out, err) = self.program_out_err(['gvfs-mount', '-d', '/dev/' + dev]) # should appear as Mount (out, err) = self.program_out_err(['gvfs-mount', '-li']) self.assertEqual(err.strip(), '') match = re.search('Mount\(\d+\): testvfat -> (file://.*/media/.*/testvfat)', out) self.assertTrue(match, 'no Mount found in gvfs-mount -li output:\n' + out) # unmount it again self.unmount(match.group(1)) def test_system_partition_api(self): '''drive mount: system partition with Gio API''' self.load_image('vfat.img.bz2') self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) self.volume = None # add partition and wait for it to appear in the monitor def volume_added(vm, v, main_loop): if v.get_name() == 'testvfat': self.volume = v main_loop.quit() vm = Gio.VolumeMonitor.get() ml = GLib.MainLoop() vm.connect('volume-added', volume_added, ml) dev = self.create_host(PTYPE_DISK) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.volume, None, 'timed out waiting for volume') ml.get_context().find_source_by_id(timeout_id).destroy() # check properties ids = self.volume.enumerate_identifiers() self.assertTrue('unix-device' in ids, ids) self.assertTrue('label' in ids, ids) self.assertTrue('uuid' in ids, ids) self.assertEqual(self.volume.get_identifier('unix-device'), '/dev/' + dev) self.assertEqual(self.volume.get_identifier('label'), 'testvfat') self.assertEqual(self.volume.get_identifier('uuid'), 'F3C1-6301') def test_media_player(self): '''drive mount: media player''' self.load_image('bogus-cd.iso.bz2') def cleanup(): rootsh = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE) rootsh.communicate(b'''rm /run/udev/rules.d/40-scsi_debug-fake-mediaplayer.rules pkill --signal HUP udevd || pkill --signal HUP systemd-udevd ''') # create udev rule to turn it into a music player self.addCleanup(cleanup) rootsh = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE) rootsh.communicate(b'''export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin echo 'SUBSYSTEM=="block", ATTRS{model}=="scsi_debug*", ENV{ID_MEDIA_PLAYER}="MockTune"' > /run/udev/rules.d/40-scsi_debug-fake-mediaplayer.rules sync pkill --signal HUP udevd || pkill --signal HUP systemd-udevd ''') dev = self.create_host(PTYPE_DISK) # check that gvfs monitor picks up the new volume out = self.get_monitor_output() self.assertRegex(out, 'Volume added:\s+.*bogus-cd') self.assertRegex(out, "label:\s+'bogus-cd") self.assertTrue('should_automount=0' in out, out) self.assertRegex(out, 'themed icons:.*harddisk') # mount it self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) (out, err) = self.program_out_err(['gvfs-mount', '-d', '/dev/' + dev]) # should appear as Mount (out, err) = self.program_out_err(['gvfs-mount', '-li']) self.assertEqual(err.strip(), '') match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) self.assertTrue(match, 'no Mount found in gvfs-mount -li output:\n' + out) # should have media player content self.assertRegex(out, 'x_content_types:.*x-content/audio-player') # unmount it again self.unmount(match.group(1)) def get_monitor_output(self): '''Wait for gvfs monitor to output something, and return it''' empty_timeout = 50 while True: out = self.monitor.stdout.readall() if out: break else: empty_timeout -= 1 self.assertGreater(empty_timeout, 0, 'timed out waiting for monitor output') time.sleep(0.1) # wait a bit more to see whether we catch some stragglers time.sleep(0.2) out2 = self.monitor.stdout.readall() if out2: out += out2 return out.decode() def start_polkit(self, actions): '''Start mock polkit with list of allowed actions.''' self.stop_polkit() self.mock_polkit = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE) self.mock_polkit.stdin.write(('set -e\n/home/test_polkitd.py -r -a %s\n' % ','.join(actions)).encode('ASCII')) # wait until it started up if actions: timeout = 50 while timeout > 0: try: out = subprocess.check_output(['pkcheck', '--action-id', actions[0], '--process', '1'], stderr=subprocess.PIPE) if b'test=test' in out: break except subprocess.CalledProcessError: pass time.sleep(0.1) timeout -= 1 else: self.fail('timed out waiting for test_polkitd.py') else: # we can only cross fingers here, as we do not have an action to verify time.sleep(0.5) self.assertEqual(self.mock_polkit.poll(), None, 'mock polkitd unexpectedly terminated') def stop_polkit(self): '''Stop mock polkit, if it is running.''' if self.mock_polkit: # for some reason, terminating the shell doesn't terminate the # polkitd running in it, so kill that separately self.root_command('kill `pidof -x /home/test_polkitd.py`') self.mock_polkit.terminate() self.mock_polkit.wait() self.mock_polkit = None @unittest.skipUnless(have_httpd, 'Apache httpd not installed') @unittest.skipUnless(have_dav_backend, 'Dav backend not enabled') class Dav(GvfsTestCase): '''Test WebDAV backend''' @classmethod def setUpClass(klass): '''Set up Apache httpd sandbox''' klass.mod_dir = klass.get_httpd_module_dir() klass.httpd_sandbox = tempfile.mkdtemp() klass.public_dir = os.path.join(klass.httpd_sandbox, 'public') os.mkdir(klass.public_dir) with open(os.path.join(klass.public_dir, 'hello.txt'), 'w') as f: f.write('hi\n') klass.secret_dir = os.path.join(klass.httpd_sandbox, 'secret') os.mkdir(klass.secret_dir) with open(os.path.join(klass.secret_dir, 'restricted.txt'), 'w') as f: f.write('dont tell anyone\n') # test:s3kr1t with open(os.path.join(klass.httpd_sandbox, 'htpasswd'), 'w') as f: f.write('test:$apr1$t0B4mfkT$Tr8ip333/ZR/7xrRBuxjI.\n') # some distros have some extra modules which we need to load modules = '' for m in ['authn_core', 'authz_core', 'authz_user', 'auth_basic', 'authn_file', 'mpm_prefork', 'unixd', 'dav', 'dav_fs', 'ssl']: if os.path.exists(os.path.join(klass.mod_dir, 'mod_%s.so' % m)): modules += 'LoadModule %s_module %s/mod_%s.so\n' % (m, klass.mod_dir, m) with open(os.path.join(klass.httpd_sandbox, 'apache2.conf'), 'w') as f: f.write('''Listen localhost:8088 Listen localhost:4443 %(modules)s DocumentRoot . ServerName localhost PidFile apache.pid LogLevel debug ErrorLog error_log DAVLockDB DAVLock SSLEngine on SSLCertificateFile %(mydir)s/files/testcert.pem SSLCertificateKeyFile %(mydir)s/files/testcert.pem Dav On Dav On AuthType Basic AuthName DAV AuthUserFile htpasswd Require valid-user ''' % {'mod_dir': klass.mod_dir, 'root': klass.httpd_sandbox, 'modules': modules, 'mydir': my_dir}) # start server try: subprocess.check_call([httpd_cmd, '-d', klass.httpd_sandbox, '-f', 'apache2.conf', '-k', 'start']) except subprocess.CalledProcessError as e: error_log = os.path.join(klass.httpd_sandbox, 'error_log') if os.path.exists(error_log): with open(error_log) as f: print('---- apache http error log ----\n%s\n---------\n' % f.read()) raise @classmethod def tearDownClass(klass): '''Stop httpd server and remove sandbox''' subprocess.call([httpd_cmd, '-d', klass.httpd_sandbox, '-f', 'apache2.conf', '-k', 'stop']) shutil.rmtree(klass.httpd_sandbox) @classmethod def get_httpd_module_dir(klass): '''Return module directory for Apache httpd. Unfortunately this is highly distro/platform specific, so try to determine it from apxs2 or apachectl/apache2. ''' # if we have apxs2 installed, use this try: apxs2 = subprocess.Popen(['apxs2', '-q', 'LIBEXECDIR'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out = apxs2.communicate()[0].strip() assert apxs2.returncode == 0, 'apxs2 -V failed' return out except OSError: # Look for apxs instead try: apxs2 = subprocess.Popen(['apxs', '-q', 'LIBEXECDIR'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out = apxs2.communicate()[0].strip() assert apxs2.returncode == 0, 'apxs2 -V failed' return out except OSError: print('[no apxs2, falling back]') pass # fall back to looking for modules in HTTPD_ROOT/modules/ ctl = subprocess.Popen([httpd_cmd, '-V'], stdout=subprocess.PIPE, universal_newlines=True) out = ctl.communicate()[0] assert ctl.returncode == 0, httpd_cmd + ' -V failed' m = re.search('\sHTTPD_ROOT="([^"]+)"\s', out) assert m, httpd_cmd + ' -V does not show HTTPD_ROOT' mod_dir = os.path.join(m.group(1), 'modules') assert os.path.isdir(mod_dir), \ '%s does not exist, cannot determine httpd module path' % mod_dir return mod_dir def test_http_noauth(self): '''dav://localhost without credentials''' uri = 'dav://localhost:8088/public' subprocess.check_call(['gvfs-mount', uri]) self.do_mount_check(uri, 'hello.txt', 'hi\n') def test_https_noauth(self): '''davs://localhost without credentials''' uri = 'davs://localhost:4443/public' subprocess.check_call(['gvfs-mount', uri]) self.do_mount_check(uri, 'hello.txt', 'hi\n') def test_http_auth(self): '''dav://localhost with credentials''' uri = 'dav://localhost:8088/secret' mount = subprocess.Popen(['gvfs-mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # wrong password self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'test\nh4ck\n') # correct password (out, err) = mount.communicate(b's3kr1t\n') self.assertEqual(mount.returncode, 0) self.assertEqual(err, b'') self.do_mount_check(uri, 'restricted.txt', 'dont tell anyone\n') def test_https_auth(self): '''davs://localhost with credentials''' uri = 'davs://localhost:4443/secret' mount = subprocess.Popen(['gvfs-mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # wrong password self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'test\nh4ck\n') # correct password (out, err) = mount.communicate(b's3kr1t\n') self.assertEqual(mount.returncode, 0) self.assertEqual(err, b'') self.do_mount_check(uri, 'restricted.txt', 'dont tell anyone\n') def do_mount_check(self, uri, testfile, content): # appears in gvfs-mount list (out, err) = self.program_out_err(['gvfs-mount', '-li']) try: self.assertRegex(out, 'Mount\(\d+\):.* -> davs?://([a-z0-9]+@)?localhost') # check gvfs-info out = self.program_out_success(['gvfs-info', uri]) self.assertRegex(out, 'id::filesystem: dav') self.assertTrue('type: directory' in out, out) # check gvfs-ls out = self.program_out_success(['gvfs-ls', uri]) self.assertEqual(out.strip(), testfile) # check gvfs-cat out = self.program_out_success(['gvfs-cat', uri + '/' + testfile]) self.assertEqual(out, content) # create a new file self.program_out_success(['gvfs-copy', uri + '/' + testfile, uri + '/foo']) out = self.program_out_success(['gvfs-cat', uri + '/foo']) self.assertEqual(out, content) # remove it again self.program_out_success(['gvfs-rm', uri + '/foo']) out = self.program_out_success(['gvfs-ls', uri]) self.assertFalse('foo' in out.split(), out) finally: self.unmount(uri) class Trash(GvfsTestCase): def setUp(self): super().setUp() self.gfile_trash = Gio.File.new_for_uri('trash://') # double-check that we are really running with a temporary # $XDG_DATA_HOME and that gvfs respects it, or under gvfs-testbed self.assertEqual(self.files_in_trash(), set()) self.my_file = None def tearDown(self): if self.my_file: if os.path.exists(self.my_file): os.unlink(self.my_file) # clean up the trash, for predictable test cases for f in self.files_in_trash(): #print('cleaning up trash:///' + f) subprocess.call(['gvfs-rm', 'trash:///' + f]) super().tearDown() def files_in_trash(self): files = set() enum = self.gfile_trash.enumerate_children('*', Gio.FileQueryInfoFlags.NONE, None) while True: info = enum.next_file(None) if info is None: break files.add(info.get_name()) return files def test_file_in_home_cli(self): '''trash:// deletion, attributes, restoring for a file in $HOME (CLI)''' # create test file self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') # trash it del_time = time.time() subprocess.check_call(['gvfs-trash', self.my_file]) # should now be gone self.assertFalse(os.path.exists(self.my_file)) # and be in the trash self.assertEqual(self.files_in_trash(), set(['hello_gvfs_tests.txt'])) out = self.program_out_success(['gvfs-info', 'trash:///hello_gvfs_tests.txt']) # has proper original path self.assertTrue('trash::orig-path: ' + self.my_file in out, out) # has proper deletion time m = re.search('trash::deletion-date: (.*)\n', out) self.assertNotEqual(m, None) recorded_time = time.mktime(time.strptime(m.group(1), '%Y-%m-%dT%H:%M:%S')) self.assertLess(abs(recorded_time - del_time), 2.0) # is saved in home trash, not by-device trash data_home = os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) self.assertTrue('standard::target-uri: file://' + data_home in out, out) def test_file_in_home_api(self): '''trash:// deletion, attributes, restoring for a file in $HOME (API)''' # create test file self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') gfile = Gio.File.new_for_path(self.my_file) self.assertTrue(gfile.trash(None)) # should now be gone self.assertFalse(os.path.exists(self.my_file)) # and be in the trash self.assertEqual(self.files_in_trash(), set(['hello_gvfs_tests.txt'])) def test_deletion_with_same_path(self): '''trash:// deletion of two files with the same path''' # create test file self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') gfile = Gio.File.new_for_path(self.my_file) self.assertTrue(gfile.trash(None)) self.assertFalse(os.path.exists(self.my_file)) # and re-create/re-trash it again self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('bye bye\n') gfile = Gio.File.new_for_path(self.my_file) self.assertTrue(gfile.trash(None)) self.assertFalse(os.path.exists(self.my_file)) # should have two trash entries now with tame original path enum = self.gfile_trash.enumerate_children('*', Gio.FileQueryInfoFlags.NONE, None) count = 0 while True: info = enum.next_file(None) if info is None: break count += 1 self.assertEqual(info.get_attribute_byte_string('trash::orig-path'), self.my_file) self.assertEqual(count, 2) def test_file_in_system(self): '''trash:// deletion for system location This either should work if /tmp/ is a partition on its own writable to the user (such as a tmpfs), or fail gracefully without deleting the file. ''' # create test file self.my_file = os.path.join(self.workdir, 'hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') # try to trash it trash = subprocess.Popen(['gvfs-trash', self.my_file], stderr=subprocess.PIPE) trash.communicate() if trash.returncode == 0: self.assertFalse(os.path.exists(self.my_file)) self.assertTrue(os.path.exists('/tmp/.Trash-%i/files/hello_gvfs_tests.txt' % os.getuid())) else: # file should still be there self.assertTrue(os.path.exists(self.my_file)) def start_dbus(): '''Run a local D-BUS daemon under temporary XDG directories Return temporary XDG home directory. ''' global dbus_daemon # use temporary config/data/runtime directories; NB that these need to be # in g_get_home_dir(), otherwise you can't trash files as this doesn't work # across fs boundaries # if/once https://bugzilla.gnome.org/show_bug.cgi?id=142568 gets fixed, we # can put it into a proper temp dir again temp_home = tempfile.mkdtemp(prefix='gvfs_test', dir=GLib.get_home_dir()) os.environ['XDG_CONFIG_HOME'] = os.path.join(temp_home, 'config') os.environ['XDG_DATA_HOME'] = os.path.join(temp_home, 'data') # run local D-BUS if os.path.exists('session.conf'): dbus_conf = 'session.conf' else: dbus_conf = os.path.join(os.path.dirname(__file__), 'session.conf') env = os.environ.copy() env['G_MESSAGES_DEBUG'] = 'all' env['GVFS_DEBUG'] = 'all' env['GVFS_SMB_DEBUG'] = '6' env['GVFS_HTTP_DEBUG'] = 'all' env['LIBSMB_PROG'] = "nc localhost 1445" dbus_daemon = subprocess.Popen( ['dbus-daemon', '--config-file', dbus_conf, '--print-address=1'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) addr = dbus_daemon.stdout.readline().decode() os.environ['DBUS_SESSION_BUS_ADDRESS'] = addr # set dbus output to nonblocking flags = fcntl.fcntl(dbus_daemon.stdout, fcntl.F_GETFL) fcntl.fcntl(dbus_daemon.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) flags = fcntl.fcntl(dbus_daemon.stderr, fcntl.F_GETFL) fcntl.fcntl(dbus_daemon.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) return temp_home if __name__ == '__main__': # do not break tests due to translations try: del os.environ['LANGUAGE'] except KeyError: pass os.environ['LC_ALL'] = 'C' if not in_testbed: temp_home = start_dbus() try: unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) finally: if not in_testbed: dbus_daemon.terminate() dbus_daemon.wait() shutil.rmtree(temp_home)