diff --git a/requirements-dev.txt b/requirements-dev.txt index 081be9e67..92a52479e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -52,7 +52,7 @@ iniconfig==2.0.0 # via pytest isort==5.13.2 # via pylint -jinja2==3.1.4 +jinja2==3.1.5 # via sphinx jmespath==1.0.1 # via @@ -84,7 +84,7 @@ pydocstyle==6.3.0 # via -r requirements-dev.in pyflakes==3.2.0 # via flake8 -pygments==2.18.0 +pygments==2.20.0 # via # -r requirements-dev.in # doc8 @@ -99,7 +99,7 @@ pytest-cov==5.0.0 # via -r requirements-test.in python-dateutil==2.9.0.post0 # via botocore -requests==2.32.3 +requests==2.32.4 # via # -r requirements-test.in # sphinx @@ -152,13 +152,13 @@ typing-extensions==4.12.2 # astroid # mypy # pylint -urllib3==1.26.20 +urllib3==2.5.0 # via # botocore # requests websocket-client==1.8.0 # via -r requirements-test.in -wheel==0.45.0 +wheel==0.46.2 # via -r requirements-dev.in # The following packages are considered to be unsafe in a requirements file: diff --git a/tests/aws/test_features.py b/tests/aws/test_features.py index 1e3353a3c..9ab5607e3 100644 --- a/tests/aws/test_features.py +++ b/tests/aws/test_features.py @@ -15,11 +15,10 @@ from chalice.deploy.deployer import ChaliceDeploymentError from chalice.config import DeployedResources - CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) -PROJECT_DIR = os.path.join(CURRENT_DIR, 'testapp') -APP_FILE = os.path.join(PROJECT_DIR, 'app.py') -RANDOM_APP_NAME = 'smoketest-%s' % str(uuid.uuid4())[:13] +PROJECT_DIR = os.path.join(CURRENT_DIR, "testapp") +APP_FILE = os.path.join(PROJECT_DIR, "app.py") +RANDOM_APP_NAME = "smoketest-%s" % str(uuid.uuid4())[:13] def retry(max_attempts, delay): @@ -30,9 +29,13 @@ def _wrapped_with_retry(*args, **kwargs): if result is not None: return result time.sleep(delay) - raise RuntimeError("Exhausted max retries of %s for function: %s" - % (max_attempts, function)) + raise RuntimeError( + "Exhausted max retries of %s for function: %s" + % (max_attempts, function) + ) + return _wrapped_with_retry + return _create_wrapped_retry_function @@ -51,8 +54,7 @@ class SmokeTestApplication(object): # stabilized. _NUM_SUCCESS = 3 - def __init__(self, deployed_values, stage_name, app_name, - app_dir, region): + def __init__(self, deployed_values, stage_name, app_name, app_dir, region): self._deployed_resources = DeployedResources(deployed_values) self.stage_name = stage_name self.app_name = app_name @@ -65,20 +67,22 @@ def __init__(self, deployed_values, stage_name, app_name, def url(self): return ( "https://{rest_api_id}.execute-api.{region}.amazonaws.com/" - "{api_gateway_stage}".format(rest_api_id=self.rest_api_id, - region=self._region, - api_gateway_stage='api') + "{api_gateway_stage}".format( + rest_api_id=self.rest_api_id, + region=self._region, + api_gateway_stage="api", + ) ) @property def rest_api_id(self): - return self._deployed_resources.resource_values( - 'rest_api')['rest_api_id'] + return self._deployed_resources.resource_values("rest_api")["rest_api_id"] @property def websocket_api_id(self): - return self._deployed_resources.resource_values( - 'websocket_api')['websocket_api_id'] + return self._deployed_resources.resource_values("websocket_api")[ + "websocket_api_id" + ] @property def websocket_connect_url(self): @@ -87,7 +91,7 @@ def websocket_connect_url(self): "{api_gateway_stage}".format( websocket_api_id=self.websocket_api_id, region=self._region, - api_gateway_stage='api', + api_gateway_stage="api", ) ) @@ -99,26 +103,26 @@ def get_json(self, url): pass def _get_json(self, url): - if not url.startswith('/'): - url = '/' + url - response = requests.get(self.url + url) + if not url.startswith("/"): + url = "/" + url + response = requests.get(self.url + url, timeout=10.0) response.raise_for_status() return response.json() @retry(max_attempts=10, delay=5) def get_response(self, url, headers=None): try: - return self._send_request('GET', url, headers=headers) + return self._send_request("GET", url, headers=headers) except InternalServerError: pass def _send_request(self, http_method, url, headers=None, data=None): kwargs = {} if headers is not None: - kwargs['headers'] = headers + kwargs["headers"] = headers if data is not None: - kwargs['data'] = data - response = requests.request(http_method, self.url + url, **kwargs) + kwargs["data"] = data + response = requests.request(http_method, self.url + url, **kwargs, timeout=10.0) if response.status_code >= 500: raise InternalServerError() return response @@ -126,21 +130,21 @@ def _send_request(self, http_method, url, headers=None, data=None): @retry(max_attempts=10, delay=5) def post_response(self, url, headers=None, data=None): try: - return self._send_request('POST', url, headers=headers, data=data) + return self._send_request("POST", url, headers=headers, data=data) except InternalServerError: pass @retry(max_attempts=10, delay=5) def put_response(self, url): try: - return self._send_request('PUT', url) + return self._send_request("PUT", url) except InternalServerError: pass @retry(max_attempts=10, delay=5) def options_response(self, url): try: - return self._send_request('OPTIONS', url) + return self._send_request("OPTIONS", url) except InternalServerError: pass @@ -149,9 +153,9 @@ def redeploy_once(self): # has already happened, this function is a noop. if self._has_redeployed: return - new_file = os.path.join(self.app_dir, 'app-redeploy.py') - original_app_py = os.path.join(self.app_dir, 'app.py') - shutil.move(original_app_py, original_app_py + '.bak') + new_file = os.path.join(self.app_dir, "app-redeploy.py") + original_app_py = os.path.join(self.app_dir, "app.py") + shutil.move(original_app_py, original_app_py + ".bak") shutil.copy(new_file, original_app_py) _deploy_app(self.app_dir) self._has_redeployed = True @@ -166,54 +170,50 @@ def _wait_for_stablize(self): # API Gateway to propagate all of its changes. # We're going to give it num_attempts to give us a # 200 response before failing. - return self.get_json('/') + return self.get_json("/") @pytest.fixture def apig_client(): s = botocore.session.get_session() - return s.create_client('apigateway') + return s.create_client("apigateway") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def smoke_test_app(tmpdir_factory): # We can't use the monkeypatch fixture here because this is a module scope # fixture and monkeypatch is a function scoped fixture. - os.environ['APP_NAME'] = RANDOM_APP_NAME + os.environ["APP_NAME"] = RANDOM_APP_NAME tmpdir = str(tmpdir_factory.mktemp(RANDOM_APP_NAME)) OSUtils().copytree(PROJECT_DIR, tmpdir) _inject_app_name(tmpdir) application = _deploy_app(tmpdir) yield application _delete_app(application, tmpdir) - os.environ.pop('APP_NAME') + os.environ.pop("APP_NAME") def _inject_app_name(dirname): - config_filename = os.path.join(dirname, '.chalice', 'config.json') + config_filename = os.path.join(dirname, ".chalice", "config.json") with open(config_filename) as f: data = json.load(f) - data['app_name'] = RANDOM_APP_NAME - data['stages']['dev']['environment_variables']['APP_NAME'] = \ - RANDOM_APP_NAME - with open(config_filename, 'w') as f: + data["app_name"] = RANDOM_APP_NAME + data["stages"]["dev"]["environment_variables"]["APP_NAME"] = RANDOM_APP_NAME + with open(config_filename, "w") as f: f.write(json.dumps(data, indent=2)) def _deploy_app(temp_dirname): factory = CLIFactory(temp_dirname) - config = factory.create_config_obj( - chalice_stage_name='dev', - autogen_policy=True - ) + config = factory.create_config_obj(chalice_stage_name="dev", autogen_policy=True) session = factory.create_botocore_session() d = factory.create_default_deployer(session, config, UI()) - region = session.get_config_variable('region') + region = session.get_config_variable("region") deployed = _deploy_with_retries(d, config) application = SmokeTestApplication( region=region, deployed_values=deployed, - stage_name='dev', + stage_name="dev", app_name=RANDOM_APP_NAME, app_dir=temp_dirname, ) @@ -223,46 +223,43 @@ def _deploy_app(temp_dirname): @retry(max_attempts=10, delay=20) def _deploy_with_retries(deployer, config): try: - deployed_stages = deployer.deploy(config, 'dev') + deployed_stages = deployer.deploy(config, "dev") return deployed_stages except ChaliceDeploymentError as e: # API Gateway aggressively throttles deployments. # If we run into this case, we just wait and try # again. error_code = _get_error_code_from_exception(e) - if error_code != 'TooManyRequestsException': + if error_code != "TooManyRequestsException": raise def _get_error_code_from_exception(exception): - error_response = getattr(exception.original_error, 'response', None) + error_response = getattr(exception.original_error, "response", None) if error_response is None: return None - return error_response.get('Error', {}).get('Code') + return error_response.get("Error", {}).get("Code") def _delete_app(application, temp_dirname): factory = CLIFactory(temp_dirname) - config = factory.create_config_obj( - chalice_stage_name='dev', - autogen_policy=True - ) + config = factory.create_config_obj(chalice_stage_name="dev", autogen_policy=True) session = factory.create_botocore_session() d = factory.create_deletion_deployer(session, UI()) _deploy_with_retries(d, config) def test_returns_simple_response(smoke_test_app): - assert smoke_test_app.get_json('/') == {'hello': 'world'} + assert smoke_test_app.get_json("/") == {"hello": "world"} def test_can_have_nested_routes(smoke_test_app): - assert smoke_test_app.get_json('/a/b/c/d/e/f/g') == {'nested': True} + assert smoke_test_app.get_json("/a/b/c/d/e/f/g") == {"nested": True} def test_supports_path_params(smoke_test_app): - assert smoke_test_app.get_json('/path/foo') == {'path': 'foo'} - assert smoke_test_app.get_json('/path/bar') == {'path': 'bar'} + assert smoke_test_app.get_json("/path/foo") == {"path": "foo"} + assert smoke_test_app.get_json("/path/bar") == {"path": "bar"} def test_path_params_mapped_in_api(smoke_test_app, apig_client): @@ -274,13 +271,13 @@ def test_path_params_mapped_in_api(smoke_test_app, apig_client): # query the resources we've created in API gateway # and make sure requestParameters are present. rest_api_id = smoke_test_app.rest_api_id - response = apig_client.get_export(restApiId=rest_api_id, - stageName='api', - exportType='swagger') - swagger_doc = json.loads(response['body'].read()) - route_config = swagger_doc['paths']['/path/{name}']['get'] - assert route_config.get('parameters', {}) == [ - {'name': 'name', 'in': 'path', 'required': True, 'type': 'string'}, + response = apig_client.get_export( + restApiId=rest_api_id, stageName="api", exportType="swagger" + ) + swagger_doc = json.loads(response["body"].read()) + route_config = swagger_doc["paths"]["/path/{name}"]["get"] + assert route_config.get("parameters", {}) == [ + {"name": "name", "in": "path", "required": True, "type": "string"}, ] @@ -289,14 +286,12 @@ def test_single_doc_mapped_in_api(smoke_test_app, apig_client): # test_path_params_mapped_in_api() rest_api_id = smoke_test_app.rest_api_id doc_parts = apig_client.get_documentation_parts( - restApiId=rest_api_id, - type='METHOD', - path='/singledoc' + restApiId=rest_api_id, type="METHOD", path="/singledoc" ) - doc_props = json.loads(doc_parts['items'][0]['properties']) - assert 'summary' in doc_props - assert 'description' not in doc_props - assert doc_props['summary'] == 'Single line docstring.' + doc_props = json.loads(doc_parts["items"][0]["properties"]) + assert "summary" in doc_props + assert "description" not in doc_props + assert doc_props["summary"] == "Single line docstring." def test_multi_doc_mapped_in_api(smoke_test_app, apig_client): @@ -304,15 +299,13 @@ def test_multi_doc_mapped_in_api(smoke_test_app, apig_client): # test_path_params_mapped_in_api() rest_api_id = smoke_test_app.rest_api_id doc_parts = apig_client.get_documentation_parts( - restApiId=rest_api_id, - type='METHOD', - path='/multidoc' + restApiId=rest_api_id, type="METHOD", path="/multidoc" ) - doc_props = json.loads(doc_parts['items'][0]['properties']) - assert 'summary' in doc_props - assert 'description' in doc_props - assert doc_props['summary'] == 'Multi-line docstring.' - assert doc_props['description'] == 'And here is another line.' + doc_props = json.loads(doc_parts["items"][0]["properties"]) + assert "summary" in doc_props + assert "description" in doc_props + assert doc_props["summary"] == "Multi-line docstring." + assert doc_props["description"] == "And here is another line." @retry(max_attempts=18, delay=10) @@ -321,189 +314,194 @@ def _get_resource_id(apig_client, rest_api_id, path): # route. As far as I know this is the best way to get # this id. matches = [ - resource for resource in - apig_client.get_resources(restApiId=rest_api_id)['items'] - if resource['path'] == path + resource + for resource in apig_client.get_resources(restApiId=rest_api_id)["items"] + if resource["path"] == path ] if matches: - return matches[0]['id'] + return matches[0]["id"] def test_supports_post(smoke_test_app): - response = smoke_test_app.post_response('/post') + response = smoke_test_app.post_response("/post") response.raise_for_status() - assert response.json() == {'success': True} + assert response.json() == {"success": True} with pytest.raises(requests.HTTPError): # Only POST is supported. - response = smoke_test_app.get_response('/post') + response = smoke_test_app.get_response("/post") response.raise_for_status() def test_supports_put(smoke_test_app): - response = smoke_test_app.put_response('/put') + response = smoke_test_app.put_response("/put") response.raise_for_status() - assert response.json() == {'success': True} + assert response.json() == {"success": True} with pytest.raises(requests.HTTPError): # Only PUT is supported. - response = smoke_test_app.get_response('/put') + response = smoke_test_app.get_response("/put") response.raise_for_status() def test_supports_shared_routes(smoke_test_app): - response = smoke_test_app.get_json('/shared') - assert response == {'method': 'GET'} - response = smoke_test_app.post_response('/shared') - assert response.json() == {'method': 'POST'} + response = smoke_test_app.get_json("/shared") + assert response == {"method": "GET"} + response = smoke_test_app.post_response("/shared") + assert response.json() == {"method": "POST"} def test_can_read_json_body_on_post(smoke_test_app): response = smoke_test_app.post_response( - '/jsonpost', data=json.dumps({'hello': 'world'}), - headers={'Content-Type': 'application/json'}) + "/jsonpost", + data=json.dumps({"hello": "world"}), + headers={"Content-Type": "application/json"}, + ) response.raise_for_status() - assert response.json() == {'json_body': {'hello': 'world'}} + assert response.json() == {"json_body": {"hello": "world"}} def test_can_raise_bad_request(smoke_test_app): - response = smoke_test_app.get_response('/badrequest') + response = smoke_test_app.get_response("/badrequest") assert response.status_code == 400 - assert response.json()['Code'] == 'BadRequestError' - assert response.json()['Message'] == 'Bad request.' + assert response.json()["Code"] == "BadRequestError" + assert response.json()["Message"] == "Bad request." def test_can_raise_not_found(smoke_test_app): - response = smoke_test_app.get_response('/notfound') + response = smoke_test_app.get_response("/notfound") assert response.status_code == 404 - assert response.json()['Code'] == 'NotFoundError' + assert response.json()["Code"] == "NotFoundError" def test_unexpected_error_raises_500_in_prod_mode(smoke_test_app): # Can't use smoke_test_app.get_response() because we're explicitly # testing for a 500. - response = requests.get(smoke_test_app.url + '/arbitrary-error') + response = requests.get(smoke_test_app.url + "/arbitrary-error", timeout=10.0) assert response.status_code == 500 - assert response.json()['Code'] == 'InternalServerError' - assert 'internal server error' in response.json()['Message'] + assert response.json()["Code"] == "InternalServerError" + assert "internal server error" in response.json()["Message"] def test_can_route_multiple_methods_in_one_view(smoke_test_app): - response = smoke_test_app.get_response('/multimethod') + response = smoke_test_app.get_response("/multimethod") response.raise_for_status() - assert response.json()['method'] == 'GET' + assert response.json()["method"] == "GET" - response = smoke_test_app.post_response('/multimethod') + response = smoke_test_app.post_response("/multimethod") response.raise_for_status() - assert response.json()['method'] == 'POST' + assert response.json()["method"] == "POST" def test_form_encoded_content_type(smoke_test_app): - response = smoke_test_app.post_response('/formencoded', - data={'foo': 'bar'}) + response = smoke_test_app.post_response("/formencoded", data={"foo": "bar"}) response.raise_for_status() - assert response.json() == {'parsed': {'foo': ['bar']}} + assert response.json() == {"parsed": {"foo": ["bar"]}} def test_can_round_trip_binary(smoke_test_app): # xde xed xbe xef will fail unicode decoding because xbe is an invalid # start byte in utf-8. - bin_data = b'\xDE\xAD\xBE\xEF' + bin_data = b"\xde\xad\xbe\xef" response = smoke_test_app.post_response( - '/binary', - headers={'Content-Type': 'application/octet-stream', - 'Accept': 'application/octet-stream'}, - data=bin_data) + "/binary", + headers={ + "Content-Type": "application/octet-stream", + "Accept": "application/octet-stream", + }, + data=bin_data, + ) response.raise_for_status() assert response.content == bin_data def test_can_round_trip_binary_custom_content_type(smoke_test_app): - bin_data = b'\xDE\xAD\xBE\xEF' + bin_data = b"\xde\xad\xbe\xef" response = smoke_test_app.post_response( - '/custom-binary', - headers={'Content-Type': 'application/binary', - 'Accept': 'application/binary'}, - data=bin_data + "/custom-binary", + headers={"Content-Type": "application/binary", "Accept": "application/binary"}, + data=bin_data, ) assert response.content == bin_data def test_can_return_default_binary_data_to_a_browser(smoke_test_app): - base64encoded_response = b'3q2+7w==' - accept = 'text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8' - response = smoke_test_app.get_response( - '/get-binary', headers={'Accept': accept}) + base64encoded_response = b"3q2+7w==" + accept = "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8" + response = smoke_test_app.get_response("/get-binary", headers={"Accept": accept}) response.raise_for_status() assert response.content == base64encoded_response def _assert_contains_access_control_allow_methods(headers, methods): - actual_methods = headers['Access-Control-Allow-Methods'].split(',') + actual_methods = headers["Access-Control-Allow-Methods"].split(",") assert sorted(methods) == sorted(actual_methods), ( - 'The expected allowed methods does not match the actual allowed ' - 'methods for CORS.') + "The expected allowed methods does not match the actual allowed " + "methods for CORS." + ) def test_can_support_cors(smoke_test_app): - response = smoke_test_app.get_response('/cors') + response = smoke_test_app.get_response("/cors") response.raise_for_status() - assert response.headers['Access-Control-Allow-Origin'] == '*' + assert response.headers["Access-Control-Allow-Origin"] == "*" # Should also have injected an OPTIONs request. - response = smoke_test_app.options_response('/cors') + response = smoke_test_app.options_response("/cors") response.raise_for_status() headers = response.headers - assert headers['Access-Control-Allow-Origin'] == '*' - assert headers['Access-Control-Allow-Headers'] == ( - 'Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token,' - 'X-Api-Key') + assert headers["Access-Control-Allow-Origin"] == "*" + assert headers["Access-Control-Allow-Headers"] == ( + "Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token," "X-Api-Key" + ) _assert_contains_access_control_allow_methods( - headers, ['GET', 'POST', 'PUT', 'OPTIONS']) + headers, ["GET", "POST", "PUT", "OPTIONS"] + ) def test_can_support_custom_cors(smoke_test_app): - response = smoke_test_app.get_response('/custom_cors') + response = smoke_test_app.get_response("/custom_cors") response.raise_for_status() - expected_allow_origin = 'https://foo.example.com' - assert response.headers[ - 'Access-Control-Allow-Origin'] == expected_allow_origin + expected_allow_origin = "https://foo.example.com" + assert response.headers["Access-Control-Allow-Origin"] == expected_allow_origin # Should also have injected an OPTIONs request. - response = smoke_test_app.options_response('/custom_cors') + response = smoke_test_app.options_response("/custom_cors") response.raise_for_status() headers = response.headers - assert headers['Access-Control-Allow-Origin'] == expected_allow_origin - assert headers['Access-Control-Allow-Headers'] == ( - 'Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token,' - 'X-Api-Key,X-Special-Header') + assert headers["Access-Control-Allow-Origin"] == expected_allow_origin + assert headers["Access-Control-Allow-Headers"] == ( + "Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token," + "X-Api-Key,X-Special-Header" + ) _assert_contains_access_control_allow_methods( - headers, ['GET', 'POST', 'PUT', 'OPTIONS']) - assert headers['Access-Control-Max-Age'] == '600' - assert headers['Access-Control-Expose-Headers'] == 'X-Special-Header' - assert headers['Access-Control-Allow-Credentials'] == 'true' + headers, ["GET", "POST", "PUT", "OPTIONS"] + ) + assert headers["Access-Control-Max-Age"] == "600" + assert headers["Access-Control-Expose-Headers"] == "X-Special-Header" + assert headers["Access-Control-Allow-Credentials"] == "true" def test_to_dict_is_also_json_serializable(smoke_test_app): - assert 'headers' in smoke_test_app.get_json('/todict') + assert "headers" in smoke_test_app.get_json("/todict") def test_multifile_support(smoke_test_app): - response = smoke_test_app.get_json('/multifile') - assert response == {'message': 'success'} + response = smoke_test_app.get_json("/multifile") + assert response == {"message": "success"} def test_custom_response(smoke_test_app): - response = smoke_test_app.get_response('/custom-response') + response = smoke_test_app.get_response("/custom-response") response.raise_for_status() # Custom header - assert response.headers['Content-Type'] == 'text/plain' + assert response.headers["Content-Type"] == "text/plain" # Multi headers - assert response.headers['Set-Cookie'] == 'key=value, foo=bar' + assert response.headers["Set-Cookie"] == "key=value, foo=bar" # Custom status code assert response.status_code == 204 def test_api_key_required_fails_with_no_key(smoke_test_app): - response = smoke_test_app.get_response('/api-key-required') + response = smoke_test_app.get_response("/api-key-required") # Request should fail because we're not providing # an API key. assert response.status_code == 403 @@ -512,49 +510,47 @@ def test_api_key_required_fails_with_no_key(smoke_test_app): def test_can_handle_charset(smoke_test_app): # Should pass content type validation even with charset specified. response = smoke_test_app.get_response( - '/json-only', - headers={'Content-Type': 'application/json; charset=utf-8'} + "/json-only", headers={"Content-Type": "application/json; charset=utf-8"} ) assert response.status_code == 200 def test_can_use_builtin_custom_auth(smoke_test_app): - url = '/builtin-auth' + url = "/builtin-auth" # First time without an Auth header, we should fail. response = smoke_test_app.get_response(url) assert response.status_code == 401 # Now with the proper auth header, things should work. - response = smoke_test_app.get_response( - url, headers={'Authorization': 'yes'} - ) + response = smoke_test_app.get_response(url, headers={"Authorization": "yes"}) assert response.status_code == 200 - context = response.json()['context'] - assert 'authorizer' in context + context = response.json()["context"] + assert "authorizer" in context # The keyval context we added shuld also be in the authorizer # dict. - assert context['authorizer']['foo'] == 'bar' + assert context["authorizer"]["foo"] == "bar" def test_can_use_shared_auth(smoke_test_app): - response = smoke_test_app.get_response('/fake-profile') + response = smoke_test_app.get_response("/fake-profile") # GETs are allowed assert response.status_code == 200 # However, POSTs require auth. # This has the same auth config as /builtin-auth, # so we're testing the auth handler can be shared. - assert smoke_test_app.post_response('/fake-profile').status_code == 401 - response = smoke_test_app.post_response('/fake-profile', - headers={'Authorization': 'yes'}) + assert smoke_test_app.post_response("/fake-profile").status_code == 401 + response = smoke_test_app.post_response( + "/fake-profile", headers={"Authorization": "yes"} + ) assert response.status_code == 200 - context = response.json()['context'] - assert 'authorizer' in context - assert context['authorizer']['foo'] == 'bar' + context = response.json()["context"] + assert "authorizer" in context + assert context["authorizer"]["foo"] == "bar" def test_empty_raw_body(smoke_test_app): - response = smoke_test_app.post_response('/repr-raw-body') + response = smoke_test_app.post_response("/repr-raw-body") response.raise_for_status() - assert response.json() == {'repr-raw-body': ''} + assert response.json() == {"repr-raw-body": ""} def test_websocket_lifecycle(smoke_test_app): @@ -568,8 +564,8 @@ def test_websocket_lifecycle(smoke_test_app): ws.close() expected_second_response = [ - [mock.ANY, 'Hello, World 1'], - [mock.ANY, 'Hello, World 2'] + [mock.ANY, "Hello, World 1"], + [mock.ANY, "Hello, World 2"], ] assert expected_second_response == second_response assert second_response[0][0] != second_response[1][0] @@ -578,35 +574,34 @@ def test_websocket_lifecycle(smoke_test_app): @pytest.mark.on_redeploy def test_redeploy_no_change_view(smoke_test_app): smoke_test_app.redeploy_once() - assert smoke_test_app.get_json('/') == {'hello': 'world'} + assert smoke_test_app.get_json("/") == {"hello": "world"} @pytest.mark.on_redeploy def test_redeploy_changed_function(smoke_test_app): smoke_test_app.redeploy_once() - assert smoke_test_app.get_json('/a/b/c/d/e/f/g') == { - 'redeployed': True} + assert smoke_test_app.get_json("/a/b/c/d/e/f/g") == {"redeployed": True} @pytest.mark.on_redeploy def test_redeploy_new_function(smoke_test_app): smoke_test_app.redeploy_once() - assert smoke_test_app.get_json('/redeploy') == {'success': True} + assert smoke_test_app.get_json("/redeploy") == {"success": True} @pytest.mark.on_redeploy def test_redeploy_change_route_info(smoke_test_app): smoke_test_app.redeploy_once() # POST is no longer allowed: - assert smoke_test_app.post_response('/multimethod').status_code == 403 + assert smoke_test_app.post_response("/multimethod").status_code == 403 # But PUT is now allowed in the redeployed app.py - assert smoke_test_app.put_response('/multimethod').status_code == 200 + assert smoke_test_app.put_response("/multimethod").status_code == 200 @pytest.mark.on_redeploy def test_redeploy_view_deleted(smoke_test_app): smoke_test_app.redeploy_once() - response = smoke_test_app.get_response('/path/foo') + response = smoke_test_app.get_response("/path/foo") # Request should fail because it's not in the redeployed # app.py assert response.status_code == 403