1
0
mirror of https://github.com/django/django.git synced 2025-03-05 23:12:32 +00:00
django/django/db/utils.py
Nick Pope d3677043fc Added backward compatibility test for ConnectionHandler.databases property.
The ConnectionHandler.databases property is no longer used within
Django, but it is maintained for backward compatibility with 3rd party
packages that have used this private API in the past.
2022-05-13 12:01:07 +02:00

279 lines
9.1 KiB
Python

import pkgutil
from importlib import import_module
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
# For backwards compatibility with Django < 3.2
from django.utils.connection import ConnectionDoesNotExist # NOQA: F401
from django.utils.connection import BaseConnectionHandler
from django.utils.functional import cached_property
from django.utils.module_loading import import_string
DEFAULT_DB_ALIAS = "default"
DJANGO_VERSION_PICKLE_KEY = "_django_version"
class Error(Exception):
pass
class InterfaceError(Error):
pass
class DatabaseError(Error):
pass
class DataError(DatabaseError):
pass
class OperationalError(DatabaseError):
pass
class IntegrityError(DatabaseError):
pass
class InternalError(DatabaseError):
pass
class ProgrammingError(DatabaseError):
pass
class NotSupportedError(DatabaseError):
pass
class DatabaseErrorWrapper:
"""
Context manager and decorator that reraises backend-specific database
exceptions using Django's common wrappers.
"""
def __init__(self, wrapper):
"""
wrapper is a database wrapper.
It must have a Database attribute defining PEP-249 exceptions.
"""
self.wrapper = wrapper
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
return
for dj_exc_type in (
DataError,
OperationalError,
IntegrityError,
InternalError,
ProgrammingError,
NotSupportedError,
DatabaseError,
InterfaceError,
Error,
):
db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
if issubclass(exc_type, db_exc_type):
dj_exc_value = dj_exc_type(*exc_value.args)
# Only set the 'errors_occurred' flag for errors that may make
# the connection unusable.
if dj_exc_type not in (DataError, IntegrityError):
self.wrapper.errors_occurred = True
raise dj_exc_value.with_traceback(traceback) from exc_value
def __call__(self, func):
# Note that we are intentionally not using @wraps here for performance
# reasons. Refs #21109.
def inner(*args, **kwargs):
with self:
return func(*args, **kwargs)
return inner
def load_backend(backend_name):
"""
Return a database backend's "base" module given a fully qualified database
backend name, or raise an error if it doesn't exist.
"""
# This backend was renamed in Django 1.9.
if backend_name == "django.db.backends.postgresql_psycopg2":
backend_name = "django.db.backends.postgresql"
try:
return import_module("%s.base" % backend_name)
except ImportError as e_user:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
import django.db.backends
builtin_backends = [
name
for _, name, ispkg in pkgutil.iter_modules(django.db.backends.__path__)
if ispkg and name not in {"base", "dummy"}
]
if backend_name not in ["django.db.backends.%s" % b for b in builtin_backends]:
backend_reprs = map(repr, sorted(builtin_backends))
raise ImproperlyConfigured(
"%r isn't an available database backend or couldn't be "
"imported. Check the above exception. To use one of the "
"built-in backends, use 'django.db.backends.XXX', where XXX "
"is one of:\n"
" %s" % (backend_name, ", ".join(backend_reprs))
) from e_user
else:
# If there's some other error, this must be an error in Django
raise
class ConnectionHandler(BaseConnectionHandler):
settings_name = "DATABASES"
# Connections needs to still be an actual thread local, as it's truly
# thread-critical. Database backends should use @async_unsafe to protect
# their code from async contexts, but this will give those contexts
# separate connections in case it's needed as well. There's no cleanup
# after async contexts, though, so we don't allow that if we can help it.
thread_critical = True
def configure_settings(self, databases):
databases = super().configure_settings(databases)
if databases == {}:
databases[DEFAULT_DB_ALIAS] = {"ENGINE": "django.db.backends.dummy"}
elif DEFAULT_DB_ALIAS not in databases:
raise ImproperlyConfigured(
f"You must define a '{DEFAULT_DB_ALIAS}' database."
)
elif databases[DEFAULT_DB_ALIAS] == {}:
databases[DEFAULT_DB_ALIAS]["ENGINE"] = "django.db.backends.dummy"
# Configure default settings.
for conn in databases.values():
conn.setdefault("ATOMIC_REQUESTS", False)
conn.setdefault("AUTOCOMMIT", True)
conn.setdefault("ENGINE", "django.db.backends.dummy")
if conn["ENGINE"] == "django.db.backends." or not conn["ENGINE"]:
conn["ENGINE"] = "django.db.backends.dummy"
conn.setdefault("CONN_MAX_AGE", 0)
conn.setdefault("CONN_HEALTH_CHECKS", False)
conn.setdefault("OPTIONS", {})
conn.setdefault("TIME_ZONE", None)
for setting in ["NAME", "USER", "PASSWORD", "HOST", "PORT"]:
conn.setdefault(setting, "")
test_settings = conn.setdefault("TEST", {})
default_test_settings = [
("CHARSET", None),
("COLLATION", None),
("MIGRATE", True),
("MIRROR", None),
("NAME", None),
]
for key, value in default_test_settings:
test_settings.setdefault(key, value)
return databases
@property
def databases(self):
# Maintained for backward compatibility as some 3rd party packages have
# made use of this private API in the past. It is no longer used within
# Django itself.
return self.settings
def create_connection(self, alias):
db = self.settings[alias]
backend = load_backend(db["ENGINE"])
return backend.DatabaseWrapper(db, alias)
class ConnectionRouter:
def __init__(self, routers=None):
"""
If routers is not specified, default to settings.DATABASE_ROUTERS.
"""
self._routers = routers
@cached_property
def routers(self):
if self._routers is None:
self._routers = settings.DATABASE_ROUTERS
routers = []
for r in self._routers:
if isinstance(r, str):
router = import_string(r)()
else:
router = r
routers.append(router)
return routers
def _router_func(action):
def _route_db(self, model, **hints):
chosen_db = None
for router in self.routers:
try:
method = getattr(router, action)
except AttributeError:
# If the router doesn't have a method, skip to the next one.
pass
else:
chosen_db = method(model, **hints)
if chosen_db:
return chosen_db
instance = hints.get("instance")
if instance is not None and instance._state.db:
return instance._state.db
return DEFAULT_DB_ALIAS
return _route_db
db_for_read = _router_func("db_for_read")
db_for_write = _router_func("db_for_write")
def allow_relation(self, obj1, obj2, **hints):
for router in self.routers:
try:
method = router.allow_relation
except AttributeError:
# If the router doesn't have a method, skip to the next one.
pass
else:
allow = method(obj1, obj2, **hints)
if allow is not None:
return allow
return obj1._state.db == obj2._state.db
def allow_migrate(self, db, app_label, **hints):
for router in self.routers:
try:
method = router.allow_migrate
except AttributeError:
# If the router doesn't have a method, skip to the next one.
continue
allow = method(db, app_label, **hints)
if allow is not None:
return allow
return True
def allow_migrate_model(self, db, model):
return self.allow_migrate(
db,
model._meta.app_label,
model_name=model._meta.model_name,
model=model,
)
def get_migratable_models(self, app_config, db, include_auto_created=False):
"""Return app models allowed to be migrated on provided db."""
models = app_config.get_models(include_auto_created=include_auto_created)
return [model for model in models if self.allow_migrate_model(db, model)]