mirror of
https://github.com/django/django.git
synced 2025-10-31 09:41:08 +00:00
[1.8.x] Moved contrib.messages tests out of contrib.
Backport of b3cd9e0d07 from master
This commit is contained in:
@@ -1,392 +0,0 @@
|
||||
from unittest import skipUnless
|
||||
|
||||
from django import http
|
||||
from django.apps import apps
|
||||
from django.contrib.messages import constants, get_level, set_level, utils
|
||||
from django.contrib.messages.api import MessageFailure
|
||||
from django.contrib.messages.constants import DEFAULT_LEVELS
|
||||
from django.contrib.messages.storage import base, default_storage
|
||||
from django.contrib.messages.storage.base import Message
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import modify_settings, override_settings
|
||||
from django.utils.translation import ugettext_lazy
|
||||
|
||||
|
||||
def skipUnlessAuthIsInstalled(func):
|
||||
return skipUnless(
|
||||
apps.is_installed('django.contrib.auth'),
|
||||
"django.contrib.auth isn't installed")(func)
|
||||
|
||||
|
||||
def add_level_messages(storage):
|
||||
"""
|
||||
Adds 6 messages from different levels (including a custom one) to a storage
|
||||
instance.
|
||||
"""
|
||||
storage.add(constants.INFO, 'A generic info message')
|
||||
storage.add(29, 'Some custom level')
|
||||
storage.add(constants.DEBUG, 'A debugging message', extra_tags='extra-tag')
|
||||
storage.add(constants.WARNING, 'A warning')
|
||||
storage.add(constants.ERROR, 'An error')
|
||||
storage.add(constants.SUCCESS, 'This was a triumph.')
|
||||
|
||||
|
||||
class override_settings_tags(override_settings):
|
||||
def enable(self):
|
||||
super(override_settings_tags, self).enable()
|
||||
# LEVEL_TAGS is a constant defined in the
|
||||
# django.contrib.messages.storage.base module, so after changing
|
||||
# settings.MESSAGE_TAGS, we need to update that constant too.
|
||||
self.old_level_tags = base.LEVEL_TAGS
|
||||
base.LEVEL_TAGS = utils.get_level_tags()
|
||||
|
||||
def disable(self):
|
||||
super(override_settings_tags, self).disable()
|
||||
base.LEVEL_TAGS = self.old_level_tags
|
||||
|
||||
|
||||
class BaseTests(object):
|
||||
storage_class = default_storage
|
||||
levels = {
|
||||
'debug': constants.DEBUG,
|
||||
'info': constants.INFO,
|
||||
'success': constants.SUCCESS,
|
||||
'warning': constants.WARNING,
|
||||
'error': constants.ERROR,
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self.settings_override = override_settings_tags(
|
||||
TEMPLATES=[{
|
||||
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||
'DIRS': [],
|
||||
'APP_DIRS': True,
|
||||
'OPTIONS': {
|
||||
'context_processors': (
|
||||
'django.contrib.auth.context_processors.auth',
|
||||
'django.contrib.messages.context_processors.messages',
|
||||
),
|
||||
},
|
||||
}],
|
||||
ROOT_URLCONF='django.contrib.messages.tests.urls',
|
||||
MESSAGE_TAGS='',
|
||||
MESSAGE_STORAGE='%s.%s' % (self.storage_class.__module__,
|
||||
self.storage_class.__name__),
|
||||
SESSION_SERIALIZER='django.contrib.sessions.serializers.JSONSerializer',
|
||||
)
|
||||
self.settings_override.enable()
|
||||
|
||||
def tearDown(self):
|
||||
self.settings_override.disable()
|
||||
|
||||
def get_request(self):
|
||||
return http.HttpRequest()
|
||||
|
||||
def get_response(self):
|
||||
return http.HttpResponse()
|
||||
|
||||
def get_storage(self, data=None):
|
||||
"""
|
||||
Returns the storage backend, setting its loaded data to the ``data``
|
||||
argument.
|
||||
|
||||
This method avoids the storage ``_get`` method from getting called so
|
||||
that other parts of the storage backend can be tested independent of
|
||||
the message retrieval logic.
|
||||
"""
|
||||
storage = self.storage_class(self.get_request())
|
||||
storage._loaded_data = data or []
|
||||
return storage
|
||||
|
||||
def test_add(self):
|
||||
storage = self.get_storage()
|
||||
self.assertFalse(storage.added_new)
|
||||
storage.add(constants.INFO, 'Test message 1')
|
||||
self.assertTrue(storage.added_new)
|
||||
storage.add(constants.INFO, 'Test message 2', extra_tags='tag')
|
||||
self.assertEqual(len(storage), 2)
|
||||
|
||||
def test_add_lazy_translation(self):
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
|
||||
storage.add(constants.INFO, ugettext_lazy('lazy message'))
|
||||
storage.update(response)
|
||||
|
||||
storing = self.stored_messages_count(storage, response)
|
||||
self.assertEqual(storing, 1)
|
||||
|
||||
def test_no_update(self):
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
storage.update(response)
|
||||
storing = self.stored_messages_count(storage, response)
|
||||
self.assertEqual(storing, 0)
|
||||
|
||||
def test_add_update(self):
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
|
||||
storage.add(constants.INFO, 'Test message 1')
|
||||
storage.add(constants.INFO, 'Test message 1', extra_tags='tag')
|
||||
storage.update(response)
|
||||
|
||||
storing = self.stored_messages_count(storage, response)
|
||||
self.assertEqual(storing, 2)
|
||||
|
||||
def test_existing_add_read_update(self):
|
||||
storage = self.get_existing_storage()
|
||||
response = self.get_response()
|
||||
|
||||
storage.add(constants.INFO, 'Test message 3')
|
||||
list(storage) # Simulates a read
|
||||
storage.update(response)
|
||||
|
||||
storing = self.stored_messages_count(storage, response)
|
||||
self.assertEqual(storing, 0)
|
||||
|
||||
def test_existing_read_add_update(self):
|
||||
storage = self.get_existing_storage()
|
||||
response = self.get_response()
|
||||
|
||||
list(storage) # Simulates a read
|
||||
storage.add(constants.INFO, 'Test message 3')
|
||||
storage.update(response)
|
||||
|
||||
storing = self.stored_messages_count(storage, response)
|
||||
self.assertEqual(storing, 1)
|
||||
|
||||
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
|
||||
def test_full_request_response_cycle(self):
|
||||
"""
|
||||
With the message middleware enabled, tests that messages are properly
|
||||
stored and then retrieved across the full request/redirect/response
|
||||
cycle.
|
||||
"""
|
||||
data = {
|
||||
'messages': ['Test message %d' % x for x in range(5)],
|
||||
}
|
||||
show_url = reverse('show_message')
|
||||
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
||||
add_url = reverse('add_message', args=(level,))
|
||||
response = self.client.post(add_url, data, follow=True)
|
||||
self.assertRedirects(response, show_url)
|
||||
self.assertIn('messages', response.context)
|
||||
messages = [Message(self.levels[level], msg) for msg in data['messages']]
|
||||
self.assertEqual(list(response.context['messages']), messages)
|
||||
for msg in data['messages']:
|
||||
self.assertContains(response, msg)
|
||||
|
||||
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
|
||||
def test_with_template_response(self):
|
||||
data = {
|
||||
'messages': ['Test message %d' % x for x in range(5)],
|
||||
}
|
||||
show_url = reverse('show_template_response')
|
||||
for level in self.levels.keys():
|
||||
add_url = reverse('add_template_response', args=(level,))
|
||||
response = self.client.post(add_url, data, follow=True)
|
||||
self.assertRedirects(response, show_url)
|
||||
self.assertIn('messages', response.context)
|
||||
for msg in data['messages']:
|
||||
self.assertContains(response, msg)
|
||||
|
||||
# there shouldn't be any messages on second GET request
|
||||
response = self.client.get(show_url)
|
||||
for msg in data['messages']:
|
||||
self.assertNotContains(response, msg)
|
||||
|
||||
def test_context_processor_message_levels(self):
|
||||
show_url = reverse('show_template_response')
|
||||
response = self.client.get(show_url)
|
||||
|
||||
self.assertIn('DEFAULT_MESSAGE_LEVELS', response.context)
|
||||
self.assertEqual(response.context['DEFAULT_MESSAGE_LEVELS'], DEFAULT_LEVELS)
|
||||
|
||||
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
|
||||
def test_multiple_posts(self):
|
||||
"""
|
||||
Tests that messages persist properly when multiple POSTs are made
|
||||
before a GET.
|
||||
"""
|
||||
data = {
|
||||
'messages': ['Test message %d' % x for x in range(5)],
|
||||
}
|
||||
show_url = reverse('show_message')
|
||||
messages = []
|
||||
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
||||
messages.extend(Message(self.levels[level], msg) for msg in data['messages'])
|
||||
add_url = reverse('add_message', args=(level,))
|
||||
self.client.post(add_url, data)
|
||||
response = self.client.get(show_url)
|
||||
self.assertIn('messages', response.context)
|
||||
self.assertEqual(list(response.context['messages']), messages)
|
||||
for msg in data['messages']:
|
||||
self.assertContains(response, msg)
|
||||
|
||||
@modify_settings(
|
||||
INSTALLED_APPS={'remove': 'django.contrib.messages'},
|
||||
MIDDLEWARE_CLASSES={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
|
||||
)
|
||||
@override_settings(
|
||||
MESSAGE_LEVEL=constants.DEBUG,
|
||||
TEMPLATES=[{
|
||||
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||
'DIRS': [],
|
||||
'APP_DIRS': True,
|
||||
}],
|
||||
)
|
||||
def test_middleware_disabled(self):
|
||||
"""
|
||||
Tests that, when the middleware is disabled, an exception is raised
|
||||
when one attempts to store a message.
|
||||
"""
|
||||
data = {
|
||||
'messages': ['Test message %d' % x for x in range(5)],
|
||||
}
|
||||
reverse('show_message')
|
||||
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
||||
add_url = reverse('add_message', args=(level,))
|
||||
self.assertRaises(MessageFailure, self.client.post, add_url,
|
||||
data, follow=True)
|
||||
|
||||
@modify_settings(
|
||||
INSTALLED_APPS={'remove': 'django.contrib.messages'},
|
||||
MIDDLEWARE_CLASSES={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
|
||||
)
|
||||
@override_settings(
|
||||
TEMPLATES=[{
|
||||
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||
'DIRS': [],
|
||||
'APP_DIRS': True,
|
||||
}],
|
||||
)
|
||||
def test_middleware_disabled_fail_silently(self):
|
||||
"""
|
||||
Tests that, when the middleware is disabled, an exception is not
|
||||
raised if 'fail_silently' = True
|
||||
"""
|
||||
data = {
|
||||
'messages': ['Test message %d' % x for x in range(5)],
|
||||
'fail_silently': True,
|
||||
}
|
||||
show_url = reverse('show_message')
|
||||
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
||||
add_url = reverse('add_message', args=(level,))
|
||||
response = self.client.post(add_url, data, follow=True)
|
||||
self.assertRedirects(response, show_url)
|
||||
self.assertNotIn('messages', response.context)
|
||||
|
||||
def stored_messages_count(self, storage, response):
|
||||
"""
|
||||
Returns the number of messages being stored after a
|
||||
``storage.update()`` call.
|
||||
"""
|
||||
raise NotImplementedError('This method must be set by a subclass.')
|
||||
|
||||
def test_get(self):
|
||||
raise NotImplementedError('This method must be set by a subclass.')
|
||||
|
||||
def get_existing_storage(self):
|
||||
return self.get_storage([Message(constants.INFO, 'Test message 1'),
|
||||
Message(constants.INFO, 'Test message 2',
|
||||
extra_tags='tag')])
|
||||
|
||||
def test_existing_read(self):
|
||||
"""
|
||||
Tests that reading the existing storage doesn't cause the data to be
|
||||
lost.
|
||||
"""
|
||||
storage = self.get_existing_storage()
|
||||
self.assertFalse(storage.used)
|
||||
# After iterating the storage engine directly, the used flag is set.
|
||||
data = list(storage)
|
||||
self.assertTrue(storage.used)
|
||||
# The data does not disappear because it has been iterated.
|
||||
self.assertEqual(data, list(storage))
|
||||
|
||||
def test_existing_add(self):
|
||||
storage = self.get_existing_storage()
|
||||
self.assertFalse(storage.added_new)
|
||||
storage.add(constants.INFO, 'Test message 3')
|
||||
self.assertTrue(storage.added_new)
|
||||
|
||||
def test_default_level(self):
|
||||
# get_level works even with no storage on the request.
|
||||
request = self.get_request()
|
||||
self.assertEqual(get_level(request), constants.INFO)
|
||||
|
||||
# get_level returns the default level if it hasn't been set.
|
||||
storage = self.get_storage()
|
||||
request._messages = storage
|
||||
self.assertEqual(get_level(request), constants.INFO)
|
||||
|
||||
# Only messages of sufficient level get recorded.
|
||||
add_level_messages(storage)
|
||||
self.assertEqual(len(storage), 5)
|
||||
|
||||
def test_low_level(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
request._messages = storage
|
||||
|
||||
self.assertTrue(set_level(request, 5))
|
||||
self.assertEqual(get_level(request), 5)
|
||||
|
||||
add_level_messages(storage)
|
||||
self.assertEqual(len(storage), 6)
|
||||
|
||||
def test_high_level(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
request._messages = storage
|
||||
|
||||
self.assertTrue(set_level(request, 30))
|
||||
self.assertEqual(get_level(request), 30)
|
||||
|
||||
add_level_messages(storage)
|
||||
self.assertEqual(len(storage), 2)
|
||||
|
||||
@override_settings(MESSAGE_LEVEL=29)
|
||||
def test_settings_level(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
|
||||
self.assertEqual(get_level(request), 29)
|
||||
|
||||
add_level_messages(storage)
|
||||
self.assertEqual(len(storage), 3)
|
||||
|
||||
def test_tags(self):
|
||||
storage = self.get_storage()
|
||||
storage.level = 0
|
||||
add_level_messages(storage)
|
||||
tags = [msg.tags for msg in storage]
|
||||
self.assertEqual(tags,
|
||||
['info', '', 'extra-tag debug', 'warning', 'error',
|
||||
'success'])
|
||||
|
||||
def test_level_tag(self):
|
||||
storage = self.get_storage()
|
||||
storage.level = 0
|
||||
add_level_messages(storage)
|
||||
tags = [msg.level_tag for msg in storage]
|
||||
self.assertEqual(tags,
|
||||
['info', '', 'debug', 'warning', 'error',
|
||||
'success'])
|
||||
|
||||
@override_settings_tags(MESSAGE_TAGS={
|
||||
constants.INFO: 'info',
|
||||
constants.DEBUG: '',
|
||||
constants.WARNING: '',
|
||||
constants.ERROR: 'bad',
|
||||
29: 'custom',
|
||||
}
|
||||
)
|
||||
def test_custom_tags(self):
|
||||
storage = self.get_storage()
|
||||
storage.level = 0
|
||||
add_level_messages(storage)
|
||||
tags = [msg.tags for msg in storage]
|
||||
self.assertEqual(tags,
|
||||
['info', 'custom', 'extra-tag', '', 'bad', 'success'])
|
||||
@@ -1,54 +0,0 @@
|
||||
from django.contrib import messages
|
||||
from django.test import RequestFactory, TestCase
|
||||
|
||||
|
||||
class DummyStorage(object):
|
||||
"""
|
||||
dummy message-store to test the api methods
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.store = []
|
||||
|
||||
def add(self, level, message, extra_tags=''):
|
||||
self.store.append(message)
|
||||
|
||||
|
||||
class ApiTest(TestCase):
|
||||
def setUp(self):
|
||||
self.rf = RequestFactory()
|
||||
self.request = self.rf.request()
|
||||
self.storage = DummyStorage()
|
||||
|
||||
def test_ok(self):
|
||||
msg = 'some message'
|
||||
|
||||
self.request._messages = self.storage
|
||||
messages.add_message(self.request, messages.DEBUG, msg)
|
||||
self.assertIn(msg, self.storage.store)
|
||||
|
||||
def test_request_is_none(self):
|
||||
msg = 'some message'
|
||||
|
||||
self.request._messages = self.storage
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
messages.add_message(None, messages.DEBUG, msg)
|
||||
|
||||
self.assertEqual([], self.storage.store)
|
||||
|
||||
def test_middleware_missing(self):
|
||||
msg = 'some message'
|
||||
|
||||
with self.assertRaises(messages.MessageFailure):
|
||||
messages.add_message(self.request, messages.DEBUG, msg)
|
||||
|
||||
self.assertEqual([], self.storage.store)
|
||||
|
||||
def test_middleware_missing_silently(self):
|
||||
msg = 'some message'
|
||||
|
||||
messages.add_message(self.request, messages.DEBUG, msg,
|
||||
fail_silently=True)
|
||||
|
||||
self.assertEqual([], self.storage.store)
|
||||
@@ -1,178 +0,0 @@
|
||||
import json
|
||||
|
||||
from django.contrib.messages import constants
|
||||
from django.contrib.messages.storage.base import Message
|
||||
from django.contrib.messages.storage.cookie import (
|
||||
CookieStorage, MessageDecoder, MessageEncoder,
|
||||
)
|
||||
from django.contrib.messages.tests.base import BaseTests
|
||||
from django.test import TestCase, override_settings
|
||||
from django.utils.safestring import SafeData, mark_safe
|
||||
|
||||
|
||||
def set_cookie_data(storage, messages, invalid=False, encode_empty=False):
|
||||
"""
|
||||
Sets ``request.COOKIES`` with the encoded data and removes the storage
|
||||
backend's loaded data cache.
|
||||
"""
|
||||
encoded_data = storage._encode(messages, encode_empty=encode_empty)
|
||||
if invalid:
|
||||
# Truncate the first character so that the hash is invalid.
|
||||
encoded_data = encoded_data[1:]
|
||||
storage.request.COOKIES = {CookieStorage.cookie_name: encoded_data}
|
||||
if hasattr(storage, '_loaded_data'):
|
||||
del storage._loaded_data
|
||||
|
||||
|
||||
def stored_cookie_messages_count(storage, response):
|
||||
"""
|
||||
Returns an integer containing the number of messages stored.
|
||||
"""
|
||||
# Get a list of cookies, excluding ones with a max-age of 0 (because
|
||||
# they have been marked for deletion).
|
||||
cookie = response.cookies.get(storage.cookie_name)
|
||||
if not cookie or cookie['max-age'] == 0:
|
||||
return 0
|
||||
data = storage._decode(cookie.value)
|
||||
if not data:
|
||||
return 0
|
||||
if data[-1] == CookieStorage.not_finished:
|
||||
data.pop()
|
||||
return len(data)
|
||||
|
||||
|
||||
@override_settings(SESSION_COOKIE_DOMAIN='.example.com', SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True)
|
||||
class CookieTest(BaseTests, TestCase):
|
||||
storage_class = CookieStorage
|
||||
|
||||
def stored_messages_count(self, storage, response):
|
||||
return stored_cookie_messages_count(storage, response)
|
||||
|
||||
def test_get(self):
|
||||
storage = self.storage_class(self.get_request())
|
||||
# Set initial data.
|
||||
example_messages = ['test', 'me']
|
||||
set_cookie_data(storage, example_messages)
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), example_messages)
|
||||
|
||||
def test_cookie_setings(self):
|
||||
"""
|
||||
Ensure that CookieStorage honors SESSION_COOKIE_DOMAIN, SESSION_COOKIE_SECURE and SESSION_COOKIE_HTTPONLY
|
||||
Refs #15618 and #20972.
|
||||
"""
|
||||
# Test before the messages have been consumed
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
storage.add(constants.INFO, 'test')
|
||||
storage.update(response)
|
||||
self.assertIn('test', response.cookies['messages'].value)
|
||||
self.assertEqual(response.cookies['messages']['domain'], '.example.com')
|
||||
self.assertEqual(response.cookies['messages']['expires'], '')
|
||||
self.assertEqual(response.cookies['messages']['secure'], True)
|
||||
self.assertEqual(response.cookies['messages']['httponly'], True)
|
||||
|
||||
# Test deletion of the cookie (storing with an empty value) after the messages have been consumed
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
storage.add(constants.INFO, 'test')
|
||||
for m in storage:
|
||||
pass # Iterate through the storage to simulate consumption of messages.
|
||||
storage.update(response)
|
||||
self.assertEqual(response.cookies['messages'].value, '')
|
||||
self.assertEqual(response.cookies['messages']['domain'], '.example.com')
|
||||
self.assertEqual(response.cookies['messages']['expires'], 'Thu, 01-Jan-1970 00:00:00 GMT')
|
||||
|
||||
def test_get_bad_cookie(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
# Set initial (invalid) data.
|
||||
example_messages = ['test', 'me']
|
||||
set_cookie_data(storage, example_messages, invalid=True)
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), [])
|
||||
|
||||
def test_max_cookie_length(self):
|
||||
"""
|
||||
Tests that, if the data exceeds what is allowed in a cookie, older
|
||||
messages are removed before saving (and returned by the ``update``
|
||||
method).
|
||||
"""
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
|
||||
# When storing as a cookie, the cookie has constant overhead of approx
|
||||
# 54 chars, and each message has a constant overhead of about 37 chars
|
||||
# and a variable overhead of zero in the best case. We aim for a message
|
||||
# size which will fit 4 messages into the cookie, but not 5.
|
||||
# See also FallbackTest.test_session_fallback
|
||||
msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
|
||||
for i in range(5):
|
||||
storage.add(constants.INFO, str(i) * msg_size)
|
||||
unstored_messages = storage.update(response)
|
||||
|
||||
cookie_storing = self.stored_messages_count(storage, response)
|
||||
self.assertEqual(cookie_storing, 4)
|
||||
|
||||
self.assertEqual(len(unstored_messages), 1)
|
||||
self.assertEqual(unstored_messages[0].message, '0' * msg_size)
|
||||
|
||||
def test_json_encoder_decoder(self):
|
||||
"""
|
||||
Tests that a complex nested data structure containing Message
|
||||
instances is properly encoded/decoded by the custom JSON
|
||||
encoder/decoder classes.
|
||||
"""
|
||||
messages = [
|
||||
{
|
||||
'message': Message(constants.INFO, 'Test message'),
|
||||
'message_list': [Message(constants.INFO, 'message %s')
|
||||
for x in range(5)] + [{'another-message':
|
||||
Message(constants.ERROR, 'error')}],
|
||||
},
|
||||
Message(constants.INFO, 'message %s'),
|
||||
]
|
||||
encoder = MessageEncoder(separators=(',', ':'))
|
||||
value = encoder.encode(messages)
|
||||
decoded_messages = json.loads(value, cls=MessageDecoder)
|
||||
self.assertEqual(messages, decoded_messages)
|
||||
|
||||
def test_safedata(self):
|
||||
"""
|
||||
Tests that a message containing SafeData is keeping its safe status when
|
||||
retrieved from the message storage.
|
||||
"""
|
||||
def encode_decode(data):
|
||||
message = Message(constants.DEBUG, data)
|
||||
encoded = storage._encode(message)
|
||||
decoded = storage._decode(encoded)
|
||||
return decoded.message
|
||||
|
||||
storage = self.get_storage()
|
||||
|
||||
self.assertIsInstance(
|
||||
encode_decode(mark_safe("<b>Hello Django!</b>")), SafeData)
|
||||
self.assertNotIsInstance(
|
||||
encode_decode("<b>Hello Django!</b>"), SafeData)
|
||||
|
||||
def test_pre_1_5_message_format(self):
|
||||
"""
|
||||
For ticket #22426. Tests whether messages that were set in the cookie
|
||||
before the addition of is_safedata are decoded correctly.
|
||||
"""
|
||||
|
||||
# Encode the messages using the current encoder.
|
||||
messages = [Message(constants.INFO, 'message %s') for x in range(5)]
|
||||
encoder = MessageEncoder(separators=(',', ':'))
|
||||
encoded_messages = encoder.encode(messages)
|
||||
|
||||
# Remove the is_safedata flag from the messages in order to imitate
|
||||
# the behavior of before 1.5 (monkey patching).
|
||||
encoded_messages = json.loads(encoded_messages)
|
||||
for obj in encoded_messages:
|
||||
obj.pop(1)
|
||||
encoded_messages = json.dumps(encoded_messages, separators=(',', ':'))
|
||||
|
||||
# Decode the messages in the old format (without is_safedata)
|
||||
decoded_messages = json.loads(encoded_messages, cls=MessageDecoder)
|
||||
self.assertEqual(messages, decoded_messages)
|
||||
@@ -1,179 +0,0 @@
|
||||
from django.contrib.messages import constants
|
||||
from django.contrib.messages.storage.fallback import (
|
||||
CookieStorage, FallbackStorage,
|
||||
)
|
||||
from django.contrib.messages.tests.base import BaseTests
|
||||
from django.contrib.messages.tests.test_cookie import (
|
||||
set_cookie_data, stored_cookie_messages_count,
|
||||
)
|
||||
from django.contrib.messages.tests.test_session import (
|
||||
set_session_data, stored_session_messages_count,
|
||||
)
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
class FallbackTest(BaseTests, TestCase):
|
||||
storage_class = FallbackStorage
|
||||
|
||||
def get_request(self):
|
||||
self.session = {}
|
||||
request = super(FallbackTest, self).get_request()
|
||||
request.session = self.session
|
||||
return request
|
||||
|
||||
def get_cookie_storage(self, storage):
|
||||
return storage.storages[-2]
|
||||
|
||||
def get_session_storage(self, storage):
|
||||
return storage.storages[-1]
|
||||
|
||||
def stored_cookie_messages_count(self, storage, response):
|
||||
return stored_cookie_messages_count(self.get_cookie_storage(storage),
|
||||
response)
|
||||
|
||||
def stored_session_messages_count(self, storage, response):
|
||||
return stored_session_messages_count(self.get_session_storage(storage))
|
||||
|
||||
def stored_messages_count(self, storage, response):
|
||||
"""
|
||||
Return the storage totals from both cookie and session backends.
|
||||
"""
|
||||
total = (self.stored_cookie_messages_count(storage, response) +
|
||||
self.stored_session_messages_count(storage, response))
|
||||
return total
|
||||
|
||||
def test_get(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
cookie_storage = self.get_cookie_storage(storage)
|
||||
|
||||
# Set initial cookie data.
|
||||
example_messages = [str(i) for i in range(5)]
|
||||
set_cookie_data(cookie_storage, example_messages)
|
||||
|
||||
# Overwrite the _get method of the fallback storage to prove it is not
|
||||
# used (it would cause a TypeError: 'NoneType' object is not callable).
|
||||
self.get_session_storage(storage)._get = None
|
||||
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), example_messages)
|
||||
|
||||
def test_get_empty(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
|
||||
# Overwrite the _get method of the fallback storage to prove it is not
|
||||
# used (it would cause a TypeError: 'NoneType' object is not callable).
|
||||
self.get_session_storage(storage)._get = None
|
||||
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), [])
|
||||
|
||||
def test_get_fallback(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
cookie_storage = self.get_cookie_storage(storage)
|
||||
session_storage = self.get_session_storage(storage)
|
||||
|
||||
# Set initial cookie and session data.
|
||||
example_messages = [str(i) for i in range(5)]
|
||||
set_cookie_data(cookie_storage, example_messages[:4] +
|
||||
[CookieStorage.not_finished])
|
||||
set_session_data(session_storage, example_messages[4:])
|
||||
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), example_messages)
|
||||
|
||||
def test_get_fallback_only(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
cookie_storage = self.get_cookie_storage(storage)
|
||||
session_storage = self.get_session_storage(storage)
|
||||
|
||||
# Set initial cookie and session data.
|
||||
example_messages = [str(i) for i in range(5)]
|
||||
set_cookie_data(cookie_storage, [CookieStorage.not_finished],
|
||||
encode_empty=True)
|
||||
set_session_data(session_storage, example_messages)
|
||||
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), example_messages)
|
||||
|
||||
def test_flush_used_backends(self):
|
||||
request = self.get_request()
|
||||
storage = self.storage_class(request)
|
||||
cookie_storage = self.get_cookie_storage(storage)
|
||||
session_storage = self.get_session_storage(storage)
|
||||
|
||||
# Set initial cookie and session data.
|
||||
set_cookie_data(cookie_storage, ['cookie', CookieStorage.not_finished])
|
||||
set_session_data(session_storage, ['session'])
|
||||
|
||||
# When updating, previously used but no longer needed backends are
|
||||
# flushed.
|
||||
response = self.get_response()
|
||||
list(storage)
|
||||
storage.update(response)
|
||||
session_storing = self.stored_session_messages_count(storage, response)
|
||||
self.assertEqual(session_storing, 0)
|
||||
|
||||
def test_no_fallback(self):
|
||||
"""
|
||||
Confirms that:
|
||||
|
||||
(1) A short number of messages whose data size doesn't exceed what is
|
||||
allowed in a cookie will all be stored in the CookieBackend.
|
||||
|
||||
(2) If the CookieBackend can store all messages, the SessionBackend
|
||||
won't be written to at all.
|
||||
"""
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
|
||||
# Overwrite the _store method of the fallback storage to prove it isn't
|
||||
# used (it would cause a TypeError: 'NoneType' object is not callable).
|
||||
self.get_session_storage(storage)._store = None
|
||||
|
||||
for i in range(5):
|
||||
storage.add(constants.INFO, str(i) * 100)
|
||||
storage.update(response)
|
||||
|
||||
cookie_storing = self.stored_cookie_messages_count(storage, response)
|
||||
self.assertEqual(cookie_storing, 5)
|
||||
session_storing = self.stored_session_messages_count(storage, response)
|
||||
self.assertEqual(session_storing, 0)
|
||||
|
||||
def test_session_fallback(self):
|
||||
"""
|
||||
Confirms that, if the data exceeds what is allowed in a cookie,
|
||||
messages which did not fit are stored in the SessionBackend.
|
||||
"""
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
|
||||
# see comment in CookieText.test_cookie_max_length
|
||||
msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
|
||||
for i in range(5):
|
||||
storage.add(constants.INFO, str(i) * msg_size)
|
||||
storage.update(response)
|
||||
|
||||
cookie_storing = self.stored_cookie_messages_count(storage, response)
|
||||
self.assertEqual(cookie_storing, 4)
|
||||
session_storing = self.stored_session_messages_count(storage, response)
|
||||
self.assertEqual(session_storing, 1)
|
||||
|
||||
def test_session_fallback_only(self):
|
||||
"""
|
||||
Confirms that large messages, none of which fit in a cookie, are stored
|
||||
in the SessionBackend (and nothing is stored in the CookieBackend).
|
||||
"""
|
||||
storage = self.get_storage()
|
||||
response = self.get_response()
|
||||
|
||||
storage.add(constants.INFO, 'x' * 5000)
|
||||
storage.update(response)
|
||||
|
||||
cookie_storing = self.stored_cookie_messages_count(storage, response)
|
||||
self.assertEqual(cookie_storing, 0)
|
||||
session_storing = self.stored_session_messages_count(storage, response)
|
||||
self.assertEqual(session_storing, 1)
|
||||
@@ -1,19 +0,0 @@
|
||||
import unittest
|
||||
|
||||
from django import http
|
||||
from django.contrib.messages.middleware import MessageMiddleware
|
||||
|
||||
|
||||
class MiddlewareTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.middleware = MessageMiddleware()
|
||||
|
||||
def test_response_without_messages(self):
|
||||
"""
|
||||
Makes sure that the response middleware is tolerant of messages not
|
||||
existing on request.
|
||||
"""
|
||||
request = http.HttpRequest()
|
||||
response = http.HttpResponse()
|
||||
self.middleware.process_response(request, response)
|
||||
@@ -1,15 +0,0 @@
|
||||
from django.contrib.messages.tests.urls import ContactFormViewWithMsg
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
|
||||
@override_settings(ROOT_URLCONF='django.contrib.messages.tests.urls')
|
||||
class SuccessMessageMixinTests(TestCase):
|
||||
|
||||
def test_set_messages_success(self):
|
||||
author = {'name': 'John Doe',
|
||||
'slug': 'success-msg'}
|
||||
add_url = reverse('add_success_msg')
|
||||
req = self.client.post(add_url, author)
|
||||
self.assertIn(ContactFormViewWithMsg.success_message % author,
|
||||
req.cookies['messages'].value)
|
||||
@@ -1,53 +0,0 @@
|
||||
from django.contrib.messages import constants
|
||||
from django.contrib.messages.storage.base import Message
|
||||
from django.contrib.messages.storage.session import SessionStorage
|
||||
from django.contrib.messages.tests.base import BaseTests
|
||||
from django.test import TestCase
|
||||
from django.utils.safestring import SafeData, mark_safe
|
||||
|
||||
|
||||
def set_session_data(storage, messages):
|
||||
"""
|
||||
Sets the messages into the backend request's session and remove the
|
||||
backend's loaded data cache.
|
||||
"""
|
||||
storage.request.session[storage.session_key] = storage.serialize_messages(messages)
|
||||
if hasattr(storage, '_loaded_data'):
|
||||
del storage._loaded_data
|
||||
|
||||
|
||||
def stored_session_messages_count(storage):
|
||||
data = storage.deserialize_messages(storage.request.session.get(storage.session_key, []))
|
||||
return len(data)
|
||||
|
||||
|
||||
class SessionTest(BaseTests, TestCase):
|
||||
storage_class = SessionStorage
|
||||
|
||||
def get_request(self):
|
||||
self.session = {}
|
||||
request = super(SessionTest, self).get_request()
|
||||
request.session = self.session
|
||||
return request
|
||||
|
||||
def stored_messages_count(self, storage, response):
|
||||
return stored_session_messages_count(storage)
|
||||
|
||||
def test_get(self):
|
||||
storage = self.storage_class(self.get_request())
|
||||
# Set initial data.
|
||||
example_messages = ['test', 'me']
|
||||
set_session_data(storage, example_messages)
|
||||
# Test that the message actually contains what we expect.
|
||||
self.assertEqual(list(storage), example_messages)
|
||||
|
||||
def test_safedata(self):
|
||||
"""
|
||||
Tests that a message containing SafeData is keeping its safe status when
|
||||
retrieved from the message storage.
|
||||
"""
|
||||
storage = self.get_storage()
|
||||
|
||||
message = Message(constants.DEBUG, mark_safe("<b>Hello Django!</b>"))
|
||||
set_session_data(storage, [message])
|
||||
self.assertIsInstance(list(storage)[0].message, SafeData)
|
||||
@@ -1,80 +0,0 @@
|
||||
from django import forms
|
||||
from django.conf.urls import url
|
||||
from django.contrib import messages
|
||||
from django.contrib.messages.views import SuccessMessageMixin
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import HttpResponse, HttpResponseRedirect
|
||||
from django.template import engines
|
||||
from django.template.response import TemplateResponse
|
||||
from django.views.decorators.cache import never_cache
|
||||
from django.views.generic.edit import FormView
|
||||
|
||||
|
||||
TEMPLATE = """{% if messages %}
|
||||
<ul class="messages">
|
||||
{% for message in messages %}
|
||||
<li{% if message.tags %} class="{{ message.tags }}"{% endif %}>
|
||||
{{ message }}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
{% endif %}
|
||||
"""
|
||||
|
||||
|
||||
@never_cache
|
||||
def add(request, message_type):
|
||||
# don't default to False here, because we want to test that it defaults
|
||||
# to False if unspecified
|
||||
fail_silently = request.POST.get('fail_silently', None)
|
||||
for msg in request.POST.getlist('messages'):
|
||||
if fail_silently is not None:
|
||||
getattr(messages, message_type)(request, msg,
|
||||
fail_silently=fail_silently)
|
||||
else:
|
||||
getattr(messages, message_type)(request, msg)
|
||||
|
||||
show_url = reverse('show_message')
|
||||
return HttpResponseRedirect(show_url)
|
||||
|
||||
|
||||
@never_cache
|
||||
def add_template_response(request, message_type):
|
||||
for msg in request.POST.getlist('messages'):
|
||||
getattr(messages, message_type)(request, msg)
|
||||
|
||||
show_url = reverse('show_template_response')
|
||||
return HttpResponseRedirect(show_url)
|
||||
|
||||
|
||||
@never_cache
|
||||
def show(request):
|
||||
template = engines['django'].from_string(TEMPLATE)
|
||||
return HttpResponse(template.render(request=request))
|
||||
|
||||
|
||||
@never_cache
|
||||
def show_template_response(request):
|
||||
template = engines['django'].from_string(TEMPLATE)
|
||||
return TemplateResponse(request, template)
|
||||
|
||||
|
||||
class ContactForm(forms.Form):
|
||||
name = forms.CharField(required=True)
|
||||
slug = forms.SlugField(required=True)
|
||||
|
||||
|
||||
class ContactFormViewWithMsg(SuccessMessageMixin, FormView):
|
||||
form_class = ContactForm
|
||||
success_url = show
|
||||
success_message = "%(name)s was created successfully"
|
||||
|
||||
|
||||
urlpatterns = [
|
||||
url('^add/(debug|info|success|warning|error)/$', add, name='add_message'),
|
||||
url('^add/msg/$', ContactFormViewWithMsg.as_view(), name='add_success_msg'),
|
||||
url('^show/$', show, name='show_message'),
|
||||
url('^template_response/add/(debug|info|success|warning|error)/$',
|
||||
add_template_response, name='add_template_response'),
|
||||
url('^template_response/show/$', show_template_response, name='show_template_response'),
|
||||
]
|
||||
Reference in New Issue
Block a user