diff --git a/aiohttp/web_response.py b/aiohttp/web_response.py index f231ca5d289..050093b0d68 100644 --- a/aiohttp/web_response.py +++ b/aiohttp/web_response.py @@ -343,9 +343,30 @@ async def _start_compression(self, request: "BaseRequest") -> None: return # Encoding comparisons should be case-insensitive # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1 - accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() + accepted_codings = set() + for header_value in request.headers.getall(hdrs.ACCEPT_ENCODING, ()): + for coding_part in header_value.split(","): + token_and_params = [ + part.strip(" \t") for part in coding_part.split(";") + ] + token = token_and_params[0].lower() + if not token: + continue + qvalue = 1.0 + for param in token_and_params[1:]: + if not param: + continue + key, sep, value = param.partition("=") + if key.strip(" \t").lower() == "q" and sep: + try: + qvalue = float(value) + except ValueError: + qvalue = 0.0 + break + if qvalue > 0: + accepted_codings.add(token) for value, coding in CONTENT_CODINGS.items(): - if value in accept_encoding: + if value in accepted_codings: await self._do_start_compression(coding) return @@ -579,13 +600,6 @@ def __init__( real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset body = text.encode(charset) text = None - elif hdrs.CONTENT_TYPE in real_headers: - if content_type is not None or charset is not None: - raise ValueError( - "passing both Content-Type header and " - "content_type or charset params " - "is forbidden" - ) elif content_type is not None: if charset is not None: content_type += "; charset=" + charset diff --git a/tests/test_web_response.py b/tests/test_web_response.py index daa9de46bc9..bc01070bdb1 100644 --- a/tests/test_web_response.py +++ b/tests/test_web_response.py @@ -488,6 +488,44 @@ async def test_compression_default_coding() -> None: assert msg.filter is not None +@pytest.mark.usefixtures("parametrize_zlib_backend") +async def test_invalid_token_not_matched() -> None: + req = make_request("GET", "/", headers=CIMultiDict({hdrs.ACCEPT_ENCODING: "xgzip"})) + resp = web.StreamResponse() + resp.enable_compression() + + msg = await resp.prepare(req) + assert msg is not None + assert not msg.enable_compression.called # type: ignore[attr-defined] + assert resp.headers.get(hdrs.CONTENT_ENCODING) is None + + +@pytest.mark.usefixtures("parametrize_zlib_backend") +async def test_valid_token_still_matched() -> None: + req = make_request("GET", "/", headers=CIMultiDict({hdrs.ACCEPT_ENCODING: "gzip"})) + resp = web.StreamResponse() + resp.enable_compression() + + msg = await resp.prepare(req) + assert msg is not None + msg.enable_compression.assert_called_with("gzip", None) # type: ignore[attr-defined] + assert "gzip" == resp.headers.get(hdrs.CONTENT_ENCODING) + + +@pytest.mark.usefixtures("parametrize_zlib_backend") +async def test_q_zero_not_selected() -> None: + req = make_request( + "GET", "/", headers=CIMultiDict({hdrs.ACCEPT_ENCODING: "gzip;q=0"}) + ) + resp = web.StreamResponse() + resp.enable_compression() + + msg = await resp.prepare(req) + assert msg is not None + assert not msg.enable_compression.called # type: ignore[attr-defined] + assert resp.headers.get(hdrs.CONTENT_ENCODING) is None + + @pytest.mark.usefixtures("parametrize_zlib_backend") async def test_force_compression_deflate() -> None: req = make_request( @@ -1114,16 +1152,19 @@ def test_ctor_both_charset_param_and_header_with_text() -> None: ) -def test_ctor_both_content_type_param_and_header() -> None: - with pytest.raises(ValueError): - web.Response( - headers={"Content-Type": "application/json"}, content_type="text/html" - ) +def test_ctor_content_type_param_and_header_without_text() -> None: + resp = web.Response( + headers={"Content-Type": "application/json"}, content_type="text/html" + ) + assert resp.content_type == "text/html" -def test_ctor_both_charset_param_and_header() -> None: - with pytest.raises(ValueError): - web.Response(headers={"Content-Type": "application/json"}, charset="koi8-r") + +def test_ctor_charset_param_and_header_without_text() -> None: + resp = web.Response(headers={"Content-Type": "application/json"}, charset="koi8-r") + + assert resp.content_type == "application/json" + assert resp.charset is None async def test_assign_nonbyteish_body() -> None: