mirror of
				https://github.com/django/django.git
				synced 2025-10-31 01:25:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			394 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			394 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| from __future__ import unicode_literals
 | |
| 
 | |
| from datetime import datetime
 | |
| 
 | |
| from django.core import serializers
 | |
| from django.core.serializers import SerializerDoesNotExist
 | |
| from django.core.serializers.base import ProgressBar
 | |
| from django.db import connection, transaction
 | |
| from django.http import HttpResponse
 | |
| from django.test import (
 | |
|     SimpleTestCase, mock, override_settings, skipUnlessDBFeature,
 | |
| )
 | |
| from django.test.utils import Approximate
 | |
| from django.utils.functional import curry
 | |
| from django.utils.six import StringIO
 | |
| 
 | |
| from .models import (
 | |
|     Actor, Article, Author, AuthorProfile, BaseModel, Category, ComplexModel,
 | |
|     Movie, Player, ProxyBaseModel, ProxyProxyBaseModel, Score, Team,
 | |
| )
 | |
| 
 | |
| 
 | |
| @override_settings(
 | |
|     SERIALIZATION_MODULES={
 | |
|         "json2": "django.core.serializers.json",
 | |
|     }
 | |
| )
 | |
| class SerializerRegistrationTests(SimpleTestCase):
 | |
|     def setUp(self):
 | |
|         self.old_serializers = serializers._serializers
 | |
|         serializers._serializers = {}
 | |
| 
 | |
|     def tearDown(self):
 | |
|         serializers._serializers = self.old_serializers
 | |
| 
 | |
|     def test_register(self):
 | |
|         "Registering a new serializer populates the full registry. Refs #14823"
 | |
|         serializers.register_serializer('json3', 'django.core.serializers.json')
 | |
| 
 | |
|         public_formats = serializers.get_public_serializer_formats()
 | |
|         self.assertIn('json3', public_formats)
 | |
|         self.assertIn('json2', public_formats)
 | |
|         self.assertIn('xml', public_formats)
 | |
| 
 | |
|     def test_unregister(self):
 | |
|         "Unregistering a serializer doesn't cause the registry to be repopulated. Refs #14823"
 | |
|         serializers.unregister_serializer('xml')
 | |
|         serializers.register_serializer('json3', 'django.core.serializers.json')
 | |
| 
 | |
|         public_formats = serializers.get_public_serializer_formats()
 | |
| 
 | |
|         self.assertNotIn('xml', public_formats)
 | |
|         self.assertIn('json3', public_formats)
 | |
| 
 | |
|     def test_unregister_unknown_serializer(self):
 | |
|         with self.assertRaises(SerializerDoesNotExist):
 | |
|             serializers.unregister_serializer("nonsense")
 | |
| 
 | |
|     def test_builtin_serializers(self):
 | |
|         "Requesting a list of serializer formats popuates the registry"
 | |
|         all_formats = set(serializers.get_serializer_formats())
 | |
|         public_formats = set(serializers.get_public_serializer_formats())
 | |
| 
 | |
|         self.assertIn('xml', all_formats),
 | |
|         self.assertIn('xml', public_formats)
 | |
| 
 | |
|         self.assertIn('json2', all_formats)
 | |
|         self.assertIn('json2', public_formats)
 | |
| 
 | |
|         self.assertIn('python', all_formats)
 | |
|         self.assertNotIn('python', public_formats)
 | |
| 
 | |
|     def test_get_unknown_serializer(self):
 | |
|         """
 | |
|         #15889: get_serializer('nonsense') raises a SerializerDoesNotExist
 | |
|         """
 | |
|         with self.assertRaises(SerializerDoesNotExist):
 | |
|             serializers.get_serializer("nonsense")
 | |
| 
 | |
|         with self.assertRaises(KeyError):
 | |
|             serializers.get_serializer("nonsense")
 | |
| 
 | |
