1
0
mirror of https://github.com/django/django.git synced 2025-06-03 18:49:12 +00:00

Fixed #34170 -- Implemented Heal The Breach (HTB) in GzipMiddleware.

This commit is contained in:
Andreas Pelme 2022-11-20 21:46:55 +01:00 committed by Mariusz Felisiak
parent a1bcdc94da
commit ab7a85ac29
5 changed files with 106 additions and 17 deletions

View File

@ -13,6 +13,8 @@ class GZipMiddleware(MiddlewareMixin):
on the Accept-Encoding header. on the Accept-Encoding header.
""" """
max_random_bytes = 100
def process_response(self, request, response): def process_response(self, request, response):
# It's not worth attempting to compress really short responses. # It's not worth attempting to compress really short responses.
if not response.streaming and len(response.content) < 200: if not response.streaming and len(response.content) < 200:
@ -31,11 +33,17 @@ class GZipMiddleware(MiddlewareMixin):
if response.streaming: if response.streaming:
# Delete the `Content-Length` header for streaming content, because # Delete the `Content-Length` header for streaming content, because
# we won't know the compressed size until we stream it. # we won't know the compressed size until we stream it.
response.streaming_content = compress_sequence(response.streaming_content) response.streaming_content = compress_sequence(
response.streaming_content,
max_random_bytes=self.max_random_bytes,
)
del response.headers["Content-Length"] del response.headers["Content-Length"]
else: else:
# Return the compressed content only if it's actually shorter. # Return the compressed content only if it's actually shorter.
compressed_content = compress_string(response.content) compressed_content = compress_string(
response.content,
max_random_bytes=self.max_random_bytes,
)
if len(compressed_content) >= len(response.content): if len(compressed_content) >= len(response.content):
return response return response
response.content = compressed_content response.content = compressed_content

View File

@ -1,4 +1,6 @@
import gzip
import re import re
import secrets
import unicodedata import unicodedata
from gzip import GzipFile from gzip import GzipFile
from gzip import compress as gzip_compress from gzip import compress as gzip_compress
@ -314,8 +316,23 @@ def phone2numeric(phone):
return "".join(char2number.get(c, c) for c in phone.lower()) return "".join(char2number.get(c, c) for c in phone.lower())
def compress_string(s): def _get_random_filename(max_random_bytes):
return gzip_compress(s, compresslevel=6, mtime=0) return b"a" * secrets.randbelow(max_random_bytes)
def compress_string(s, *, max_random_bytes=None):
compressed_data = gzip_compress(s, compresslevel=6, mtime=0)
if not max_random_bytes:
return compressed_data
compressed_view = memoryview(compressed_data)
header = bytearray(compressed_view[:10])
header[3] = gzip.FNAME
filename = _get_random_filename(max_random_bytes) + b"\x00"
return bytes(header) + filename + compressed_view[10:]
class StreamingBuffer(BytesIO): class StreamingBuffer(BytesIO):
@ -327,9 +344,12 @@ class StreamingBuffer(BytesIO):
# Like compress_string, but for iterators of strings. # Like compress_string, but for iterators of strings.
def compress_sequence(sequence): def compress_sequence(sequence, *, max_random_bytes=None):
buf = StreamingBuffer() buf = StreamingBuffer()
with GzipFile(mode="wb", compresslevel=6, fileobj=buf, mtime=0) as zfile: filename = _get_random_filename(max_random_bytes) if max_random_bytes else None
with GzipFile(
filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0
) as zfile:
# Output headers... # Output headers...
yield buf.read() yield buf.read()
for item in sequence: for item in sequence:

View File

