1
0
mirror of https://github.com/django/django.git synced 2025-10-24 06:06:09 +00:00

Added a ManyToManyField(db_constraint=False) option, this allows not creating constraints on the intermediary models.

This commit is contained in:
Alex Gaynor
2013-03-07 11:24:51 -08:00
parent 4cccb85e29
commit bbbd698c7a
5 changed files with 65 additions and 22 deletions

View File

@@ -12,13 +12,12 @@ from django.db import (backend, connection, connections, DEFAULT_DB_ALIAS,
from django.db.backends.signals import connection_created
from django.db.backends.postgresql_psycopg2 import version as pg_version
from django.db.models import Sum, Avg, Variance, StdDev
from django.db.utils import ConnectionHandler, DatabaseError
from django.db.utils import ConnectionHandler
from django.test import (TestCase, skipUnlessDBFeature, skipIfDBFeature,
TransactionTestCase)
from django.test.utils import override_settings, str_prefix
from django.utils import six
from django.utils import six, unittest
from django.utils.six.moves import xrange
from django.utils import unittest
from . import models
@@ -52,7 +51,7 @@ class OracleChecks(unittest.TestCase):
convert_unicode = backend.convert_unicode
cursor = connection.cursor()
cursor.callproc(convert_unicode('DBMS_SESSION.SET_IDENTIFIER'),
[convert_unicode('_django_testing!'),])
[convert_unicode('_django_testing!')])
@unittest.skipUnless(connection.vendor == 'oracle',
"No need to check Oracle cursor semantics")
@@ -72,7 +71,7 @@ class OracleChecks(unittest.TestCase):
c = connection.cursor()
c.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
long_str = ''.join([six.text_type(x) for x in xrange(4000)])
c.execute('INSERT INTO ltext VALUES (%s)',[long_str])
c.execute('INSERT INTO ltext VALUES (%s)', [long_str])
c.execute('SELECT text FROM ltext')
row = c.fetchone()
self.assertEqual(long_str, row[0].read())
@@ -99,6 +98,7 @@ class OracleChecks(unittest.TestCase):
c.execute(query)
self.assertEqual(c.fetchone()[0], 1)
class MySQLTests(TestCase):
@unittest.skipUnless(connection.vendor == 'mysql',
"Test valid only for MySQL")
@@ -117,7 +117,7 @@ class MySQLTests(TestCase):
found_reset = False
for sql in statements:
found_reset = found_reset or 'ALTER TABLE' in sql
if connection.mysql_version < (5,0,13):
if connection.mysql_version < (5, 0, 13):
self.assertTrue(found_reset)
else:
self.assertFalse(found_reset)
@@ -182,6 +182,7 @@ class LastExecutedQueryTest(TestCase):
self.assertEqual(connection.queries[-1]['sql'],
str_prefix("QUERY = %(_)s\"SELECT strftime('%%Y', 'now');\" - PARAMS = ()"))
class ParameterHandlingTest(TestCase):
def test_bad_parameter_count(self):
"An executemany call with too many/not enough parameters will raise an exception (Refs #12612)"
@@ -191,8 +192,9 @@ class ParameterHandlingTest(TestCase):
connection.ops.quote_name('root'),
connection.ops.quote_name('square')
))
self.assertRaises(Exception, cursor.executemany, query, [(1,2,3),])
self.assertRaises(Exception, cursor.executemany, query, [(1,),])
self.assertRaises(Exception, cursor.executemany, query, [(1, 2, 3)])
self.assertRaises(Exception, cursor.executemany, query, [(1,)])
# Unfortunately, the following tests would be a good test to run on all
# backends, but it breaks MySQL hard. Until #13711 is fixed, it can't be run
@@ -240,6 +242,7 @@ class LongNameTest(TestCase):
for statement in connection.ops.sql_flush(no_style(), tables, sequences):
cursor.execute(statement)
class SequenceResetTest(TestCase):
def test_generic_relation(self):
"Sequence names are correct when resetting generic relations (Ref #13941)"
@@ -257,6 +260,7 @@ class SequenceResetTest(TestCase):
obj = models.Post.objects.create(name='New post', text='goodbye world')
self.assertTrue(obj.pk > 10)
class PostgresVersionTest(TestCase):
def assert_parses(self, version_string, version):
self.assertEqual(pg_version._parse_version(version_string), version)
@@ -291,6 +295,7 @@ class PostgresVersionTest(TestCase):
conn = OlderConnectionMock()
self.assertEqual(pg_version.get_version(conn), 80300)
class PostgresNewConnectionTest(TestCase):
"""
#17062: PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
@@ -338,17 +343,18 @@ class ConnectionCreatedSignalTest(TestCase):
@skipUnlessDBFeature('test_db_allows_multiple_connections')
def test_signal(self):
data = {}
def receiver(sender, connection, **kwargs):
data["connection"] = connection
connection_created.connect(receiver)
connection.close()
cursor = connection.cursor()
connection.cursor()
self.assertTrue(data["connection"].connection is connection.connection)
connection_created.disconnect(receiver)
data.clear()
cursor = connection.cursor()
connection.cursor()
self.assertTrue(data == {})
@@ -443,7 +449,7 @@ class BackendTestCase(TestCase):
old_password = connection.settings_dict['PASSWORD']
connection.settings_dict['PASSWORD'] = "françois"
try:
cursor = connection.cursor()
connection.cursor()
except DatabaseError:
# As password is probably wrong, a database exception is expected
pass
@@ -470,6 +476,7 @@ class BackendTestCase(TestCase):
with self.assertRaises(DatabaseError):
cursor.execute(query)
# We don't make these tests conditional because that means we would need to
# check and differentiate between:
# * MySQL+InnoDB, MySQL+MYISAM (something we currently can't do).
@@ -477,7 +484,6 @@ class BackendTestCase(TestCase):
# on or not, something that would be controlled by runtime support and user
# preference.
# verify if its type is django.database.db.IntegrityError.
class FkConstraintsTests(TransactionTestCase):
def setUp(self):
@@ -581,6 +587,7 @@ class ThreadTests(TestCase):
connections_dict = {}
connection.cursor()
connections_dict[id(connection)] = connection
def runner():
# Passing django.db.connection between threads doesn't work while
# connections[DEFAULT_DB_ALIAS] does.
@@ -602,7 +609,7 @@ class ThreadTests(TestCase):
# Finish by closing the connections opened by the other threads (the
# connection opened in the main thread will automatically be closed on
# teardown).
for conn in connections_dict.values() :
for conn in connections_dict.values():
if conn is not connection:
conn.close()
@@ -616,6 +623,7 @@ class ThreadTests(TestCase):
connections_dict = {}
for conn in connections.all():
connections_dict[id(conn)] = conn
def runner():
from django.db import connections
for conn in connections.all():
@@ -682,6 +690,7 @@ class ThreadTests(TestCase):
"""
# First, without explicitly enabling the connection for sharing.
exceptions = set()
def runner1():
def runner2(other_thread_connection):
try:
@@ -699,6 +708,7 @@ class ThreadTests(TestCase):
# Then, with explicitly enabling the connection for sharing.
exceptions = set()
def runner1():
def runner2(other_thread_connection):
try:
@@ -746,3 +756,14 @@ class DBConstraintTestCase(TransactionTestCase):
with self.assertRaises(models.Object.DoesNotExist):
ref.obj
def test_many_to_many(self):
obj = models.Object.objects.create()
obj.related_objects.create()
self.assertEqual(models.Object.objects.count(), 2)
self.assertEqual(obj.related_objects.count(), 1)
intermediary_model = models.Object._meta.get_field_by_name("related_objects")[0].rel.through
intermediary_model.objects.create(from_object_id=obj.id, to_object_id=12345)
self.assertEqual(obj.related_objects.count(), 1)
self.assertEqual(intermediary_model.objects.count(), 2)