mirror of
				https://github.com/django/django.git
				synced 2025-10-24 22:26:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			939 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			939 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import contextlib
 | |
| import os
 | |
| import py_compile
 | |
| import shutil
 | |
| import sys
 | |
| import tempfile
 | |
| import threading
 | |
| import time
 | |
| import types
 | |
| import weakref
 | |
| import zipfile
 | |
| import zoneinfo
 | |
| from importlib import import_module
 | |
| from pathlib import Path
 | |
| from subprocess import CompletedProcess
 | |
| from unittest import mock, skip, skipIf
 | |
| 
 | |
| import django.__main__
 | |
| from django.apps.registry import Apps
 | |
| from django.test import SimpleTestCase
 | |
| from django.test.utils import extend_sys_path
 | |
| from django.utils import autoreload
 | |
| from django.utils.autoreload import WatchmanUnavailable
 | |
| 
 | |
| from .test_module import __main__ as test_main
 | |
| from .test_module import main_module as test_main_module
 | |
| from .utils import on_macos_with_hfs
 | |
| 
 | |
| 
 | |
| class TestIterModulesAndFiles(SimpleTestCase):
 | |
|     def import_and_cleanup(self, name):
 | |
|         import_module(name)
 | |
|         self.addCleanup(lambda: sys.path_importer_cache.clear())
 | |
|         self.addCleanup(lambda: sys.modules.pop(name, None))
 | |
| 
 | |
|     def clear_autoreload_caches(self):
 | |
|         autoreload.iter_modules_and_files.cache_clear()
 | |
| 
 | |
|     def assertFileFound(self, filename):
 | |
|         # Some temp directories are symlinks. Python resolves these fully while
 | |
|         # importing.
 | |
|         resolved_filename = filename.resolve(strict=True)
 | |
|         self.clear_autoreload_caches()
 | |
|         # Test uncached access
 | |
|         self.assertIn(
 | |
|             resolved_filename, list(autoreload.iter_all_python_module_files())
 | |
|         )
 | |
|         # Test cached access
 | |
|         self.assertIn(
 | |
|             resolved_filename, list(autoreload.iter_all_python_module_files())
 | |
|         )
 | |
|         self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
 | |
| 
 | |
|     def assertFileNotFound(self, filename):
 | |
|         resolved_filename = filename.resolve(strict=True)
 | |
|         self.clear_autoreload_caches()
 | |
|         # Test uncached access
 | |
|         self.assertNotIn(
 | |
|             resolved_filename, list(autoreload.iter_all_python_module_files())
 | |
|         )
 | |
|         # Test cached access
 | |
|         self.assertNotIn(
 | |
|             resolved_filename, list(autoreload.iter_all_python_module_files())
 | |
|         )
 | |
|         self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
 | |
| 
 | |
|     def temporary_file(self, filename):
 | |
|         dirname = tempfile.mkdtemp()
 | |
|         self.addCleanup(shutil.rmtree, dirname)
 | |
|         return Path(dirname) / filename
 | |
| 
 | |
|     def test_paths_are_pathlib_instances(self):
 | |
|         for filename in autoreload.iter_all_python_module_files():
 | |
|             self.assertIsInstance(filename, Path)
 | |
| 
 | |
|     def test_file_added(self):
 | |
|         """
 | |
|         When a file is added, it's returned by iter_all_python_module_files().
 | |
|         """
 | |
|         filename = self.temporary_file("test_deleted_removed_module.py")
 | |
|         filename.touch()
 | |
| 
 | |
|         with extend_sys_path(str(filename.parent)):
 | |
|             self.import_and_cleanup("test_deleted_removed_module")
 | |
| 
 | |
|         self.assertFileFound(filename.absolute())
 | |
| 
 | |
|     def test_check_errors(self):
 | |
|         """
 | |
|         When a file containing an error is imported in a function wrapped by
 | |
|         check_errors(), gen_filenames() returns it.
 | |
|         """
 | |
|         filename = self.temporary_file("test_syntax_error.py")
 | |
|         filename.write_text("Ceci n'est pas du Python.")
 | |
| 
 | |
|         with extend_sys_path(str(filename.parent)):
 | |
|             try:
 | |
|                 with self.assertRaises(SyntaxError):
 | |
|                     autoreload.check_errors(import_module)("test_syntax_error")
 | |
|             finally:
 | |
|                 autoreload._exception = None
 | |
|         self.assertFileFound(filename)
 | |
| 
 | |
|     def test_check_errors_catches_all_exceptions(self):
 | |
|         """
 | |
|         Since Python may raise arbitrary exceptions when importing code,
 | |
|         check_errors() must catch Exception, not just some subclasses.
 | |
|         """
 | |
|         filename = self.temporary_file("test_exception.py")
 | |
|         filename.write_text("raise Exception")
 | |
|         with extend_sys_path(str(filename.parent)):
 | |
|             try:
 | |
|                 with self.assertRaises(Exception):
 | |
|                     autoreload.check_errors(import_module)("test_exception")
 | |
|             finally:
 | |
|                 autoreload._exception = None
 | |
|         self.assertFileFound(filename)
 | |
| 
 | |
|     def test_zip_reload(self):
 | |
|         """
 | |
|         Modules imported from zipped files have their archive location included
 | |
|         in the result.
 | |
|         """
 | |
|         zip_file = self.temporary_file("zip_import.zip")
 | |
|         with zipfile.ZipFile(str(zip_file), "w", zipfile.ZIP_DEFLATED) as zipf:
 | |
