mirror of
				https://github.com/django/django.git
				synced 2025-10-25 06:36:07 +00:00 
			
		
		
		
	Fixed #30509 -- Made FileResponse better handle buffers and non-zero file offsets.
This commit is contained in:
		
				
					committed by
					
						 Carlton Gibson
						Carlton Gibson
					
				
			
			
				
	
			
			
			
						parent
						
							3ac4764397
						
					
				
				
					commit
					dc724c5bf9
				
			| @@ -1,4 +1,5 @@ | ||||
| import datetime | ||||
| import io | ||||
| import json | ||||
| import mimetypes | ||||
| import os | ||||
| @@ -437,6 +438,7 @@ class FileResponse(StreamingHttpResponse): | ||||
|     def __init__(self, *args, as_attachment=False, filename='', **kwargs): | ||||
|         self.as_attachment = as_attachment | ||||
|         self.filename = filename | ||||
|         self._no_default_content_type_set = 'content_type' not in kwargs or kwargs['content_type'] is None | ||||
|         super().__init__(*args, **kwargs) | ||||
|  | ||||
|     def _set_streaming_content(self, value): | ||||
| @@ -456,29 +458,38 @@ class FileResponse(StreamingHttpResponse): | ||||
|         Set some common response headers (Content-Length, Content-Type, and | ||||
|         Content-Disposition) based on the `filelike` response content. | ||||
|         """ | ||||
|         encoding_map = { | ||||
|             'bzip2': 'application/x-bzip', | ||||
|             'gzip': 'application/gzip', | ||||
|             'xz': 'application/x-xz', | ||||
|         } | ||||
|         filename = getattr(filelike, 'name', None) | ||||
|         filename = filename if (isinstance(filename, str) and filename) else self.filename | ||||
|         if os.path.isabs(filename): | ||||
|             self.headers['Content-Length'] = os.path.getsize(filelike.name) | ||||
|         filename = getattr(filelike, 'name', '') | ||||
|         filename = filename if isinstance(filename, str) else '' | ||||
|         seekable = hasattr(filelike, 'seek') and (not hasattr(filelike, 'seekable') or filelike.seekable()) | ||||
|         if hasattr(filelike, 'tell'): | ||||
|             if seekable: | ||||
|                 initial_position = filelike.tell() | ||||
|                 filelike.seek(0, io.SEEK_END) | ||||
|                 self.headers['Content-Length'] = filelike.tell() - initial_position | ||||
|                 filelike.seek(initial_position) | ||||
|             elif hasattr(filelike, 'getbuffer'): | ||||
|             self.headers['Content-Length'] = filelike.getbuffer().nbytes | ||||
|                 self.headers['Content-Length'] = filelike.getbuffer().nbytes - filelike.tell() | ||||
|             elif os.path.exists(filename): | ||||
|                 self.headers['Content-Length'] = os.path.getsize(filename) - filelike.tell() | ||||
|         elif seekable: | ||||
|             self.headers['Content-Length'] = sum(iter(lambda: len(filelike.read(self.block_size)), 0)) | ||||
|             filelike.seek(-int(self.headers['Content-Length']), io.SEEK_END) | ||||
|  | ||||
|         if self.headers.get('Content-Type', '').startswith('text/html'): | ||||
|         filename = os.path.basename(self.filename or filename) | ||||
|         if self._no_default_content_type_set: | ||||
|             if filename: | ||||
|                 content_type, encoding = mimetypes.guess_type(filename) | ||||
|                 # Encoding isn't set to prevent browsers from automatically | ||||
|                 # uncompressing files. | ||||
|                 content_type = encoding_map.get(encoding, content_type) | ||||
|                 content_type = { | ||||
|                     'bzip2': 'application/x-bzip', | ||||
|                     'gzip': 'application/gzip', | ||||
|                     'xz': 'application/x-xz', | ||||
|                 }.get(encoding, content_type) | ||||
|                 self.headers['Content-Type'] = content_type or 'application/octet-stream' | ||||
|             else: | ||||
|                 self.headers['Content-Type'] = 'application/octet-stream' | ||||
|  | ||||
|         filename = self.filename or os.path.basename(filename) | ||||
|         if filename: | ||||
|             disposition = 'attachment' if self.as_attachment else 'inline' | ||||
|             try: | ||||
|   | ||||
| @@ -18,17 +18,71 @@ class UnseekableBytesIO(io.BytesIO): | ||||
| class FileResponseTests(SimpleTestCase): | ||||
|     def test_content_length_file(self): | ||||
|         response = FileResponse(open(__file__, 'rb')) | ||||
|         self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__))) | ||||
|         response.close() | ||||
|         self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__))) | ||||
|  | ||||
|     def test_content_length_buffer(self): | ||||
|         response = FileResponse(io.BytesIO(b'binary content')) | ||||
|         self.assertEqual(response.headers['Content-Length'], '14') | ||||
|  | ||||
|     def test_content_length_nonzero_starting_position_file(self): | ||||
|         file = open(__file__, 'rb') | ||||
|         file.seek(10) | ||||
|         response = FileResponse(file) | ||||
|         response.close() | ||||
|         self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__) - 10)) | ||||
|  | ||||
|     def test_content_length_nonzero_starting_position_buffer(self): | ||||
|         test_tuples = ( | ||||
|             ('BytesIO', io.BytesIO), | ||||
|             ('UnseekableBytesIO', UnseekableBytesIO), | ||||
|         ) | ||||
|         for buffer_class_name, BufferClass in test_tuples: | ||||
|             with self.subTest(buffer_class_name=buffer_class_name): | ||||
|                 buffer = BufferClass(b'binary content') | ||||
|                 buffer.seek(10) | ||||
|                 response = FileResponse(buffer) | ||||
|                 self.assertEqual(response.headers['Content-Length'], '4') | ||||
|  | ||||
|     def test_content_length_nonzero_starting_position_file_seekable_no_tell(self): | ||||
|         class TestFile: | ||||
|             def __init__(self, path, *args, **kwargs): | ||||
|                 self._file = open(path, *args, **kwargs) | ||||
|  | ||||
|             def read(self, n_bytes=-1): | ||||
|                 return self._file.read(n_bytes) | ||||
|  | ||||
|             def seek(self, offset, whence=io.SEEK_SET): | ||||
|                 return self._file.seek(offset, whence) | ||||
|  | ||||
|             def seekable(self): | ||||
|                 return True | ||||
|  | ||||
|             @property | ||||
|             def name(self): | ||||
|                 return self._file.name | ||||
|  | ||||
|             def close(self): | ||||
|                 if self._file: | ||||
|                     self._file.close() | ||||
|                     self._file = None | ||||
|  | ||||
|             def __enter__(self): | ||||
|                 return self | ||||
|  | ||||
|             def __exit__(self, e_type, e_val, e_tb): | ||||
|                 self.close() | ||||
|  | ||||
|         file = TestFile(__file__, 'rb') | ||||
|         file.seek(10) | ||||
|         response = FileResponse(file) | ||||
|         response.close() | ||||
|         self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__) - 10)) | ||||
|  | ||||
|     def test_content_type_file(self): | ||||
|         response = FileResponse(open(__file__, 'rb')) | ||||
|         self.assertIn(response.headers['Content-Type'], ['text/x-python', 'text/plain']) | ||||
|         response.close() | ||||
|         self.assertIn(response.headers['Content-Type'], ['text/x-python', 'text/plain']) | ||||
|  | ||||
|     def test_content_type_buffer(self): | ||||
|         response = FileResponse(io.BytesIO(b'binary content')) | ||||
| @@ -38,8 +92,14 @@ class FileResponseTests(SimpleTestCase): | ||||
|         response = FileResponse(io.BytesIO(b'binary content'), content_type='video/webm') | ||||
|         self.assertEqual(response.headers['Content-Type'], 'video/webm') | ||||
|  | ||||
|     def test_content_type_buffer_explicit_default(self): | ||||
|         response = FileResponse(io.BytesIO(b'binary content'), content_type='text/html') | ||||
|         self.assertEqual(response.headers['Content-Type'], 'text/html') | ||||
|  | ||||
|     def test_content_type_buffer_named(self): | ||||
|         test_tuples = ( | ||||
|             (__file__, ['text/x-python', 'text/plain']), | ||||
|             (__file__ + 'nosuchfile', ['application/octet-stream']), | ||||
|             ('test_fileresponse.py', ['text/x-python', 'text/plain']), | ||||
|             ('test_fileresponse.pynosuchfile', ['application/octet-stream']), | ||||
|         ) | ||||
| @@ -59,15 +119,16 @@ class FileResponseTests(SimpleTestCase): | ||||
|             (False, 'inline'), | ||||
|             (True, 'attachment'), | ||||
|         ) | ||||
|         for (filename, header_filename), (as_attachment, header_disposition)\ | ||||
|                 in itertools.product(filenames, dispositions): | ||||
|         for (filename, header_filename), (as_attachment, header_disposition) in itertools.product( | ||||
|             filenames, dispositions | ||||
|         ): | ||||
|             with self.subTest(filename=filename, disposition=header_disposition): | ||||
|                 response = FileResponse(open(__file__, 'rb'), filename=filename, as_attachment=as_attachment) | ||||
|                 response.close() | ||||
|                 self.assertEqual( | ||||
|                     response.headers['Content-Disposition'], | ||||
|                     '%s; filename="%s"' % (header_disposition, header_filename), | ||||
|                 ) | ||||
|                 response.close() | ||||
|  | ||||
|     def test_content_disposition_buffer(self): | ||||
|         response = FileResponse(io.BytesIO(b'binary content')) | ||||
| @@ -89,7 +150,7 @@ class FileResponseTests(SimpleTestCase): | ||||
|                 filename='custom_name.py', | ||||
|             ) | ||||
|             self.assertEqual( | ||||
|                 response.headers['Content-Disposition'], '%s; filename="custom_name.py"' % header_disposition | ||||
|                 response.headers['Content-Disposition'], '%s; filename="custom_name.py"' % header_disposition, | ||||
|             ) | ||||
|  | ||||
|     def test_response_buffer(self): | ||||
| @@ -108,6 +169,15 @@ class FileResponseTests(SimpleTestCase): | ||||
|                 response = FileResponse(buffer) | ||||
|                 self.assertEqual(list(response), [b'tent']) | ||||
|  | ||||
|     def test_buffer_explicit_absolute_filename(self): | ||||
|         """ | ||||
|         Headers are set correctly with a buffer when an absolute filename is | ||||
|         provided. | ||||
|         """ | ||||
|         response = FileResponse(io.BytesIO(b'binary content'), filename=__file__) | ||||
|         self.assertEqual(response.headers['Content-Length'], '14') | ||||
|         self.assertEqual(response.headers['Content-Disposition'], 'inline; filename="test_fileresponse.py"') | ||||
|  | ||||
|     @skipIf(sys.platform == 'win32', "Named pipes are Unix-only.") | ||||
|     def test_file_from_named_pipe_response(self): | ||||
|         with tempfile.TemporaryDirectory() as temp_dir: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user