import sys
import traceback
from io import BytesIO
from unittest import TestCase, mock
from wsgiref import simple_server

from django.core.servers.basehttp import get_internal_wsgi_application
from django.core.signals import request_finished
from django.test import RequestFactory, override_settings

from .views import FILE_RESPONSE_HOLDER

# If data is too large, socket will choke, so write chunks no larger than 32MB
# at a time. The rationale behind the 32MB can be found in #5596#comment:4.
MAX_SOCKET_CHUNK_SIZE = 32 * 1024 * 1024  # 32 MB


class ServerHandler(simple_server.ServerHandler):
    error_status = "500 INTERNAL SERVER ERROR"

    def write(self, data):
        """'write()' callable as specified by PEP 3333"""

        assert isinstance(data, bytes), "write() argument must be bytestring"

        if not self.status:
            raise AssertionError("write() before start_response()")

        elif not self.headers_sent:
            # Before the first output, send the stored headers
            self.bytes_sent = len(data)  # make sure we know content-length
            self.send_headers()
        else:
            self.bytes_sent += len(data)

        # XXX check Content-Length and truncate if too many bytes written?
        data = BytesIO(data)
        for chunk in iter(lambda: data.read(MAX_SOCKET_CHUNK_SIZE), b""):
            self._write(chunk)
            self._flush()

    def error_output(self, environ, start_response):
        super().error_output(environ, start_response)
        return ["\n".join(traceback.format_exception(*sys.exc_info()))]


class DummyHandler:
    def log_request(self, *args, **kwargs):
        pass


class FileWrapperHandler(ServerHandler):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_handler = DummyHandler()
        self._used_sendfile = False

    def sendfile(self):
        self._used_sendfile = True
        return True


def wsgi_app(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    return [b"Hello World!"]


def wsgi_app_file_wrapper(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    return environ["wsgi.file_wrapper"](BytesIO(b"foo"))


class WSGIFileWrapperTests(TestCase):
    """
    The wsgi.file_wrapper works for the builtin server.

    Tests for #9659: wsgi.file_wrapper in the builtin server.
    We need to mock a couple of handlers and keep track of what
    gets called when using a couple kinds of WSGI apps.
    """

    def test_file_wrapper_uses_sendfile(self):
        env = {"SERVER_PROTOCOL": "HTTP/1.0"}
        handler = FileWrapperHandler(BytesIO(), BytesIO(), BytesIO(), env)
        handler.run(wsgi_app_file_wrapper)
        self.assertTrue(handler._used_sendfile)
        self.assertEqual(handler.stdout.getvalue(), b"")
        self.assertEqual(handler.stderr.getvalue(), b"")

    def test_file_wrapper_no_sendfile(self):
        env = {"SERVER_PROTOCOL": "HTTP/1.0"}
        handler = FileWrapperHandler(BytesIO(), BytesIO(), BytesIO(), env)
        handler.run(wsgi_app)
        self.assertFalse(handler._used_sendfile)
        self.assertEqual(handler.stdout.getvalue().splitlines()[-1], b"Hello World!")
        self.assertEqual(handler.stderr.getvalue(), b"")

    @override_settings(ROOT_URLCONF="builtin_server.urls")
    def test_file_response_closing(self):
        """
        View returning a FileResponse properly closes the file and http
        response when file_wrapper is used.
        """
        env = RequestFactory().get("/fileresponse/").environ
        handler = FileWrapperHandler(BytesIO(), BytesIO(), BytesIO(), env)
        handler.run(get_internal_wsgi_application())
        # Sendfile is used only when file_wrapper has been used.
        self.assertTrue(handler._used_sendfile)
        # Fetch the original response object.
        self.assertIn("response", FILE_RESPONSE_HOLDER)
        response = FILE_RESPONSE_HOLDER["response"]
        # The response and file buffers are closed.
        self.assertIs(response.closed, True)
        buf1, buf2 = FILE_RESPONSE_HOLDER["buffers"]
        self.assertIs(buf1.closed, True)
        self.assertIs(buf2.closed, True)
        FILE_RESPONSE_HOLDER.clear()

    @override_settings(ROOT_URLCONF="builtin_server.urls")
    def test_file_response_call_request_finished(self):
        env = RequestFactory().get("/fileresponse/").environ
        handler = FileWrapperHandler(BytesIO(), BytesIO(), BytesIO(), env)
        with mock.MagicMock() as signal_handler:
            request_finished.connect(signal_handler)
            handler.run(get_internal_wsgi_application())
            self.assertEqual(signal_handler.call_count, 1)


class WriteChunkCounterHandler(ServerHandler):
    """
    Server handler that counts the number of chunks written after headers were
    sent. Used to make sure large response body chunking works properly.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_handler = DummyHandler()
        self.headers_written = False
        self.write_chunk_counter = 0

    def send_headers(self):
        super().send_headers()
        self.headers_written = True

    def _write(self, data):
        if self.headers_written:
            self.write_chunk_counter += 1
        self.stdout.write(data)


def send_big_data_app(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    # Return a blob of data that is 1.5 times the maximum chunk size.
    return [b"x" * (MAX_SOCKET_CHUNK_SIZE + MAX_SOCKET_CHUNK_SIZE // 2)]


class ServerHandlerChunksProperly(TestCase):
    """
    The ServerHandler chunks data properly.

    Tests for #18972: The logic that performs the math to break data into
    32MB (MAX_SOCKET_CHUNK_SIZE) chunks was flawed, BUT it didn't actually
    cause any problems.
    """

    def test_chunked_data(self):
        env = {"SERVER_PROTOCOL": "HTTP/1.0"}
        handler = WriteChunkCounterHandler(None, BytesIO(), BytesIO(), env)
        handler.run(send_big_data_app)
        self.assertEqual(handler.write_chunk_counter, 2)