mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			485 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			485 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #! -*- coding: utf-8 -*-
 | |
| from __future__ import unicode_literals
 | |
| 
 | |
| import base64
 | |
| import errno
 | |
| import hashlib
 | |
| import json
 | |
| import os
 | |
| import shutil
 | |
| import tempfile as sys_tempfile
 | |
| import unittest
 | |
| 
 | |
| from django.core.files import temp as tempfile
 | |
| from django.core.files.uploadedfile import SimpleUploadedFile
 | |
| from django.http.multipartparser import MultiPartParser
 | |
| from django.test import TestCase, client
 | |
| from django.test import override_settings
 | |
| from django.utils.encoding import force_bytes
 | |
| from django.utils.six import StringIO
 | |
| 
 | |
| from . import uploadhandler
 | |
| from .models import FileModel
 | |
| 
 | |
| 
 | |
| UNICODE_FILENAME = 'test-0123456789_中文_Orléans.jpg'
 | |
| MEDIA_ROOT = sys_tempfile.mkdtemp(dir=os.environ['DJANGO_TEST_TEMP_DIR'])
 | |
| UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload')
 | |
| 
 | |
| 
 | |
| @override_settings(MEDIA_ROOT=MEDIA_ROOT)
 | |
| class FileUploadTests(TestCase):
 | |
|     urls = 'file_uploads.urls'
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         if not os.path.isdir(MEDIA_ROOT):
 | |
|             os.makedirs(MEDIA_ROOT)
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         shutil.rmtree(MEDIA_ROOT)
 | |
| 
 | |
|     def test_simple_upload(self):
 | |
|         with open(__file__, 'rb') as fp:
 | |
|             post_data = {
 | |
|                 'name': 'Ringo',
 | |
|                 'file_field': fp,
 | |
|             }
 | |
|             response = self.client.post('/upload/', post_data)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_large_upload(self):
 | |
|         tdir = tempfile.gettempdir()
 | |
| 
 | |
|         file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
 | |
|         file1.write(b'a' * (2 ** 21))
 | |
|         file1.seek(0)
 | |
| 
 | |
|         file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
 | |
|         file2.write(b'a' * (10 * 2 ** 20))
 | |
|         file2.seek(0)
 | |
| 
 | |
|         post_data = {
 | |
|             'name': 'Ringo',
 | |
|             'file_field1': file1,
 | |
|             'file_field2': file2,
 | |
|         }
 | |
| 
 | |
|         for key in list(post_data):
 | |
|             try:
 | |
|                 post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
 | |
|                 post_data[key].seek(0)
 | |
|             except AttributeError:
 | |
|                 post_data[key + '_hash'] = hashlib.sha1(force_bytes(post_data[key])).hexdigest()
 | |
| 
 | |
|         response = self.client.post('/verify/', post_data)
 | |
| 
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def _test_base64_upload(self, content):
 | |
|         payload = client.FakePayload("\r\n".join([
 | |
|             '--' + client.BOUNDARY,
 | |
|             'Content-Disposition: form-data; name="file"; filename="test.txt"',
 | |
|             'Content-Type: application/octet-stream',
 | |
|             'Content-Transfer-Encoding: base64',
 | |
|             '']))
 | |
|         payload.write(b"\r\n" + base64.b64encode(force_bytes(content)) + b"\r\n")
 | |
|         payload.write('--' + client.BOUNDARY + '--\r\n')
 | |
