import contextlib
import os
import py_compile
import shutil
import sys
import tempfile
import threading
import time
import types
import weakref
import zipfile
from importlib import import_module
from pathlib import Path
from subprocess import CompletedProcess
from unittest import mock, skip, skipIf

from django.apps.registry import Apps
from django.test import SimpleTestCase
from django.test.utils import extend_sys_path
from django.utils import autoreload
from django.utils.autoreload import WatchmanUnavailable

from .utils import on_macos_with_hfs


class TestIterModulesAndFiles(SimpleTestCase):
    def import_and_cleanup(self, name):
        import_module(name)
        self.addCleanup(lambda: sys.path_importer_cache.clear())
        self.addCleanup(lambda: sys.modules.pop(name, None))

    def clear_autoreload_caches(self):
        autoreload.iter_modules_and_files.cache_clear()

    def assertFileFound(self, filename):
        # Some temp directories are symlinks. Python resolves these fully while
        # importing.
        resolved_filename = filename.resolve()
        self.clear_autoreload_caches()
        # Test uncached access
        self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
        # Test cached access
        self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
        self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)

    def assertFileNotFound(self, filename):
        resolved_filename = filename.resolve()
        self.clear_autoreload_caches()
        # Test uncached access
        self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
        # Test cached access
        self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
        self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)

    def temporary_file(self, filename):
        dirname = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, dirname)
        return Path(dirname) / filename

    def test_paths_are_pathlib_instances(self):
        for filename in autoreload.iter_all_python_module_files():
            self.assertIsInstance(filename, Path)

    def test_file_added(self):
        """
        When a file is added, it's returned by iter_all_python_module_files().
        """
        filename = self.temporary_file('test_deleted_removed_module.py')
        filename.touch()

        with extend_sys_path(str(filename.parent)):
            self.import_and_cleanup('test_deleted_removed_module')

        self.assertFileFound(filename.absolute())

    def test_check_errors(self):
        """
        When a file containing an error is imported in a function wrapped by
        check_errors(), gen_filenames() returns it.
        """
        filename = self.temporary_file('test_syntax_error.py')
        filename.write_text("Ceci n'est pas du Python.")

        with extend_sys_path(str(filename.parent)):
            with self.assertRaises(SyntaxError):
                autoreload.check_errors(import_module)('test_syntax_error')
        self.assertFileFound(filename)

    def test_check_errors_catches_all_exceptions(self):
        """
        Since Python may raise arbitrary exceptions when importing code,
        check_errors() must catch Exception, not just some subclasses.
        """
        filename = self.temporary_file('test_exception.py')
        filename.write_text('raise Exception')
        with extend_sys_path(str(filename.parent)):
            with self.assertRaises(Exception):
                autoreload.check_errors(import_module)('test_exception')
        self.assertFileFound(filename)

    def test_zip_reload(self):
        """
        Modules imported from zipped files have their archive location included
        in the result.
        """
        zip_file = self.temporary_file('zip_import.zip')
        with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf:
            zipf.writestr('test_zipped_file.py', '')

        with extend_sys_path(str(zip_file)):
            self.import_and_cleanup('test_zipped_file')
        self.assertFileFound(zip_file)

    def test_bytecode_conversion_to_source(self):
        """.pyc and .pyo files are included in the files list."""
        filename = self.temporary_file('test_compiled.py')
        filename.touch()
        compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc'))))
        filename.unlink()
        with extend_sys_path(str(compiled_file.parent)):
            self.import_and_cleanup('test_compiled')
        self.assertFileFound(compiled_file)

    def test_weakref_in_sys_module(self):
        """iter_all_python_module_file() ignores weakref modules."""
        time_proxy = weakref.proxy(time)
        sys.modules['time_proxy'] = time_proxy
        self.addCleanup(lambda: sys.modules.pop('time_proxy', None))
        list(autoreload.iter_all_python_module_files())  # No crash.

    def test_module_without_spec(self):
        module = types.ModuleType('test_module')
        del module.__spec__
        self.assertEqual(autoreload.iter_modules_and_files((module,), frozenset()), frozenset())

    def test_main_module_is_resolved(self):
        main_module = sys.modules['__main__']
        self.assertFileFound(Path(main_module.__file__))

    def test_main_module_without_file_is_not_resolved(self):
        fake_main = types.ModuleType('__main__')
        self.assertEqual(autoreload.iter_modules_and_files((fake_main,), frozenset()), frozenset())

    def test_path_with_embedded_null_bytes(self):
        for path in (
            'embedded_null_byte\x00.py',
            'di\x00rectory/embedded_null_byte.py',
        ):
            with self.subTest(path=path):
                self.assertEqual(
                    autoreload.iter_modules_and_files((), frozenset([path])),
                    frozenset(),
                )