|         # SerializerDoesNotExist is instantiated with the nonexistent format
 | |
|         with self.assertRaises(SerializerDoesNotExist) as cm:
 | |
|             serializers.get_serializer("nonsense")
 | |
|         self.assertEqual(cm.exception.args, ("nonsense",))
 | |
| 
 | |
|     def test_get_unknown_deserializer(self):
 | |
|         with self.assertRaises(SerializerDoesNotExist):
 | |
|             serializers.get_deserializer("nonsense")
 | |
| 
 | |
| 
 | |
| class SerializersTestBase(object):
 | |
|     serializer_name = None  # Set by subclasses to the serialization format name
 | |
| 
 | |
|     @staticmethod
 | |
|     def _comparison_value(value):
 | |
|         return value
 | |
| 
 | |
|     def setUp(self):
 | |
|         sports = Category.objects.create(name="Sports")
 | |
|         music = Category.objects.create(name="Music")
 | |
|         op_ed = Category.objects.create(name="Op-Ed")
 | |
| 
 | |
|         self.joe = Author.objects.create(name="Joe")
 | |
|         self.jane = Author.objects.create(name="Jane")
 | |
| 
 | |
|         self.a1 = Article(
 | |
|             author=self.jane,
 | |
|             headline="Poker has no place on ESPN",
 | |
|             pub_date=datetime(2006, 6, 16, 11, 00)
 | |
|         )
 | |
|         self.a1.save()
 | |
|         self.a1.categories.set([sports, op_ed])
 | |
| 
 | |
|         self.a2 = Article(
 | |
|             author=self.joe,
 | |
|             headline="Time to reform copyright",
 | |
|             pub_date=datetime(2006, 6, 16, 13, 00, 11, 345)
 | |
|         )
 | |
|         self.a2.save()
 | |
|         self.a2.categories.set([music, op_ed])
 | |
| 
 | |
|     def test_serialize(self):
 | |
|         """Tests that basic serialization works."""
 | |
|         serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
 | |
|         self.assertTrue(self._validate_output(serial_str))
 | |
| 
 | |
|     def test_serializer_roundtrip(self):
 | |
|         """Tests that serialized content can be deserialized."""
 | |
|         serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
 | |
|         models = list(serializers.deserialize(self.serializer_name, serial_str))
 | |
|         self.assertEqual(len(models), 2)
 | |
| 
 | |
|     def test_serialize_to_stream(self):
 | |
|         obj = ComplexModel(field1='first', field2='second', field3='third')
 | |
|         obj.save_base(raw=True)
 | |
| 
 | |
|         # Serialize the test database to a stream
 | |
|         for stream in (StringIO(), HttpResponse()):
 | |
|             serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream)
 | |
| 
 | |
|             # Serialize normally for a comparison
 | |
|             string_data = serializers.serialize(self.serializer_name, [obj], indent=2)
 | |
| 
 | |
|             # Check that the two are the same
 | |
|             if isinstance(stream, StringIO):
 | |
|                 self.assertEqual(string_data, stream.getvalue())
 | |
|             else:
 | |
|                 self.assertEqual(string_data, stream.content.decode('utf-8'))
 | |
| 
 | |
|     def test_serialize_specific_fields(self):
 | |
|         obj = ComplexModel(field1='first', field2='second', field3='third')
 | |
|         obj.save_base(raw=True)
 | |
| 
 | |
|         # Serialize then deserialize the test database
 | |
|         serialized_data = serializers.serialize(
 | |
|             self.serializer_name, [obj], indent=2, fields=('field1', 'field3')
 | |
|         )
 | |
|         result = next(serializers.deserialize(self.serializer_name, serialized_data))
 | |
| 
 | |
|         # Check that the deserialized object contains data in only the serialized fields.
 | |
|         self.assertEqual(result.object.field1, 'first')
 | |
|         self.assertEqual(result.object.field2, '')
 | |
