mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	Also removed try/except/fail antipattern that hides exceptions.
Backport of c9ae09addf from master
		
	
		
			
				
	
	
		
			872 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			872 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import base64
 | |
| import os
 | |
| import shutil
 | |
| import string
 | |
| import sys
 | |
| import tempfile
 | |
| import unittest
 | |
| from datetime import timedelta
 | |
| 
 | |
| from django.conf import settings
 | |
| from django.contrib.sessions.backends.base import UpdateError
 | |
| from django.contrib.sessions.backends.cache import SessionStore as CacheSession
 | |
| from django.contrib.sessions.backends.cached_db import \
 | |
|     SessionStore as CacheDBSession
 | |
| from django.contrib.sessions.backends.db import SessionStore as DatabaseSession
 | |
| from django.contrib.sessions.backends.file import SessionStore as FileSession
 | |
| from django.contrib.sessions.backends.signed_cookies import \
 | |
|     SessionStore as CookieSession
 | |
| from django.contrib.sessions.exceptions import InvalidSessionKey
 | |
| from django.contrib.sessions.middleware import SessionMiddleware
 | |
| from django.contrib.sessions.models import Session
 | |
| from django.contrib.sessions.serializers import (
 | |
|     JSONSerializer, PickleSerializer,
 | |
| )
 | |
| from django.core import management
 | |
| from django.core.cache import caches
 | |
| from django.core.cache.backends.base import InvalidCacheBackendError
 | |
| from django.core.exceptions import ImproperlyConfigured
 | |
| from django.http import HttpResponse
 | |
| from django.test import (
 | |
|     RequestFactory, TestCase, ignore_warnings, override_settings,
 | |
| )
 | |
| from django.test.utils import patch_logger
 | |
| from django.utils import six, timezone
 | |
| from django.utils.encoding import force_text
 | |
| from django.utils.six.moves import http_cookies
 | |
| 
 | |
| from .models import SessionStore as CustomDatabaseSession
 | |
| 
 | |
| 
 | |
| class SessionTestsMixin(object):
 | |
|     # This does not inherit from TestCase to avoid any tests being run with this
 | |
|     # class, which wouldn't work, and to allow different TestCase subclasses to
 | |
|     # be used.
 | |
| 
 | |
|     backend = None  # subclasses must specify
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.session = self.backend()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         # NB: be careful to delete any sessions created; stale sessions fill up
 | |
|         # the /tmp (with some backends) and eventually overwhelm it after lots
 | |
|         # of runs (think buildbots)
 | |
|         self.session.delete()
 | |
| 
 | |
|     def test_new_session(self):
 | |
|         self.assertFalse(self.session.modified)
 | |
|         self.assertFalse(self.session.accessed)
 | |
| 
 | |
|     def test_get_empty(self):
 | |
|         self.assertIsNone(self.session.get('cat'))
 | |
| 
 | |
|     def test_store(self):
 | |
|         self.session['cat'] = "dog"
 | |
|         self.assertTrue(self.session.modified)
 | |
|         self.assertEqual(self.session.pop('cat'), 'dog')
 | |
| 
 | |
|     def test_pop(self):
 | |
|         self.session['some key'] = 'exists'
 | |
|         # Need to reset these to pretend we haven't accessed it:
 | |
|         self.accessed = False
 | |
|         self.modified = False
 | |
| 
 | |
|         self.assertEqual(self.session.pop('some key'), 'exists')
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertTrue(self.session.modified)
 | |
|         self.assertIsNone(self.session.get('some key'))
 | |
| 
 | |
|     def test_pop_default(self):
 | |
|         self.assertEqual(self.session.pop('some key', 'does not exist'),
 | |
|                          'does not exist')
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertFalse(self.session.modified)
 | |
| 
 | |
|     def test_pop_default_named_argument(self):
 | |
|         self.assertEqual(self.session.pop('some key', default='does not exist'), 'does not exist')
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertFalse(self.session.modified)
 | |
| 
 | |
|     def test_pop_no_default_keyerror_raised(self):
 | |
|         with self.assertRaises(KeyError):
 | |
|             self.session.pop('some key')
 | |
| 
 | |
|     def test_setdefault(self):
 | |
|         self.assertEqual(self.session.setdefault('foo', 'bar'), 'bar')
 | |
|         self.assertEqual(self.session.setdefault('foo', 'baz'), 'bar')
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertTrue(self.session.modified)
 | |
| 
 | |
|     def test_update(self):
 | |
|         self.session.update({'update key': 1})
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertTrue(self.session.modified)
 | |
|         self.assertEqual(self.session.get('update key', None), 1)
 | |
| 
 | |
|     def test_has_key(self):
 | |
|         self.session['some key'] = 1
 | |