class TestCommonRoots(SimpleTestCase):
    def test_common_roots(self):
        paths = (
            Path('/first/second'),
            Path('/first/second/third'),
            Path('/first/'),
            Path('/root/first/'),
        )
        results = autoreload.common_roots(paths)
        self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')])


class TestSysPathDirectories(SimpleTestCase):
    def setUp(self):
        self._directory = tempfile.TemporaryDirectory()
        self.directory = Path(self._directory.name).resolve().absolute()
        self.file = self.directory / 'test'
        self.file.touch()

    def tearDown(self):
        self._directory.cleanup()

    def test_sys_paths_with_directories(self):
        with extend_sys_path(str(self.file)):
            paths = list(autoreload.sys_path_directories())
        self.assertIn(self.file.parent, paths)

    def test_sys_paths_non_existing(self):
        nonexistent_file = Path(self.directory.name) / 'does_not_exist'
        with extend_sys_path(str(nonexistent_file)):
            paths = list(autoreload.sys_path_directories())
        self.assertNotIn(nonexistent_file, paths)
        self.assertNotIn(nonexistent_file.parent, paths)

    def test_sys_paths_absolute(self):
        paths = list(autoreload.sys_path_directories())
        self.assertTrue(all(p.is_absolute() for p in paths))

    def test_sys_paths_directories(self):
        with extend_sys_path(str(self.directory)):
            paths = list(autoreload.sys_path_directories())
        self.assertIn(self.directory, paths)


class GetReloaderTests(SimpleTestCase):
    @mock.patch('django.utils.autoreload.WatchmanReloader')
    def test_watchman_unavailable(self, mocked_watchman):
        mocked_watchman.check_availability.side_effect = WatchmanUnavailable
        self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)

    @mock.patch.object(autoreload.WatchmanReloader, 'check_availability')
    def test_watchman_available(self, mocked_available):
        # If WatchmanUnavailable isn't raised, Watchman will be chosen.
        mocked_available.return_value = None
        result = autoreload.get_reloader()
        self.assertIsInstance(result, autoreload.WatchmanReloader)


class RunWithReloaderTests(SimpleTestCase):
    @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
    @mock.patch('django.utils.autoreload.get_reloader')
    def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
        mocked_get_reloader.side_effect = KeyboardInterrupt()
        autoreload.run_with_reloader(lambda: None)  # No exception

    @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'})
    @mock.patch('django.utils.autoreload.restart_with_reloader')
    def test_calls_sys_exit(self, mocked_restart_reloader):
        mocked_restart_reloader.return_value = 1
        with self.assertRaises(SystemExit) as exc:
            autoreload.run_with_reloader(lambda: None)
        self.assertEqual(exc.exception.code, 1)

    @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
    @mock.patch('django.utils.autoreload.start_django')
    @mock.patch('django.utils.autoreload.get_reloader')
    def test_calls_start_django(self, mocked_reloader, mocked_start_django):
        mocked_reloader.return_value = mock.sentinel.RELOADER
        autoreload.run_with_reloader(mock.sentinel.METHOD)
        self.assertEqual(mocked_start_django.call_count, 1)
        self.assertSequenceEqual(
            mocked_start_django.call_args[0],
            [mock.sentinel.RELOADER, mock.sentinel.METHOD]
        )