@ -93,18 +93,33 @@ GZip middleware
.. class:: GZipMiddleware .. class:: GZipMiddleware
.. warning:: .. attribute:: max_random_bytes
Security researchers recently revealed that when compression techniques Defaults to 100. Subclass ``GZipMiddleware`` and override the attribute
(including ``GZipMiddleware``) are used on a website, the site may become to change the maximum number of random bytes that is included with
exposed to a number of possible attacks. Before using ``GZipMiddleware`` on compressed responses.
your site, you should consider very carefully whether you are subject to
these attacks. If you're in *any* doubt about whether you're affected, you
should avoid using ``GZipMiddleware``. For more details, see the `the BREACH
paper (PDF)`_ and `breachattack.com`_.
.. _the BREACH paper (PDF): https://www.breachattack.com/resources/BREACH%20-%20SSL,%20gone%20in%2030%20seconds.pdf .. note::
Security researchers revealed that when compression techniques (including
``GZipMiddleware``) are used on a website, the site may become exposed to a
number of possible attacks.
To mitigate attacks, Django implements a technique called *Heal The Breach
(HTB)*. It adds up to 100 bytes (see
:attr:`.max_random_bytes`) of random bytes to each response
to make the attacks less effective.
For more details, see the `BREACH paper (PDF)`_, `breachattack.com`_, and
the `Heal The Breach (HTB) paper`_.
.. _BREACH paper (PDF): https://www.breachattack.com/resources/BREACH%20-%20SSL,%20gone%20in%2030%20seconds.pdf
.. _breachattack.com: https://www.breachattack.com/ .. _breachattack.com: https://www.breachattack.com/
.. _Heal The Breach (HTB) paper: https://ieeexplore.ieee.org/document/9754554
.. versionchanged:: 4.2
Mitigation for the BREACH attack was added.
The ``django.middleware.gzip.GZipMiddleware`` compresses content for browsers The ``django.middleware.gzip.GZipMiddleware`` compresses content for browsers
that understand GZip compression (all modern browsers). that understand GZip compression (all modern browsers).

View File

@ -40,6 +40,16 @@ in the future.
.. _psycopg: https://www.psycopg.org/psycopg3/ .. _psycopg: https://www.psycopg.org/psycopg3/
.. _psycopg library: https://pypi.org/project/psycopg/ .. _psycopg library: https://pypi.org/project/psycopg/
Mitigation for the BREACH attack
--------------------------------
:class:`~django.middleware.gzip.GZipMiddleware` now includes a mitigation for
the BREACH attack. It will add up to 100 random bytes to gzip responses to make
BREACH attacks harder. Read more about the mitigation technique in the `Heal
The Breach (HTB) paper`_.
.. _Heal The Breach (HTB) paper: https://ieeexplore.ieee.org/document/9754554
Minor features Minor features
-------------- --------------

View File

@ -3,6 +3,7 @@ import random
import re import re
import struct import struct
from io import BytesIO from io import BytesIO
from unittest import mock
from urllib.parse import quote from urllib.parse import quote
from django.conf import settings from django.conf import settings
@ -978,12 +979,47 @@ class GZipMiddlewareTest(SimpleTestCase):
ConditionalGetMiddleware from recognizing conditional matches ConditionalGetMiddleware from recognizing conditional matches
on gzipped content). on gzipped content).
""" """
r1 = GZipMiddleware(self.get_response)(self.req)
r2 = GZipMiddleware(self.get_response)(self.req) class DeterministicGZipMiddleware(GZipMiddleware):
max_random_bytes = 0
r1 = DeterministicGZipMiddleware(self.get_response)(self.req)
r2 = DeterministicGZipMiddleware(self.get_response)(self.req)
self.assertEqual(r1.content, r2.content) self.assertEqual(r1.content, r2.content)
self.assertEqual(self.get_mtime(r1.content), 0) self.assertEqual(self.get_mtime(r1.content), 0)
self.assertEqual(self.get_mtime(r2.content), 0) self.assertEqual(self.get_mtime(r2.content), 0)
def test_random_bytes(self):
"""A random number of bytes is added to mitigate the BREACH attack."""
with mock.patch(
"django.utils.text.secrets.randbelow", autospec=True, return_value=3
):
r = GZipMiddleware(self.get_response)(self.req)
# The fourth byte of a gzip stream contains flags.
self.assertEqual(r.content[3], gzip.FNAME)
# A 3 byte filename "aaa" and a null byte are added.
self.assertEqual(r.content[10:14], b"aaa\x00")
self.assertEqual(self.decompress(r.content), self.compressible_string)
def test_random_bytes_streaming_response(self):
"""A random number of bytes is added to mitigate the BREACH attack."""
def get_stream_response(request):
resp = StreamingHttpResponse(self.sequence)
resp["Content-Type"] = "text/html; charset=UTF-8"
return resp
with mock.patch(
"django.utils.text.secrets.randbelow", autospec=True, return_value=3
):
r = GZipMiddleware(get_stream_response)(self.req)
content = b"".join(r)
# The fourth byte of a gzip stream contains flags.
self.assertEqual(content[3], gzip.FNAME)
# A 3 byte filename "aaa" and a null byte are added.
self.assertEqual(content[10:14], b"aaa\x00")
self.assertEqual(self.decompress(content), b"".join(self.sequence))
class ETagGZipMiddlewareTest(SimpleTestCase): class ETagGZipMiddlewareTest(SimpleTestCase):
""" """