|         self.session.modified = False
 | |
|         self.session.accessed = False
 | |
|         self.assertIn('some key', self.session)
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertFalse(self.session.modified)
 | |
| 
 | |
|     def test_values(self):
 | |
|         self.assertEqual(list(self.session.values()), [])
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.session['some key'] = 1
 | |
|         self.assertEqual(list(self.session.values()), [1])
 | |
| 
 | |
|     def test_iterkeys(self):
 | |
|         self.session['x'] = 1
 | |
|         self.session.modified = False
 | |
|         self.session.accessed = False
 | |
|         i = six.iterkeys(self.session)
 | |
|         self.assertTrue(hasattr(i, '__iter__'))
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertFalse(self.session.modified)
 | |
|         self.assertEqual(list(i), ['x'])
 | |
| 
 | |
|     def test_itervalues(self):
 | |
|         self.session['x'] = 1
 | |
|         self.session.modified = False
 | |
|         self.session.accessed = False
 | |
|         i = six.itervalues(self.session)
 | |
|         self.assertTrue(hasattr(i, '__iter__'))
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertFalse(self.session.modified)
 | |
|         self.assertEqual(list(i), [1])
 | |
| 
 | |
|     def test_iteritems(self):
 | |
|         self.session['x'] = 1
 | |
|         self.session.modified = False
 | |
|         self.session.accessed = False
 | |
|         i = six.iteritems(self.session)
 | |
|         self.assertTrue(hasattr(i, '__iter__'))
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertFalse(self.session.modified)
 | |
|         self.assertEqual(list(i), [('x', 1)])
 | |
| 
 | |
|     def test_clear(self):
 | |
|         self.session['x'] = 1
 | |
|         self.session.modified = False
 | |
|         self.session.accessed = False
 | |
|         self.assertEqual(list(self.session.items()), [('x', 1)])
 | |
|         self.session.clear()
 | |
|         self.assertEqual(list(self.session.items()), [])
 | |
|         self.assertTrue(self.session.accessed)
 | |
|         self.assertTrue(self.session.modified)
 | |
| 
 | |
|     def test_save(self):
 | |
|         self.session.save()
 | |
|         self.assertTrue(self.session.exists(self.session.session_key))
 | |
| 
 | |
|     def test_delete(self):
 | |
|         self.session.save()
 | |
|         self.session.delete(self.session.session_key)
 | |
|         self.assertFalse(self.session.exists(self.session.session_key))
 | |
| 
 | |
|     def test_flush(self):
 | |
|         self.session['foo'] = 'bar'
 | |
|         self.session.save()
 | |
|         prev_key = self.session.session_key
 | |
|         self.session.flush()
 | |
|         self.assertFalse(self.session.exists(prev_key))
 | |
|         self.assertNotEqual(self.session.session_key, prev_key)
 | |
|         self.assertIsNone(self.session.session_key)
 | |
|         self.assertTrue(self.session.modified)
 | |
|         self.assertTrue(self.session.accessed)
 | |
| 
 | |
|     def test_cycle(self):
 | |
|         self.session['a'], self.session['b'] = 'c', 'd'
 | |
|         self.session.save()
 | |
|         prev_key = self.session.session_key
 | |
|         prev_data = list(self.session.items())
 | |
|         self.session.cycle_key()
 | |
|         self.assertFalse(self.session.exists(prev_key))
 | |
|         self.assertNotEqual(self.session.session_key, prev_key)
 | |
|         self.assertEqual(list(self.session.items()), prev_data)
 | |
| 
 | |
|     def test_save_doesnt_clear_data(self):
 | |
|         self.session['a'] = 'b'
 | |
|         self.session.save()
 | |
|         self.assertEqual(self.session['a'], 'b')
 | |
| 
 | |
|     def test_invalid_key(self):
 | |
|         # Submitting an invalid session key (either by guessing, or if the db has
 | |
|         # removed the key) results in a new key being generated.
 | |
|         try:
 | |
|             session = self.backend('1')
 | |
|             session.save()
 | |
|             self.assertNotEqual(session.session_key, '1')
 | |
|             self.assertIsNone(session.get('cat'))
 | |
|             session.delete()
 | |
|         finally:
 | |
|             # Some backends leave a stale cache entry for the invalid
 | |
|             # session key; make sure that entry is manually deleted
 | |
|             session.delete('1')
 | |
| 
 | |
|     def test_session_key_empty_string_invalid(self):
 | |
|         """Falsey values (Such as an empty string) are rejected."""
 | |
|         self.session._session_key = ''
 | |
|         self.assertIsNone(self.session.session_key)
 | |
| 
 | |
|     def test_session_key_too_short_invalid(self):
 | |
