mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	Migrations respected Field.db_parameters()['check'], but
DatabaseCreation was still using just Field.db_type().
Backport of 14c8456 from master
		
	
		
			
				
	
	
		
			111 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			111 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import unicode_literals
 | |
| 
 | |
| import re
 | |
| import unittest
 | |
| 
 | |
| from django.apps import apps
 | |
| from django.core.management.color import no_style
 | |
| from django.core.management.sql import (sql_create, sql_delete, sql_indexes,
 | |
|     sql_destroy_indexes, sql_all)
 | |
| from django.db import connections, DEFAULT_DB_ALIAS, router
 | |
| from django.test import TestCase
 | |
| from django.utils import six
 | |
| 
 | |
| # See also initial_sql_regress for 'custom_sql_for_model' tests
 | |
| 
 | |
| 
 | |
| class SQLCommandsTestCase(TestCase):
 | |
|     """Tests for several functions in django/core/management/sql.py"""
 | |
|     def count_ddl(self, output, cmd):
 | |
|         return len([o for o in output if o.startswith(cmd)])
 | |
| 
 | |
|     def test_sql_create(self):
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
 | |
| 
 | |
|         tables = set()
 | |
|         create_table_re = re.compile(r'^create table .(?P<table>[\w_]+).*', re.IGNORECASE)
 | |
|         reference_re = re.compile(r'.* references .(?P<table>[\w_]+).*', re.IGNORECASE)
 | |
|         for statement in output:
 | |
|             create_table = create_table_re.match(statement)
 | |
|             if create_table:
 | |
|                 # Lower since Oracle's table names are upper cased.
 | |
|                 tables.add(create_table.group('table').lower())
 | |
|                 continue
 | |
|             reference = reference_re.match(statement)
 | |
|             if reference:
 | |
|                 # Lower since Oracle's table names are upper cased.
 | |
|                 table = reference.group('table').lower()
 | |
|                 self.assertIn(
 | |
|                     table, tables, "The table %s is referenced before its creation." % table
 | |
|                 )
 | |
| 
 | |
|         self.assertEqual(tables, {
 | |
|             'commands_sql_comment', 'commands_sql_book', 'commands_sql_book_comments'
 | |
|         })
 | |
| 
 | |
|     @unittest.skipUnless('PositiveIntegerField' in connections[DEFAULT_DB_ALIAS].creation.data_type_check_constraints, 'Backend does not have checks.')
 | |
|     def test_sql_create_check(self):
 | |
|         """Regression test for #23416 -- Check that db_params['check'] is respected."""
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
 | |
|         success = False
 | |
|         for statement in output:
 | |
|             if 'CHECK' in statement:
 | |
|                 success = True
 | |
|         if not success:
 | |
|             self.fail("'CHECK' not found in ouput %s" % output)
 | |
| 
 | |
|     def test_sql_delete(self):
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         output = sql_delete(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
 | |
|         drop_tables = [o for o in output if o.startswith('DROP TABLE')]
 | |
|         self.assertEqual(len(drop_tables), 3)
 | |
|         # Lower so that Oracle's upper case tbl names wont break
 | |
|         sql = drop_tables[-1].lower()
 | |
|         six.assertRegex(self, sql, r'^drop table .commands_sql_comment.*')
 | |
| 
 | |
|     def test_sql_indexes(self):
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         output = sql_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
 | |
|         # PostgreSQL creates one additional index for CharField
 | |
|         self.assertIn(self.count_ddl(output, 'CREATE INDEX'), [3, 4])
 | |
| 
 | |
|     def test_sql_destroy_indexes(self):
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         output = sql_destroy_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
 | |
|         # PostgreSQL creates one additional index for CharField
 | |
|         self.assertIn(self.count_ddl(output, 'DROP INDEX'), [3, 4])
 | |
| 
 | |
|     def test_sql_all(self):
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         output = sql_all(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
 | |
| 
 | |
|         self.assertEqual(self.count_ddl(output, 'CREATE TABLE'), 3)
 | |
|         # PostgreSQL creates one additional index for CharField
 | |
|         self.assertIn(self.count_ddl(output, 'CREATE INDEX'), [3, 4])
 | |
| 
 | |
| 
 | |
| class TestRouter(object):
 | |
|     def allow_migrate(self, db, model):
 | |
|         return False
 | |
| 
 | |
| 
 | |
| class SQLCommandsRouterTestCase(TestCase):
 | |
|     def setUp(self):
 | |
|         self._old_routers = router.routers
 | |
|         router.routers = [TestRouter()]
 | |
| 
 | |
|     def tearDown(self):
 | |
|         router.routers = self._old_routers
 | |
| 
 | |
|     def test_router_honored(self):
 | |
|         app_config = apps.get_app_config('commands_sql')
 | |
|         for sql_command in (sql_all, sql_create, sql_delete, sql_indexes, sql_destroy_indexes):
 | |
|             if sql_command is sql_delete:
 | |
|                 output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
 | |
|             else:
 | |
|                 output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
 | |
|             self.assertEqual(len(output), 0,
 | |
|                 "%s command is not honoring routers" % sql_command.__name__)
 |