|             zipf.writestr("test_zipped_file.py", "")
 | |
| 
 | |
|         with extend_sys_path(str(zip_file)):
 | |
|             self.import_and_cleanup("test_zipped_file")
 | |
|         self.assertFileFound(zip_file)
 | |
| 
 | |
|     def test_bytecode_conversion_to_source(self):
 | |
|         """.pyc and .pyo files are included in the files list."""
 | |
|         filename = self.temporary_file("test_compiled.py")
 | |
|         filename.touch()
 | |
|         compiled_file = Path(
 | |
|             py_compile.compile(str(filename), str(filename.with_suffix(".pyc")))
 | |
|         )
 | |
|         filename.unlink()
 | |
|         with extend_sys_path(str(compiled_file.parent)):
 | |
|             self.import_and_cleanup("test_compiled")
 | |
|         self.assertFileFound(compiled_file)
 | |
| 
 | |
|     def test_weakref_in_sys_module(self):
 | |
|         """iter_all_python_module_file() ignores weakref modules."""
 | |
|         time_proxy = weakref.proxy(time)
 | |
|         sys.modules["time_proxy"] = time_proxy
 | |
|         self.addCleanup(lambda: sys.modules.pop("time_proxy", None))
 | |
|         list(autoreload.iter_all_python_module_files())  # No crash.
 | |
| 
 | |
|     def test_module_without_spec(self):
 | |
|         module = types.ModuleType("test_module")
 | |
|         del module.__spec__
 | |
|         self.assertEqual(
 | |
|             autoreload.iter_modules_and_files((module,), frozenset()), frozenset()
 | |
|         )
 | |
| 
 | |
|     def test_main_module_is_resolved(self):
 | |
|         main_module = sys.modules["__main__"]
 | |
|         self.assertFileFound(Path(main_module.__file__))
 | |
| 
 | |
|     def test_main_module_without_file_is_not_resolved(self):
 | |
|         fake_main = types.ModuleType("__main__")
 | |
|         self.assertEqual(
 | |
|             autoreload.iter_modules_and_files((fake_main,), frozenset()), frozenset()
 | |
|         )
 | |
| 
 | |
|     def test_path_with_embedded_null_bytes(self):
 | |
|         for path in (
 | |
|             "embedded_null_byte\x00.py",
 | |
|             "di\x00rectory/embedded_null_byte.py",
 | |
|         ):
 | |
|             with self.subTest(path=path):
 | |
|                 self.assertEqual(
 | |
|                     autoreload.iter_modules_and_files((), frozenset([path])),
 | |
|                     frozenset(),
 | |
|                 )
 | |
| 
 | |
| 
 | |
| class TestChildArguments(SimpleTestCase):
 | |
|     @mock.patch.dict(sys.modules, {"__main__": django.__main__})
 | |
|     @mock.patch("sys.argv", [django.__main__.__file__, "runserver"])
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch("sys._xoptions", {})
 | |
|     def test_run_as_module(self):
 | |
|         self.assertEqual(
 | |
|             autoreload.get_child_arguments(),
 | |
|             [sys.executable, "-m", "django", "runserver"],
 | |
|         )
 | |
| 
 | |
|     @mock.patch.dict(sys.modules, {"__main__": test_main})
 | |
|     @mock.patch("sys.argv", [test_main.__file__, "runserver"])
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch("sys._xoptions", {})
 | |
|     def test_run_as_non_django_module(self):
 | |
|         self.assertEqual(
 | |
|             autoreload.get_child_arguments(),
 | |
|             [sys.executable, "-m", "utils_tests.test_module", "runserver"],
 | |
|         )
 | |
| 
 | |
|     @mock.patch.dict(sys.modules, {"__main__": test_main_module})
 | |
|     @mock.patch("sys.argv", [test_main.__file__, "runserver"])
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch("sys._xoptions", {})
 | |
|     def test_run_as_non_django_module_non_package(self):
 | |
|         self.assertEqual(
 | |
|             autoreload.get_child_arguments(),
 | |
|             [sys.executable, "-m", "utils_tests.test_module.main_module", "runserver"],
 | |
|         )
 | |
| 
 | |
|     @mock.patch("__main__.__spec__", None)
 | |
|     @mock.patch("sys.argv", [__file__, "runserver"])
 | |
|     @mock.patch("sys.warnoptions", ["error"])
 | |
|     @mock.patch("sys._xoptions", {})
 | |
|     def test_warnoptions(self):
 | |
|         self.assertEqual(
 | |
|             autoreload.get_child_arguments(),
 | |
|             [sys.executable, "-Werror", __file__, "runserver"],
 | |
|         )
 | |
| 
 | |
|     @mock.patch("sys.argv", [__file__, "runserver"])
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch("sys._xoptions", {"utf8": True, "a": "b"})
 | |
|     def test_xoptions(self):
 | |
|         self.assertEqual(
 | |
|             autoreload.get_child_arguments(),
 | |
|             [sys.executable, "-Xutf8", "-Xa=b", __file__, "runserver"],
 | |
|         )
 | |
| 
 | |
|     @mock.patch("__main__.__spec__", None)
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     def test_exe_fallback(self):
 | |
|         with tempfile.TemporaryDirectory() as tmpdir:
 | |
|             exe_path = Path(tmpdir) / "django-admin.exe"
 | |
|             exe_path.touch()
 | |
|             with mock.patch("sys.argv", [exe_path.with_suffix(""), "runserver"]):
 | |
|                 self.assertEqual(
 | |
|                     autoreload.get_child_arguments(), [exe_path, "runserver"]
 | |
|                 )
 | |
| 
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch.dict(sys.modules, {"__main__": django.__main__})
 | |
|     def test_use_exe_when_main_spec(self):
 | |
|         with tempfile.TemporaryDirectory() as tmpdir:
 | |
|             exe_path = Path(tmpdir) / "django-admin.exe"
 | |
|             exe_path.touch()
 | |
|             with mock.patch("sys.argv", [exe_path.with_suffix(""), "runserver"]):
 | |
|                 self.assertEqual(
 | |
|                     autoreload.get_child_arguments(), [exe_path, "runserver"]
 | |
|                 )
 | |
| 
 | |
|     @mock.patch("__main__.__spec__", None)
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch("sys._xoptions", {})
 | |
|     def test_entrypoint_fallback(self):
 | |
|         with tempfile.TemporaryDirectory() as tmpdir:
 | |
|             script_path = Path(tmpdir) / "django-admin-script.py"
 | |
|             script_path.touch()
 | |
|             with mock.patch(
 | |
|                 "sys.argv", [script_path.with_name("django-admin"), "runserver"]
 | |
|             ):
 | |
|                 self.assertEqual(
 | |
|                     autoreload.get_child_arguments(),
 | |
|                     [sys.executable, script_path, "runserver"],
 | |
|                 )
 | |
| 
 | |
|     @mock.patch("__main__.__spec__", None)
 | |
|     @mock.patch("sys.argv", ["does-not-exist", "runserver"])
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     def test_raises_runtimeerror(self):
 | |
|         msg = "Script does-not-exist does not exist."
 | |
|         with self.assertRaisesMessage(RuntimeError, msg):
 | |
|             autoreload.get_child_arguments()
 | |
| 
 | |
|     @mock.patch("sys.argv", [__file__, "runserver"])
 | |
|     @mock.patch("sys.warnoptions", [])
 | |
|     @mock.patch("sys._xoptions", {})
 | |
|     def test_module_no_spec(self):
 | |
|         module = types.ModuleType("test_module")
 | |
|         del module.__spec__
 | |
|         with mock.patch.dict(sys.modules, {"__main__": module}):
 | |
|             self.assertEqual(
 | |
|                 autoreload.get_child_arguments(),
 | |
|                 [sys.executable, __file__, "runserver"],
 | |
|             )
 | |
| 
 | |
| 
 | |
| class TestUtilities(SimpleTestCase):
 | |
|     def test_is_django_module(self):
 | |
|         for module, expected in ((zoneinfo, False), (sys, False), (autoreload, True)):
 | |
|             with self.subTest(module=module):
 | |
|                 self.assertIs(autoreload.is_django_module(module), expected)
 | |
| 
 | |
|     def test_is_django_path(self):
 | |
|         for module, expected in (
 | |
|             (zoneinfo.__file__, False),
 | |
|             (contextlib.__file__, False),
 | |
|             (autoreload.__file__, True),
 | |
|         ):
 | |
|             with self.subTest(module=module):
 | |
|                 self.assertIs(autoreload.is_django_path(module), expected)
 | |
| 
 | |
| 
 | |
| class TestCommonRoots(SimpleTestCase):
 | |
|     def test_common_roots(self):
 | |
|         paths = (
 | |
|             Path("/first/second"),
 | |
|             Path("/first/second/third"),
 | |
|             Path("/first/"),
 | |
|             Path("/root/first/"),
 | |
|         )
 | |
|         results = autoreload.common_roots(paths)
 | |
|         self.assertCountEqual(results, [Path("/first/"), Path("/root/first/")])
 | |
| 
 | |
| 
 | |
| class TestSysPathDirectories(SimpleTestCase):
 | |
|     def setUp(self):
 | |
|         _directory = tempfile.TemporaryDirectory()
 | |
|         self.addCleanup(_directory.cleanup)
 | |
|         self.directory = Path(_directory.name).resolve(strict=True).absolute()
 | |
|         self.file = self.directory / "test"
 | |
|         self.file.touch()
 | |
| 
 | |
|     def test_sys_paths_with_directories(self):
 | |
|         with extend_sys_path(str(self.file)):
 | |
|             paths = list(autoreload.sys_path_directories())
 | |
|         self.assertIn(self.file.parent, paths)
 | |
| 
 | |
|     def test_sys_paths_non_existing(self):
 | |
|         nonexistent_file = Path(self.directory.name) / "does_not_exist"
 | |
|         with extend_sys_path(str(nonexistent_file)):
 | |
|             paths = list(autoreload.sys_path_directories())
 | |
|         self.assertNotIn(nonexistent_file, paths)
 | |
|         self.assertNotIn(nonexistent_file.parent, paths)
 | |
| 
 | |
|     def test_sys_paths_absolute(self):
 | |
|         paths = list(autoreload.sys_path_directories())
 | |
|         self.assertTrue(all(p.is_absolute() for p in paths))
 | |
| 
 | |
|     def test_sys_paths_directories(self):
 | |
|         with extend_sys_path(str(self.directory)):
 | |
|             paths = list(autoreload.sys_path_directories())
 | |
|         self.assertIn(self.directory, paths)
 | |
| 
 | |
| 
 | |
| class GetReloaderTests(SimpleTestCase):
 | |
|     @mock.patch("django.utils.autoreload.WatchmanReloader")
 | |
|     def test_watchman_unavailable(self, mocked_watchman):
 | |
|         mocked_watchman.check_availability.side_effect = WatchmanUnavailable
 | |
|         self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)
 | |
| 
 | |
|     @mock.patch.object(autoreload.WatchmanReloader, "check_availability")
 | |
|     def test_watchman_available(self, mocked_available):
 | |
|         # If WatchmanUnavailable isn't raised, Watchman will be chosen.
 | |
|         mocked_available.return_value = None
 | |
|         result = autoreload.get_reloader()
 | |
|         self.assertIsInstance(result, autoreload.WatchmanReloader)
 | |
| 
 | |
| 
 | |
| class RunWithReloaderTests(SimpleTestCase):
 | |
|     @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: "true"})
 | |
|     @mock.patch("django.utils.autoreload.get_reloader")
 | |
|     def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
 | |
|         mocked_get_reloader.side_effect = KeyboardInterrupt()
 | |
|         autoreload.run_with_reloader(lambda: None)  # No exception
 | |
| 
 | |
|     @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: "false"})
 | |
|     @mock.patch("django.utils.autoreload.restart_with_reloader")
 | |
|     def test_calls_sys_exit(self, mocked_restart_reloader):
 | |
|         mocked_restart_reloader.return_value = 1
 | |
|         with self.assertRaises(SystemExit) as exc:
 | |
|             autoreload.run_with_reloader(lambda: None)
 | |
|         self.assertEqual(exc.exception.code, 1)
 | |
| 
 | |
|     @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: "true"})
 | |
|     @mock.patch("django.utils.autoreload.start_django")
 | |
|     @mock.patch("django.utils.autoreload.get_reloader")
 | |
|     def test_calls_start_django(self, mocked_reloader, mocked_start_django):
 | |
|         mocked_reloader.return_value = mock.sentinel.RELOADER
 | |
|         autoreload.run_with_reloader(mock.sentinel.METHOD)
 | |
|         self.assertEqual(mocked_start_django.call_count, 1)
 | |
|         self.assertSequenceEqual(
 | |
|             mocked_start_django.call_args[0],
 | |
|             [mock.sentinel.RELOADER, mock.sentinel.METHOD],
 | |
|         )
 | |
| 
 | |
| 
 | |
| class StartDjangoTests(SimpleTestCase):
 | |
|     @mock.patch("django.utils.autoreload.ensure_echo_on")
 | |
|     def test_echo_on_called(self, mocked_echo):
 | |
|         fake_reloader = mock.MagicMock()
 | |
|         autoreload.start_django(fake_reloader, lambda: None)
 | |
|         self.assertEqual(mocked_echo.call_count, 1)
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.check_errors")
 | |
|     def test_check_errors_called(self, mocked_check_errors):
 | |
|         fake_method = mock.MagicMock(return_value=None)
 | |
|         fake_reloader = mock.MagicMock()
 | |
|         autoreload.start_django(fake_reloader, fake_method)
 | |
|         self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])
 | |
| 
 | |
|     @mock.patch("threading.Thread")
 | |
|     @mock.patch("django.utils.autoreload.check_errors")
 | |
|     def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
 | |
|         fake_reloader = mock.MagicMock()
 | |
|         fake_main_func = mock.MagicMock()
 | |
|         fake_thread = mock.MagicMock()
 | |
|         mocked_check_errors.return_value = fake_main_func
 | |
|         mocked_thread.return_value = fake_thread
 | |
|         autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
 | |
|         self.assertEqual(mocked_thread.call_count, 1)
 | |
|         self.assertEqual(
 | |
|             mocked_thread.call_args[1],
 | |
|             {
 | |
|                 "target": fake_main_func,
 | |
|                 "args": (123,),
 | |
|                 "kwargs": {"abc": 123},
 | |
|                 "name": "django-main-thread",
 | |
|             },
 | |
|         )
 | |
|         self.assertIs(fake_thread.daemon, True)
 | |
|         self.assertTrue(fake_thread.start.called)
 | |
| 
 | |
| 
 | |
| class TestCheckErrors(SimpleTestCase):
 | |
|     def test_mutates_error_files(self):
 | |
|         fake_method = mock.MagicMock(side_effect=RuntimeError())
 | |
|         wrapped = autoreload.check_errors(fake_method)
 | |
|         with mock.patch.object(autoreload, "_error_files") as mocked_error_files:
 | |
|             try:
 | |
|                 with self.assertRaises(RuntimeError):
 | |
|                     wrapped()
 | |
|             finally:
 | |
|                 autoreload._exception = None
 | |
|         self.assertEqual(mocked_error_files.append.call_count, 1)
 | |
| 
 | |
| 
 | |
| class TestRaiseLastException(SimpleTestCase):
 | |
|     @mock.patch("django.utils.autoreload._exception", None)
 | |
|     def test_no_exception(self):
 | |
|         # Should raise no exception if _exception is None
 | |
|         autoreload.raise_last_exception()
 | |
| 
 | |
|     def test_raises_exception(self):
 | |
|         class MyException(Exception):
 | |
|             pass
 | |
| 
 | |
|         # Create an exception
 | |
|         try:
 | |
|             raise MyException("Test Message")
 | |
|         except MyException:
 | |
|             exc_info = sys.exc_info()
 | |
| 
 | |
|         with mock.patch("django.utils.autoreload._exception", exc_info):
 | |