|         """Strings shorter than 8 characters are rejected."""
 | |
|         self.session._session_key = '1234567'
 | |
|         self.assertIsNone(self.session.session_key)
 | |
| 
 | |
|     def test_session_key_valid_string_saved(self):
 | |
|         """Strings of length 8 and up are accepted and stored."""
 | |
|         self.session._session_key = '12345678'
 | |
|         self.assertEqual(self.session.session_key, '12345678')
 | |
| 
 | |
|     def test_session_key_is_read_only(self):
 | |
|         def set_session_key(session):
 | |
|             session.session_key = session._get_new_session_key()
 | |
|         with self.assertRaises(AttributeError):
 | |
|             set_session_key(self.session)
 | |
| 
 | |
|     # Custom session expiry
 | |
|     def test_default_expiry(self):
 | |
|         # A normal session has a max age equal to settings
 | |
|         self.assertEqual(self.session.get_expiry_age(), settings.SESSION_COOKIE_AGE)
 | |
| 
 | |
|         # So does a custom session with an idle expiration time of 0 (but it'll
 | |
|         # expire at browser close)
 | |
|         self.session.set_expiry(0)
 | |
|         self.assertEqual(self.session.get_expiry_age(), settings.SESSION_COOKIE_AGE)
 | |
| 
 | |
|     def test_custom_expiry_seconds(self):
 | |
|         modification = timezone.now()
 | |
| 
 | |
|         self.session.set_expiry(10)
 | |
| 
 | |
|         date = self.session.get_expiry_date(modification=modification)
 | |
|         self.assertEqual(date, modification + timedelta(seconds=10))
 | |
| 
 | |
|         age = self.session.get_expiry_age(modification=modification)
 | |
|         self.assertEqual(age, 10)
 | |
| 
 | |
|     def test_custom_expiry_timedelta(self):
 | |
|         modification = timezone.now()
 | |
| 
 | |
|         # Mock timezone.now, because set_expiry calls it on this code path.
 | |
|         original_now = timezone.now
 | |
|         try:
 | |
|             timezone.now = lambda: modification
 | |
|             self.session.set_expiry(timedelta(seconds=10))
 | |
|         finally:
 | |
|             timezone.now = original_now
 | |
| 
 | |
|         date = self.session.get_expiry_date(modification=modification)
 | |
|         self.assertEqual(date, modification + timedelta(seconds=10))
 | |
| 
 | |
|         age = self.session.get_expiry_age(modification=modification)
 | |
|         self.assertEqual(age, 10)
 | |
| 
 | |
|     def test_custom_expiry_datetime(self):
 | |
|         modification = timezone.now()
 | |
| 
 | |
|         self.session.set_expiry(modification + timedelta(seconds=10))
 | |
| 
 | |
|         date = self.session.get_expiry_date(modification=modification)
 | |
|         self.assertEqual(date, modification + timedelta(seconds=10))
 | |
| 
 | |
|         age = self.session.get_expiry_age(modification=modification)
 | |
|         self.assertEqual(age, 10)
 | |
| 
 | |
|     def test_custom_expiry_reset(self):
 | |
|         self.session.set_expiry(None)
 | |
|         self.session.set_expiry(10)
 | |
|         self.session.set_expiry(None)
 | |
|         self.assertEqual(self.session.get_expiry_age(), settings.SESSION_COOKIE_AGE)
 | |
| 
 | |
|     def test_get_expire_at_browser_close(self):
 | |
|         # Tests get_expire_at_browser_close with different settings and different
 | |
|         # set_expiry calls
 | |
|         with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=False):
 | |
|             self.session.set_expiry(10)
 | |
|             self.assertFalse(self.session.get_expire_at_browser_close())
 | |
| 
 | |
|             self.session.set_expiry(0)
 | |
|             self.assertTrue(self.session.get_expire_at_browser_close())
 | |
| 
 | |
|             self.session.set_expiry(None)
 | |
|             self.assertFalse(self.session.get_expire_at_browser_close())
 | |
| 
 | |
|         with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=True):
 | |
|             self.session.set_expiry(10)
 | |
|             self.assertFalse(self.session.get_expire_at_browser_close())
 | |
| 
 | |
|             self.session.set_expiry(0)
 | |
|             self.assertTrue(self.session.get_expire_at_browser_close())
 | |
| 
 | |
|             self.session.set_expiry(None)
 | |
|             self.assertTrue(self.session.get_expire_at_browser_close())
 | |
| 
 | |
|     def test_decode(self):
 | |
|         # Ensure we can decode what we encode
 | |
|         data = {'a test key': 'a test value'}
 | |
|         encoded = self.session.encode(data)
 | |
|         self.assertEqual(self.session.decode(encoded), data)
 | |
| 
 | |
|     def test_decode_failure_logged_to_security(self):
 | |
|         bad_encode = base64.b64encode(b'flaskdj:alkdjf')
 | |
|         with patch_logger('django.security.SuspiciousSession', 'warning') as calls:
 | |
|             self.assertEqual({}, self.session.decode(bad_encode))
 | |
|             # check that the failed decode is logged
 | |
|             self.assertEqual(len(calls), 1)
 | |
|             self.assertIn('corrupted', calls[0])
 | |
| 
 | |
|     def test_actual_expiry(self):
 | |
|         # this doesn't work with JSONSerializer (serializing timedelta)
 | |
|         with override_settings(SESSION_SERIALIZER='django.contrib.sessions.serializers.PickleSerializer'):
 | |
|             self.session = self.backend()  # reinitialize after overriding settings
 | |
| 
 | |
|             # Regression test for #19200
 | |
|             old_session_key = None
 | |
|             new_session_key = None
 | |
|             try:
 | |
|                 self.session['foo'] = 'bar'
 | |
|                 self.session.set_expiry(-timedelta(seconds=10))
 | |
|                 self.session.save()
 | |
|                 old_session_key = self.session.session_key
 | |
|                 # With an expiry date in the past, the session expires instantly.
 | |
|                 new_session = self.backend(self.session.session_key)
 | |
|                 new_session_key = new_session.session_key
 | |
|                 self.assertNotIn('foo', new_session)
 | |
|             finally:
 | |
|                 self.session.delete(old_session_key)
 | |
|                 self.session.delete(new_session_key)
 | |
| 
 | |
|     def test_session_load_does_not_create_record(self):
 | |
|         """
 | |
|         Loading an unknown session key does not create a session record.
 | |
| 
 | |
|         Creating session records on load is a DOS vulnerability.
 | |
|         """
 | |
|         session = self.backend('someunknownkey')
 | |
|         session.load()
 | |
| 
 | |
|         self.assertFalse(session.exists(session.session_key))
 | |
|         # provided unknown key was cycled, not reused
 | |
|         self.assertNotEqual(session.session_key, 'someunknownkey')
 | |
| 
 | |
|     def test_session_save_does_not_resurrect_session_logged_out_in_other_context(self):
 | |
|         """
 | |
|         Sessions shouldn't be resurrected by a concurrent request.
 | |
|         """
 | |
|         # Create new session.
 | |
|         s1 = self.backend()
 | |
|         s1['test_data'] = 'value1'
 | |
|         s1.save(must_create=True)
 | |
| 
 | |
|         # Logout in another context.
 | |
|         s2 = self.backend(s1.session_key)
 | |
|         s2.delete()
 | |
| 
 | |
|         # Modify session in first context.
 | |
|         s1['test_data'] = 'value2'
 | |
|         with self.assertRaises(UpdateError):
 | |
|             # This should throw an exception as the session is deleted, not
 | |
|             # resurrect the session.
 | |
|             s1.save()
 | |
| 
 | |
|         self.assertEqual(s1.load(), {})
 | |
| 
 | |
| 
 | |
| class DatabaseSessionTests(SessionTestsMixin, TestCase):
 | |
| 
 | |
|     backend = DatabaseSession
 | |
|     session_engine = 'django.contrib.sessions.backends.db'
 | |
| 
 | |
|     @property
 | |
|     def model(self):
 | |
|         return self.backend.get_model_class()
 | |
| 
 | |
|     def test_session_str(self):
 | |
|         "Session repr should be the session key."
 | |
|         self.session['x'] = 1
 | |
|         self.session.save()
 | |
| 
 | |
|         session_key = self.session.session_key
 | |
|         s = self.model.objects.get(session_key=session_key)
 | |
| 
 | |
|         self.assertEqual(force_text(s), session_key)
 | |
| 
 | |
|     def test_session_get_decoded(self):
 | |
|         """
 | |
|         Test we can use Session.get_decoded to retrieve data stored
 | |
|         in normal way
 | |
|         """
 | |
|         self.session['x'] = 1
 | |
|         self.session.save()
 | |
| 
 | |
|         s = self.model.objects.get(session_key=self.session.session_key)
 | |
| 
 | |
|         self.assertEqual(s.get_decoded(), {'x': 1})
 | |
| 
 | |
|     def test_sessionmanager_save(self):
 | |
|         """
 | |
|         Test SessionManager.save method
 | |
|         """
 | |
|         # Create a session
 | |
|         self.session['y'] = 1
 | |
|         self.session.save()
 | |
| 
 | |
|         s = self.model.objects.get(session_key=self.session.session_key)
 | |
|         # Change it
 | |
|         self.model.objects.save(s.session_key, {'y': 2}, s.expire_date)
 | |