class StartDjangoTests(SimpleTestCase):
    @mock.patch('django.utils.autoreload.StatReloader')
    def test_watchman_becomes_unavailable(self, mocked_stat):
        mocked_stat.should_stop.return_value = True
        fake_reloader = mock.MagicMock()
        fake_reloader.should_stop = False
        fake_reloader.run.side_effect = autoreload.WatchmanUnavailable()

        autoreload.start_django(fake_reloader, lambda: None)
        self.assertEqual(mocked_stat.call_count, 1)

    @mock.patch('django.utils.autoreload.ensure_echo_on')
    def test_echo_on_called(self, mocked_echo):
        fake_reloader = mock.MagicMock()
        autoreload.start_django(fake_reloader, lambda: None)
        self.assertEqual(mocked_echo.call_count, 1)

    @mock.patch('django.utils.autoreload.check_errors')
    def test_check_errors_called(self, mocked_check_errors):
        fake_method = mock.MagicMock(return_value=None)
        fake_reloader = mock.MagicMock()
        autoreload.start_django(fake_reloader, fake_method)
        self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])

    @mock.patch('threading.Thread')
    @mock.patch('django.utils.autoreload.check_errors')
    def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
        fake_reloader = mock.MagicMock()
        fake_main_func = mock.MagicMock()
        fake_thread = mock.MagicMock()
        mocked_check_errors.return_value = fake_main_func
        mocked_thread.return_value = fake_thread
        autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
        self.assertEqual(mocked_thread.call_count, 1)
        self.assertEqual(
            mocked_thread.call_args[1],
            {'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}, 'name': 'django-main-thread'}
        )
        self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True])
        self.assertTrue(fake_thread.start.called)


class TestCheckErrors(SimpleTestCase):
    def test_mutates_error_files(self):
        fake_method = mock.MagicMock(side_effect=RuntimeError())
        wrapped = autoreload.check_errors(fake_method)
        with mock.patch.object(autoreload, '_error_files') as mocked_error_files:
            with self.assertRaises(RuntimeError):
                wrapped()
        self.assertEqual(mocked_error_files.append.call_count, 1)


class TestRaiseLastException(SimpleTestCase):
    @mock.patch('django.utils.autoreload._exception', None)
    def test_no_exception(self):
        # Should raise no exception if _exception is None
        autoreload.raise_last_exception()

    def test_raises_exception(self):
        class MyException(Exception):
            pass

        # Create an exception
        try:
            raise MyException('Test Message')
        except MyException:
            exc_info = sys.exc_info()

        with mock.patch('django.utils.autoreload._exception', exc_info):
            with self.assertRaisesMessage(MyException, 'Test Message'):
                autoreload.raise_last_exception()

    def test_raises_custom_exception(self):
        class MyException(Exception):
            def __init__(self, msg, extra_context):
                super().__init__(msg)
                self.extra_context = extra_context
        # Create an exception.
        try:
            raise MyException('Test Message', 'extra context')
        except MyException:
            exc_info = sys.exc_info()

        with mock.patch('django.utils.autoreload._exception', exc_info):
            with self.assertRaisesMessage(MyException, 'Test Message'):
                autoreload.raise_last_exception()

    def test_raises_exception_with_context(self):
        try:
            raise Exception(2)
        except Exception as e:
            try:
                raise Exception(1) from e
            except Exception:
                exc_info = sys.exc_info()

        with mock.patch('django.utils.autoreload._exception', exc_info):
            with self.assertRaises(Exception) as cm:
                autoreload.raise_last_exception()
            self.assertEqual(cm.exception.args[0], 1)
            self.assertEqual(cm.exception.__cause__.args[0], 2)


class RestartWithReloaderTests(SimpleTestCase):
    executable = '/usr/bin/python'

    def patch_autoreload(self, argv):
        patch_call = mock.patch('django.utils.autoreload.subprocess.run', return_value=CompletedProcess(argv, 0))
        patches = [
            mock.patch('django.utils.autoreload.sys.argv', argv),
            mock.patch('django.utils.autoreload.sys.executable', self.executable),
            mock.patch('django.utils.autoreload.sys.warnoptions', ['all']),
        ]
        for p in patches:
            p.start()
            self.addCleanup(p.stop)
        mock_call = patch_call.start()
        self.addCleanup(patch_call.stop)
        return mock_call

    def test_manage_py(self):
        argv = ['./manage.py', 'runserver']
        mock_call = self.patch_autoreload(argv)
        autoreload.restart_with_reloader()
        self.assertEqual(mock_call.call_count, 1)
        self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall'] + argv)

    def test_python_m_django(self):
        main = '/usr/lib/pythonX.Y/site-packages/django/__main__.py'
        argv = [main, 'runserver']
        mock_call = self.patch_autoreload(argv)
        with mock.patch('django.__main__.__file__', main):
            autoreload.restart_with_reloader()
            self.assertEqual(mock_call.call_count, 1)
            self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:])


class ReloaderTests(SimpleTestCase):
    RELOADER_CLS = None

    def setUp(self):
        self._tempdir = tempfile.TemporaryDirectory()
        self.tempdir = Path(self._tempdir.name).resolve().absolute()
        self.existing_file = self.ensure_file(self.tempdir / 'test.py')
        self.nonexistent_file = (self.tempdir / 'does_not_exist.py').absolute()
        self.reloader = self.RELOADER_CLS()

    def tearDown(self):
        self._tempdir.cleanup()
        self.reloader.stop()

    def ensure_file(self, path):
        path.parent.mkdir(exist_ok=True, parents=True)
        path.touch()
        # On Linux and Windows updating the mtime of a file using touch() will set a timestamp
        # value that is in the past, as the time value for the last kernel tick is used rather
        # than getting the correct absolute time.
        # To make testing simpler set the mtime to be the observed time when this function is
        # called.
        self.set_mtime(path, time.time())
        return path.absolute()

    def set_mtime(self, fp, value):
        os.utime(str(fp), (value, value))

    def increment_mtime(self, fp, by=1):
        current_time = time.time()
        self.set_mtime(fp, current_time + by)

    @contextlib.contextmanager
    def tick_twice(self):
        ticker = self.reloader.tick()
        next(ticker)
        yield
        next(ticker)


class IntegrationTests:
    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_glob(self, mocked_modules, notify_mock):
        non_py_file = self.ensure_file(self.tempdir / 'non_py_file')
        self.reloader.watch_dir(self.tempdir, '*.py')
        with self.tick_twice():
            self.increment_mtime(non_py_file)
            self.increment_mtime(self.existing_file)
        self.assertEqual(notify_mock.call_count, 1)
        self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])

    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_multiple_globs(self, mocked_modules, notify_mock):
        self.ensure_file(self.tempdir / 'x.test')
        self.reloader.watch_dir(self.tempdir, '*.py')
        self.reloader.watch_dir(self.tempdir, '*.test')
        with self.tick_twice():
            self.increment_mtime(self.existing_file)
        self.assertEqual(notify_mock.call_count, 1)
        self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])

    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_overlapping_globs(self, mocked_modules, notify_mock):
        self.reloader.watch_dir(self.tempdir, '*.py')
        self.reloader.watch_dir(self.tempdir, '*.p*')
        with self.tick_twice():
            self.increment_mtime(self.existing_file)
        self.assertEqual(notify_mock.call_count, 1)
        self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])

    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_glob_recursive(self, mocked_modules, notify_mock):
        non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file')
        py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
        self.reloader.watch_dir(self.tempdir, '**/*.py')
        with self.tick_twice():
            self.increment_mtime(non_py_file)
            self.increment_mtime(py_file)
        self.assertEqual(notify_mock.call_count, 1)
        self.assertCountEqual(notify_mock.call_args[0], [py_file])

    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
        non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt')
        py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
        self.reloader.watch_dir(self.tempdir, '**/*.txt')
        self.reloader.watch_dir(self.tempdir, '**/*.py')
        with self.tick_twice():
            self.increment_mtime(non_py_file)
            self.increment_mtime(py_file)
        self.assertEqual(notify_mock.call_count, 2)
        self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)])

    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_nested_glob_recursive(self, mocked_modules, notify_mock):
        inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
        self.reloader.watch_dir(self.tempdir, '**/*.py')
        self.reloader.watch_dir(inner_py_file.parent, '**/*.py')
        with self.tick_twice():
            self.increment_mtime(inner_py_file)
        self.assertEqual(notify_mock.call_count, 1)
        self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])

    @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
    @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
    def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
        py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
        self.reloader.watch_dir(self.tempdir, '**/*.p*')
        self.reloader.watch_dir(self.tempdir, '**/*.py*')
        with self.tick_twice():
            self.increment_mtime(py_file)
        self.assertEqual(notify_mock.call_count, 1)
        self.assertCountEqual(notify_mock.call_args[0], [py_file])


class BaseReloaderTests(ReloaderTests):
    RELOADER_CLS = autoreload.BaseReloader

    def test_watch_dir_with_unresolvable_path(self):
        path = Path('unresolvable_directory')
        with mock.patch.object(Path, 'absolute', side_effect=FileNotFoundError):
            self.reloader.watch_dir(path, '**/*.mo')
        self.assertEqual(list(self.reloader.directory_globs), [])

    def test_watch_with_glob(self):
        self.reloader.watch_dir(self.tempdir, '*.py')
        watched_files = list(self.reloader.watched_files())
        self.assertIn(self.existing_file, watched_files)

    def test_watch_files_with_recursive_glob(self):
        inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py')
        self.reloader.watch_dir(self.tempdir, '**/*.py')
        watched_files = list(self.reloader.watched_files())
        self.assertIn(self.existing_file, watched_files)
        self.assertIn(inner_file, watched_files)

    def test_run_loop_catches_stopiteration(self):
        def mocked_tick():
            yield

        with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
            self.reloader.run_loop()
        self.assertEqual(tick.call_count, 1)

    def test_run_loop_stop_and_return(self):
        def mocked_tick(*args):
            yield
            self.reloader.stop()
            return  # Raises StopIteration

        with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
            self.reloader.run_loop()

        self.assertEqual(tick.call_count, 1)

    def test_wait_for_apps_ready_checks_for_exception(self):
        app_reg = Apps()
        app_reg.ready_event.set()
        # thread.is_alive() is False if it's not started.
        dead_thread = threading.Thread()
        self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))

    def test_wait_for_apps_ready_without_exception(self):
        app_reg = Apps()
        app_reg.ready_event.set()
        thread = mock.MagicMock()
        thread.is_alive.return_value = True
        self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))


def skip_unless_watchman_available():
    try:
        autoreload.WatchmanReloader.check_availability()
    except WatchmanUnavailable as e:
        return skip('Watchman unavailable: %s' % e)
    return lambda func: func