|             with self.assertRaisesMessage(MyException, "Test Message"):
 | |
|                 autoreload.raise_last_exception()
 | |
| 
 | |
|     def test_raises_custom_exception(self):
 | |
|         class MyException(Exception):
 | |
|             def __init__(self, msg, extra_context):
 | |
|                 super().__init__(msg)
 | |
|                 self.extra_context = extra_context
 | |
| 
 | |
|         # Create an exception.
 | |
|         try:
 | |
|             raise MyException("Test Message", "extra context")
 | |
|         except MyException:
 | |
|             exc_info = sys.exc_info()
 | |
| 
 | |
|         with mock.patch("django.utils.autoreload._exception", exc_info):
 | |
|             with self.assertRaisesMessage(MyException, "Test Message"):
 | |
|                 autoreload.raise_last_exception()
 | |
| 
 | |
|     def test_raises_exception_with_context(self):
 | |
|         try:
 | |
|             raise Exception(2)
 | |
|         except Exception as e:
 | |
|             try:
 | |
|                 raise Exception(1) from e
 | |
|             except Exception:
 | |
|                 exc_info = sys.exc_info()
 | |
| 
 | |
|         with mock.patch("django.utils.autoreload._exception", exc_info):
 | |
|             with self.assertRaises(Exception) as cm:
 | |
|                 autoreload.raise_last_exception()
 | |
|             self.assertEqual(cm.exception.args[0], 1)
 | |
|             self.assertEqual(cm.exception.__cause__.args[0], 2)
 | |
| 
 | |
| 
 | |
| class RestartWithReloaderTests(SimpleTestCase):
 | |
|     executable = "/usr/bin/python"
 | |
| 
 | |
|     def patch_autoreload(self, argv):
 | |
|         patch_call = mock.patch(
 | |
|             "django.utils.autoreload.subprocess.run",
 | |
|             return_value=CompletedProcess(argv, 0),
 | |
|         )
 | |
|         patches = [
 | |
|             mock.patch("django.utils.autoreload.sys.argv", argv),
 | |
|             mock.patch("django.utils.autoreload.sys.executable", self.executable),
 | |
|             mock.patch("django.utils.autoreload.sys.warnoptions", ["all"]),
 | |
|             mock.patch("django.utils.autoreload.sys._xoptions", {}),
 | |
|         ]
 | |
|         for p in patches:
 | |
|             p.start()
 | |
|             self.addCleanup(p.stop)
 | |
|         mock_call = patch_call.start()
 | |
|         self.addCleanup(patch_call.stop)
 | |
|         return mock_call
 | |
| 
 | |
|     def test_manage_py(self):
 | |
|         with tempfile.TemporaryDirectory() as temp_dir:
 | |
|             script = Path(temp_dir) / "manage.py"
 | |
|             script.touch()
 | |
|             argv = [str(script), "runserver"]
 | |
|             mock_call = self.patch_autoreload(argv)
 | |
|             with mock.patch("__main__.__spec__", None):
 | |
|                 autoreload.restart_with_reloader()
 | |
|             self.assertEqual(mock_call.call_count, 1)
 | |
|             self.assertEqual(
 | |
|                 mock_call.call_args[0][0],
 | |
|                 [self.executable, "-Wall"] + argv,
 | |
|             )
 | |
| 
 | |
|     def test_python_m_django(self):
 | |
|         main = "/usr/lib/pythonX.Y/site-packages/django/__main__.py"
 | |
|         argv = [main, "runserver"]
 | |
|         mock_call = self.patch_autoreload(argv)
 | |
|         with mock.patch("django.__main__.__file__", main):
 | |
|             with mock.patch.dict(sys.modules, {"__main__": django.__main__}):
 | |
|                 autoreload.restart_with_reloader()
 | |
|             self.assertEqual(mock_call.call_count, 1)
 | |
|             self.assertEqual(
 | |
|                 mock_call.call_args[0][0],
 | |
|                 [self.executable, "-Wall", "-m", "django"] + argv[1:],
 | |
|             )
 | |
| 
 | |
|     def test_propagates_unbuffered_from_parent(self):
 | |
|         for args in ("-u", "-Iuv"):
 | |
|             with self.subTest(args=args):
 | |
|                 with mock.patch.dict(os.environ, {}, clear=True):
 | |
|                     with tempfile.TemporaryDirectory() as d:
 | |
|                         script = Path(d) / "manage.py"
 | |
|                         script.touch()
 | |
|                         mock_call = self.patch_autoreload([str(script), "runserver"])
 | |
|                         with (
 | |
|                             mock.patch("__main__.__spec__", None),
 | |
|                             mock.patch.object(
 | |
|                                 autoreload.sys,
 | |
|                                 "orig_argv",
 | |
|                                 [self.executable, args, str(script), "runserver"],
 | |
|                             ),
 | |
|                         ):
 | |
|                             autoreload.restart_with_reloader()
 | |
|                     env = mock_call.call_args.kwargs["env"]
 | |
|                     self.assertEqual(env.get("PYTHONUNBUFFERED"), "1")
 | |
| 
 | |
|     def test_does_not_propagate_unbuffered_from_parent(self):
 | |
|         for args in (
 | |
|             "-Xdev",
 | |
|             "-Xfaulthandler",
 | |
|             "--user",
 | |
|             "-Wall",
 | |
|             "-Wdefault",
 | |
|             "-Wignore::UserWarning",
 | |
|         ):
 | |
|             with self.subTest(args=args):
 | |