|         # Clear cache, so that it will be retrieved from DB
 | |
|         del self.session._session_cache
 | |
|         self.assertEqual(self.session['y'], 2)
 | |
| 
 | |
|     def test_clearsessions_command(self):
 | |
|         """
 | |
|         Test clearsessions command for clearing expired sessions.
 | |
|         """
 | |
|         self.assertEqual(0, self.model.objects.count())
 | |
| 
 | |
|         # One object in the future
 | |
|         self.session['foo'] = 'bar'
 | |
|         self.session.set_expiry(3600)
 | |
|         self.session.save()
 | |
| 
 | |
|         # One object in the past
 | |
|         other_session = self.backend()
 | |
|         other_session['foo'] = 'bar'
 | |
|         other_session.set_expiry(-3600)
 | |
|         other_session.save()
 | |
| 
 | |
|         # Two sessions are in the database before clearsessions...
 | |
|         self.assertEqual(2, self.model.objects.count())
 | |
|         with override_settings(SESSION_ENGINE=self.session_engine):
 | |
|             management.call_command('clearsessions')
 | |
|         # ... and one is deleted.
 | |
|         self.assertEqual(1, self.model.objects.count())
 | |
| 
 | |
| 
 | |
| @override_settings(USE_TZ=True)
 | |
| class DatabaseSessionWithTimeZoneTests(DatabaseSessionTests):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class CustomDatabaseSessionTests(DatabaseSessionTests):
 | |
|     backend = CustomDatabaseSession
 | |
|     session_engine = 'sessions_tests.models'
 | |
| 
 | |
|     def test_extra_session_field(self):
 | |
|         # Set the account ID to be picked up by a custom session storage
 | |
|         # and saved to a custom session model database column.
 | |
|         self.session['_auth_user_id'] = 42
 | |
|         self.session.save()
 | |
| 
 | |
|         # Make sure that the customized create_model_instance() was called.
 | |
|         s = self.model.objects.get(session_key=self.session.session_key)
 | |
|         self.assertEqual(s.account_id, 42)
 | |
| 
 | |
|         # Make the session "anonymous".
 | |
|         self.session.pop('_auth_user_id')
 | |
|         self.session.save()
 | |
| 
 | |
|         # Make sure that save() on an existing session did the right job.
 | |
|         s = self.model.objects.get(session_key=self.session.session_key)
 | |
|         self.assertIsNone(s.account_id)
 | |
| 
 | |
| 
 | |
| class CacheDBSessionTests(SessionTestsMixin, TestCase):
 | |
| 
 | |
|     backend = CacheDBSession
 | |
| 
 | |
|     def test_exists_searches_cache_first(self):
 | |
|         self.session.save()
 | |
|         with self.assertNumQueries(0):
 | |
|             self.assertTrue(self.session.exists(self.session.session_key))
 | |
| 
 | |
|     # Some backends might issue a warning
 | |
|     @ignore_warnings(module="django.core.cache.backends.base")
 | |
|     def test_load_overlong_key(self):
 | |
|         self.session._session_key = (string.ascii_letters + string.digits) * 20
 | |
|         self.assertEqual(self.session.load(), {})
 | |
| 
 | |
|     @override_settings(SESSION_CACHE_ALIAS='sessions')
 | |
|     def test_non_default_cache(self):
 | |
|         # 21000 - CacheDB backend should respect SESSION_CACHE_ALIAS.
 | |
|         with self.assertRaises(InvalidCacheBackendError):
 | |
|             self.backend()
 | |
| 
 | |
| 
 | |
| @override_settings(USE_TZ=True)
 | |
| class CacheDBSessionWithTimeZoneTests(CacheDBSessionTests):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| # Don't need DB flushing for these tests, so can use unittest.TestCase as base class
 | |
| class FileSessionTests(SessionTestsMixin, unittest.TestCase):
 | |
| 
 | |
|     backend = FileSession
 | |
| 
 | |
|     def setUp(self):
 | |
|         # Do file session tests in an isolated directory, and kill it after we're done.
 | |
|         self.original_session_file_path = settings.SESSION_FILE_PATH
 | |
|         self.temp_session_store = settings.SESSION_FILE_PATH = tempfile.mkdtemp()
 | |
|         # Reset the file session backend's internal caches
 | |
|         if hasattr(self.backend, '_storage_path'):
 | |
|             del self.backend._storage_path
 | |
|         super(FileSessionTests, self).setUp()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(FileSessionTests, self).tearDown()
 | |
|         settings.SESSION_FILE_PATH = self.original_session_file_path
 | |
|         shutil.rmtree(self.temp_session_store)
 | |
| 
 | |
