From 650838255d4026202be78cfdbbd91d2fbaf15a47 Mon Sep 17 00:00:00 2001 From: Schnepper Date: Wed, 1 Dec 2021 12:32:52 -0800 Subject: [PATCH 1/6] When we have an error receiving the subjob result, do not mark the build as failed immediately. --- app/master/build.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/master/build.py b/app/master/build.py index 3a910a5..743830e 100644 --- a/app/master/build.py +++ b/app/master/build.py @@ -190,8 +190,7 @@ def complete_subjob(self, subjob_id, payload=None): self._mark_subjob_complete(subjob_id) except Exception: - self._logger.exception('Error while completing subjob; marking build as failed.') - self.mark_failed('Error occurred while completing subjob {}.'.format(subjob_id)) + self._logger.exception('Error while processing subjob {} payload'.format(subjob_id)) raise def _parse_payload_for_atom_exit_code(self, subjob_id): From 51e8efc67365c1b3c064e22c40397dc7f33d1b65 Mon Sep 17 00:00:00 2001 From: Schnepper Date: Wed, 1 Dec 2021 12:33:43 -0800 Subject: [PATCH 2/6] When creating a tarfile result on the client - verify the tarfile after creating it. --- app/util/fs.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/util/fs.py b/app/util/fs.py index de354b9..e1a88aa 100644 --- a/app/util/fs.py +++ b/app/util/fs.py @@ -121,6 +121,11 @@ def tar_directories(target_dirs_to_archive_paths, tarfile_path): for dir_path, archive_name in target_dirs_to_archive_paths.items(): target_dir = os.path.normpath(dir_path) tar.add(target_dir, arcname=archive_name) + # Verify that the tarfile is readable + with tarfile.open(tarfile_path) as tar: + for member in tar.getmembers(): + with tar.extractfile(member.name) as target: + data = target.read() def zip_directory(target_dir: str, archive_filename: str) -> str: From af7bdeaab55a00afa4981e9c9b554da4c1003136 Mon Sep 17 00:00:00 2001 From: Schnepper Date: Wed, 1 Dec 2021 12:55:47 -0800 Subject: [PATCH 3/6] Retry POST & PUT on timeout --- app/util/network.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/util/network.py b/app/util/network.py index b016c29..46897c9 100644 --- a/app/util/network.py +++ b/app/util/network.py @@ -51,7 +51,7 @@ def get(self, *args, **kwargs): return self._request('GET', *args, **kwargs) # todo: may be a bad idea to retry -- what if post was successful but just had a response error? - @retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError,)) + @retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError,requests.Timeout), initial_delay=1.0) def post(self, *args, **kwargs): """ Send a POST request to a url. Arguments to this method, unless otherwise documented below in _request(), are @@ -75,7 +75,7 @@ def post_with_digest(self, url, request_params, secret, error_on_failure=False): error_on_failure=error_on_failure) # todo: may be a bad idea to retry -- what if put was successful but just had a response error? - @retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError,)) + @retry_on_exception_exponential_backoff(exceptions=(requests.ConnectionError,requests.Timeout)) def put(self, *args, **kwargs): """ Send a PUT request to a url. Arguments to this method, unless otherwise documented below in _request(), are From 074e33becade182bcc7c98bd32dbe6e4d7f7c79d Mon Sep 17 00:00:00 2001 From: Schnepper Date: Wed, 1 Dec 2021 13:02:44 -0800 Subject: [PATCH 4/6] Retry POST of subjob results from worker to manager when we don't have a 200 response --- app/slave/cluster_slave.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/app/slave/cluster_slave.py b/app/slave/cluster_slave.py index e82cf7c..1d30d0b 100644 --- a/app/slave/cluster_slave.py +++ b/app/slave/cluster_slave.py @@ -359,13 +359,15 @@ def _execute_subjob(self, build_id, subjob_id, executor, atomic_commands): files = {'file': ('payload', open(results_file, 'rb'), 'application/x-compressed')} self._idle_executors.put(executor) # work is done; mark executor as idle - resp = self._network.post(results_url, data=data, files=files) - if resp.ok: - self._logger.info('Build {}, Subjob {} completed and sent results to master.', build_id, subjob_id) - else: - self._logger.error( - ('Build {}, Subjob {} encountered an error when sending results to master.' - '\n\tStatus Code {}\n\t{}').format(build_id, subjob_id, resp.status_code, resp.text)) + for attempt in range(3): + resp = self._network.post(results_url, data=data, files=files) + if resp.status_code == 200: + self._logger.info('Build {}, Subjob {} completed and sent results to master.', build_id, subjob_id) + break + else: + self._logger.error( + ('Build {}, Subjob {} encountered an error when sending results to master.' + '\n\tStatus Code {} attempt {}\n\t{}').format(build_id, subjob_id, resp.status_code, attempt+1, resp.text)) def _notify_master_of_state_change(self, new_state): """ From 20d791bca3ede24dd731db2c6ea1e5c4d878a5d8 Mon Sep 17 00:00:00 2001 From: Schnepper Date: Wed, 1 Dec 2021 13:39:37 -0800 Subject: [PATCH 5/6] Check that received subjob results is a properly formatted tarfile --- app/util/fs.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/util/fs.py b/app/util/fs.py index e1a88aa..b08a9db 100644 --- a/app/util/fs.py +++ b/app/util/fs.py @@ -83,6 +83,9 @@ def extract_tar(archive_file, target_dir=None, delete=False): if not target_dir: target_dir, _ = os.path.split(archive_file) # default to same directory as tar file + if not tarfile.is_tarfile(archive_file): + raise Exception("Not a tarfile: {}".format(archive_file)) + try: with tarfile.open(archive_file, 'r:gz') as f: f.extractall(target_dir) From cee3ab64f90e6b1009a2a2703e428e719d9ef66d Mon Sep 17 00:00:00 2001 From: Schnepper Date: Tue, 7 Dec 2021 16:57:53 -0800 Subject: [PATCH 6/6] Revert "When creating a tarfile result on the client - verify the tarfile after creating it." This reverts commit 51e8efc67365c1b3c064e22c40397dc7f33d1b65. --- app/util/fs.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/app/util/fs.py b/app/util/fs.py index b08a9db..f0bc4b4 100644 --- a/app/util/fs.py +++ b/app/util/fs.py @@ -124,11 +124,6 @@ def tar_directories(target_dirs_to_archive_paths, tarfile_path): for dir_path, archive_name in target_dirs_to_archive_paths.items(): target_dir = os.path.normpath(dir_path) tar.add(target_dir, arcname=archive_name) - # Verify that the tarfile is readable - with tarfile.open(tarfile_path) as tar: - for member in tar.getmembers(): - with tar.extractfile(member.name) as target: - data = target.read() def zip_directory(target_dir: str, archive_filename: str) -> str: