mirror of
				https://github.com/django/django.git
				synced 2025-10-25 06:36:07 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			314 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			314 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import sys
 | |
| from itertools import takewhile
 | |
| 
 | |
| from django.apps import apps
 | |
| from django.conf import settings
 | |
| from django.core.management.base import (
 | |
|     BaseCommand, CommandError, no_translations,
 | |
| )
 | |
| from django.db import DEFAULT_DB_ALIAS, connections, router
 | |
| from django.db.migrations import Migration
 | |
| from django.db.migrations.autodetector import MigrationAutodetector
 | |
| from django.db.migrations.loader import MigrationLoader
 | |
| from django.db.migrations.questioner import (
 | |
|     InteractiveMigrationQuestioner, MigrationQuestioner,
 | |
|     NonInteractiveMigrationQuestioner,
 | |
| )
 | |
| from django.db.migrations.state import ProjectState
 | |
| from django.db.migrations.utils import get_migration_name_timestamp
 | |
| from django.db.migrations.writer import MigrationWriter
 | |
| 
 | |
| 
 | |
| class Command(BaseCommand):
 | |
|     help = "Creates new migration(s) for apps."
 | |
| 
 | |
|     def add_arguments(self, parser):
 | |
|         parser.add_argument(
 | |
|             'args', metavar='app_label', nargs='*',
 | |
|             help='Specify the app label(s) to create migrations for.',
 | |
|         )
 | |
|         parser.add_argument(
 | |
|             '--dry-run', action='store_true', dest='dry_run',
 | |
|             help="Just show what migrations would be made; don't actually write them.",
 | |
|         )
 | |
|         parser.add_argument(
 | |
|             '--merge', action='store_true', dest='merge',
 | |
|             help="Enable fixing of migration conflicts.",
 | |
|         )
 | |
|         parser.add_argument(
 | |
|             '--empty', action='store_true', dest='empty',
 | |
|             help="Create an empty migration.",
 | |
|         )
 | |
|         parser.add_argument(
 | |
|             '--noinput', '--no-input', action='store_false', dest='interactive',
 | |
|             help='Tells Django to NOT prompt the user for input of any kind.',
 | |
|         )
 | |
|         parser.add_argument(
 | |
|             '-n', '--name', action='store', dest='name', default=None,
 | |
|             help="Use this name for migration file(s).",
 | |
|         )
 | |
|         parser.add_argument(
 | |
|             '--check', action='store_true', dest='check_changes',
 | |
|             help='Exit with a non-zero status if model changes are missing migrations.',
 | |
|         )
 | |
| 
 | |
|     @no_translations
 | |
|     def handle(self, *app_labels, **options):
 | |
|         self.verbosity = options['verbosity']
 | |
|         self.interactive = options['interactive']
 | |
|         self.dry_run = options['dry_run']
 | |
|         self.merge = options['merge']
 | |
|         self.empty = options['empty']
 | |
|         self.migration_name = options['name']
 | |
|         check_changes = options['check_changes']
 | |
| 
 | |
|         # Make sure the app they asked for exists
 | |
|         app_labels = set(app_labels)
 | |
|         bad_app_labels = set()
 | |
|         for app_label in app_labels:
 | |
|             try:
 | |
|                 apps.get_app_config(app_label)
 | |
|             except LookupError:
 | |
|                 bad_app_labels.add(app_label)
 | |
|         if bad_app_labels:
 | |
|             for app_label in bad_app_labels:
 | |
|                 if '.' in app_label:
 | |
|                     self.stderr.write(
 | |
|                         "'%s' is not a valid app label. Did you mean '%s'?" % (
 | |
|                             app_label,
 | |
|                             app_label.split('.')[-1],
 | |
|                         )
 | |
|                     )
 | |
|                 else:
 | |
|                     self.stderr.write("App '%s' could not be found. Is it in INSTALLED_APPS?" % app_label)
 | |
|             sys.exit(2)
 | |
| 
 | |
|         # Load the current graph state. Pass in None for the connection so
 | |
|         # the loader doesn't try to resolve replaced migrations from DB.
 | |
|         loader = MigrationLoader(None, ignore_no_migrations=True)
 | |
| 
 | |
|         # Raise an error if any migrations are applied before their dependencies.
 | |
|         consistency_check_labels = {config.label for config in apps.get_app_configs()}
 | |
|         # Non-default databases are only checked if database routers used.
 | |
|         aliases_to_check = connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS]
 | |
|         for alias in sorted(aliases_to_check):
 | |
|             connection = connections[alias]
 | |
|             if (connection.settings_dict['ENGINE'] != 'django.db.backends.dummy' and any(
 | |
|                     # At least one model must be migrated to the database.
 | |
|                     router.allow_migrate(connection.alias, app_label, model_name=model._meta.object_name)
 | |
|                     for app_label in consistency_check_labels
 | |
|                     for model in apps.get_app_config(app_label).get_models()
 | |
|             )):
 | |
|                 loader.check_consistent_history(connection)
 | |
| 
 | |
|         # Before anything else, see if there's conflicting apps and drop out
 | |
|         # hard if there are any and they don't want to merge
 | |
|         conflicts = loader.detect_conflicts()
 | |
| 
 | |
|         # If app_labels is specified, filter out conflicting migrations for unspecified apps
 | |
|         if app_labels:
 | |
|             conflicts = {
 | |
|                 app_label: conflict for app_label, conflict in conflicts.items()
 | |
|                 if app_label in app_labels
 | |
|             }
 | |
| 
 | |
|         if conflicts and not self.merge:
 | |
|             name_str = "; ".join(
 | |
|                 "%s in %s" % (", ".join(names), app)
 | |
|                 for app, names in conflicts.items()
 | |
|             )
 | |
|             raise CommandError(
 | |
|                 "Conflicting migrations detected; multiple leaf nodes in the "
 | |
|                 "migration graph: (%s).\nTo fix them run "
 | |
|                 "'python manage.py makemigrations --merge'" % name_str
 | |
|             )
 | |
| 
 | |
|         # If they want to merge and there's nothing to merge, then politely exit
 | |
|         if self.merge and not conflicts:
 | |
|             self.stdout.write("No conflicts detected to merge.")
 | |
|             return
 | |
| 
 | |
|         # If they want to merge and there is something to merge, then
 | |
|         # divert into the merge code
 | |
|         if self.merge and conflicts:
 | |
|             return self.handle_merge(loader, conflicts)
 | |
| 
 | |
|         if self.interactive:
 | |
|             questioner = InteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run)
 | |
|         else:
 | |
|             questioner = NonInteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run)
 | |
|         # Set up autodetector
 | |
|         autodetector = MigrationAutodetector(
 | |
|             loader.project_state(),
 | |
|             ProjectState.from_apps(apps),
 | |
|             questioner,
 | |
|         )
 | |
| 
 | |
|         # If they want to make an empty migration, make one for each app
 | |
|         if self.empty:
 | |
|             if not app_labels:
 | |
|                 raise CommandError("You must supply at least one app label when using --empty.")
 | |
|             # Make a fake changes() result we can pass to arrange_for_graph
 | |
|             changes = {
 | |
|                 app: [Migration("custom", app)]
 | |
|                 for app in app_labels
 | |
|             }
 | |
|             changes = autodetector.arrange_for_graph(
 | |
|                 changes=changes,
 | |
|                 graph=loader.graph,
 | |
|                 migration_name=self.migration_name,
 | |
|             )
 | |
|             self.write_migration_files(changes)
 | |
|             return
 | |
| 
 | |
|         # Detect changes
 | |
|         changes = autodetector.changes(
 | |
|             graph=loader.graph,
 | |
|             trim_to_apps=app_labels or None,
 | |
|             convert_apps=app_labels or None,
 | |
|             migration_name=self.migration_name,
 | |
|         )
 | |
| 
 | |
|         if not changes:
 | |
|             # No changes? Tell them.
 | |
|             if self.verbosity >= 1:
 | |
|                 if app_labels:
 | |
|                     if len(app_labels) == 1:
 | |
|                         self.stdout.write("No changes detected in app '%s'" % app_labels.pop())
 | |
|                     else:
 | |
|                         self.stdout.write("No changes detected in apps '%s'" % ("', '".join(app_labels)))
 | |
|                 else:
 | |
|                     self.stdout.write("No changes detected")
 | |
|         else:
 | |
|             self.write_migration_files(changes)
 | |
|             if check_changes:
 | |
|                 sys.exit(1)
 | |
| 
 | |
|     def write_migration_files(self, changes):
 | |
|         """
 | |
|         Take a changes dict and write them out as migration files.
 | |
|         """
 | |
|         directory_created = {}
 | |
|         for app_label, app_migrations in changes.items():
 | |
|             if self.verbosity >= 1:
 | |
|                 self.stdout.write(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label) + "\n")
 | |
|             for migration in app_migrations:
 | |
|                 # Describe the migration
 | |
|                 writer = MigrationWriter(migration)
 | |
|                 if self.verbosity >= 1:
 | |
|                     # Display a relative path if it's below the current working
 | |
|                     # directory, or an absolute path otherwise.
 | |
|                     try:
 | |
|                         migration_string = os.path.relpath(writer.path)
 | |
|                     except ValueError:
 | |
|                         migration_string = writer.path
 | |
|                     if migration_string.startswith('..'):
 | |
|                         migration_string = writer.path
 | |
|                     self.stdout.write("  %s\n" % (self.style.MIGRATE_LABEL(migration_string),))
 | |
|                     for operation in migration.operations:
 | |
|                         self.stdout.write("    - %s\n" % operation.describe())
 | |
|                 if not self.dry_run:
 | |
|                     # Write the migrations file to the disk.
 | |
|                     migrations_directory = os.path.dirname(writer.path)
 | |
|                     if not directory_created.get(app_label):
 | |
|                         if not os.path.isdir(migrations_directory):
 | |
|                             os.mkdir(migrations_directory)
 | |
|                         init_path = os.path.join(migrations_directory, "__init__.py")
 | |
|                         if not os.path.isfile(init_path):
 | |
|                             open(init_path, "w").close()
 | |
|                         # We just do this once per app
 | |
|                         directory_created[app_label] = True
 | |
|                     migration_string = writer.as_string()
 | |
|                     with open(writer.path, "w", encoding='utf-8') as fh:
 | |
|                         fh.write(migration_string)
 | |
|                 elif self.verbosity == 3:
 | |
|                     # Alternatively, makemigrations --dry-run --verbosity 3
 | |
|                     # will output the migrations to stdout rather than saving
 | |
|                     # the file to the disk.
 | |
|                     self.stdout.write(self.style.MIGRATE_HEADING(
 | |
|                         "Full migrations file '%s':" % writer.filename) + "\n"
 | |
|                     )
 | |
|                     self.stdout.write("%s\n" % writer.as_string())
 | |
| 
 | |
|     def handle_merge(self, loader, conflicts):
 | |
|         """
 | |
|         Handles merging together conflicted migrations interactively,
 | |
|         if it's safe; otherwise, advises on how to fix it.
 | |
|         """
 | |
|         if self.interactive:
 | |
|             questioner = InteractiveMigrationQuestioner()
 | |
|         else:
 | |
|             questioner = MigrationQuestioner(defaults={'ask_merge': True})
 | |
| 
 | |
|         for app_label, migration_names in conflicts.items():
 | |
|             # Grab out the migrations in question, and work out their
 | |
|             # common ancestor.
 | |