|     @override_settings(
 | |
|         SESSION_FILE_PATH="/if/this/directory/exists/you/have/a/weird/computer")
 | |
|     def test_configuration_check(self):
 | |
|         del self.backend._storage_path
 | |
|         # Make sure the file backend checks for a good storage dir
 | |
|         with self.assertRaises(ImproperlyConfigured):
 | |
|             self.backend()
 | |
| 
 | |
|     def test_invalid_key_backslash(self):
 | |
|         # Ensure we don't allow directory-traversal.
 | |
|         # This is tested directly on _key_to_file, as load() will swallow
 | |
|         # a SuspiciousOperation in the same way as an IOError - by creating
 | |
|         # a new session, making it unclear whether the slashes were detected.
 | |
|         with self.assertRaises(InvalidSessionKey):
 | |
|             self.backend()._key_to_file("a\\b\\c")
 | |
| 
 | |
|     def test_invalid_key_forwardslash(self):
 | |
|         # Ensure we don't allow directory-traversal
 | |
|         with self.assertRaises(InvalidSessionKey):
 | |
|             self.backend()._key_to_file("a/b/c")
 | |
| 
 | |
|     @override_settings(
 | |
|         SESSION_ENGINE="django.contrib.sessions.backends.file",
 | |
|         SESSION_COOKIE_AGE=0,
 | |
|     )
 | |
|     def test_clearsessions_command(self):
 | |
|         """
 | |
|         Test clearsessions command for clearing expired sessions.
 | |
|         """
 | |
|         storage_path = self.backend._get_storage_path()
 | |
|         file_prefix = settings.SESSION_COOKIE_NAME
 | |
| 
 | |
|         def count_sessions():
 | |
|             return len([
 | |
|                 session_file for session_file in os.listdir(storage_path)
 | |
|                 if session_file.startswith(file_prefix)
 | |
|             ])
 | |
| 
 | |
|         self.assertEqual(0, count_sessions())
 | |
| 
 | |
|         # One object in the future
 | |
|         self.session['foo'] = 'bar'
 | |
|         self.session.set_expiry(3600)
 | |
|         self.session.save()
 | |
| 
 | |
|         # One object in the past
 | |
|         other_session = self.backend()
 | |
|         other_session['foo'] = 'bar'
 | |
|         other_session.set_expiry(-3600)
 | |
|         other_session.save()
 | |
| 
 | |
|         # One object in the present without an expiry (should be deleted since
 | |
|         # its modification time + SESSION_COOKIE_AGE will be in the past when
 | |
|         # clearsessions runs).
 | |
|         other_session2 = self.backend()
 | |
|         other_session2['foo'] = 'bar'
 | |
|         other_session2.save()
 | |
| 
 | |
|         # Three sessions are in the filesystem before clearsessions...
 | |
|         self.assertEqual(3, count_sessions())
 | |
|         management.call_command('clearsessions')
 | |
|         # ... and two are deleted.
 | |
|         self.assertEqual(1, count_sessions())
 | |
| 
 | |
| 
 | |
| class CacheSessionTests(SessionTestsMixin, unittest.TestCase):
 | |
| 
 | |
|     backend = CacheSession
 | |
| 
 | |
|     # Some backends might issue a warning
 | |
|     @ignore_warnings(module="django.core.cache.backends.base")
 | |
|     def test_load_overlong_key(self):
 | |
|         self.session._session_key = (string.ascii_letters + string.digits) * 20
 | |
|         self.assertEqual(self.session.load(), {})
 | |
| 
 | |
|     def test_default_cache(self):
 | |
|         self.session.save()
 | |
|         self.assertIsNotNone(caches['default'].get(self.session.cache_key))
 | |
| 
 | |
|     @override_settings(CACHES={
 | |
|         'default': {
 | |
|             'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
 | |
|         },
 | |
|         'sessions': {
 | |
|             'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
 | |
|             'LOCATION': 'session',
 | |
|         },
 | |
|     }, SESSION_CACHE_ALIAS='sessions')
 | |
|     def test_non_default_cache(self):
 | |
|         # Re-initialize the session backend to make use of overridden settings.
 | |
|         self.session = self.backend()
 | |
| 
 | |
|         self.session.save()
 | |
|         self.assertIsNone(caches['default'].get(self.session.cache_key))
 | |
|         self.assertIsNotNone(caches['sessions'].get(self.session.cache_key))
 | |
| 
 | |
|     def test_create_and_save(self):
 | |
|         self.session = self.backend()
 | |
|         self.session.create()
 | |
|         self.session.save()
 | |
|         self.assertIsNotNone(caches['default'].get(self.session.cache_key))
 | |
| 
 | |
| 
 | |
| class SessionMiddlewareTests(TestCase):
 | |