|         r = {
 | |
|             'CONTENT_LENGTH': len(payload),
 | |
|             'CONTENT_TYPE': client.MULTIPART_CONTENT,
 | |
|             'PATH_INFO': "/echo_content/",
 | |
|             'REQUEST_METHOD': 'POST',
 | |
|             'wsgi.input': payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         received = json.loads(response.content.decode('utf-8'))
 | |
| 
 | |
|         self.assertEqual(received['file'], content)
 | |
| 
 | |
|     def test_base64_upload(self):
 | |
|         self._test_base64_upload("This data will be transmitted base64-encoded.")
 | |
| 
 | |
|     def test_big_base64_upload(self):
 | |
|         self._test_base64_upload("Big data" * 68000)  # > 512Kb
 | |
| 
 | |
|     def test_unicode_file_name(self):
 | |
|         tdir = sys_tempfile.mkdtemp()
 | |
|         self.addCleanup(shutil.rmtree, tdir, True)
 | |
| 
 | |
|         # This file contains chinese symbols and an accented char in the name.
 | |
|         with open(os.path.join(tdir, UNICODE_FILENAME), 'w+b') as file1:
 | |
|             file1.write(b'b' * (2 ** 10))
 | |
|             file1.seek(0)
 | |
| 
 | |
|             post_data = {
 | |
|                 'file_unicode': file1,
 | |
|             }
 | |
| 
 | |
|             response = self.client.post('/unicode_name/', post_data)
 | |
| 
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_dangerous_file_names(self):
 | |
|         """Uploaded file names should be sanitized before ever reaching the view."""
 | |
|         # This test simulates possible directory traversal attacks by a
 | |
|         # malicious uploader We have to do some monkeybusiness here to construct
 | |
|         # a malicious payload with an invalid file name (containing os.sep or
 | |
|         # os.pardir). This similar to what an attacker would need to do when
 | |
|         # trying such an attack.
 | |
|         scary_file_names = [
 | |
|             "/tmp/hax0rd.txt",          # Absolute path, *nix-style.
 | |
|             "C:\\Windows\\hax0rd.txt",  # Absolute path, win-syle.
 | |
|             "C:/Windows/hax0rd.txt",    # Absolute path, broken-style.
 | |
|             "\\tmp\\hax0rd.txt",        # Absolute path, broken in a different way.
 | |
|             "/tmp\\hax0rd.txt",         # Absolute path, broken by mixing.
 | |
|             "subdir/hax0rd.txt",        # Descendant path, *nix-style.
 | |
|             "subdir\\hax0rd.txt",       # Descendant path, win-style.
 | |
|             "sub/dir\\hax0rd.txt",      # Descendant path, mixed.
 | |
|             "../../hax0rd.txt",         # Relative path, *nix-style.
 | |
|             "..\\..\\hax0rd.txt",       # Relative path, win-style.
 | |
|             "../..\\hax0rd.txt"         # Relative path, mixed.
 | |
|         ]
 | |
| 
 | |
|         payload = client.FakePayload()
 | |
|         for i, name in enumerate(scary_file_names):
 | |
|             payload.write('\r\n'.join([
 | |
|                 '--' + client.BOUNDARY,
 | |
|                 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
 | |
|                 'Content-Type: application/octet-stream',
 | |
|                 '',
 | |
|                 'You got pwnd.\r\n'
 | |
|             ]))
 | |
|         payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
 | |
| 
 | |
|         r = {
 | |
|             'CONTENT_LENGTH': len(payload),
 | |
|             'CONTENT_TYPE': client.MULTIPART_CONTENT,
 | |
|             'PATH_INFO': "/echo/",
 | |
|             'REQUEST_METHOD': 'POST',
 | |
|             'wsgi.input': payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
| 
 | |
|         # The filenames should have been sanitized by the time it got to the view.
 | |
|         received = json.loads(response.content.decode('utf-8'))
 | |
|         for i, name in enumerate(scary_file_names):
 | |
|             got = received["file%s" % i]
 | |
|             self.assertEqual(got, "hax0rd.txt")
 | |
| 
 | |
|     def test_filename_overflow(self):
 | |
|         """File names over 256 characters (dangerous on some platforms) get fixed up."""
 | |
|         long_str = 'f' * 300
 | |
|         cases = [
 | |
|             # field name, filename, expected
 | |
|             ('long_filename', '%s.txt' % long_str, '%s.txt' % long_str[:251]),
 | |
|             ('long_extension', 'foo.%s' % long_str, '.%s' % long_str[:254]),
 | |
|             ('no_extension', long_str, long_str[:255]),
 | |
|             ('no_filename', '.%s' % long_str, '.%s' % long_str[:254]),
 | |
|             ('long_everything', '%s.%s' % (long_str, long_str), '.%s' % long_str[:254]),
 | |
|         ]
 | |
|         payload = client.FakePayload()
 | |
|         for name, filename, _ in cases:
 | |
|             payload.write("\r\n".join([
 | |
|                 '--' + client.BOUNDARY,
 | |
|                 'Content-Disposition: form-data; name="{0}"; filename="{1}"',
 | |
|                 'Content-Type: application/octet-stream',
 | |
|                 '',
 | |
|                 'Oops.',
 | |
|                 ''
 | |
|             ]).format(name, filename))
 | |
|         payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
 | |
|         r = {
 | |
|             'CONTENT_LENGTH': len(payload),
 | |
|             'CONTENT_TYPE': client.MULTIPART_CONTENT,
 | |
|             'PATH_INFO': "/echo/",
 | |
|             'REQUEST_METHOD': 'POST',
 | |
|             'wsgi.input': payload,
 | |
|         }
 | |
|         result = json.loads(self.client.request(**r).content.decode('utf-8'))
 | |
|         for name, _, expected in cases:
 | |
|             got = result[name]
 | |
|             self.assertEqual(expected, got, 'Mismatch for {0}'.format(name))
 | |
|             self.assertTrue(len(got) < 256,
 | |
|                             "Got a long file name (%s characters)." % len(got))
 | |
| 
 | |
|     def test_content_type_extra(self):
 | |
|         """Uploaded files may have content type parameters available."""
 | |
|         tdir = tempfile.gettempdir()
 | |
| 
 | |
|         no_content_type = tempfile.NamedTemporaryFile(suffix=".ctype_extra", dir=tdir)
 | |
|         no_content_type.write(b'something')
 | |
|         no_content_type.seek(0)
 | |
| 
 | |
|         simple_file = tempfile.NamedTemporaryFile(suffix=".ctype_extra", dir=tdir)
 | |
|         simple_file.write(b'something')
 | |
|         simple_file.seek(0)
 | |
|         simple_file.content_type = 'text/plain; test-key=test_value'
 | |
| 
 | |
|         response = self.client.post('/echo_content_type_extra/', {
 | |
|             'no_content_type': no_content_type,
 | |
|             'simple_file': simple_file,
 | |
|         })
 | |
|         received = json.loads(response.content.decode('utf-8'))
 | |
|         self.assertEqual(received['no_content_type'], {})
 | |
|         self.assertEqual(received['simple_file'], {'test-key': 'test_value'})
 | |
| 
 | |
|     def test_truncated_multipart_handled_gracefully(self):
 | |
|         """
 | |
|         If passed an incomplete multipart message, MultiPartParser does not
 | |
|         attempt to read beyond the end of the stream, and simply will handle
 | |
|         the part that can be parsed gracefully.
 | |
|         """
 | |
|         payload_str = "\r\n".join([
 | |
|             '--' + client.BOUNDARY,
 | |
|             'Content-Disposition: form-data; name="file"; filename="foo.txt"',
 | |
|             'Content-Type: application/octet-stream',
 | |
|             '',
 | |
|             'file contents'
 | |
|             '--' + client.BOUNDARY + '--',
 | |
|             '',
 | |
|         ])
 | |
|         payload = client.FakePayload(payload_str[:-10])
 | |
|         r = {
 | |
|             'CONTENT_LENGTH': len(payload),
 | |
|             'CONTENT_TYPE': client.MULTIPART_CONTENT,
 | |
|             'PATH_INFO': '/echo/',
 | |
|             'REQUEST_METHOD': 'POST',
 | |
|             'wsgi.input': payload,
 | |
|         }
 | |
|         got = json.loads(self.client.request(**r).content.decode('utf-8'))
 | |
|         self.assertEqual(got, {})
 | |
| 
 | |
|     def test_empty_multipart_handled_gracefully(self):
 | |
|         """
 | |
|         If passed an empty multipart message, MultiPartParser will return
 | |
|         an empty QueryDict.
 | |
|         """
 | |
|         r = {
 | |
|             'CONTENT_LENGTH': 0,
 | |
|             'CONTENT_TYPE': client.MULTIPART_CONTENT,
 | |
|             'PATH_INFO': '/echo/',
 | |
|             'REQUEST_METHOD': 'POST',
 | |
|             'wsgi.input': client.FakePayload(b''),
 | |
|         }
 | |
|         got = json.loads(self.client.request(**r).content.decode('utf-8'))
 | |
|         self.assertEqual(got, {})
 | |
| 
 | |
|     def test_custom_upload_handler(self):
 | |
|         # A small file (under the 5M quota)
 | |
|         smallfile = tempfile.NamedTemporaryFile()
 | |
|         smallfile.write(b'a' * (2 ** 21))
 | |
|         smallfile.seek(0)
 | |
| 
 | |
|         # A big file (over the quota)
 | |
|         bigfile = tempfile.NamedTemporaryFile()
 | |
|         bigfile.write(b'a' * (10 * 2 ** 20))
 | |
|         bigfile.seek(0)
 | |
| 
 | |
|         # Small file posting should work.
 | |
|         response = self.client.post('/quota/', {'f': smallfile})
 | |
|         got = json.loads(response.content.decode('utf-8'))
 | |
|         self.assertTrue('f' in got)
 | |
| 
 | |
|         # Large files don't go through.
 | |
|         response = self.client.post("/quota/", {'f': bigfile})
 | |
|         got = json.loads(response.content.decode('utf-8'))
 | |
|         self.assertTrue('f' not in got)
 | |
| 
 | |
|     def test_broken_custom_upload_handler(self):
 | |
|         f = tempfile.NamedTemporaryFile()
 | |
|         f.write(b'a' * (2 ** 21))
 | |
|         f.seek(0)
 | |
| 
 | |
|         # AttributeError: You cannot alter upload handlers after the upload has been processed.
 | |
|         self.assertRaises(
 | |
|             AttributeError,
 | |
|             self.client.post,
 | |
|             '/quota/broken/',
 | |
|             {'f': f}
 | |
|         )
 | |
| 
 | |
|     def test_fileupload_getlist(self):
 | |
|         file1 = tempfile.NamedTemporaryFile()
 | |
|         file1.write(b'a' * (2 ** 23))
 | |
|         file1.seek(0)
 | |
| 
 | |
|         file2 = tempfile.NamedTemporaryFile()
 | |
|         file2.write(b'a' * (2 * 2 ** 18))
 | |
|         file2.seek(0)
 | |
| 
 | |
|         file2a = tempfile.NamedTemporaryFile()
 | |
|         file2a.write(b'a' * (5 * 2 ** 20))
 | |
|         file2a.seek(0)
 | |
| 
 | |
|         response = self.client.post('/getlist_count/', {
 | |
|             'file1': file1,
 | |
|             'field1': 'test',
 | |
|             'field2': 'test3',
 | |
|             'field3': 'test5',
 | |
|             'field4': 'test6',
 | |
|             'field5': 'test7',
 | |
|             'file2': (file2, file2a)
 | |
|         })
 | |
|         got = json.loads(response.content.decode('utf-8'))
 | |
| 
 | |
|         self.assertEqual(got.get('file1'), 1)
 | |
|         self.assertEqual(got.get('file2'), 2)
 | |
| 
 | |
|     def test_fileuploads_closed_at_request_end(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file() as f1, file() as f2a, file() as f2b:
 | |
|             response = self.client.post('/fd_closing/t/', {
 | |
|                 'file': f1,
 | |
|                 'file2': (f2a, f2b),
 | |
|             })
 | |
| 
 | |
|         request = response.wsgi_request
 | |
|         # Check that the files got actually parsed.
 | |
|         self.assertTrue(hasattr(request, '_files'))
 | |
| 
 | |
|         file = request._files['file']
 | |
|         self.assertTrue(file.closed)
 | |
| 
 | |
|         files = request._files.getlist('file2')
 | |
|         self.assertTrue(files[0].closed)
 | |
|         self.assertTrue(files[1].closed)
 | |
| 
 | |
|     def test_no_parsing_triggered_by_fd_closing(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file() as f1, file() as f2a, file() as f2b:
 | |
|             response = self.client.post('/fd_closing/f/', {
 | |
|                 'file': f1,
 | |
|                 'file2': (f2a, f2b),
 | |
|             })
 | |
| 
 | |
|         request = response.wsgi_request
 | |
|         # Check that the fd closing logic doesn't trigger parsing of the stream
 | |
|         self.assertFalse(hasattr(request, '_files'))
 | |
| 
 | |
|     def test_file_error_blocking(self):
 | |
|         """
 | |
|         The server should not block when there are upload errors (bug #8622).
 | |
|         This can happen if something -- i.e. an exception handler -- tries to
 | |
|         access POST while handling an error in parsing POST. This shouldn't
 | |
|         cause an infinite loop!
 | |
|         """
 | |
|         class POSTAccessingHandler(client.ClientHandler):
 | |
|             """A handler that'll access POST during an exception."""
 | |
|             def handle_uncaught_exception(self, request, resolver, exc_info):
 | |
|                 ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
 | |
|                 request.POST  # evaluate
 | |
|                 return ret
 | |
| 
 | |
|         # Maybe this is a little more complicated that it needs to be; but if
 | |
|         # the django.test.client.FakePayload.read() implementation changes then
 | |
|         # this test would fail.  So we need to know exactly what kind of error
 | |
|         # it raises when there is an attempt to read more than the available bytes:
 | |
|         try:
 | |
|             client.FakePayload(b'a').read(2)
 | |
|         except Exception as err:
 | |
|             reference_error = err
 | |
| 
 | |
|         # install the custom handler that tries to access request.POST
 | |
|         self.client.handler = POSTAccessingHandler()
 | |
| 
 | |
|         with open(__file__, 'rb') as fp:
 | |
|             post_data = {
 | |
|                 'name': 'Ringo',
 | |
|                 'file_field': fp,
 | |
|             }
 | |
|             try:
 | |
|                 self.client.post('/upload_errors/', post_data)
 | |
|             except reference_error.__class__ as err:
 | |
|                 self.assertFalse(
 | |
|                     str(err) == str(reference_error),
 | |
|                     "Caught a repeated exception that'll cause an infinite loop in file uploads."
 | |
|                 )
 | |
|             except Exception as err:
 | |
|                 # CustomUploadError is the error that should have been raised
 | |
|                 self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
 | |
| 
 | |
|     def test_filename_case_preservation(self):
 | |
|         """
 | |
|         The storage backend shouldn't mess with the case of the filenames
 | |
|         uploaded.
 | |
|         """
 | |
|         # Synthesize the contents of a file upload with a mixed case filename
 | |
|         # so we don't have to carry such a file in the Django tests source code
 | |
|         # tree.
 | |
|         vars = {'boundary': 'oUrBoUnDaRyStRiNg'}
 | |
|         post_data = [
 | |
|             '--%(boundary)s',
 | |
|             'Content-Disposition: form-data; name="file_field"; filename="MiXeD_cAsE.txt"',
 | |
|             'Content-Type: application/octet-stream',
 | |
|             '',
 | |
|             'file contents\n'
 | |
|             '',
 | |
|             '--%(boundary)s--\r\n',
 | |
|         ]
 | |
|         response = self.client.post(
 | |
|             '/filename_case/',
 | |
|             '\r\n'.join(post_data) % vars,
 | |
|             'multipart/form-data; boundary=%(boundary)s' % vars
 | |
|         )
 | |
|         self.assertEqual(response.status_code, 200)
 | |
|         id = int(response.content)
 | |
|         obj = FileModel.objects.get(pk=id)
 | |
|         # The name of the file uploaded and the file stored in the server-side
 | |
|         # shouldn't differ.
 | |
|         self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt')
 | |
| 
 | |
| 
 | |
| @override_settings(MEDIA_ROOT=MEDIA_ROOT)
 | |
| class DirectoryCreationTests(TestCase):
 | |
|     """
 | |
|     Tests for error handling during directory creation
 | |
|     via _save_FIELD_file (ticket #6450)
 | |
|     """
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         if not os.path.isdir(MEDIA_ROOT):
 | |
|             os.makedirs(MEDIA_ROOT)
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         shutil.rmtree(MEDIA_ROOT)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.obj = FileModel()
 | |
| 
 | |
|     def test_readonly_root(self):
 | |
|         """Permission errors are not swallowed"""
 | |
|         os.chmod(MEDIA_ROOT, 0o500)
 | |
|         self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
 | |
|         try:
 | |
|             self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
 | |
|         except OSError as err:
 | |
|             self.assertEqual(err.errno, errno.EACCES)
 | |
|         except Exception:
 | |
|             self.fail("OSError [Errno %s] not raised." % errno.EACCES)
 | |
| 
 | |
|     def test_not_a_directory(self):
 | |
|         """The correct IOError is raised when the upload directory name exists but isn't a directory"""
 | |
|         # Create a file with the upload directory name
 | |
|         open(UPLOAD_TO, 'wb').close()
 | |
|         self.addCleanup(os.remove, UPLOAD_TO)
 | |
|         with self.assertRaises(IOError) as exc_info:
 | |
|             self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
 | |
|         # The test needs to be done on a specific string as IOError
 | |
|         # is raised even without the patch (just not early enough)
 | |
|         self.assertEqual(exc_info.exception.args[0],
 | |
|             "%s exists and is not a directory." % UPLOAD_TO)
 | |
| 
 | |
| 
 | |
| class MultiParserTests(unittest.TestCase):
 | |
| 
 | |
|     def test_empty_upload_handlers(self):
 | |
|         # We're not actually parsing here; just checking if the parser properly
 | |
|         # instantiates with empty upload handlers.
 | |
|         MultiPartParser({
 | |
|             'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
 | |
|             'CONTENT_LENGTH': '1'
 | |
|         }, StringIO('x'), [], 'utf-8')
 |