mirror of
				https://github.com/django/django.git
				synced 2025-10-25 06:36:07 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			417 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			417 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from datetime import date
 | |
| from decimal import Decimal
 | |
| 
 | |
| from django.core.exceptions import FieldDoesNotExist
 | |
| from django.db.models.query import RawQuerySet
 | |
| from django.test import TestCase, skipUnlessDBFeature
 | |
| 
 | |
| from .models import (
 | |
|     Author,
 | |
|     Book,
 | |
|     BookFkAsPk,
 | |
|     Coffee,
 | |
|     FriendlyAuthor,
 | |
|     MixedCaseIDColumn,
 | |
|     Reviewer,
 | |
| )
 | |
| 
 | |
| 
 | |
| class RawQueryTests(TestCase):
 | |
|     @classmethod
 | |
|     def setUpTestData(cls):
 | |
|         cls.a1 = Author.objects.create(
 | |
|             first_name="Joe", last_name="Smith", dob=date(1950, 9, 20)
 | |
|         )
 | |
|         cls.a2 = Author.objects.create(
 | |
|             first_name="Jill", last_name="Doe", dob=date(1920, 4, 2)
 | |
|         )
 | |
|         cls.a3 = Author.objects.create(
 | |
|             first_name="Bob", last_name="Smith", dob=date(1986, 1, 25)
 | |
|         )
 | |
|         cls.a4 = Author.objects.create(
 | |
|             first_name="Bill", last_name="Jones", dob=date(1932, 5, 10)
 | |
|         )
 | |
|         cls.b1 = Book.objects.create(
 | |
|             title="The awesome book",
 | |
|             author=cls.a1,
 | |
|             paperback=False,
 | |
|             opening_line=(
 | |
|                 "It was a bright cold day in April and the clocks were striking "
 | |
|                 "thirteen."
 | |
|             ),
 | |
|         )
 | |
|         cls.b2 = Book.objects.create(
 | |
|             title="The horrible book",
 | |
|             author=cls.a1,
 | |
|             paperback=True,
 | |
|             opening_line=(
 | |
|                 "On an evening in the latter part of May a middle-aged man "
 | |
|                 "was walking homeward from Shaston to the village of Marlott, "
 | |
|                 "in the adjoining Vale of Blakemore, or Blackmoor."
 | |
|             ),
 | |
|         )
 | |
|         cls.b3 = Book.objects.create(
 | |
|             title="Another awesome book",
 | |
|             author=cls.a1,
 | |
|             paperback=False,
 | |
|             opening_line="A squat gray building of only thirty-four stories.",
 | |
|         )
 | |
|         cls.b4 = Book.objects.create(
 | |
|             title="Some other book",
 | |
|             author=cls.a3,
 | |
|             paperback=True,
 | |
|             opening_line="It was the day my grandmother exploded.",
 | |
|         )
 | |
|         cls.c1 = Coffee.objects.create(brand="dunkin doughnuts")
 | |
|         cls.c2 = Coffee.objects.create(brand="starbucks")
 | |
|         cls.r1 = Reviewer.objects.create()
 | |
|         cls.r2 = Reviewer.objects.create()
 | |
|         cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4)
 | |
| 
 | |
|     def assertSuccessfulRawQuery(
 | |
|         self,
 | |
|         model,
 | |
|         query,
 | |
|         expected_results,
 | |
|         expected_annotations=(),
 | |
|         params=[],
 | |
|         translations=None,
 | |
|     ):
 | |
|         """
 | |
|         Execute the passed query against the passed model and check the output
 | |
|         """
 | |
|         results = list(
 | |
|             model.objects.raw(query, params=params, translations=translations)
 | |
|         )
 | |
|         self.assertProcessed(model, results, expected_results, expected_annotations)
 | |
|         self.assertAnnotations(results, expected_annotations)
 | |
| 
 | |
|     def assertProcessed(self, model, results, orig, expected_annotations=()):
 | |
|         """
 | |
|         Compare the results of a raw query against expected results
 | |
|         """
 | |
|         self.assertEqual(len(results), len(orig))
 | |
|         for index, item in enumerate(results):
 | |
|             orig_item = orig[index]
 | |
|             for annotation in expected_annotations:
 | |
|                 setattr(orig_item, *annotation)
 | |
| 
 | |
|             for field in model._meta.fields:
 | |