| 
 | |
|     @override_settings(SESSION_COOKIE_SECURE=True)
 | |
|     def test_secure_session_cookie(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Simulate a request the modifies the session
 | |
|         middleware.process_request(request)
 | |
|         request.session['hello'] = 'world'
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
|         self.assertTrue(
 | |
|             response.cookies[settings.SESSION_COOKIE_NAME]['secure'])
 | |
| 
 | |
|     @override_settings(SESSION_COOKIE_HTTPONLY=True)
 | |
|     def test_httponly_session_cookie(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Simulate a request the modifies the session
 | |
|         middleware.process_request(request)
 | |
|         request.session['hello'] = 'world'
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
|         self.assertTrue(
 | |
|             response.cookies[settings.SESSION_COOKIE_NAME]['httponly'])
 | |
|         self.assertIn(
 | |
|             http_cookies.Morsel._reserved['httponly'],
 | |
|             str(response.cookies[settings.SESSION_COOKIE_NAME])
 | |
|         )
 | |
| 
 | |
|     @override_settings(SESSION_COOKIE_HTTPONLY=False)
 | |
|     def test_no_httponly_session_cookie(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Simulate a request the modifies the session
 | |
|         middleware.process_request(request)
 | |
|         request.session['hello'] = 'world'
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
|         self.assertFalse(response.cookies[settings.SESSION_COOKIE_NAME]['httponly'])
 | |
| 
 | |
|         self.assertNotIn(http_cookies.Morsel._reserved['httponly'],
 | |
|                          str(response.cookies[settings.SESSION_COOKIE_NAME]))
 | |
| 
 | |
|     def test_session_save_on_500(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Horrible error')
 | |
|         response.status_code = 500
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Simulate a request the modifies the session
 | |
|         middleware.process_request(request)
 | |
|         request.session['hello'] = 'world'
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
| 
 | |
|         # Check that the value wasn't saved above.
 | |
|         self.assertNotIn('hello', request.session.load())
 | |
| 
 | |
|     def test_session_update_error_redirect(self):
 | |
|         path = '/foo/'
 | |
|         request = RequestFactory().get(path)
 | |
|         response = HttpResponse()
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         request.session = DatabaseSession()
 | |
|         request.session.save(must_create=True)
 | |
|         request.session.delete()
 | |
| 
 | |
|         # Handle the response through the middleware. It will try to save the
 | |
|         # deleted session which will cause an UpdateError that's caught and
 | |
|         # results in a redirect to the original page.
 | |
|         response = middleware.process_response(request, response)
 | |
| 
 | |
|         # Check that the response is a redirect.
 | |
|         self.assertEqual(response.status_code, 302)
 | |
|         self.assertEqual(response['Location'], path)
 | |
| 
 | |
|     def test_session_delete_on_end(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Before deleting, there has to be an existing cookie
 | |
|         request.COOKIES[settings.SESSION_COOKIE_NAME] = 'abc'
 | |
| 
 | |
|         # Simulate a request that ends the session
 | |
|         middleware.process_request(request)
 | |
|         request.session.flush()
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
| 
 | |
|         # Check that the cookie was deleted, not recreated.
 | |
|         # A deleted cookie header looks like:
 | |
|         #  Set-Cookie: sessionid=; expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/
 | |
|         self.assertEqual(
 | |
|             'Set-Cookie: {}={}; expires=Thu, 01-Jan-1970 00:00:00 GMT; '
 | |
|             'Max-Age=0; Path=/'.format(
 | |
|                 settings.SESSION_COOKIE_NAME,
 | |
|                 '""' if sys.version_info >= (3, 5) else '',
 | |
|             ),
 | |
|             str(response.cookies[settings.SESSION_COOKIE_NAME])
 | |
|         )
 | |
| 
 | |
|     @override_settings(SESSION_COOKIE_DOMAIN='.example.local', SESSION_COOKIE_PATH='/example/')
 | |
|     def test_session_delete_on_end_with_custom_domain_and_path(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Before deleting, there has to be an existing cookie
 | |
|         request.COOKIES[settings.SESSION_COOKIE_NAME] = 'abc'
 | |
| 
 | |
|         # Simulate a request that ends the session
 | |
|         middleware.process_request(request)
 | |
|         request.session.flush()
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
| 
 | |
|         # Check that the cookie was deleted, not recreated.
 | |
|         # A deleted cookie header with a custom domain and path looks like:
 | |
|         #  Set-Cookie: sessionid=; Domain=.example.local;
 | |
|         #              expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0;
 | |
|         #              Path=/example/
 | |
|         self.assertEqual(
 | |
|             'Set-Cookie: {}={}; Domain=.example.local; expires=Thu, '
 | |
|             '01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/example/'.format(
 | |
|                 settings.SESSION_COOKIE_NAME,
 | |
|                 '""' if sys.version_info >= (3, 5) else '',
 | |
|             ),
 | |
|             str(response.cookies[settings.SESSION_COOKIE_NAME])
 | |
|         )
 | |
| 
 | |
|     def test_flush_empty_without_session_cookie_doesnt_set_cookie(self):
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Simulate a request that ends the session
 | |
|         middleware.process_request(request)
 | |
|         request.session.flush()
 | |
| 
 | |
|         # Handle the response through the middleware
 | |
|         response = middleware.process_response(request, response)
 | |
| 
 | |
|         # A cookie should not be set.
 | |
|         self.assertEqual(response.cookies, {})
 | |
|         # The session is accessed so "Vary: Cookie" should be set.
 | |
|         self.assertEqual(response['Vary'], 'Cookie')
 | |
| 
 | |
|     def test_empty_session_saved(self):
 | |
|         """
 | |
|         If a session is emptied of data but still has a key, it should still
 | |
|         be updated.
 | |
|         """
 | |
|         request = RequestFactory().get('/')
 | |
|         response = HttpResponse('Session test')
 | |
|         middleware = SessionMiddleware()
 | |
| 
 | |
|         # Set a session key and some data.
 | |
|         middleware.process_request(request)
 | |
|         request.session['foo'] = 'bar'
 | |
|         # Handle the response through the middleware.
 | |
|         response = middleware.process_response(request, response)
 | |
|         self.assertEqual(tuple(request.session.items()), (('foo', 'bar'),))
 | |
|         # A cookie should be set, along with Vary: Cookie.
 | |
|         self.assertIn(
 | |
|             'Set-Cookie: sessionid=%s' % request.session.session_key,
 | |
|             str(response.cookies)
 | |
|         )
 | |
|         self.assertEqual(response['Vary'], 'Cookie')
 | |
| 
 | |
|         # Empty the session data.
 | |
|         del request.session['foo']
 | |
|         # Handle the response through the middleware.
 | |
|         response = HttpResponse('Session test')
 | |
|         response = middleware.process_response(request, response)
 | |
|         self.assertEqual(dict(request.session.values()), {})
 | |
|         session = Session.objects.get(session_key=request.session.session_key)
 | |
|         self.assertEqual(session.get_decoded(), {})
 | |
|         # While the session is empty, it hasn't been flushed so a cookie should
 | |
|         # still be set, along with Vary: Cookie.
 | |
|         self.assertGreater(len(request.session.session_key), 8)
 | |
|         self.assertIn(
 | |
|             'Set-Cookie: sessionid=%s' % request.session.session_key,
 | |
|             str(response.cookies)
 | |
|         )
 | |
|         self.assertEqual(response['Vary'], 'Cookie')
 | |
| 
 | |
| 
 | |
| # Don't need DB flushing for these tests, so can use unittest.TestCase as base class
 | |
| class CookieSessionTests(SessionTestsMixin, unittest.TestCase):
 | |
| 
 | |
|     backend = CookieSession
 | |
| 
 | |
|     def test_save(self):
 | |
|         """
 | |
|         This test tested exists() in the other session backends, but that
 | |
|         doesn't make sense for us.
 | |
|         """
 | |
|         pass
 | |
| 
 | |
|     def test_cycle(self):
 | |
|         """
 | |
|         This test tested cycle_key() which would create a new session
 | |
|         key for the same session data. But we can't invalidate previously
 | |
|         signed cookies (other than letting them expire naturally) so
 | |
|         testing for this behavior is meaningless.
 | |
|         """
 | |
|         pass
 | |
| 
 | |
|     @unittest.expectedFailure
 | |
|     def test_actual_expiry(self):
 | |
|         # The cookie backend doesn't handle non-default expiry dates, see #19201
 | |
|         super(CookieSessionTests, self).test_actual_expiry()
 | |
| 
 | |
|     def test_unpickling_exception(self):
 | |
|         # signed_cookies backend should handle unpickle exceptions gracefully
 | |
|         # by creating a new session
 | |
|         self.assertEqual(self.session.serializer, JSONSerializer)
 | |
|         self.session.save()
 | |
| 
 | |
|         self.session.serializer = PickleSerializer
 | |
|         self.session.load()
 | |
| 
 | |
|     @unittest.skip("Cookie backend doesn't have an external store to create records in.")
 | |
|     def test_session_load_does_not_create_record(self):
 | |
|         pass
 | |
| 
 | |
|     @unittest.skip("CookieSession is stored in the client and there is no way to query it.")
 | |
|     def test_session_save_does_not_resurrect_session_logged_out_in_other_context(self):
 | |
|         pass
 |