|         self.assertEqual(result.object.field3, 'third')
 | |
| 
 | |
|     def test_altering_serialized_output(self):
 | |
|         """
 | |
|         Tests the ability to create new objects by
 | |
|         modifying serialized content.
 | |
|         """
 | |
|         old_headline = "Poker has no place on ESPN"
 | |
|         new_headline = "Poker has no place on television"
 | |
|         serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
 | |
|         serial_str = serial_str.replace(old_headline, new_headline)
 | |
|         models = list(serializers.deserialize(self.serializer_name, serial_str))
 | |
| 
 | |
|         # Prior to saving, old headline is in place
 | |
|         self.assertTrue(Article.objects.filter(headline=old_headline))
 | |
|         self.assertFalse(Article.objects.filter(headline=new_headline))
 | |
| 
 | |
|         for model in models:
 | |
|             model.save()
 | |
| 
 | |
|         # After saving, new headline is in place
 | |
|         self.assertTrue(Article.objects.filter(headline=new_headline))
 | |
|         self.assertFalse(Article.objects.filter(headline=old_headline))
 | |
| 
 | |
|     def test_one_to_one_as_pk(self):
 | |
|         """
 | |
|         Tests that if you use your own primary key field
 | |
|         (such as a OneToOneField), it doesn't appear in the
 | |
|         serialized field list - it replaces the pk identifier.
 | |
|         """
 | |
|         AuthorProfile.objects.create(author=self.joe, date_of_birth=datetime(1970, 1, 1))
 | |
|         serial_str = serializers.serialize(self.serializer_name, AuthorProfile.objects.all())
 | |
|         self.assertFalse(self._get_field_values(serial_str, 'author'))
 | |
| 
 | |
|         for obj in serializers.deserialize(self.serializer_name, serial_str):
 | |
|             self.assertEqual(obj.object.pk, self._comparison_value(self.joe.pk))
 | |
| 
 | |
|     def test_serialize_field_subset(self):
 | |
|         """Tests that output can be restricted to a subset of fields"""
 | |
|         valid_fields = ('headline', 'pub_date')
 | |
|         invalid_fields = ("author", "categories")
 | |
|         serial_str = serializers.serialize(self.serializer_name, Article.objects.all(), fields=valid_fields)
 | |
|         for field_name in invalid_fields:
 | |
|             self.assertFalse(self._get_field_values(serial_str, field_name))
 | |
| 
 | |
|         for field_name in valid_fields:
 | |
|             self.assertTrue(self._get_field_values(serial_str, field_name))
 | |
| 
 | |
|     def test_serialize_unicode(self):
 | |
|         """Tests that unicode makes the roundtrip intact"""
 | |
|         actor_name = "Za\u017c\u00f3\u0142\u0107"
 | |
|         movie_title = 'G\u0119\u015bl\u0105 ja\u017a\u0144'
 | |
|         ac = Actor(name=actor_name)
 | |
|         mv = Movie(title=movie_title, actor=ac)
 | |
|         ac.save()
 | |
|         mv.save()
 | |
| 
 | |
|         serial_str = serializers.serialize(self.serializer_name, [mv])
 | |
|         self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title)
 | |
|         self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name)
 | |
| 
 | |
|         obj_list = list(serializers.deserialize(self.serializer_name, serial_str))
 | |
|         mv_obj = obj_list[0].object
 | |
|         self.assertEqual(mv_obj.title, movie_title)
 | |
| 
 | |
|     def test_serialize_progressbar(self):
 | |
|         fake_stdout = StringIO()
 | |
|         serializers.serialize(
 | |
|             self.serializer_name, Article.objects.all(),
 | |
|             progress_output=fake_stdout, object_count=Article.objects.count()
 | |
|         )
 | |
|         self.assertTrue(
 | |
|             fake_stdout.getvalue().endswith('[' + '.' * ProgressBar.progress_width + ']\n')
 | |
|         )
 | |
| 
 | |
|     def test_serialize_superfluous_queries(self):
 | |
|         """Ensure no superfluous queries are made when serializing ForeignKeys
 | |
| 
 | |
|         #17602
 | |
|         """
 | |
|         ac = Actor(name='Actor name')
 | |
|         ac.save()
 | |
|         mv = Movie(title='Movie title', actor_id=ac.pk)
 | |
|         mv.save()
 | |
| 
 | |
|         with self.assertNumQueries(0):
 | |
|             serializers.serialize(self.serializer_name, [mv])
 | |
| 
 | |
|     def test_serialize_with_null_pk(self):
 | |
|         """
 | |
|         Tests that serialized data with no primary key results
 | |
|         in a model instance with no id
 | |
|         """
 | |
|         category = Category(name="Reference")
 | |
|         serial_str = serializers.serialize(self.serializer_name, [category])
 | |
|         pk_value = self._get_pk_values(serial_str)[0]
 | |
|         self.assertFalse(pk_value)
 | |
| 
 | |
|         cat_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0].object
 | |
|         self.assertIsNone(cat_obj.id)
 | |
| 
 | |
|     def test_float_serialization(self):
 | |
|         """Tests that float values serialize and deserialize intact"""
 | |
|         sc = Score(score=3.4)
 | |
|         sc.save()
 | |
|         serial_str = serializers.serialize(self.serializer_name, [sc])
 | |
|         deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
 | |
|         self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1))
 | |
| 
 | |
|     def test_deferred_field_serialization(self):
 | |
|         author = Author.objects.create(name='Victor Hugo')
 | |
|         author = Author.objects.defer('name').get(pk=author.pk)
 | |
|         serial_str = serializers.serialize(self.serializer_name, [author])
 | |
|         deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
 | |
|         # Check the class instead of using isinstance() because model instances
 | |
|         # with deferred fields (e.g. Author_Deferred_name) will pass isinstance.
 | |
|         self.assertEqual(deserial_objs[0].object.__class__, Author)
 | |
| 
 | |
|     def test_custom_field_serialization(self):
 | |
|         """Tests that custom fields serialize and deserialize intact"""
 | |
|         team_str = "Spartak Moskva"
 | |
|         player = Player()
 | |
|         player.name = "Soslan Djanaev"
 | |
|         player.rank = 1
 | |
|         player.team = Team(team_str)
 | |
|         player.save()
 | |
|         serial_str = serializers.serialize(self.serializer_name, Player.objects.all())
 | |
|         team = self._get_field_values(serial_str, "team")
 | |
|         self.assertTrue(team)
 | |
|         self.assertEqual(team[0], team_str)
 | |
| 
 | |
|         deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
 | |
|         self.assertEqual(deserial_objs[0].object.team.to_string(), player.team.to_string())
 | |
| 
 | |
|     def test_pre_1000ad_date(self):
 | |
|         """Tests that year values before 1000AD are properly formatted"""
 | |
|         # Regression for #12524 -- dates before 1000AD get prefixed
 | |
|         # 0's on the year
 | |
|         a = Article.objects.create(
 | |
|             author=self.jane,
 | |
|             headline="Nobody remembers the early years",
 | |
|             pub_date=datetime(1, 2, 3, 4, 5, 6))
 | |
| 
 | |
|         serial_str = serializers.serialize(self.serializer_name, [a])
 | |
|         date_values = self._get_field_values(serial_str, "pub_date")
 | |
|         self.assertEqual(date_values[0].replace('T', ' '), "0001-02-03 04:05:06")
 | |
| 
 | |
|     def test_pkless_serialized_strings(self):
 | |
|         """
 | |
|         Tests that serialized strings without PKs
 | |
|         can be turned into models
 | |
|         """
 | |
|         deserial_objs = list(serializers.deserialize(self.serializer_name, self.pkless_str))
 | |
|         for obj in deserial_objs:
 | |