|                 # All values on the model are equal
 | |
|                 self.assertEqual(
 | |
|                     getattr(item, field.attname), getattr(orig_item, field.attname)
 | |
|                 )
 | |
|                 # This includes checking that they are the same type
 | |
|                 self.assertEqual(
 | |
|                     type(getattr(item, field.attname)),
 | |
|                     type(getattr(orig_item, field.attname)),
 | |
|                 )
 | |
| 
 | |
|     def assertNoAnnotations(self, results):
 | |
|         """
 | |
|         The results of a raw query contain no annotations
 | |
|         """
 | |
|         self.assertAnnotations(results, ())
 | |
| 
 | |
|     def assertAnnotations(self, results, expected_annotations):
 | |
|         """
 | |
|         The passed raw query results contain the expected annotations
 | |
|         """
 | |
|         if expected_annotations:
 | |
|             for index, result in enumerate(results):
 | |
|                 annotation, value = expected_annotations[index]
 | |
|                 self.assertTrue(hasattr(result, annotation))
 | |
|                 self.assertEqual(getattr(result, annotation), value)
 | |
| 
 | |
|     def test_rawqueryset_repr(self):
 | |
|         queryset = RawQuerySet(raw_query="SELECT * FROM raw_query_author")
 | |
|         self.assertEqual(
 | |
|             repr(queryset), "<RawQuerySet: SELECT * FROM raw_query_author>"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             repr(queryset.query), "<RawQuery: SELECT * FROM raw_query_author>"
 | |
|         )
 | |
| 
 | |
|     def test_simple_raw_query(self):
 | |
|         """
 | |
|         Basic test of raw query with a simple database query
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_author"
 | |
|         authors = Author.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Author, query, authors)
 | |
| 
 | |
|     def test_raw_query_lazy(self):
 | |
|         """
 | |
|         Raw queries are lazy: they aren't actually executed until they're
 | |
|         iterated over.
 | |
|         """
 | |
|         q = Author.objects.raw("SELECT * FROM raw_query_author")
 | |
|         self.assertIsNone(q.query.cursor)
 | |
|         list(q)
 | |
|         self.assertIsNotNone(q.query.cursor)
 | |
| 
 | |
|     def test_FK_raw_query(self):
 | |
|         """
 | |
|         Test of a simple raw query against a model containing a foreign key
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_book"
 | |
|         books = Book.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Book, query, books)
 | |
| 
 | |
|     def test_db_column_handler(self):
 | |
|         """
 | |
|         Test of a simple raw query against a model containing a field with
 | |
|         db_column defined.
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_coffee"
 | |
|         coffees = Coffee.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Coffee, query, coffees)
 | |
| 
 | |
|     def test_pk_with_mixed_case_db_column(self):
 | |
|         """
 | |
|         A raw query with a model that has a pk db_column with mixed case.
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_mixedcaseidcolumn"
 | |
|         queryset = MixedCaseIDColumn.objects.all()
 | |
|         self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset)
 | |
| 
 | |
|     def test_order_handler(self):
 | |
|         """Raw query tolerates columns being returned in any order."""
 | |
|         selects = (
 | |
|             ("dob, last_name, first_name, id"),
 | |
|             ("last_name, dob, first_name, id"),
 | |
|             ("first_name, last_name, dob, id"),
 | |
|         )
 | |
| 
 | |
|         for select in selects:
 | |
|             query = "SELECT %s FROM raw_query_author" % select
 | |
|             authors = Author.objects.all()
 | |
|             self.assertSuccessfulRawQuery(Author, query, authors)
 | |
| 
 | |
|     def test_translations(self):
 | |
|         """
 | |
|         Test of raw query's optional ability to translate unexpected result
 | |
|         column names to specific model fields
 | |
|         """
 | |
|         query = (
 | |
|             "SELECT first_name AS first, last_name AS last, dob, id "
 | |
|             "FROM raw_query_author"
 | |
|         )
 | |
|         translations = {"first": "first_name", "last": "last_name"}
 | |
|         authors = Author.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
 | |
| 
 | |
|     def test_params(self):
 | |