|             merge_migrations = []
 | |
|             for migration_name in migration_names:
 | |
|                 migration = loader.get_migration(app_label, migration_name)
 | |
|                 migration.ancestry = [
 | |
|                     mig for mig in loader.graph.forwards_plan((app_label, migration_name))
 | |
|                     if mig[0] == migration.app_label
 | |
|                 ]
 | |
|                 merge_migrations.append(migration)
 | |
| 
 | |
|             def all_items_equal(seq):
 | |
|                 return all(item == seq[0] for item in seq[1:])
 | |
| 
 | |
|             merge_migrations_generations = zip(*(m.ancestry for m in merge_migrations))
 | |
|             common_ancestor_count = sum(1 for common_ancestor_generation
 | |
|                                         in takewhile(all_items_equal, merge_migrations_generations))
 | |
|             if not common_ancestor_count:
 | |
|                 raise ValueError("Could not find common ancestor of %s" % migration_names)
 | |
|             # Now work out the operations along each divergent branch
 | |
|             for migration in merge_migrations:
 | |
|                 migration.branch = migration.ancestry[common_ancestor_count:]
 | |
|                 migrations_ops = (loader.get_migration(node_app, node_name).operations
 | |
|                                   for node_app, node_name in migration.branch)
 | |
|                 migration.merged_operations = sum(migrations_ops, [])
 | |
|             # In future, this could use some of the Optimizer code
 | |
|             # (can_optimize_through) to automatically see if they're
 | |
|             # mergeable. For now, we always just prompt the user.
 | |
|             if self.verbosity > 0:
 | |
|                 self.stdout.write(self.style.MIGRATE_HEADING("Merging %s" % app_label))
 | |
|                 for migration in merge_migrations:
 | |
|                     self.stdout.write(self.style.MIGRATE_LABEL("  Branch %s" % migration.name))
 | |
|                     for operation in migration.merged_operations:
 | |
|                         self.stdout.write("    - %s\n" % operation.describe())
 | |
|             if questioner.ask_merge(app_label):
 | |
|                 # If they still want to merge it, then write out an empty
 | |
|                 # file depending on the migrations needing merging.
 | |
|                 numbers = [
 | |
|                     MigrationAutodetector.parse_number(migration.name)
 | |
|                     for migration in merge_migrations
 | |
|                 ]
 | |
|                 try:
 | |
|                     biggest_number = max(x for x in numbers if x is not None)
 | |
|                 except ValueError:
 | |
|                     biggest_number = 1
 | |
|                 subclass = type("Migration", (Migration,), {
 | |
|                     "dependencies": [(app_label, migration.name) for migration in merge_migrations],
 | |
|                 })
 | |
|                 migration_name = "%04i_%s" % (
 | |
|                     biggest_number + 1,
 | |
|                     self.migration_name or ("merge_%s" % get_migration_name_timestamp())
 | |
|                 )
 | |
|                 new_migration = subclass(migration_name, app_label)
 | |
|                 writer = MigrationWriter(new_migration)
 | |
| 
 | |
|                 if not self.dry_run:
 | |
|                     # Write the merge migrations file to the disk
 | |
|                     with open(writer.path, "w", encoding='utf-8') as fh:
 | |
|                         fh.write(writer.as_string())
 | |
|                     if self.verbosity > 0:
 | |
|                         self.stdout.write("\nCreated new merge migration %s" % writer.path)
 | |
|                 elif self.verbosity == 3:
 | |
|                     # Alternatively, makemigrations --merge --dry-run --verbosity 3
 | |
|                     # will output the merge migrations to stdout rather than saving
 | |
|                     # the file to the disk.
 | |
|                     self.stdout.write(self.style.MIGRATE_HEADING(
 | |
|                         "Full merge migrations file '%s':" % writer.filename) + "\n"
 | |
|                     )
 | |
|                     self.stdout.write("%s\n" % writer.as_string())
 |