|                 with mock.patch.dict(os.environ, {}, clear=True):
 | |
|                     with tempfile.TemporaryDirectory() as d:
 | |
|                         script = Path(d) / "manage.py"
 | |
|                         script.touch()
 | |
|                         mock_call = self.patch_autoreload([str(script), "runserver"])
 | |
|                         with (
 | |
|                             mock.patch("__main__.__spec__", None),
 | |
|                             mock.patch.object(
 | |
|                                 autoreload.sys,
 | |
|                                 "orig_argv",
 | |
|                                 [self.executable, args, str(script), "runserver"],
 | |
|                             ),
 | |
|                         ):
 | |
|                             autoreload.restart_with_reloader()
 | |
|                     env = mock_call.call_args.kwargs["env"]
 | |
|                     self.assertIsNone(env.get("PYTHONUNBUFFERED"))
 | |
| 
 | |
| 
 | |
| class ReloaderTests(SimpleTestCase):
 | |
|     RELOADER_CLS = None
 | |
| 
 | |
|     def setUp(self):
 | |
|         _tempdir = tempfile.TemporaryDirectory()
 | |
|         self.tempdir = Path(_tempdir.name).resolve(strict=True).absolute()
 | |
|         self.existing_file = self.ensure_file(self.tempdir / "test.py")
 | |
|         self.nonexistent_file = (self.tempdir / "does_not_exist.py").absolute()
 | |
|         self.reloader = self.RELOADER_CLS()
 | |
|         self.addCleanup(self.reloader.stop)
 | |
|         self.addCleanup(_tempdir.cleanup)
 | |
| 
 | |
|     def ensure_file(self, path):
 | |
|         path.parent.mkdir(exist_ok=True, parents=True)
 | |
|         path.touch()
 | |
|         # On Linux and Windows updating the mtime of a file using touch() will
 | |
|         # set a timestamp value that is in the past, as the time value for the
 | |
|         # last kernel tick is used rather than getting the correct absolute
 | |
|         # time.
 | |
|         # To make testing simpler set the mtime to be the observed time when
 | |
|         # this function is called.
 | |
|         self.set_mtime(path, time.time())
 | |
|         return path.absolute()
 | |
| 
 | |
|     def set_mtime(self, fp, value):
 | |
|         os.utime(str(fp), (value, value))
 | |
| 
 | |
|     def increment_mtime(self, fp, by=1):
 | |
|         current_time = time.time()
 | |
|         self.set_mtime(fp, current_time + by)
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def tick_twice(self):
 | |
|         ticker = self.reloader.tick()
 | |
|         next(ticker)
 | |
|         yield
 | |
|         next(ticker)
 | |
| 
 | |
| 
 | |
| class IntegrationTests:
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_glob(self, mocked_modules, notify_mock):
 | |
|         non_py_file = self.ensure_file(self.tempdir / "non_py_file")
 | |
|         self.reloader.watch_dir(self.tempdir, "*.py")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(non_py_file)
 | |
|             self.increment_mtime(self.existing_file)
 | |
|         self.assertEqual(notify_mock.call_count, 1)
 | |
|         self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_multiple_globs(self, mocked_modules, notify_mock):
 | |
|         self.ensure_file(self.tempdir / "x.test")
 | |
|         self.reloader.watch_dir(self.tempdir, "*.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "*.test")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(self.existing_file)
 | |
|         self.assertEqual(notify_mock.call_count, 1)
 | |
|         self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_overlapping_globs(self, mocked_modules, notify_mock):
 | |
|         self.reloader.watch_dir(self.tempdir, "*.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "*.p*")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(self.existing_file)
 | |
|         self.assertEqual(notify_mock.call_count, 1)
 | |
|         self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_glob_recursive(self, mocked_modules, notify_mock):
 | |
|         non_py_file = self.ensure_file(self.tempdir / "dir" / "non_py_file")
 | |
|         py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.py")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(non_py_file)
 | |
|             self.increment_mtime(py_file)
 | |
|         self.assertEqual(notify_mock.call_count, 1)
 | |
|         self.assertCountEqual(notify_mock.call_args[0], [py_file])
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
 | |
|         non_py_file = self.ensure_file(self.tempdir / "dir" / "test.txt")
 | |
|         py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.txt")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.py")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(non_py_file)
 | |
|             self.increment_mtime(py_file)
 | |
|         self.assertEqual(notify_mock.call_count, 2)
 | |
|         self.assertCountEqual(
 | |
|             notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)]
 | |
|         )
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_nested_glob_recursive(self, mocked_modules, notify_mock):
 | |
|         inner_py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.py")
 | |
|         self.reloader.watch_dir(inner_py_file.parent, "**/*.py")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(inner_py_file)
 | |
|         self.assertEqual(notify_mock.call_count, 1)
 | |
|         self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
 | |
|     @mock.patch(
 | |
|         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
 | |
|     )
 | |
|     def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
 | |
|         py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.p*")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.py*")
 | |
|         with self.tick_twice():
 | |
|             self.increment_mtime(py_file)
 | |
|         self.assertEqual(notify_mock.call_count, 1)
 | |
|         self.assertCountEqual(notify_mock.call_args[0], [py_file])
 | |
| 
 | |
| 
 | |
| class BaseReloaderTests(ReloaderTests):
 | |
|     RELOADER_CLS = autoreload.BaseReloader
 | |
| 
 | |
|     def test_watch_dir_with_unresolvable_path(self):
 | |