@skip_unless_watchman_available()
class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
    RELOADER_CLS = autoreload.WatchmanReloader

    def setUp(self):
        super().setUp()
        # Shorten the timeout to speed up tests.
        self.reloader.client_timeout = 0.1

    def test_watch_glob_ignores_non_existing_directories_two_levels(self):
        with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
            self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*'])
        self.assertFalse(mocked_subscribe.called)

    def test_watch_glob_uses_existing_parent_directories(self):
        with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
            self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*'])
        self.assertSequenceEqual(
            mocked_subscribe.call_args[0],
            [
                self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir,
                ['anyof', ['match', 'does_not_exist/*', 'wholename']]
            ]
        )

    def test_watch_glob_multiple_patterns(self):
        with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
            self.reloader._watch_glob(self.tempdir, ['*', '*.py'])
        self.assertSequenceEqual(
            mocked_subscribe.call_args[0],
            [
                self.tempdir, 'glob:%s' % self.tempdir,
                ['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']]
            ]
        )

    def test_watched_roots_contains_files(self):
        paths = self.reloader.watched_roots([self.existing_file])
        self.assertIn(self.existing_file.parent, paths)

    def test_watched_roots_contains_directory_globs(self):
        self.reloader.watch_dir(self.tempdir, '*.py')
        paths = self.reloader.watched_roots([])
        self.assertIn(self.tempdir, paths)

    def test_watched_roots_contains_sys_path(self):
        with extend_sys_path(str(self.tempdir)):
            paths = self.reloader.watched_roots([])
        self.assertIn(self.tempdir, paths)

    def test_check_server_status(self):
        self.assertTrue(self.reloader.check_server_status())

    def test_check_server_status_raises_error(self):
        with mock.patch.object(self.reloader.client, 'query') as mocked_query:
            mocked_query.side_effect = Exception()
            with self.assertRaises(autoreload.WatchmanUnavailable):
                self.reloader.check_server_status()

    @mock.patch('pywatchman.client')
    def test_check_availability(self, mocked_client):
        mocked_client().capabilityCheck.side_effect = Exception()
        with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'):
            self.RELOADER_CLS.check_availability()

    @mock.patch('pywatchman.client')
    def test_check_availability_lower_version(self, mocked_client):
        mocked_client().capabilityCheck.return_value = {'version': '4.8.10'}
        with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'):
            self.RELOADER_CLS.check_availability()

    def test_pywatchman_not_available(self):
        with mock.patch.object(autoreload, 'pywatchman') as mocked:
            mocked.__bool__.return_value = False
            with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'):
                self.RELOADER_CLS.check_availability()

    def test_update_watches_raises_exceptions(self):
        class TestException(Exception):
            pass

        with mock.patch.object(self.reloader, '_update_watches') as mocked_watches:
            with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status:
                mocked_watches.side_effect = TestException()
                mocked_server_status.return_value = True
                with self.assertRaises(TestException):
                    self.reloader.update_watches()
                self.assertIsInstance(mocked_server_status.call_args[0][0], TestException)

    @mock.patch.dict(os.environ, {'DJANGO_WATCHMAN_TIMEOUT': '10'})
    def test_setting_timeout_from_environment_variable(self):
        self.assertEqual(self.RELOADER_CLS.client_timeout, 10)


@skipIf(on_macos_with_hfs(), "These tests do not work with HFS+ as a filesystem")
class StatReloaderTests(ReloaderTests, IntegrationTests):
    RELOADER_CLS = autoreload.StatReloader

    def setUp(self):
        super().setUp()
        # Shorten the sleep time to speed up tests.
        self.reloader.SLEEP_TIME = 0.01

    @mock.patch('django.utils.autoreload.StatReloader.notify_file_changed')
    def test_tick_does_not_trigger_twice(self, mock_notify_file_changed):
        with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
            ticker = self.reloader.tick()
            next(ticker)
            self.increment_mtime(self.existing_file)
            next(ticker)
            next(ticker)
            self.assertEqual(mock_notify_file_changed.call_count, 1)

    def test_snapshot_files_ignores_missing_files(self):
        with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistent_file]):
            self.assertEqual(dict(self.reloader.snapshot_files()), {})

    def test_snapshot_files_updates(self):
        with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
            snapshot1 = dict(self.reloader.snapshot_files())
            self.assertIn(self.existing_file, snapshot1)
            self.increment_mtime(self.existing_file)
            snapshot2 = dict(self.reloader.snapshot_files())
            self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file])

    def test_snapshot_files_with_duplicates(self):
        with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file, self.existing_file]):
            snapshot = list(self.reloader.snapshot_files())
            self.assertEqual(len(snapshot), 1)
            self.assertEqual(snapshot[0][0], self.existing_file)