|         """
 | |
|         Test passing optional query parameters
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_author WHERE first_name = %s"
 | |
|         author = Author.objects.all()[2]
 | |
|         params = [author.first_name]
 | |
|         qset = Author.objects.raw(query, params=params)
 | |
|         results = list(qset)
 | |
|         self.assertProcessed(Author, results, [author])
 | |
|         self.assertNoAnnotations(results)
 | |
|         self.assertEqual(len(results), 1)
 | |
|         self.assertIsInstance(repr(qset), str)
 | |
| 
 | |
|     def test_params_none(self):
 | |
|         query = "SELECT * FROM raw_query_author WHERE first_name like 'J%'"
 | |
|         qset = Author.objects.raw(query, params=None)
 | |
|         self.assertEqual(len(qset), 2)
 | |
| 
 | |
|     def test_escaped_percent(self):
 | |
|         query = "SELECT * FROM raw_query_author WHERE first_name like 'J%%'"
 | |
|         qset = Author.objects.raw(query)
 | |
|         self.assertEqual(len(qset), 2)
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_paramstyle_pyformat")
 | |
|     def test_pyformat_params(self):
 | |
|         """
 | |
|         Test passing optional query parameters
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
 | |
|         author = Author.objects.all()[2]
 | |
|         params = {"first": author.first_name}
 | |
|         qset = Author.objects.raw(query, params=params)
 | |
|         results = list(qset)
 | |
|         self.assertProcessed(Author, results, [author])
 | |
|         self.assertNoAnnotations(results)
 | |
|         self.assertEqual(len(results), 1)
 | |
|         self.assertIsInstance(repr(qset), str)
 | |
| 
 | |
|     def test_query_representation(self):
 | |
|         """
 | |
|         Test representation of raw query with parameters
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s"
 | |
|         qset = Author.objects.raw(query, {"last": "foo"})
 | |
|         self.assertEqual(
 | |
|             repr(qset),
 | |
|             "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>",
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             repr(qset.query),
 | |
|             "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>",
 | |
|         )
 | |
| 
 | |
|         query = "SELECT * FROM raw_query_author WHERE last_name = %s"
 | |
|         qset = Author.objects.raw(query, {"foo"})
 | |
|         self.assertEqual(
 | |
|             repr(qset),
 | |
|             "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>",
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             repr(qset.query),
 | |
|             "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>",
 | |
|         )
 | |
| 
 | |
|     def test_many_to_many(self):
 | |
|         """
 | |
|         Test of a simple raw query against a model containing a m2m field
 | |