|         path = Path("unresolvable_directory")
 | |
|         with mock.patch.object(Path, "absolute", side_effect=FileNotFoundError):
 | |
|             self.reloader.watch_dir(path, "**/*.mo")
 | |
|         self.assertEqual(list(self.reloader.directory_globs), [])
 | |
| 
 | |
|     def test_watch_with_glob(self):
 | |
|         self.reloader.watch_dir(self.tempdir, "*.py")
 | |
|         watched_files = list(self.reloader.watched_files())
 | |
|         self.assertIn(self.existing_file, watched_files)
 | |
| 
 | |
|     def test_watch_files_with_recursive_glob(self):
 | |
|         inner_file = self.ensure_file(self.tempdir / "test" / "test.py")
 | |
|         self.reloader.watch_dir(self.tempdir, "**/*.py")
 | |
|         watched_files = list(self.reloader.watched_files())
 | |
|         self.assertIn(self.existing_file, watched_files)
 | |
|         self.assertIn(inner_file, watched_files)
 | |
| 
 | |
|     def test_run_loop_catches_stopiteration(self):
 | |
|         def mocked_tick():
 | |
|             yield
 | |
| 
 | |
|         with mock.patch.object(self.reloader, "tick", side_effect=mocked_tick) as tick:
 | |
|             self.reloader.run_loop()
 | |
|         self.assertEqual(tick.call_count, 1)
 | |
| 
 | |
|     def test_run_loop_stop_and_return(self):
 | |
|         def mocked_tick(*args):
 | |
|             yield
 | |
|             self.reloader.stop()
 | |
|             return  # Raises StopIteration
 | |
| 
 | |
|         with mock.patch.object(self.reloader, "tick", side_effect=mocked_tick) as tick:
 | |
|             self.reloader.run_loop()
 | |
| 
 | |
|         self.assertEqual(tick.call_count, 1)
 | |
| 
 | |
|     def test_wait_for_apps_ready_checks_for_exception(self):
 | |
|         app_reg = Apps()
 | |
|         app_reg.ready_event.set()
 | |
|         # thread.is_alive() is False if it's not started.
 | |
|         dead_thread = threading.Thread()
 | |
|         self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))
 | |
| 
 | |
|     def test_wait_for_apps_ready_without_exception(self):
 | |
|         app_reg = Apps()
 | |
|         app_reg.ready_event.set()
 | |
|         thread = mock.MagicMock()
 | |
|         thread.is_alive.return_value = True
 | |
|         self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))
 | |
| 
 | |
| 
 | |
| def skip_unless_watchman_available():
 | |
|     try:
 | |
|         autoreload.WatchmanReloader.check_availability()
 | |
|     except WatchmanUnavailable as e:
 | |
|         return skip("Watchman unavailable: %s" % e)
 | |
|     return lambda func: func
 | |
| 
 | |
| 
 | |
| @skip_unless_watchman_available()
 | |
| class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
 | |
|     RELOADER_CLS = autoreload.WatchmanReloader
 | |
| 
 | |
|     def setUp(self):
 | |
|         super().setUp()
 | |
|         # Shorten the timeout to speed up tests.
 | |
|         self.reloader.client_timeout = int(os.environ.get("DJANGO_WATCHMAN_TIMEOUT", 2))
 | |
| 
 | |
|     def test_watch_glob_ignores_non_existing_directories_two_levels(self):
 | |
|         with mock.patch.object(self.reloader, "_subscribe") as mocked_subscribe:
 | |
|             self.reloader._watch_glob(self.tempdir / "does_not_exist" / "more", ["*"])
 | |
|         self.assertFalse(mocked_subscribe.called)
 | |
| 
 | |
|     def test_watch_glob_uses_existing_parent_directories(self):
 | |
|         with mock.patch.object(self.reloader, "_subscribe") as mocked_subscribe:
 | |
|             self.reloader._watch_glob(self.tempdir / "does_not_exist", ["*"])
 | |
