1
0
mirror of https://github.com/django/django.git synced 2025-10-25 14:46:09 +00:00

[1.2.X] Fixed #14199 -- Added a missing table creation statement in the db cache backend cull implementation, and added tests for cache culling. Thanks to Tim for the report.

Backport of r13678 from trunk.

git-svn-id: http://code.djangoproject.com/svn/django/branches/releases/1.2.X@13679 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Russell Keith-Magee
2010-08-31 00:45:35 +00:00
parent 4d8af2faa3
commit 7a1b694189
2 changed files with 27 additions and 3 deletions

View File

@@ -124,6 +124,7 @@ class CacheClass(BaseCache):
if self._cull_frequency == 0:
self.clear()
else:
table = connections[db].ops.quote_name(self._table)
cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
[connections[db].ops.value_to_db_datetime(now)])
cursor.execute("SELECT COUNT(*) FROM %s" % table)

View File

@@ -352,21 +352,41 @@ class BaseCacheTests(object):
self.assertEqual(self.cache.get('key3'), 'sausage')
self.assertEqual(self.cache.get('key4'), 'lobster bisque')
def perform_cull_test(self, initial_count, final_count):
"""This is implemented as a utility method, because only some of the backends
implement culling. The culling algorithm also varies slightly, so the final
number of entries will vary between backends"""
# Create initial cache key entries. This will overflow the cache, causing a cull
for i in range(1, initial_count):
self.cache.set('cull%d' % i, 'value', 1000)
count = 0
# Count how many keys are left in the cache.
for i in range(1, initial_count):
if self.cache.has_key('cull%d' % i):
count = count + 1
self.assertEqual(count, final_count)
class DBCacheTests(unittest.TestCase, BaseCacheTests):
def setUp(self):
# Spaces are used in the table name to ensure quoting/escaping is working
self._table_name = 'test cache table'
management.call_command('createcachetable', self._table_name, verbosity=0, interactive=False)
self.cache = get_cache('db://%s' % self._table_name)
self.cache = get_cache('db://%s?max_entries=30' % self._table_name)
def tearDown(self):
from django.db import connection
cursor = connection.cursor()
cursor.execute('DROP TABLE %s' % connection.ops.quote_name(self._table_name))
def test_cull(self):
self.perform_cull_test(50, 29)
class LocMemCacheTests(unittest.TestCase, BaseCacheTests):
def setUp(self):
self.cache = get_cache('locmem://')
self.cache = get_cache('locmem://?max_entries=30')
def test_cull(self):
self.perform_cull_test(50, 29)
# memcached backend isn't guaranteed to be available.
# To check the memcached backend, the test settings file will
@@ -383,7 +403,7 @@ class FileBasedCacheTests(unittest.TestCase, BaseCacheTests):
"""
def setUp(self):
self.dirname = tempfile.mkdtemp()
self.cache = get_cache('file://%s' % self.dirname)
self.cache = get_cache('file://%s?max_entries=30' % self.dirname)
def test_hashing(self):
"""Test that keys are hashed into subdirectories correctly"""
@@ -406,6 +426,9 @@ class FileBasedCacheTests(unittest.TestCase, BaseCacheTests):
self.assert_(not os.path.exists(os.path.dirname(keypath)))
self.assert_(not os.path.exists(os.path.dirname(os.path.dirname(keypath))))
def test_cull(self):
self.perform_cull_test(50, 28)
class CacheUtils(unittest.TestCase):
"""TestCase for django.utils.cache functions."""