|         """
 | |
|         query = "SELECT * FROM raw_query_reviewer"
 | |
|         reviewers = Reviewer.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Reviewer, query, reviewers)
 | |
| 
 | |
|     def test_extra_conversions(self):
 | |
|         """Extra translations are ignored."""
 | |
|         query = "SELECT * FROM raw_query_author"
 | |
|         translations = {"something": "else"}
 | |
|         authors = Author.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
 | |
| 
 | |
|     def test_missing_fields(self):
 | |
|         query = "SELECT id, first_name, dob FROM raw_query_author"
 | |
|         for author in Author.objects.raw(query):
 | |
|             self.assertIsNotNone(author.first_name)
 | |
|             # last_name isn't given, but it will be retrieved on demand
 | |
|             self.assertIsNotNone(author.last_name)
 | |
| 
 | |
|     def test_missing_fields_without_PK(self):
 | |
|         query = "SELECT first_name, dob FROM raw_query_author"
 | |
|         msg = "Raw query must include the primary key"
 | |
|         with self.assertRaisesMessage(FieldDoesNotExist, msg):
 | |
|             list(Author.objects.raw(query))
 | |
| 
 | |
|     def test_annotations(self):
 | |
|         query = (
 | |
|             "SELECT a.*, count(b.id) as book_count "
 | |
|             "FROM raw_query_author a "
 | |
|             "LEFT JOIN raw_query_book b ON a.id = b.author_id "
 | |
|             "GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id"
 | |
|         )
 | |
|         expected_annotations = (
 | |
|             ("book_count", 3),
 | |
|             ("book_count", 0),
 | |
|             ("book_count", 1),
 | |
|             ("book_count", 0),
 | |
|         )
 | |
|         authors = Author.objects.order_by("pk")
 | |
|         self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations)
 | |
| 
 | |
|     def test_white_space_query(self):
 | |
|         query = "    SELECT * FROM raw_query_author"
 | |
|         authors = Author.objects.all()
 | |
|         self.assertSuccessfulRawQuery(Author, query, authors)
 | |
| 
 | |
|     def test_multiple_iterations(self):
 | |
|         query = "SELECT * FROM raw_query_author"
 | |
|         normal_authors = Author.objects.all()
 | |
|         raw_authors = Author.objects.raw(query)
 | |
| 
 | |
|         # First Iteration
 | |
|         first_iterations = 0
 | |
|         for index, raw_author in enumerate(raw_authors):
 | |
|             self.assertEqual(normal_authors[index], raw_author)
 | |
|             first_iterations += 1
 | |
| 
 | |
|         # Second Iteration
 | |
|         second_iterations = 0
 | |
|         for index, raw_author in enumerate(raw_authors):
 | |
|             self.assertEqual(normal_authors[index], raw_author)
 | |
|             second_iterations += 1
 | |
| 
 | |
|         self.assertEqual(first_iterations, second_iterations)
 | |
| 
 | |
|     def test_get_item(self):
 | |
|         # Indexing on RawQuerySets
 | |
|         query = "SELECT * FROM raw_query_author ORDER BY id ASC"
 | |
|         third_author = Author.objects.raw(query)[2]
 | |
|         self.assertEqual(third_author.first_name, "Bob")
 | |
| 
 | |
|         first_two = Author.objects.raw(query)[0:2]
 | |
|         self.assertEqual(len(first_two), 2)
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             Author.objects.raw(query)["test"]
 | |
| 
 | |
|     def test_inheritance(self):
 | |
|         f = FriendlyAuthor.objects.create(
 | |
|             first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28)
 | |
|         )
 | |
|         query = "SELECT * FROM raw_query_friendlyauthor"
 | |
|         self.assertEqual([o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk])
 | |
| 
 | |
|     def test_query_count(self):
 | |
|         self.assertNumQueries(
 | |
|             1, list, Author.objects.raw("SELECT * FROM raw_query_author")
 | |
|         )
 | |
| 
 | |
|     def test_subquery_in_raw_sql(self):
 | |
|         list(
 | |
|             Book.objects.raw(
 | |
|                 "SELECT id FROM "
 | |
|                 "(SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq"
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     def test_db_column_name_is_used_in_raw_query(self):
 | |
|         """
 | |
|         Regression test that ensures the `column` attribute on the field is
 | |
|         used to generate the list of fields included in the query, as opposed
 | |
|         to the `attname`. This is important when the primary key is a
 | |
|         ForeignKey field because `attname` and `column` are not necessarily the
 | |
|         same.
 | |
|         """
 | |
|         b = BookFkAsPk.objects.create(book=self.b1)
 | |
|         self.assertEqual(
 | |
|             list(
 | |
|                 BookFkAsPk.objects.raw(
 | |
|                     "SELECT not_the_default FROM raw_query_bookfkaspk"
 | |
|                 )
 | |
|             ),
 | |
|             [b],
 | |
|         )
 | |
| 
 | |
|     def test_decimal_parameter(self):
 | |
|         c = Coffee.objects.create(brand="starbucks", price=20.5)
 | |
|         qs = Coffee.objects.raw(
 | |
|             "SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)]
 | |
|         )
 | |
|         self.assertEqual(list(qs), [c])
 | |
| 
 | |
|     def test_result_caching(self):
 | |
|         with self.assertNumQueries(1):
 | |
|             books = Book.objects.raw("SELECT * FROM raw_query_book")
 | |
|             list(books)
 | |
|             list(books)
 | |
| 
 | |
|     def test_iterator(self):
 | |
|         with self.assertNumQueries(2):
 | |
|             books = Book.objects.raw("SELECT * FROM raw_query_book")
 | |
|             list(books.iterator())
 | |
|             list(books.iterator())
 | |
| 
 | |
|     def test_bool(self):
 | |
|         self.assertIs(bool(Book.objects.raw("SELECT * FROM raw_query_book")), True)
 | |
|         self.assertIs(
 | |
|             bool(Book.objects.raw("SELECT * FROM raw_query_book WHERE id = 0")), False
 | |
|         )
 | |
| 
 | |
|     def test_len(self):
 | |
|         self.assertEqual(len(Book.objects.raw("SELECT * FROM raw_query_book")), 4)
 | |
|         self.assertEqual(
 | |
|             len(Book.objects.raw("SELECT * FROM raw_query_book WHERE id = 0")), 0
 | |
|         )
 |