|         self.assertSequenceEqual(
 | |
|             mocked_subscribe.call_args[0],
 | |
|             [
 | |
|                 self.tempdir,
 | |
|                 "glob-parent-does_not_exist:%s" % self.tempdir,
 | |
|                 ["anyof", ["match", "does_not_exist/*", "wholename"]],
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     def test_watch_glob_multiple_patterns(self):
 | |
|         with mock.patch.object(self.reloader, "_subscribe") as mocked_subscribe:
 | |
|             self.reloader._watch_glob(self.tempdir, ["*", "*.py"])
 | |
|         self.assertSequenceEqual(
 | |
|             mocked_subscribe.call_args[0],
 | |
|             [
 | |
|                 self.tempdir,
 | |
|                 "glob:%s" % self.tempdir,
 | |
|                 ["anyof", ["match", "*", "wholename"], ["match", "*.py", "wholename"]],
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     def test_watched_roots_contains_files(self):
 | |
|         paths = self.reloader.watched_roots([self.existing_file])
 | |
|         self.assertIn(self.existing_file.parent, paths)
 | |
| 
 | |
|     def test_watched_roots_contains_directory_globs(self):
 | |
|         self.reloader.watch_dir(self.tempdir, "*.py")
 | |
|         paths = self.reloader.watched_roots([])
 | |
|         self.assertIn(self.tempdir, paths)
 | |
| 
 | |
|     def test_watched_roots_contains_sys_path(self):
 | |
|         with extend_sys_path(str(self.tempdir)):
 | |
|             paths = self.reloader.watched_roots([])
 | |
|         self.assertIn(self.tempdir, paths)
 | |
| 
 | |
|     def test_check_server_status(self):
 | |
|         self.assertTrue(self.reloader.check_server_status())
 | |
| 
 | |
|     def test_check_server_status_raises_error(self):
 | |
|         with mock.patch.object(self.reloader.client, "query") as mocked_query:
 | |
|             mocked_query.side_effect = Exception()
 | |
|             with self.assertRaises(autoreload.WatchmanUnavailable):
 | |
|                 self.reloader.check_server_status()
 | |
| 
 | |
|     @mock.patch("pywatchman.client")
 | |
|     def test_check_availability(self, mocked_client):
 | |
|         mocked_client().capabilityCheck.side_effect = Exception()
 | |
|         with self.assertRaisesMessage(
 | |
|             WatchmanUnavailable, "Cannot connect to the watchman service"
 | |
|         ):
 | |
|             self.RELOADER_CLS.check_availability()
 | |
| 
 | |
|     @mock.patch("pywatchman.client")
 | |
|     def test_check_availability_lower_version(self, mocked_client):
 | |
|         mocked_client().capabilityCheck.return_value = {"version": "4.8.10"}
 | |
|         with self.assertRaisesMessage(
 | |
|             WatchmanUnavailable, "Watchman 4.9 or later is required."
 | |
|         ):
 | |
|             self.RELOADER_CLS.check_availability()
 | |
| 
 | |
|     def test_pywatchman_not_available(self):
 | |
|         with mock.patch.object(autoreload, "pywatchman") as mocked:
 | |
|             mocked.__bool__.return_value = False
 | |
|             with self.assertRaisesMessage(
 | |
|                 WatchmanUnavailable, "pywatchman not installed."
 | |
|             ):
 | |
|                 self.RELOADER_CLS.check_availability()
 | |
| 
 | |
|     def test_update_watches_raises_exceptions(self):
 | |
|         class TestException(Exception):
 | |
|             pass
 | |
| 
 | |
|         with mock.patch.object(self.reloader, "_update_watches") as mocked_watches:
 | |
|             with mock.patch.object(
 | |
|                 self.reloader, "check_server_status"
 | |
|             ) as mocked_server_status:
 | |
|                 mocked_watches.side_effect = TestException()
 | |
|                 mocked_server_status.return_value = True
 | |
|                 with self.assertRaises(TestException):
 | |
|                     self.reloader.update_watches()
 | |
|                 self.assertIsInstance(
 | |
|                     mocked_server_status.call_args[0][0], TestException
 | |
|                 )
 | |
| 
 | |
|     @mock.patch.dict(os.environ, {"DJANGO_WATCHMAN_TIMEOUT": "10"})
 | |
|     def test_setting_timeout_from_environment_variable(self):
 | |
|         self.assertEqual(self.RELOADER_CLS().client_timeout, 10)
 | |
| 
 | |
| 
 | |
| @skipIf(on_macos_with_hfs(), "These tests do not work with HFS+ as a filesystem")
 | |
| class StatReloaderTests(ReloaderTests, IntegrationTests):
 | |
|     RELOADER_CLS = autoreload.StatReloader
 | |
| 
 | |
|     def setUp(self):
 | |
|         super().setUp()
 | |
|         # Shorten the sleep time to speed up tests.
 | |
|         self.reloader.SLEEP_TIME = 0.01
 | |
| 
 | |
|     @mock.patch("django.utils.autoreload.StatReloader.notify_file_changed")
 | |
|     def test_tick_does_not_trigger_twice(self, mock_notify_file_changed):
 | |
|         with mock.patch.object(
 | |
|             self.reloader, "watched_files", return_value=[self.existing_file]
 | |
|         ):
 | |
|             ticker = self.reloader.tick()
 | |
|             next(ticker)
 | |
|             self.increment_mtime(self.existing_file)
 | |
|             next(ticker)
 | |
|             next(ticker)
 | |
|             self.assertEqual(mock_notify_file_changed.call_count, 1)
 | |
| 
 | |
|     def test_snapshot_files_ignores_missing_files(self):
 | |
|         with mock.patch.object(
 | |
|             self.reloader, "watched_files", return_value=[self.nonexistent_file]
 | |
|         ):
 | |
|             self.assertEqual(dict(self.reloader.snapshot_files()), {})
 | |
| 
 | |
|     def test_snapshot_files_updates(self):
 | |
|         with mock.patch.object(
 | |
|             self.reloader, "watched_files", return_value=[self.existing_file]
 | |
|         ):
 | |
|             snapshot1 = dict(self.reloader.snapshot_files())
 | |
|             self.assertIn(self.existing_file, snapshot1)
 | |
|             self.increment_mtime(self.existing_file)
 | |
|             snapshot2 = dict(self.reloader.snapshot_files())
 | |
|             self.assertNotEqual(
 | |
|                 snapshot1[self.existing_file], snapshot2[self.existing_file]
 | |
|             )
 | |
| 
 | |
|     def test_snapshot_files_with_duplicates(self):
 | |
|         with mock.patch.object(
 | |
|             self.reloader,
 | |
|             "watched_files",
 | |
|             return_value=[self.existing_file, self.existing_file],
 | |
|         ):
 | |
|             snapshot = list(self.reloader.snapshot_files())
 | |
|             self.assertEqual(len(snapshot), 1)
 | |
|             self.assertEqual(snapshot[0][0], self.existing_file)
 |