|             self.assertFalse(obj.object.id)
 | |
|             obj.save()
 | |
|         self.assertEqual(Category.objects.all().count(), 5)
 | |
| 
 | |
|     def test_deterministic_mapping_ordering(self):
 | |
|         """Mapping such as fields should be deterministically ordered. (#24558)"""
 | |
|         output = serializers.serialize(self.serializer_name, [self.a1], indent=2)
 | |
|         categories = self.a1.categories.values_list('pk', flat=True)
 | |
|         self.assertEqual(output, self.mapping_ordering_str % {
 | |
|             'article_pk': self.a1.pk,
 | |
|             'author_pk': self.a1.author_id,
 | |
|             'first_category_pk': categories[0],
 | |
|             'second_category_pk': categories[1],
 | |
|         })
 | |
| 
 | |
|     def test_deserialize_force_insert(self):
 | |
|         """Tests that deserialized content can be saved with force_insert as a parameter."""
 | |
|         serial_str = serializers.serialize(self.serializer_name, [self.a1])
 | |
|         deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0]
 | |
|         with mock.patch('django.db.models.Model') as mock_model:
 | |
|             deserial_obj.save(force_insert=False)
 | |
|             mock_model.save_base.assert_called_with(deserial_obj.object, raw=True, using=None, force_insert=False)
 | |
| 
 | |
|     @skipUnlessDBFeature('can_defer_constraint_checks')
 | |
|     def test_serialize_proxy_model(self):
 | |
|         BaseModel.objects.create(parent_data=1)
 | |
|         base_objects = BaseModel.objects.all()
 | |
|         proxy_objects = ProxyBaseModel.objects.all()
 | |
|         proxy_proxy_objects = ProxyProxyBaseModel.objects.all()
 | |
|         base_data = serializers.serialize("json", base_objects)
 | |
|         proxy_data = serializers.serialize("json", proxy_objects)
 | |
|         proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects)
 | |
|         self.assertEqual(base_data, proxy_data.replace('proxy', ''))
 | |
|         self.assertEqual(base_data, proxy_proxy_data.replace('proxy', ''))
 | |
| 
 | |
| 
 | |
| class SerializersTransactionTestBase(object):
 | |
| 
 | |
|     available_apps = ['serializers']
 | |
| 
 | |
|     @skipUnlessDBFeature('supports_forward_references')
 | |
|     def test_forward_refs(self):
 | |
|         """
 | |
|         Tests that objects ids can be referenced before they are
 | |
|         defined in the serialization data.
 | |
|         """
 | |
|         # The deserialization process needs to run in a transaction in order
 | |
|         # to test forward reference handling.
 | |
|         with transaction.atomic():
 | |
|             objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str)
 | |
|             with connection.constraint_checks_disabled():
 | |
|                 for obj in objs:
 | |
|                     obj.save()
 | |
| 
 | |
|         for model_cls in (Category, Author, Article):
 | |
|             self.assertEqual(model_cls.objects.all().count(), 1)
 | |
|         art_obj = Article.objects.all()[0]
 | |
|         self.assertEqual(art_obj.categories.all().count(), 1)
 | |
|         self.assertEqual(art_obj.author.name, "Agnes")
 | |
| 
 | |
| 
 | |
| def register_tests(test_class, method_name, test_func, exclude=None):
 | |
|     """
 | |
|     Dynamically create serializer tests to ensure that all registered
 | |
|     serializers are automatically tested.
 | |
|     """
 | |
|     formats = [
 | |
|         f for f in serializers.get_serializer_formats()
 | |
|         if (not isinstance(serializers.get_serializer(f), serializers.BadSerializer) and
 | |
|             f != 'geojson' and
 | |
|             (exclude is None or f not in exclude))
 | |
|     ]
 | |
|     for format_ in formats:
 | |
|         setattr(test_class, method_name % format_, curry(test_func, format_))
 |