diff --git a/Cargo.lock b/Cargo.lock index 056a8d6335603..4c811ec78aa23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,22 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "antithesis_sdk" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18dbd97a5b6c21cc9176891cf715f7f0c273caf3959897f43b9bd1231939e675" +dependencies = [ + "libc", + "libloading", + "linkme", + "once_cell", + "rand 0.8.5", + "rustc_version_runtime", + "serde", + "serde_json", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -5120,6 +5136,26 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "linkme" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83272d46373fb8decca684579ac3e7c8f3d71d4cc3aa693df8759e260ae41cf" +dependencies = [ + "linkme-impl", +] + +[[package]] +name = "linkme-impl" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32d59e20403c7d08fe62b4376edfe5c7fb2ef1e6b1465379686d0f21c8df444b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -5779,6 +5815,7 @@ dependencies = [ name = "mz-catalog" version = "0.0.0" dependencies = [ + "antithesis_sdk", "anyhow", "async-trait", "base64 0.22.1", @@ -7170,8 +7207,9 @@ dependencies = [ [[package]] name = "mz-persist-client" -version = "26.26.0-dev.0" +version = "26.25.0-dev.0" dependencies = [ + "antithesis_sdk", "anyhow", "arrayvec 0.7.6", "arrow", @@ -7956,6 +7994,7 @@ dependencies = [ name = "mz-storage" version = "0.0.0" dependencies = [ + "antithesis_sdk", "anyhow", "arrow", "arrow-ipc", @@ -10698,6 +10737,16 @@ dependencies = [ "semver", ] +[[package]] +name = "rustc_version_runtime" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dd18cd2bae1820af0b6ad5e54f4a51d0f3fcc53b05f845675074efcc7af071d" +dependencies = [ + "rustc_version", + "semver", +] + [[package]] name = "rustix" version = "0.38.44" diff --git a/Cargo.toml b/Cargo.toml index 1f71ea92e6c7b..e58fdd3b30874 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -265,6 +265,7 @@ ahash = { version = "0.8.12", default-features = false } aho-corasick = "1.1.4" allocation-counter = "0" anyhow = "1.0.102" +antithesis_sdk = "0.2.8" array-concat = "0.5.5" arrayvec = "0.7.6" arrow = { version = "57", default-features = false } diff --git a/bin/ci-builder b/bin/ci-builder index 066bf273130a9..6d53be5cad2f5 100755 --- a/bin/ci-builder +++ b/bin/ci-builder @@ -18,6 +18,9 @@ set -euo pipefail NIGHTLY_RUST_DATE=2026-05-06 +# Allow overriding the container runtime (e.g. MZ_DEV_CI_BUILDER_RUNTIME=podman). +DOCKER="${MZ_DEV_CI_BUILDER_RUNTIME:-docker}" + workdir=$(pwd) cd "$(dirname "$0")/.." @@ -128,10 +131,14 @@ gid=$(id -g) [[ "$gid" -lt 500 ]] && gid=$uid build() { + local cache_args=() + if [[ "$DOCKER" != "podman" ]]; then + cache_args+=(--cache-from=materialize/ci-builder:"$cache_tag") + cache_args+=(--cache-to=type=inline,mode=max) + fi # shellcheck disable=SC2086 # intentional splitting of build args string - docker buildx build --pull \ - --cache-from=materialize/ci-builder:"$cache_tag" \ - --cache-to=type=inline,mode=max \ + "$DOCKER" buildx build --pull \ + "${cache_args[@]}" \ $docker_build_args \ --tag materialize/ci-builder:"$tag" \ --tag ghcr.io/materializeinc/materialize/ci-builder:"$tag" \ @@ -181,13 +188,13 @@ case "$cmd" in build "$@" ;; exists) - docker manifest inspect "$image_registry"/ci-builder:"$tag" &> /dev/null + "$DOCKER" manifest inspect "$image_registry"/ci-builder:"$tag" &> /dev/null ;; tag) echo "$tag" ;; push) - docker login ghcr.io -u materialize-bot --password "$GITHUB_GHCR_TOKEN" + "$DOCKER" login ghcr.io -u materialize-bot --password "$GITHUB_GHCR_TOKEN" build --push "$@" ;; run) @@ -274,6 +281,7 @@ case "$cmd" in --env AZURE_SERVICE_ACCOUNT_PASSWORD --env AZURE_SERVICE_ACCOUNT_TENANT --env GCP_SERVICE_ACCOUNT_JSON + --env ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON --env GITHUB_TOKEN --env GITHUB_GHCR_TOKEN --env GPG_KEY @@ -372,20 +380,26 @@ case "$cmd" in ) fi if [[ "$(uname -s)" = Linux ]]; then - args+=( - --user "$(id -u):$(stat -c %g /var/run/docker.sock)" - ) + if [[ "${MZ_DEV_CI_BUILDER_RUNTIME:-docker}" == "podman" ]]; then + args+=(--userns=keep-id) + else + args+=( + --user "$(id -u):$(stat -c %g /var/run/docker.sock)" + ) + fi if [[ $secrets == "true" ]]; then # Allow Docker-in-Docker by mounting the Docker socket in the # container. Host networking allows us to see ports created by # containers that we launch. args+=( - --volume "/var/run/docker.sock:/var/run/docker.sock" --network host --env "DOCKER_TLS_VERIFY=${DOCKER_TLS_VERIFY-}" --env "DOCKER_HOST=${DOCKER_HOST-}" ) + if [[ -S /var/run/docker.sock ]]; then + args+=(--volume "/var/run/docker.sock:/var/run/docker.sock") + fi # Forward Docker configuration too, if available. docker_dir=${DOCKER_CONFIG:-$HOME/.docker} @@ -431,14 +445,22 @@ case "$cmd" in image="$image_registry/ci-builder:$tag" # Try downloading the image a few times in case of registry flakiness if [[ "${CI:-}" ]]; then - if ! docker inspect "$image" > /dev/null 2>&1; then - docker pull "$image" || (sleep 3 && docker pull "$image") || (sleep 3 && docker pull "$image") || sleep 3 + if ! "$DOCKER" inspect "$image" > /dev/null 2>&1; then + "$DOCKER" pull "$image" || (sleep 3 && "$DOCKER" pull "$image") || (sleep 3 && "$DOCKER" pull "$image") || sleep 3 fi fi - docker run "${args[@]}" "$image" eatmydata "${docker_command[@]}" + if [[ "$DOCKER" == "podman" ]]; then + # --userns=keep-id already maps the host UID/GID into the + # container, so autouseradd is unnecessary. Override the + # entrypoint to skip it. + args+=(--entrypoint eatmydata) + "$DOCKER" run "${args[@]}" "$image" "${docker_command[@]}" + else + "$DOCKER" run "${args[@]}" "$image" eatmydata "${docker_command[@]}" + fi ;; root-shell) - docker exec --interactive --tty --user 0:0 "$(<"$cid_file")" eatmydata ci/builder/root-shell.sh + "$DOCKER" exec --interactive --tty --user 0:0 "$(<"$cid_file")" eatmydata ci/builder/root-shell.sh ;; *) printf "unknown command %q\n" "$cmd" diff --git a/ci/builder/Dockerfile b/ci/builder/Dockerfile index b3022ff3ea683..eb6b71be277a4 100644 --- a/ci/builder/Dockerfile +++ b/ci/builder/Dockerfile @@ -10,7 +10,7 @@ # Stage 1: Build a minimum CI Builder image that we can use for the initial # steps like `mkpipeline` and `Build`, as well as any tests that are self # contained and use other Docker images. -FROM ubuntu:noble-20260410 AS ci-builder-min +FROM ubuntu:noble-20260210.1 AS ci-builder-min WORKDIR /workdir @@ -399,8 +399,13 @@ ENV CARGO_HOME=/cargo RUN mkdir /cargo && chmod 777 /cargo VOLUME /cargo +# Antithesis coverage instrumentation library (used when --antithesis is passed) +RUN curl -sSL https://antithesis.com/assets/instrumentation/libvoidstar.so \ + -o /usr/lib/libvoidstar.so \ + && ldconfig + # Stage 3: Build a lightweight CI Builder image for console/playwright jobs. -FROM ubuntu:noble-20260410 AS ci-builder-console +FROM ubuntu:noble-20260324 AS ci-builder-console ARG ARCH_GCC ARG ARCH_GO diff --git a/ci/mkpipeline.py b/ci/mkpipeline.py index 79fcb7bd2a0c9..d6be6018c7532 100644 --- a/ci/mkpipeline.py +++ b/ci/mkpipeline.py @@ -121,6 +121,12 @@ def main() -> int: type=Sanitizer, choices=Sanitizer, ) + parser.add_argument( + "--antithesis", + action="store_true", + default=ui.env_is_truthy("CI_ANTITHESIS"), + help="enable Antithesis coverage instrumentation", + ) parser.add_argument( "--priority", type=int, @@ -166,6 +172,7 @@ def get_hashes(arch: Arch) -> tuple[str, bool]: arch=arch, coverage=args.coverage, sanitizer=args.sanitizer, + antithesis=args.antithesis, ) deps = repo.resolve_dependencies(image for image in repo if image.publish) check = deps.check() @@ -209,6 +216,7 @@ def fetch_hashes() -> None: args.coverage, args.sanitizer, lto, + args.antithesis, ) trim_ci_glue_exempt_steps(pipeline) else: @@ -218,9 +226,11 @@ def fetch_hashes() -> None: args.coverage, args.sanitizer, lto, + args.antithesis, ) truncate_skip_length(pipeline) handle_sanitizer_skip(pipeline, args.sanitizer) + handle_antithesis_skip(pipeline, args.antithesis) increase_agents_timeouts(pipeline, args.sanitizer, args.coverage) prioritize_pipeline(pipeline, args.priority) switch_jobs_to_aws(pipeline, args.priority) @@ -240,6 +250,7 @@ def fetch_hashes() -> None: args.coverage, args.sanitizer, lto, + args.antithesis, ) add_nightly_deploy_dependency(pipeline, args.pipeline) remove_dependencies_on_prs(pipeline, args.pipeline, hash_check) @@ -328,6 +339,21 @@ def handle_sanitizer_skip(pipeline: Any, sanitizer: Sanitizer) -> None: step["skip"] = True +def handle_antithesis_skip(pipeline: Any, antithesis: bool) -> None: + if antithesis: + pipeline.setdefault("env", {})["CI_ANTITHESIS"] = "1" + + for step in steps(pipeline): + if step.get("antithesis") == "skip": + step["skip"] = True + + else: + + for step in steps(pipeline): + if step.get("antithesis") == "only": + step["skip"] = True + + def increase_agents_timeouts( pipeline: Any, sanitizer: Sanitizer, coverage: bool ) -> None: @@ -711,6 +737,7 @@ def trim_tests_pipeline( coverage: bool, sanitizer: Sanitizer, lto: bool, + antithesis: bool = False, ) -> None: """Trim pipeline steps whose inputs have not changed in this branch. @@ -731,6 +758,7 @@ def trim_tests_pipeline( profile=mzbuild.Profile.RELEASE if lto else mzbuild.Profile.OPTIMIZED, coverage=coverage, sanitizer=sanitizer, + antithesis=antithesis, ) deps = repo.resolve_dependencies(image for image in repo) @@ -917,6 +945,7 @@ def add_cargo_test_dependency( coverage: bool, sanitizer: Sanitizer, lto: bool, + antithesis: bool = False, ) -> None: """Cargo Test normally doesn't have to wait for the build to complete, but it requires a few images (ubuntu-base, postgres), which are rarely changed. So only add a dependency when those images are not on Dockerhub yet.""" if pipeline_name not in ("test", "nightly"): @@ -933,6 +962,7 @@ def add_cargo_test_dependency( profile=mzbuild.Profile.RELEASE if lto else mzbuild.Profile.OPTIMIZED, coverage=coverage, sanitizer=sanitizer, + antithesis=antithesis, ) composition = Composition(repo, name="cargo-test") deps = composition.dependencies @@ -1090,6 +1120,8 @@ def remove_mz_specific_keys(pipeline: Any) -> None: del step["coverage"] if "sanitizer" in step: del step["sanitizer"] + if "antithesis" in step: + del step["antithesis"] if "ci_glue_exempt" in step: del step["ci_glue_exempt"] if ( diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 64e0809e2ef7e..90deba8d15c45 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -65,6 +65,29 @@ steps: branches: "main" skip: "currently broken" + - id: build-x86_64-antithesis + label: ":rust: Build x86_64 (Antithesis)" + # Regenerate the antithesis compose YAML before building so the + # `antithesis-config` image's fingerprint captures the same + # materialized fingerprint we're about to publish — otherwise + # Antithesis would try to pull a stale `materialized:mzbuild-…` + # whenever the committed YAML lagged behind source changes. + command: bin/ci-builder run stable ci/test/build-antithesis.sh + inputs: + - "*" + depends_on: [] + timeout_in_minutes: 90 + agents: + queue: l-builder-linux-x86_64 + env: + CI_ANTITHESIS: "1" + # Antithesis-flavored images get distinct mzbuild fingerprints, so + # they coexist with regular GHCR tags. The build is x86_64-only — + # Antithesis runs amd64 sandboxes. + sanitizer: skip + coverage: skip + antithesis: skip + - id: build-rust-latest-beta label: "Build with Latest Rust Beta" command: bin/ci-builder run stable ci/test/rust-beta-build.sh diff --git a/ci/test/build-antithesis.sh b/ci/test/build-antithesis.sh new file mode 100755 index 0000000000000..da288aff7ec34 --- /dev/null +++ b/ci/test/build-antithesis.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +# build-antithesis.sh — antithesis-flavored build + Antithesis-registry push. +# +# 1. Write a `.env` into every `test/antithesis/configs//` so each +# per-group `antithesis-config-` image bakes in compose refs +# that point at the Antithesis GCP Artifact Registry (where we mirror +# to). The .env content is the same across groups (only materialized +# + antithesis-workload refs matter), but each config image's +# fingerprint includes its own .env copy as an input, so the per-group +# images track the materialized fingerprint transitively. +# 2. Run the standard `ci.test.build` to compile antithesis-flavored Rust +# binaries and build the docker images (pushed to GHCR via mzbuild). +# 3. `docker login` the Antithesis GCP Artifact Registry using +# `ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON` (a service account scoped to +# `materialize-storage@molten-verve-216720.iam.gserviceaccount.com` — +# kept distinct from `GCP_SERVICE_ACCOUNT_JSON` which is used elsewhere +# for unrelated GCP integrations). +# 4. Retag + push `materialized`, `antithesis-workload`, and every +# `antithesis-config-` to the Antithesis registry. Public images +# referenced by the composes (postgres, minio, kafka stack) stay on +# their upstream registries — Antithesis can reach those directly. + +set -euo pipefail + +: "${CI_ANTITHESIS:?build-antithesis.sh expects CI_ANTITHESIS=1}" + +# GCP Artifact Registry path for Antithesis. Tags pushed under +# $ANTITHESIS_REGISTRY/:mzbuild-. +ANTITHESIS_REGISTRY="${ANTITHESIS_REGISTRY:-us-central1-docker.pkg.dev/molten-verve-216720/materialize-repository}" + +# Workload groups whose per-group docker-compose.yaml + .env must be +# written before `ci.test.build` runs (the `antithesis-config-` +# mzbuild images COPY the .env into their build context). Read from +# the manifest so adding a group to test/antithesis/groups.yaml only +# requires editing that file. +# +# Two non-obvious points about this load: +# - The array is `workload_groups`, not `GROUPS`. `GROUPS` is a magic +# bash variable holding the current user's supplementary group IDs; +# `mapfile -t GROUPS < …` silently fails to overwrite it and `set +# -e` then kills the script with no error message. Don't rename. +# - The pyactivate call writes to a temp file rather than feeding +# `mapfile` via process substitution. Process substitution masks +# non-zero exits from the inner command (bash gotcha), so a +# pyactivate failure would otherwise leave `workload_groups` empty +# and the for-loop below would silently iterate zero times. +groups_tmp=$(mktemp) +trap 'rm -f "$groups_tmp"' EXIT +bin/pyactivate -c " +import sys +sys.path.insert(0, 'test/antithesis') +from groups import load_manifest +for name in sorted(load_manifest().groups): + print(name) +" > "$groups_tmp" +mapfile -t workload_groups < "$groups_tmp" +echo "--- Antithesis workload groups: ${workload_groups[*]}" + +for group in "${workload_groups[@]}"; do + config_dir="test/antithesis/configs/$group" + echo "--- Writing $config_dir/.env (registry: $ANTITHESIS_REGISTRY)" + bin/pyactivate test/antithesis/export-env.py \ + --group="$group" \ + --registry "$ANTITHESIS_REGISTRY" \ + > "$config_dir/.env" +done + +echo "--- Building antithesis-flavored mzbuild images" +bin/pyactivate -m ci.test.build + +echo "--- Authenticating to Antithesis registry" +if [[ -z "${ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON:-}" ]]; then + echo "ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON is unset — pushing to the Antithesis registry will fail." >&2 + echo "Provision it as a Buildkite-agent env var (see bin/ci-builder env-forwarding)." >&2 + exit 1 +fi +echo "$ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON" \ + | docker login -u _json_key --password-stdin "https://${ANTITHESIS_REGISTRY%%/*}" + +echo "--- Pushing Materialize-built images to the Antithesis registry" +bin/pyactivate test/antithesis/push-antithesis.py --registry "$ANTITHESIS_REGISTRY" diff --git a/ci/test/build.py b/ci/test/build.py index d91e82ffe2734..d026b6dd3a8e5 100755 --- a/ci/test/build.py +++ b/ci/test/build.py @@ -34,18 +34,41 @@ def main() -> None: set_build_status("pending") coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED") sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")] + antithesis = ui.env_is_truthy("CI_ANTITHESIS") repo = mzbuild.Repository( Path("."), coverage=coverage, sanitizer=sanitizer, + antithesis=antithesis, image_registry="materialize", ) # Build and push any images that are not already available on Docker Hub, # so they are accessible to other build agents. print("--- Acquiring mzbuild images") - deps = repo.resolve_dependencies(image for image in repo if image.publish) + if antithesis: + # Antithesis only consumes a small set of mzbuild images; + # everything else in the repo (balancerd, sqllogictest, ...) is + # wasted CI time for this pipeline. `resolve_dependencies` walks + # `depends_on` transitively, so anything materialized actually + # needs still comes along. The per-group `antithesis-workload-*` + # and `antithesis-config-*` images are discovered by walking + # the mzbuild tree under test/antithesis/workloads/ and + # test/antithesis/configs/ respectively. Keep this list in + # sync with SHARED_IMAGES + manifest groups in + # push-antithesis.py. + antithesis_images = ["materialized"] + [ + name + for name in repo.images + if name.startswith("antithesis-workload-") + or name.startswith("antithesis-config-") + ] + deps = repo.resolve_dependencies( + repo.images[name] for name in antithesis_images + ) + else: + deps = repo.resolve_dependencies(image for image in repo if image.publish) deps.ensure(pre_build=lambda images: upload_debuginfo(repo, images)) set_build_status("success") annotate_buildkite_with_tags(repo.rd.arch, deps) diff --git a/ci/test/lint-main/checks/check-antithesis-compose.sh b/ci/test/lint-main/checks/check-antithesis-compose.sh new file mode 100755 index 0000000000000..f451383a39fff --- /dev/null +++ b/ci/test/lint-main/checks/check-antithesis-compose.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +# check-antithesis-compose.sh — ensure every test/antithesis/configs//docker-compose.yaml +# is in sync with test/antithesis/mzcompose.py + test/antithesis/groups.yaml. +# +# Image refs in the committed YAMLs are `${MATERIALIZED_IMAGE}` style +# placeholders (resolved from `.env` at compose-parse time), so the files +# are stable across materialized source changes. A plain diff catches any +# composition (services/ports/env/deps) drift. + +set -euo pipefail + +cd "$(dirname "$0")/../../../.." + +. misc/shlib/shlib.bash + +check_antithesis_compose() { + local rc=0 + + mapfile -t groups < <(bin/pyactivate -c " +import sys +sys.path.insert(0, 'test/antithesis') +from groups import load_manifest +for name in sorted(load_manifest().groups): + print(name) +") + + for group in "${groups[@]}"; do + local committed="test/antithesis/configs/$group/docker-compose.yaml" + local generated + generated=$(mktemp) + + bin/pyactivate test/antithesis/export-compose.py --group="$group" > "$generated" + + if ! diff -u "$committed" "$generated"; then + echo + echo "$committed is out of sync with test/antithesis/mzcompose.py + groups.yaml." + echo "Regenerate with:" + echo " bin/pyactivate test/antithesis/export-compose.py --group=$group > $committed" + rc=1 + fi + + rm -f "$generated" + done + + return $rc +} + +try check_antithesis_compose + +try_status_report diff --git a/ci/test/lint-main/checks/check-pipeline.sh b/ci/test/lint-main/checks/check-pipeline.sh index baed7ae9a717c..95da47ae547c8 100755 --- a/ci/test/lint-main/checks/check-pipeline.sh +++ b/ci/test/lint-main/checks/check-pipeline.sh @@ -28,6 +28,7 @@ unset CI_TEST_IDS unset CI_TEST_SELECTION unset CI_SANITIZER unset CI_COVERAGE_ENABLED +unset CI_ANTITHESIS unset CI_WAITING_FOR_BUILD pids=() diff --git a/misc/python/materialize/data_ingest/executor.py b/misc/python/materialize/data_ingest/executor.py index 9892eed185e27..d1716adf50813 100644 --- a/misc/python/materialize/data_ingest/executor.py +++ b/misc/python/materialize/data_ingest/executor.py @@ -8,6 +8,7 @@ # by the Apache License, Version 2.0. import json +import os import random import time from textwrap import dedent @@ -620,8 +621,15 @@ def __init__( def create(self, logging_exe: Any | None = None) -> None: self.logging_exe = logging_exe + # MZ_DATA_INGEST_PG_HOST lets callers reach the upstream postgres + # by a non-127.0.0.1 hostname. mzcompose's default test shape + # binds postgres's 5432 to 127.0.0.1 on the host, so direct + # localhost access works. Container-to-container topologies + # (Antithesis) need to reach postgres by service name instead, + # since bare-port forwarding doesn't traverse the workload + # container's loopback. self.pg_conn = psycopg.connect( - host="127.0.0.1", + host=os.environ.get("MZ_DATA_INGEST_PG_HOST", "127.0.0.1"), user="postgres", password="postgres", port=self.ports["postgres"], diff --git a/misc/python/materialize/mzbuild.py b/misc/python/materialize/mzbuild.py index f653b84abc4a9..08ca9bb43c943 100644 --- a/misc/python/materialize/mzbuild.py +++ b/misc/python/materialize/mzbuild.py @@ -187,6 +187,7 @@ def __init__( sanitizer: Sanitizer, image_registry: str, image_prefix: str, + antithesis: bool = False, ): self.root = root self.arch = arch @@ -196,6 +197,7 @@ def __init__( self.cargo_workspace = cargo.Workspace(root) self.image_registry = image_registry self.image_prefix = image_prefix + self.antithesis = antithesis def build( self, @@ -471,13 +473,21 @@ def __init__(self, rd: RepositoryDetails, path: Path, config: dict[str, Any]): def run(self, prep: Any) -> None: super().run(prep) + source = Path(self.source) for src in self.inputs(): - dst = self.path / self.destination / src + rel = Path(src).relative_to(source) + dst = self.path / self.destination / rel dst.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(self.rd.root / self.source / src, dst) + shutil.copy(self.rd.root / src, dst) def inputs(self) -> set[str]: - return set(git.expand_globs(self.rd.root / self.source, self.matching)) + # Return repo-root-relative paths so that `ResolvedImage.fingerprint` + # (which resolves each input as `rd.root / rel_path`) can lstat them. + source = Path(self.source) + return { + str(source / p) + for p in git.expand_globs(self.rd.root / self.source, self.matching) + } class CargoPreImage(PreImage): @@ -513,6 +523,8 @@ def extra(self) -> str: flags += "optimized" if self.rd.coverage: flags += "coverage" + if self.rd.antithesis: + flags += ["antithesis"] if self.rd.sanitizer != Sanitizer.none: flags += self.rd.sanitizer.value flags.sort() @@ -547,15 +559,14 @@ def generate_cargo_build_command( examples: list[str], features: list[str] | None = None, ) -> list[str]: - rustflags = ( - rustc_flags.coverage - if rd.coverage - else ( - rustc_flags.sanitizer[rd.sanitizer] - if rd.sanitizer != Sanitizer.none - else ["--cfg=tokio_unstable"] - ) - ) + if rd.antithesis: + rustflags = rustc_flags.antithesis + elif rd.coverage: + rustflags = rustc_flags.coverage + elif rd.sanitizer != Sanitizer.none: + rustflags = rustc_flags.sanitizer[rd.sanitizer] + else: + rustflags = ["--cfg=tokio_unstable"] cflags = ( [ f"--target={target(rd.arch)}", @@ -568,8 +579,8 @@ def generate_cargo_build_command( if rd.sanitizer != Sanitizer.none else [] ) - extra_env = ( - { + if rd.sanitizer != Sanitizer.none: + extra_env = { "CFLAGS": " ".join(cflags), "CXXFLAGS": " ".join(cflags), "LDFLAGS": " ".join(cflags), @@ -582,9 +593,8 @@ def generate_cargo_build_command( "PATH": f"/sanshim:/opt/x-tools/{target(rd.arch)}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", "TSAN_OPTIONS": "report_bugs=0", # build-scripts fail } - if rd.sanitizer != Sanitizer.none - else {} - ) + else: + extra_env = {} cargo_build = rd.build( "build", channel=None, rustflags=rustflags, extra_env=extra_env @@ -672,7 +682,11 @@ def copy(src: Path, relative_dst: Path) -> None: exe_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(src, exe_path) - if self.strip: + if self.rd.antithesis: + # Antithesis needs full debug symbols for symbolization. + # Don't strip anything. + pass + elif self.strip: # The debug information is large enough that it slows down CI, # since we're packaging these binaries up into Docker images and # shipping them around. @@ -945,6 +959,7 @@ def _build_locked( "ARCH_GCC": str(self.image.rd.arch), "ARCH_GO": self.image.rd.arch.go_str(), "CI_SANITIZER": str(self.image.rd.sanitizer), + "ANTITHESIS": "1" if self.image.rd.antithesis else "", } f = self.write_dockerfile() @@ -1416,6 +1431,7 @@ def __init__( sanitizer: Sanitizer = Sanitizer.none, image_registry: str = image_registry(), image_prefix: str = "", + antithesis: bool = False, ): self.rd = RepositoryDetails( root, @@ -1425,6 +1441,7 @@ def __init__( sanitizer, image_registry, image_prefix, + antithesis=antithesis, ) self.images: dict[str, Image] = {} self.compositions: dict[str, Path] = {} @@ -1517,6 +1534,12 @@ def install_arguments(parser: argparse.ArgumentParser) -> None: default="", help="a prefix to apply to all Docker image names", ) + parser.add_argument( + "--antithesis", + help="whether to enable Antithesis coverage instrumentation", + default=ui.env_is_truthy("CI_ANTITHESIS"), + action="store_true", + ) @classmethod def from_arguments(cls, root: Path, args: argparse.Namespace) -> "Repository": @@ -1544,6 +1567,7 @@ def from_arguments(cls, root: Path, args: argparse.Namespace) -> "Repository": image_registry=args.image_registry, image_prefix=args.image_prefix, arch=args.arch, + antithesis=args.antithesis, ) @property diff --git a/misc/python/materialize/mzcompose/services/clusterd.py b/misc/python/materialize/mzcompose/services/clusterd.py index e07ca490a5355..bffe3ddc3e470 100644 --- a/misc/python/materialize/mzcompose/services/clusterd.py +++ b/misc/python/materialize/mzcompose/services/clusterd.py @@ -28,7 +28,7 @@ def __init__( options: list[str] = [], restart: str = "no", stop_grace_period: str = "120s", - scratch_directory: str = "/scratch", + scratch_directory: str | None = "/scratch", volumes: list[str] = [], workers: int = 1, process_names: list[str] = [], @@ -68,7 +68,13 @@ def __init__( f"CLUSTERD_STORAGE_TIMELY_CONFIG={storage_timely_config}", ] - options = ["clusterd", f"--scratch-directory={scratch_directory}", *options] + # `scratch_directory=None` omits the CLI flag entirely. clusterd + # treats this as "no scratch" — RocksDB switches to its in-memory + # env (`Env::mem_env()`), matching the production deployment shape + # where cluster replicas have no scratch disk attached. + options = ["clusterd", *options] + if scratch_directory is not None: + options.insert(1, f"--scratch-directory={scratch_directory}") config: ServiceConfig = {} diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index a693d07fecf7e..629855bc4bdbf 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -89,6 +89,7 @@ ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, Complexity, Scenario, + is_fault_shaped, ) from materialize.sqlsmith import known_errors @@ -510,11 +511,15 @@ def refill_sqlsmith(self, exe: Executor) -> None: # json library used or the interaction with Python reading from # it. Ignore for now return - except: - if exe.db.scenario not in ( - Scenario.Kill, - Scenario.BackupRestore, - Scenario.ZeroDowntimeDeploy, + except Exception as exc: + if ( + exe.db.scenario + not in ( + Scenario.Kill, + Scenario.BackupRestore, + Scenario.ZeroDowntimeDeploy, + ) + or not is_fault_shaped(exc) ): raise finally: @@ -1496,7 +1501,6 @@ def __init__( BOOLEAN_FLAG_VALUES ) self.flags_with_values["enable_eager_delta_joins"] = BOOLEAN_FLAG_VALUES - self.flags_with_values["enable_public_metrics_endpoint"] = BOOLEAN_FLAG_VALUES self.flags_with_values["persist_batch_structured_key_lower_len"] = [ "0", "1", @@ -1809,9 +1813,6 @@ def __init__( "kafka_retry_backoff_max", "kafka_reconnect_backoff", "kafka_reconnect_backoff_max", - "kafka_sink_message_max_bytes", - "kafka_sink_batch_size", - "kafka_sink_batch_num_messages", "pg_source_validate_timeline", "sql_server_source_validate_restore_history", "oidc_issuer", @@ -1991,7 +1992,7 @@ def run(self, exe: Executor) -> bool: return False role_id = exe.db.role_id exe.db.role_id += 1 - role = Role(role_id) + role = Role(role_id, name_scope=exe.db.name_scope) role.create(exe) exe.db.roles.append(role) return True @@ -2030,6 +2031,13 @@ def run(self, exe: Executor) -> bool: class CreateClusterAction(Action): def run(self, exe: Executor) -> bool: + # In existing-cluster mode the Database wraps a pre-existing + # (caller-supplied) cluster, typically bootstrapped by the + # Antithesis compose, and we have no allocator for additional + # clusters tied to other pool members. Skip — the wrapped + # cluster is the entire test surface. + if exe.db.existing_cluster_name is not None: + return False with exe.db.lock: if len(exe.db.clusters) >= MAX_CLUSTERS: return False @@ -2041,6 +2049,7 @@ def run(self, exe: Executor) -> bool: size=self.rng.choice(["1", "2"]), replication_factor=self.rng.choice([1, 2]), introspection_interval="1s", + name_scope=exe.db.name_scope, ) cluster.create(exe) exe.db.clusters.append(cluster) @@ -2174,6 +2183,11 @@ def run(self, exe: Executor) -> bool: with exe.db.lock: # Keep cluster 0 with 1 replica for sources/sinks unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed] + # Pre-existing (pool) clusters: the framework didn't create them + # and won't mutate them. Skip. + unmanaged_clusters = [ + c for c in unmanaged_clusters if not c.is_pool_backed + ] if not unmanaged_clusters: return False cluster = self.rng.choice(unmanaged_clusters) @@ -2197,6 +2211,10 @@ def run(self, exe: Executor) -> bool: with exe.db.lock: # Keep cluster 0 with 1 replica for sources/sinks unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed] + # Pre-existing (pool) clusters: same reasoning as above. Skip. + unmanaged_clusters = [ + c for c in unmanaged_clusters if not c.is_pool_backed + ] if not unmanaged_clusters: return False cluster = self.rng.choice(unmanaged_clusters) @@ -2677,10 +2695,11 @@ def run(self, exe: Executor) -> bool: ) source.create(exe) exe.db.kafka_sources.append(source) - except: - if exe.db.scenario not in ( - Scenario.Kill, - Scenario.ZeroDowntimeDeploy, + except Exception as exc: + if ( + exe.db.scenario + not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy) + or not is_fault_shaped(exc) ): raise return True @@ -2752,10 +2771,11 @@ def run(self, exe: Executor) -> bool: ) source.create(exe) exe.db.mysql_sources.append(source) - except: - if exe.db.scenario not in ( - Scenario.Kill, - Scenario.ZeroDowntimeDeploy, + except Exception as exc: + if ( + exe.db.scenario + not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy) + or not is_fault_shaped(exc) ): raise return True @@ -2827,10 +2847,11 @@ def run(self, exe: Executor) -> bool: ) source.create(exe) exe.db.postgres_sources.append(source) - except: - if exe.db.scenario not in ( - Scenario.Kill, - Scenario.ZeroDowntimeDeploy, + except Exception as exc: + if ( + exe.db.scenario + not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy) + or not is_fault_shaped(exc) ): raise return True @@ -2904,10 +2925,11 @@ def run(self, exe: Executor) -> bool: ) source.create(exe) exe.db.sql_server_sources.append(source) - except: - if exe.db.scenario not in ( - Scenario.Kill, - Scenario.ZeroDowntimeDeploy, + except Exception as exc: + if ( + exe.db.scenario + not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy) + or not is_fault_shaped(exc) ): raise return True diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index bad0b4081bbde..7034be033dae8 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -7,6 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +import dataclasses import random import threading import uuid @@ -885,18 +886,56 @@ def __str__(self) -> str: class Role: role_id: int lock: threading.Lock - - def __init__(self, role_id: int): + # Inserted between `role` and `{role_id}` in the generated name. Empty by + # default (giving the historical `role0` shape). When set, gives + # `role{name_scope}{role_id}` — used by callers like the Antithesis + # parallel-driver where many concurrent Database instances against one + # materialize would otherwise collide on the same `role0..roleN` names. + name_scope: str + + def __init__(self, role_id: int, name_scope: str = ""): self.role_id = role_id self.lock = threading.Lock() + self.name_scope = name_scope def __str__(self) -> str: + # Format: `role[-{name_scope}-]{role_id}`. The bracketed segment is + # only present when seed-scoping is on, so the historical `role0` + # shape (which non-Antithesis consumers parse) is preserved. + # Scoped names need identifier-quoting because dashes aren't valid + # in an unquoted identifier; unscoped names stay bare to match the + # original SQL the framework emits. + if self.name_scope: + return identifier(f"role-{self.name_scope}-{self.role_id}") return f"role{self.role_id}" def create(self, exe: Executor) -> None: exe.execute(f"CREATE ROLE {self}") +@dataclasses.dataclass(frozen=True) +class ClusterdPoolMember: + """Address+config of one external clusterd container the SUT will host an + unmanaged cluster replica on. + + Used by the Antithesis compose bootstrap (see test/antithesis/mzcompose.py) + to build the CREATE CLUSTER REPLICAS clause for each long-lived pool + cluster: one cluster per pool member, with this member as its sole + replica. After bootstrap the framework only references the cluster by + name (`existing_cluster_name`); pool members aren't passed into + `Database` directly. + + Default ports match clusterd's defaults; override per environment. + """ + + host: str + storagectl_port: int = 2100 + computectl_port: int = 2101 + compute_port: int = 2102 + storage_port: int = 2103 + workers: int = 4 + + class ClusterReplica: replica_id: int size: str @@ -904,7 +943,12 @@ class ClusterReplica: rename: int lock: threading.Lock - def __init__(self, replica_id: int, size: str, cluster: "Cluster"): + def __init__( + self, + replica_id: int, + size: str, + cluster: "Cluster", + ): self.replica_id = replica_id self.size = size self.cluster = cluster @@ -935,6 +979,18 @@ class Cluster: introspection_interval: str rename: int lock: threading.Lock + # Inserted between `cluster` and `-{cluster_id}` in the generated name. + # Empty by default (giving the historical `cluster-N` shape). When set, + # gives `cluster{name_scope}-N` — used by callers like the Antithesis + # parallel-driver, where many concurrent Database instances against one + # materialize would otherwise collide on the same `cluster-N` names. + name_scope: str + # When set, the cluster represents a pre-existing cluster the framework + # did not create and must not drop. `name()` returns this literally + # (bypassing cluster_id / rename / name_scope), and `create()` / `drop()` + # are no-ops. The replicas list is empty in this mode — the framework + # doesn't model the pre-existing replicas because it never touches them. + pre_existing_name: str | None def __init__( self, @@ -943,27 +999,66 @@ def __init__( size: str, replication_factor: int, introspection_interval: str, + name_scope: str = "", + pre_existing_name: str | None = None, ): self.cluster_id = cluster_id self.managed = managed self.size = size - self.replicas = [ - ClusterReplica(i, size, self) for i in range(replication_factor) - ] + self.pre_existing_name = pre_existing_name + if pre_existing_name is not None: + # Pre-existing cluster: framework only models its name. The actual + # replicas live in materialize's catalog from the bootstrap step + # that created the cluster (see test/antithesis/mzcompose.py). + # Empty replicas list flips `is_pool_backed` to True, which is + # what the action classes use to skip DDL on this cluster. + self.managed = False + self.replicas = [] + else: + self.replicas = [ + ClusterReplica(i, size, self) for i in range(replication_factor) + ] self.replica_id = len(self.replicas) self.introspection_interval = introspection_interval self.rename = 0 self.lock = threading.Lock() + self.name_scope = name_scope + + @property + def is_pool_backed(self) -> bool: + """True for clusters the framework didn't create itself and won't + mutate (replica count, drop). Currently set when `pre_existing_name` + was passed in. Action classes that would CREATE/ALTER/DROP REPLICA + check this and bail.""" + return self.pre_existing_name is not None def name(self) -> str: + # Pre-existing clusters: name is fixed by the caller (typically a + # pool-cluster the Antithesis compose bootstrapped). Don't apply + # naughtify / name_scope / rename — they don't apply to objects we + # didn't create. + if self.pre_existing_name is not None: + return self.pre_existing_name + # Format: `cluster[-{name_scope}]-{cluster_id}[-{rename}]`. The + # bracketed `-{name_scope}` segment is only present when seed- + # scoping is on, so the historical `cluster-0` / `cluster-0-1` + # shapes (which non-Antithesis consumers parse) are preserved. + prefix = ( + f"cluster-{self.name_scope}" if self.name_scope else "cluster" + ) if self.rename: - return naughtify(f"cluster-{self.cluster_id}-{self.rename}") - return naughtify(f"cluster-{self.cluster_id}") + return naughtify(f"{prefix}-{self.cluster_id}-{self.rename}") + return naughtify(f"{prefix}-{self.cluster_id}") def __str__(self) -> str: return identifier(self.name()) def create(self, exe: Executor) -> None: + # Pre-existing cluster: the SUT already has it (bootstrapped at + # compose-up). The framework's only responsibility for the cluster + # is to use its name; never DDL it. + if self.pre_existing_name is not None: + return query = f"CREATE CLUSTER {self} " if self.managed: query += f"SIZE = '{self.size}', REPLICATION FACTOR = {len(self.replicas)}, INTROSPECTION INTERVAL = '{self.introspection_interval}'" @@ -1025,12 +1120,35 @@ def __init__( complexity: Complexity, scenario: Scenario, naughty_identifiers: bool, + # When True, top-level objects whose names are not schema-qualified + # (clusters and roles) are scoped by the database seed so concurrent + # Database instances against one materialize don't collide. Off by + # default; opted into by the Antithesis parallel-driver where many + # invocations share the SUT. Tables / schemas / views are already + # qualified by DB.name() which includes the seed, so they don't + # need this. + seed_scoped_names: bool = False, + # When set, the Database runs against a pre-existing cluster the + # framework didn't create and won't drop. CreateClusterAction is + # disabled in this mode; the single initial cluster wraps the + # supplied name. Used by the Antithesis parallel-driver to bind + # each invocation to one of the long-lived pool clusters that the + # compose creates at bootstrap (see test/antithesis/mzcompose.py). + existing_cluster_name: str | None = None, ): self.host = host self.ports = ports self.complexity = complexity self.scenario = scenario self.seed = seed + self.seed_scoped_names = seed_scoped_names + self.existing_cluster_name = existing_cluster_name + # The bare seed (no leading/trailing punctuation) used by Cluster / + # Role / etc. to assemble their scoped names. Empty when seed-scoping + # is off, in which case those classes fall back to their historical + # `cluster-N` / `role0` shapes. See Cluster.name() and Role.__str__() + # for how the seed gets inlaid. + self.name_scope = seed if seed_scoped_names else "" set_naughty_identifiers(naughty_identifiers) self.s3_path = 0 @@ -1064,21 +1182,46 @@ def __init__( ) self.views.append(view) self.view_id = len(self.views) - self.roles = [Role(i) for i in range(rng.randint(0, MAX_INITIAL_ROLES))] - self.role_id = len(self.roles) - # At least one storage cluster required for WebhookSources - self.clusters = [ - Cluster( - i, - managed=rng.choice([True, False]), - size=rng.choice( - ["scale=1,workers=1", "scale=1,workers=4", "scale=2,workers=2"] - ), - replication_factor=1, - introspection_interval="1s", - ) - for i in range(rng.randint(1, MAX_INITIAL_CLUSTERS)) + self.roles = [ + Role(i, name_scope=self.name_scope) + for i in range(rng.randint(0, MAX_INITIAL_ROLES)) ] + self.role_id = len(self.roles) + # At least one storage cluster required for WebhookSources. + # In existing-cluster mode the framework's sole initial cluster + # wraps a pre-existing cluster (typically a pool cluster the + # Antithesis compose bootstrapped). The wrapper's create()/drop() + # are no-ops; CreateClusterAction / CreateClusterReplicaAction / + # DropClusterReplicaAction are also disabled for it. + if existing_cluster_name is not None: + self.clusters = [ + Cluster( + 0, + # managed / size / replication_factor are ignored when + # `pre_existing_name` is set — the wrapper never emits + # CREATE CLUSTER. + managed=False, + size="", + replication_factor=1, + introspection_interval="1s", + name_scope=self.name_scope, + pre_existing_name=existing_cluster_name, + ) + ] + else: + self.clusters = [ + Cluster( + i, + managed=rng.choice([True, False]), + size=rng.choice( + ["scale=1,workers=1", "scale=1,workers=4", "scale=2,workers=2"] + ), + replication_factor=1, + introspection_interval="1s", + name_scope=self.name_scope, + ) + for i in range(rng.randint(1, MAX_INITIAL_CLUSTERS)) + ] self.cluster_id = len(self.clusters) self.indexes = set() self.webhook_sources = [ diff --git a/misc/python/materialize/parallel_workload/settings.py b/misc/python/materialize/parallel_workload/settings.py index 39894cd0dc536..e9398503e6165 100644 --- a/misc/python/materialize/parallel_workload/settings.py +++ b/misc/python/materialize/parallel_workload/settings.py @@ -10,6 +10,9 @@ import random from enum import Enum +import psycopg +import requests + class Complexity(Enum): Read = "read" @@ -38,6 +41,75 @@ def _missing_(cls, value): return cls(random.choice([elem.value for elem in cls])) +# Message substrings produced by connection drops, DNS partitions, broker +# transport failures, and Mz-restart admission control. Used by +# `is_fault_shaped()` to decide whether a Scenario.Kill / ZeroDowntimeDeploy +# swallow site should tolerate `exc` or re-raise it. Anything not in this +# list is treated as a real correctness signal worth surfacing. +_FAULT_SHAPED_MSG_PATTERNS: tuple[str, ...] = ( + # psycopg connection-drop wordings + "server closed the connection unexpectedly", + "the connection is lost", + "EOF detected", + "Cursor closed", + # libc / socket + "connection refused", + "connection reset", + "broken pipe", + "could not connect to server", + "Failed to resolve hostname", + "failed to lookup address information", + "Temporary failure in name resolution", + "Multiple connection attempts failed", + "connection timeout", + # Materialize / persist admission control: S3-backed persist surfaces + # HTTP 429s via reqwest error chains from src/persist/src/location.rs. + "TooManyRequests", + # Postgres source visiting its own restart window + "terminating connection due to administrator command", + # Kafka transport during broker fault windows + "BrokerTransportFailure", + "Meta data fetch error", + # HTTP / WS + "Remote end closed connection without response", + "Connection aborted", + "Connection broken: IncompleteRead", + "Connection to remote host was lost", + "socket is already closed", + "WS connect", +) + + +def is_fault_shaped(exc: BaseException) -> bool: + """True if `exc` looks like a fault-injection / kill-test artifact + rather than a SUT correctness bug. + + Used by `Scenario.Kill` / `Scenario.ZeroDowntimeDeploy` tolerance + sites in `action.py` and `executor.py`: matching exceptions get + swallowed (the kill-thread can plausibly produce them); everything + else re-raises so real bugs surface. + + The bare `except:` shape that this function replaces previously + swallowed *every* exception under those scenarios, including + AssertionError / KeyError / TypeError from framework bugs and + actual SUT misbehavior. Narrow it to the shapes a kill / fault + actually produces. + """ + # Operational connection-error types — always tolerated regardless + # of the surface message (drivers don't promise a stable wording). + if isinstance( + exc, + ( + psycopg.OperationalError, + psycopg.InterfaceError, + requests.exceptions.ConnectionError, + ), + ): + return True + msg = getattr(exc, "msg", None) or str(exc) + return any(p in msg for p in _FAULT_SHAPED_MSG_PATTERNS) + + ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS = { # Uses a lot of memory, hard to predict how much "memory_limiter_interval": "0", diff --git a/misc/python/materialize/rustc_flags.py b/misc/python/materialize/rustc_flags.py index 6353f83d3b68a..f6aac45573e14 100644 --- a/misc/python/materialize/rustc_flags.py +++ b/misc/python/materialize/rustc_flags.py @@ -25,6 +25,20 @@ ] +# Flags to enable Antithesis coverage instrumentation. +# Requires libvoidstar.so at /usr/lib/ (installed in ci-builder and +# the materialized Docker image). +# See: https://antithesis.com/docs/using_antithesis/sdk/rust/instrumentation/ +antithesis = [ + "-Ccodegen-units=1", + "-Cpasses=sancov-module", + "-Cllvm-args=-sanitizer-coverage-level=3", + "-Cllvm-args=-sanitizer-coverage-trace-pc-guard", + "-Clink-args=-Wl,--build-id", + "-lvoidstar", +] + + class Sanitizer(Enum): """What sanitizer to use""" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 6704bd79d8b06..3553217de30ed 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -10,6 +10,7 @@ publish = false workspace = true [dependencies] +antithesis_sdk.workspace = true anyhow.workspace = true async-trait.workspace = true base64.workspace = true diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index c93830e38d7e3..83d560c98004c 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -17,6 +17,7 @@ use std::str::FromStr; use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant}; +use antithesis_sdk::assert_always_greater_than; use async_trait::async_trait; use differential_dataflow::lattice::Lattice; use futures::{FutureExt, StreamExt}; @@ -41,6 +42,7 @@ use mz_repr::Diff; use mz_storage_client::controller::PersistEpoch; use mz_storage_types::StorageDiff; use mz_storage_types::sources::SourceData; +use serde_json::json; use sha2::Digest; use timely::progress::{Antichain, Timestamp as TimelyTimestamp}; use tracing::{debug, info, warn}; @@ -145,6 +147,21 @@ impl FenceableToken { current_token, fence_token, } => { + // The two `assert!` calls below are the natural placement + // for an Antithesis `assert_always!` covering the + // FenceableToken state-machine invariant. They are not + // wrapped today because Materialize does not run multiple + // concurrent environmentd processes against the same + // catalog shard, so the `Fenced` state is unreachable in + // every supported topology — including the Antithesis + // topology in this repo. Wrapping them would create + // assertions Antithesis cannot exercise, which is dead + // weight in coverage reports. If we ever ship multi- + // environmentd (e.g. for a 0DT-preflight Antithesis run), + // convert these to `assert_always!` with distinct + // messages so a violation becomes a reportable property + // failure rather than a panic. See the + // `epoch-fencing-prevents-split-brain` catalog entry. assert!( fence_token > current_token, "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}" @@ -1182,12 +1199,43 @@ impl UnopenedPersistCatalogState { "fencing previous catalogs" ); if matches!(self.mode, Mode::Writable) { + // Snapshot the prior durable epoch so the post-CaS anchor + // below can verify monotonicity. Captured before the write + // because `compare_and_append` may call `sync()` which + // reads new state into `self.fenceable_token`. + let prior_durable_epoch = self + .fenceable_token + .token() + .map(|t| t.epoch.get()) + .unwrap_or(0); match self .compare_and_append(fence_updates.clone(), commit_ts) .await { Ok(upper) => { commit_ts = upper; + // Antithesis anchor for `epoch-fencing-prevents- + // split-brain`: after our fence-token CaS commits, + // the freshly-minted epoch we just persisted must + // be strictly greater than the prior durable + // epoch. A regression here would mean a future + // lower-epoch writer would not be fenced out by + // the write we just made, opening the split-brain + // window the catalog is supposed to close. + let new_epoch = current_fenceable_token + .token() + .expect("freshly minted Unfenced token always has a current_token") + .epoch + .get(); + assert_always_greater_than!( + new_epoch, + prior_durable_epoch, + "catalog fencing: new durable epoch did not strictly increase after fence-token CaS", + &json!({ + "prior_durable_epoch": prior_durable_epoch, + "new_epoch": new_epoch, + }) + ); } Err(CompareAndAppendError::Fence(e)) => return Err(e.into()), Err(e @ CompareAndAppendError::UpperMismatch { .. }) => { diff --git a/src/materialized/ci/Dockerfile b/src/materialized/ci/Dockerfile index 18686251a7b07..e06aaf6bad0cf 100644 --- a/src/materialized/ci/Dockerfile +++ b/src/materialized/ci/Dockerfile @@ -20,6 +20,17 @@ COPY materialized entrypoint.sh /usr/local/bin/ USER root RUN ln -s /usr/local/bin/materialized /usr/local/bin/environmentd \ && ln -s /usr/local/bin/materialized /usr/local/bin/clusterd + +# Antithesis instrumentation (conditional on --build-arg ANTITHESIS=1) +ARG ANTITHESIS +RUN if [ -n "$ANTITHESIS" ]; then \ + curl -sSL https://antithesis.com/assets/instrumentation/libvoidstar.so \ + -o /usr/lib/libvoidstar.so \ + && ldconfig \ + && mkdir -p /symbols \ + && ln -s /usr/local/bin/materialized /symbols/materialized; \ + fi + USER materialize ENTRYPOINT ["tini", "--", "entrypoint.sh"] diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 5441fec177038..0d2b068964372 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-persist-client" description = "Client for Materialize pTVC durability system" -version = "26.26.0-dev.0" +version = "26.25.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false @@ -28,6 +28,7 @@ name = "benches" harness = false [dependencies] +antithesis_sdk.workspace = true anyhow.workspace = true arrayvec.workspace = true arrow.workspace = true diff --git a/src/persist-client/src/internal/apply.rs b/src/persist-client/src/internal/apply.rs index a48982ff77eb9..5085b24b3d6fb 100644 --- a/src/persist-client/src/internal/apply.rs +++ b/src/persist-client/src/internal/apply.rs @@ -15,6 +15,9 @@ use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::Instant; +use antithesis_sdk::assert_always_greater_than; +use serde_json::json; + use crate::cache::{LockingTypedState, StateCache}; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::gc::GcReq; @@ -598,6 +601,21 @@ where } } + // Antithesis-reportable form of the broader `persist-cas-monotonicity` + // catalog property: SeqNo must strictly increase across any committed + // state transition. The narrower equality check below (next == seqno) + // still panics on violation and stays in place to catch skip/regress + // in the same call. + assert_always_greater_than!( + new_state.seqno().0, + expected.0, + "persist: state seqno did not strictly increase across CaS apply", + &json!({ + "expected_prev": expected.0, + "computed_next": new_state.seqno().0, + "cmd": cmd.name, + }) + ); assert_eq!( expected.next(), new_state.seqno(), diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index f96d9991511dc..2e7f4f4a37ab7 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -15,6 +15,7 @@ bench = false [dependencies] anyhow.workspace = true +antithesis_sdk.workspace = true async-stream.workspace = true async-trait.workspace = true aws-credential-types.workspace = true diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 60ab8b8928058..2f6e8d28f960e 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; +use antithesis_sdk::{assert_always, assert_unreachable}; use anyhow::anyhow; use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Hashable}; @@ -52,6 +53,7 @@ use rdkafka::statistics::Statistics; use rdkafka::topic_partition_list::Offset; use rdkafka::{ClientContext, Message, TopicPartitionList}; use serde::{Deserialize, Serialize}; +use serde_json::json; use timely::PartialOrder; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; @@ -273,7 +275,13 @@ fn render_reader<'scope>( .iter() .map(|(_name, kind)| kind.clone()) .collect::>(), - _ => panic!("unexpected source export details: {:?}", details), + _ => { + assert_unreachable!( + "kafka: unexpected source export details", + &json!({"source_id": id.to_string()}) + ); + panic!("unexpected source export details: {:?}", details) + } }; let statistics = config @@ -888,6 +896,11 @@ fn render_reader<'scope>( } } // We can now put them back + assert_always!( + reader.partition_consumers.is_empty(), + "kafka: partition_consumers not drained at shutdown", + &json!({"remaining": reader.partition_consumers.len()}) + ); assert!(reader.partition_consumers.is_empty()); reader.partition_consumers = consumers; @@ -1139,6 +1152,20 @@ impl KafkaSourceReader { // Given the explicit consumer to partition assignment, we should never receive a message // for a partition for which we have no metadata + let partition_known = self + .last_offsets + .get(output_index) + .map(|m| m.contains_key(&partition)) + .unwrap_or(false); + assert_always!( + partition_known, + "kafka: partition missing from last_offsets", + &json!({ + "source_id": self.id.to_string(), + "partition": partition, + "output_index": output_index, + }) + ); assert!( self.last_offsets .get(output_index) @@ -1190,6 +1217,13 @@ fn construct_source_message( ) { let pid = msg.partition(); let Ok(offset) = u64::try_from(msg.offset()) else { + assert_unreachable!( + "kafka: negative offset from non-error message", + &json!({ + "partition": msg.partition(), + "raw_offset": msg.offset(), + }) + ); panic!( "got negative offset ({}) from otherwise non-error'd kafka message", msg.offset() diff --git a/src/storage/src/source/mysql/replication/partitions.rs b/src/storage/src/source/mysql/replication/partitions.rs index c4a6a9ba743bc..7aef48cb3c2c8 100644 --- a/src/storage/src/source/mysql/replication/partitions.rs +++ b/src/storage/src/source/mysql/replication/partitions.rs @@ -11,6 +11,8 @@ use std::collections::BTreeMap; +use antithesis_sdk::assert_unreachable; +use serde_json::json; use timely::progress::Antichain; use uuid::Uuid; @@ -92,6 +94,14 @@ impl GtidReplicationPartitions { // should only see GTID transaction-ids // in a monotonic order for each source, starting at that upper. if active_part.timestamp() > new_part.timestamp() { + assert_unreachable!( + "mysql: BinlogGtidMonotonicityViolation — received out-of-order GTID from multithreaded replica", + &json!({ + "source_uuid": source_id.to_string(), + "active_timestamp": format!("{:?}", active_part.timestamp()), + "new_timestamp": format!("{:?}", new_part.timestamp()), + }) + ); let err = DefiniteError::BinlogGtidMonotonicityViolation( source_id.to_string(), new_part.timestamp().clone(), diff --git a/src/storage/src/source/reclock.rs b/src/storage/src/source/reclock.rs index d4ab5ac4b312b..745115e5dbf72 100644 --- a/src/storage/src/source/reclock.rs +++ b/src/storage/src/source/reclock.rs @@ -10,11 +10,13 @@ /// The `ReclockOperator` observes the progress of a stream that is /// timestamped with some source time `FromTime` and generates bindings that describe how the /// collection should evolve in target time `IntoTime`. +use antithesis_sdk::assert_reachable; use differential_dataflow::consolidation; use differential_dataflow::lattice::Lattice; use mz_persist_client::error::UpperMismatch; use mz_repr::Diff; use mz_storage_client::util::remap_handle::RemapHandle; +use serde_json::json; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain}; @@ -128,6 +130,12 @@ where upper: self.upper.clone(), }; + // Tracks whether append_batch hit an UpperMismatch during this mint + // invocation. If true and we still exit the while loop normally, + // we've exercised the retry path covered by the catalog property + // `reclock-mint-eventually-succeeds`. + let mut cas_retry_count: u64 = 0; + while *self.upper == [IntoTime::minimum()] || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper) && PartialOrder::less_than(&self.upper, &new_into_upper) @@ -159,12 +167,28 @@ where let new_batch = match self.append_batch(updates, &new_into_upper).await { Ok(trace_batch) => trace_batch, - Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await, + Err(UpperMismatch { current, .. }) => { + cas_retry_count = cas_retry_count.saturating_add(1); + self.sync(current.borrow()).await + } }; batch.updates.extend(new_batch.updates); batch.upper = new_batch.upper; } + // Reachability anchor for `reclock-mint-eventually-succeeds`: this + // line fires only when a CaS UpperMismatch was observed and the + // mint loop nonetheless terminated. That's the path the catalog + // wants Antithesis to observe at least once per run; reaching it + // is the signal, so the marker is unconditional `assert_reachable!` + // rather than `assert_sometimes!(true, …)`. + if cas_retry_count > 0 { + assert_reachable!( + "reclock: mint completed after at least one compare_and_append UpperMismatch", + &json!({"cas_retry_count": cas_retry_count}) + ); + } + batch } diff --git a/src/storage/src/source/reclock/compat.rs b/src/storage/src/source/reclock/compat.rs index a260e2dfcf060..607bbc4c5e680 100644 --- a/src/storage/src/source/reclock/compat.rs +++ b/src/storage/src/source/reclock/compat.rs @@ -15,6 +15,7 @@ use std::rc::Rc; use std::sync::Arc; use std::time::Duration; +use antithesis_sdk::assert_unreachable; use anyhow::Context; use differential_dataflow::lattice::Lattice; use fail::fail_point; @@ -33,6 +34,7 @@ use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader}; use mz_storage_types::StorageDiff; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::sources::{SourceData, SourceTimestamp}; +use serde_json::json; use timely::order::{PartialOrder, TotalOrder}; use timely::progress::Timestamp; use timely::progress::frontier::Antichain; @@ -303,7 +305,13 @@ where *self.shared_write_frontier.borrow_mut() = new_upper; return result; } - Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"), + Err(invalid_use) => { + assert_unreachable!( + "reclock: compare_and_append InvalidUsage", + &json!({"error": invalid_use.to_string()}) + ); + panic!("compare_and_append failed: {invalid_use}") + } } } diff --git a/src/storage/src/upsert/types.rs b/src/storage/src/upsert/types.rs index 2bf8270aa2c95..57a4b85033563 100644 --- a/src/storage/src/upsert/types.rs +++ b/src/storage/src/upsert/types.rs @@ -88,11 +88,13 @@ use std::num::Wrapping; use std::sync::Arc; use std::time::Instant; +use antithesis_sdk::{assert_always, assert_unreachable}; use bincode::Options; use itertools::Itertools; use mz_ore::error::ErrorExt; use mz_repr::{Diff, GlobalId}; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::json; use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics}; use crate::statistics::SourceStatistics; @@ -294,6 +296,10 @@ impl StateValue { match self { Self::Value(value) => value, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: into_decoded on Consolidating StateValue", + &json!({"accessor": "into_decoded"}) + ); panic!("called `into_decoded without calling `ensure_decoded`") } } @@ -366,6 +372,10 @@ impl StateValue { }), }), StateValue::Consolidating(_) => { + assert_unreachable!( + "upsert: into_provisional_value on Consolidating StateValue", + &json!({"accessor": "into_provisional_value"}) + ); panic!("called `into_provisional_value` without calling `ensure_decoded`") } } @@ -400,6 +410,10 @@ impl StateValue { }), }), StateValue::Consolidating(_) => { + assert_unreachable!( + "upsert: into_provisional_tombstone on Consolidating StateValue", + &json!({"accessor": "into_provisional_tombstone"}) + ); panic!("called `into_provisional_tombstone` without calling `ensure_decoded`") } } @@ -413,6 +427,10 @@ impl StateValue { _ => None, }, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: provisional_order on Consolidating StateValue", + &json!({"accessor": "provisional_order"}) + ); panic!("called `provisional_order` without calling `ensure_decoded`") } } @@ -427,6 +445,10 @@ impl StateValue { _ => value.finalized.as_ref(), }, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: provisional_value_ref on Consolidating StateValue", + &json!({"accessor": "provisional_value_ref"}) + ); panic!("called `provisional_value_ref` without calling `ensure_decoded`") } } @@ -437,6 +459,10 @@ impl StateValue { match self { Self::Value(v) => v.finalized, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: into_finalized_value on Consolidating StateValue", + &json!({"accessor": "into_finalized_value"}) + ); panic!("called `into_finalized_value` without calling `ensure_decoded`") } } @@ -577,7 +603,13 @@ impl StateValue { *acc ^= val; } } - _ => panic!("`merge_update_state` called with non-consolidating state"), + _ => { + assert_unreachable!( + "upsert: merge_update_state on non-Consolidating state", + &json!({"site": "merge_update_state"}) + ); + panic!("`merge_update_state` called with non-consolidating state") + } } } @@ -618,29 +650,61 @@ impl StateValue { }) .expect("invalid upsert state"); // Truncation is fine (using `as`) as this is just a checksum + let want_checksum = seahash::hash(value) as i64; + assert_always!( + consolidating.checksum_sum.0 == want_checksum, + "upsert: consolidating checksum_sum mismatch (diff_sum=1)", + &json!({ + "source_id": source_id.to_string(), + "checksum_sum": consolidating.checksum_sum.0, + "expected_seahash": want_checksum, + }) + ); assert_eq!( - consolidating.checksum_sum.0, - // Hash the value, not the full buffer, which may have extra 0's - seahash::hash(value) as i64, + consolidating.checksum_sum.0, want_checksum, "invalid upsert state: checksum_sum does not match, state: {}, {}", - consolidating, - source_id, + consolidating, source_id, ); *self = Self::finalized_value(bincode_opts.deserialize(value).unwrap()); } 0 => { + assert_always!( + consolidating.len_sum.0 == 0, + "upsert: consolidating len_sum nonzero (diff_sum=0)", + &json!({ + "source_id": source_id.to_string(), + "len_sum": consolidating.len_sum.0, + }) + ); assert_eq!( consolidating.len_sum.0, 0, "invalid upsert state: len_sum is non-0, state: {}, {}", consolidating, source_id, ); + assert_always!( + consolidating.checksum_sum.0 == 0, + "upsert: consolidating checksum_sum nonzero (diff_sum=0)", + &json!({ + "source_id": source_id.to_string(), + "checksum_sum": consolidating.checksum_sum.0, + }) + ); assert_eq!( consolidating.checksum_sum.0, 0, "invalid upsert state: checksum_sum is non-0, state: {}, {}", consolidating, source_id, ); + let all_zero = consolidating.value_xor.iter().all(|&x| x == 0); + assert_always!( + all_zero, + "upsert: consolidating value_xor nonzero (diff_sum=0)", + &json!({ + "source_id": source_id.to_string(), + "value_xor_len": consolidating.value_xor.len(), + }) + ); assert!( - consolidating.value_xor.iter().all(|&x| x == 0), + all_zero, "invalid upsert state: value_xor not all 0s with 0 diff. \ Non-zero positions: {:?}, state: {}, {}", consolidating @@ -669,6 +733,15 @@ impl StateValue { ), Err(_) => "Err(UpsertValueError)".to_string(), }); + assert_unreachable!( + "upsert: consolidating diff_sum not in {0,1}", + &json!({ + "source_id": source_id.to_string(), + "diff_sum": other, + "value_byte_len": value_byte_len, + "decodable": decode_ok, + }) + ); panic!( "invalid upsert state: non 0/1 diff_sum: {}, state: {}, {}, \ key: {:?}, value_byte_len: {:?}, decodable: {:?}", @@ -1059,6 +1132,10 @@ where }); if completed && self.snapshot_completed { + assert_unreachable!( + "upsert: snapshot completion called twice", + &json!({"site": "consolidate_chunk"}) + ); panic!("attempted completion of already completed upsert snapshot") } diff --git a/src/storage/src/upsert_continual_feedback.rs b/src/storage/src/upsert_continual_feedback.rs index a4669d3a80099..5fb562a7aa08a 100644 --- a/src/storage/src/upsert_continual_feedback.rs +++ b/src/storage/src/upsert_continual_feedback.rs @@ -14,6 +14,7 @@ use std::cmp::Reverse; use std::fmt::Debug; use std::sync::Arc; +use antithesis_sdk::{assert_always, assert_unreachable}; use differential_dataflow::hashable::Hashable; use differential_dataflow::{AsCollection, VecCollection}; use indexmap::map::Entry; @@ -23,6 +24,7 @@ use mz_storage_types::errors::{DataflowError, EnvelopeError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use serde_json::json; use std::convert::Infallible; use timely::container::CapacityContainerBuilder; use timely::dataflow::StreamVec; @@ -623,6 +625,11 @@ fn stage_input( } stash.extend(data.drain(..).map(|((key, value, order), time, diff)| { + assert_always!( + diff.is_positive(), + "upsert: input diff positive (cf v1)", + &json!({"diff": diff.into_inner()}) + ); assert!(diff.is_positive(), "invalid upsert input"); (time, key, Reverse(order), value) })); @@ -797,6 +804,10 @@ where let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) { command_state } else { + assert_unreachable!( + "upsert: key missing from commands_state (cf v1)", + &json!({"source_id": source_config.id.to_string()}) + ); panic!("key missing from commands_state"); }; diff --git a/src/storage/src/upsert_continual_feedback_v2.rs b/src/storage/src/upsert_continual_feedback_v2.rs index 32de9e3770086..8560ffd614603 100644 --- a/src/storage/src/upsert_continual_feedback_v2.rs +++ b/src/storage/src/upsert_continual_feedback_v2.rs @@ -65,6 +65,7 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use antithesis_sdk::{assert_always, assert_unreachable}; use differential_dataflow::difference::{IsZero, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; @@ -81,6 +82,7 @@ use mz_storage_types::errors::{DataflowError, EnvelopeError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use serde_json::json; use std::convert::Infallible; use timely::container::CapacityContainerBuilder; use timely::dataflow::StreamVec; @@ -312,6 +314,11 @@ where AsyncEvent::Data(cap, data) => { let mut pushed_any = false; for ((key, value, from_time), ts, diff) in data { + assert_always!( + diff.is_positive(), + "upsert: input diff positive (cf v2)", + &json!({"diff": diff.into_inner()}) + ); assert!(diff.is_positive(), "invalid upsert input"); if PartialOrder::less_equal(&input_upper, &resume_upper) && !resume_upper.less_equal(&ts) @@ -480,7 +487,13 @@ where (Some(a), Some(b)) => std::cmp::min(a, b).clone(), (Some(a), None) => a.clone(), (None, Some(b)) => b.clone(), - (None, None) => unreachable!(), + (None, None) => { + assert_unreachable!( + "upsert: cf v2 join produced (None, None)", + &json!({"site": "min_ts join"}) + ); + unreachable!() + } }; cap.downgrade(&min_ts); } else { diff --git a/src/testdrive/src/action/sql.rs b/src/testdrive/src/action/sql.rs index 02b4f2cdf7c54..fd29e9d3f5cf0 100644 --- a/src/testdrive/src/action/sql.rs +++ b/src/testdrive/src/action/sql.rs @@ -10,7 +10,7 @@ use std::ascii; use std::error::Error; use std::fmt::{self, Display, Formatter, Write as _}; -use std::time::SystemTime; +use std::time::{Instant, SystemTime}; use anyhow::{Context, bail}; use itertools::Itertools; @@ -94,13 +94,17 @@ pub async fn run_sql(mut cmd: SqlCommand, state: &mut State) -> Result { - let now = SystemTime::now(); - let epoch = SystemTime::UNIX_EPOCH; - let ts = now.duration_since(epoch).unwrap().as_secs_f64(); - let delay = now.duration_since(start).unwrap().as_secs_f64(); + let ts = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + let delay = start.elapsed().as_secs_f64(); println!("rows match; continuing at ts {ts}, took {delay}s"); (state, Ok(())) } diff --git a/test/antithesis/AGENTS.md b/test/antithesis/AGENTS.md new file mode 100644 index 0000000000000..b93956df1ea94 --- /dev/null +++ b/test/antithesis/AGENTS.md @@ -0,0 +1,21 @@ +Files relevant to running Materialize under Antithesis. + +Use the `antithesis-setup` skill to scaffold and manage this directory. Use the `antithesis-research` skill to analyze the system and build a property catalog. Use the `antithesis-workload` skill to implement assertions and test commands. + +**mzcompose.py** +Source of truth for the Antithesis topology. Standard mzcompose composition: services (`postgres-metadata`, `minio`, `redpanda`, `materialized`, `workload`), dependencies, env, ports. The generated `config/docker-compose.yaml` is derived from this. + +**export-compose.py** +Renders `mzcompose.py` into a flat docker-compose YAML that Antithesis can consume. Images are emitted as `ghcr.io/materializeinc/materialize/:mzbuild-` refs that Antithesis pulls directly from public GHCR. + +**workload/** +Mzbuild image (`antithesis-workload`) for the Python test driver. Dockerfile, entrypoint, and test-template scripts (`test/*.sh`) live here. Test command files must be prefixed with one of `parallel_driver_`, `singleton_driver_`, `serial_driver_`, `first_`, `eventually_`, `finally_`, `anytime_`; files prefixed with `helper_` are ignored by Test Composer. + +**config/** +Mzbuild image (`antithesis-config`) — a `FROM scratch` container holding the generated `docker-compose.yaml`. This is the image Antithesis points at to bring up the environment. + +**scratchbook/** +Antithesis scratchbook: system analysis, property catalog, topology plans, per-property evidence files (in `scratchbook/properties/`), property relationship maps, persistent integration notes. Keep up to date as Antithesis-related decisions change. + +**setup-complete.sh** (in `workload/`) +Inject this script into a Dockerfile to notify Antithesis that setup is complete. Should only run once the system under test is ready for testing — Antithesis will not run test commands until it receives this event. diff --git a/test/antithesis/Makefile b/test/antithesis/Makefile new file mode 100644 index 0000000000000..0c9251a65fce0 --- /dev/null +++ b/test/antithesis/Makefile @@ -0,0 +1,186 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Local-dev helper for the Materialize Antithesis harness. +# +# The harness emits one docker-compose.yaml per workload group (see +# `test/antithesis/groups.yaml`). Use `GROUP=` to pick one; +# defaults to `combined` (kitchen-sink) for back-compat. Valid groups: +# kafka, pg-cdc, mysql-cdc, sql-server-cdc, parallel-workload, upsert-stress, combined. +# +# Targets: +# make build GROUP=kafka # regen this group's compose YAML, acquire images +# make up GROUP=kafka # build + bring up the stack +# make down GROUP=kafka # tear down (preserves volumes) +# make smoke GROUP=kafka # build + up + smoke test +# make test GROUP=kafka # smoke test against a running stack +# make clean GROUP=kafka # tear down + remove volumes +# +# make build-local GROUP=kafka # build for local dev (no --antithesis flavor) +# make up-local GROUP=kafka # build-local + bring up the stack +# +# make build-all # build every group sequentially +# +# The `-local` targets are for validating the workload + drivers without +# the Antithesis platform. They build the plain (non-antithesis-flavored) +# images, which (a) don't need libvoidstar.so locally and (b) cover all +# images including new transitive deps (e.g. testdrive) that CI doesn't +# yet publish under the antithesis flavor. + +SHELL := /usr/bin/env bash +.SHELLFLAGS := -eu -o pipefail -c + +GROUP ?= combined +PROJECT := materialize-antithesis-$(GROUP) +REPO_ROOT := $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/../..) +ALL_GROUPS := kafka pg-cdc mysql-cdc sql-server-cdc parallel-workload upsert-stress combined + +# Pick podman if available, else docker. +ifndef RUNTIME + RUNTIME := $(shell command -v podman >/dev/null 2>&1 && echo podman || (command -v docker >/dev/null 2>&1 && echo docker || echo none)) +endif +ifeq ($(RUNTIME),none) + $(error neither podman nor docker found in PATH; set RUNTIME=docker or install podman) +endif +ifeq ($(RUNTIME),podman) + export MZ_DEV_CI_BUILDER_RUNTIME := podman +endif + +CONFIG_DIR := $(REPO_ROOT)/test/antithesis/configs/$(GROUP) +COMPOSE_FILE := $(CONFIG_DIR)/docker-compose.yaml +ENV_FILE := $(CONFIG_DIR)/.env +COMPOSE := $(RUNTIME) compose -p $(PROJECT) --env-file $(ENV_FILE) -f $(COMPOSE_FILE) +PSQL := $(COMPOSE) exec materialized psql -h localhost -p 6875 -U materialize + +# mzbuild images we need built locally. Third-party images (postgres, minio, +# kafka, …) are pulled by `docker compose` from their upstream registries. +# Per-group `antithesis-config-` is built by `mzimage acquire` +# transitively, since it's a no-op FROM-scratch image referencing the +# materialized + antithesis-workload fingerprints. +MZBUILD_IMAGES := materialized antithesis-workload-$(GROUP) + +# The upsert-stress group's compose adds `upsert-hammer-{i}` services +# backed by a dedicated `antithesis-upsert-hammer` image (kept tiny + +# free of any /opt/antithesis/ directory so Antithesis doesn't pick it +# up as a Test Composer target). Only acquire it when the active +# group actually references it; other groups would just be paying the +# build cost for no consumer. +ifeq ($(GROUP),upsert-stress) + MZBUILD_IMAGES += antithesis-upsert-hammer +endif + +# --------------------------------------------------------------------------- +# Build +# --------------------------------------------------------------------------- +.PHONY: build export-compose export-env acquire-images build-all + +build: export-compose export-env acquire-images + +export-compose: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-compose.py --group=$(GROUP) > $(COMPOSE_FILE) + @echo "Wrote $(COMPOSE_FILE)" + +export-env: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-env.py --group=$(GROUP) > $(ENV_FILE) + @echo "Wrote $(ENV_FILE)" + +acquire-images: + @# Force `--arch x86_64` to match what `export-env.py` writes into the + @# `.env` file. The Antithesis platform itself runs amd64-only — both + @# `export-env.py` and `export-compose.py` pin `arch=Arch.X86_64` — so + @# the fingerprints baked into the compose YAML are always for x86_64. + @# Without this flag, `bin/mzimage acquire` defaults to the host arch + @# (aarch64 on Apple Silicon), producing a different fingerprint than + @# the one the compose YAML references; the resulting image doesn't + @# match the compose's `image:` tag and the local stack fails to pull. + @# Also: aarch64 cross-compile of `--antithesis` builds needs an aarch64 + @# `libvoidstar.so` which isn't published — x86_64 is the only flavor + @# Antithesis ships. + @for image in $(MZBUILD_IMAGES); do \ + echo "--- Acquiring $$image (--antithesis --arch x86_64)"; \ + cd $(REPO_ROOT) && bin/mzimage acquire "$$image" --antithesis --arch x86_64; \ + done + +build-all: + @for g in $(ALL_GROUPS); do \ + echo "==> Building group: $$g"; \ + $(MAKE) build GROUP=$$g; \ + done + +# --------------------------------------------------------------------------- +# Local (non-antithesis) targets +# --------------------------------------------------------------------------- +# +# Build and run the same compose topology without the Antithesis flavor. +# Used for validating the workload + drivers locally before pushing to CI. +# Plain (non-antithesis) mzbuild images: +# * don't need libvoidstar.so installed in the cross-sysroot +# * cover all transitive deps (e.g. testdrive), unlike the antithesis +# flavor which CI only publishes for materialized + antithesis-workload +# + antithesis-config-. +# The fault-orchestrator service is a no-op outside Antithesis (its +# pause_faults.sh exits cleanly when ANTITHESIS_STOP_FAULTS is unset), so +# the topology behaves like a regular docker-compose stack. + +.PHONY: build-local export-compose-local export-env-local acquire-images-local up-local + +build-local: export-compose-local export-env-local acquire-images-local + +export-compose-local: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-compose.py \ + --group=$(GROUP) --no-antithesis > $(COMPOSE_FILE) + @echo "Wrote $(COMPOSE_FILE) (host arch)" + +export-env-local: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-env.py \ + --group=$(GROUP) --no-antithesis > $(ENV_FILE) + @echo "Wrote $(ENV_FILE) (non-antithesis)" + +acquire-images-local: + @# Use the host arch (no `--arch` flag) so the resulting workload image + @# runs natively. On Apple Silicon, running the x86_64 testdrive binary + @# under Docker's rosetta/qemu emulation segfaults inside the + @# foundationdb client init — native aarch64 sidesteps that entirely. + @# `export-env.py --no-antithesis` mirrors the same logic and emits + @# host-arch fingerprints to the .env file. + @for image in $(MZBUILD_IMAGES); do \ + echo "--- Acquiring $$image (plain, host arch)"; \ + cd $(REPO_ROOT) && bin/mzimage acquire "$$image"; \ + done + +up-local: build-local + $(COMPOSE) up -d + +# --------------------------------------------------------------------------- +# Up / Down +# --------------------------------------------------------------------------- +.PHONY: up down clean + +up: build + $(COMPOSE) up -d + +down: + $(COMPOSE) down + +clean: down + $(COMPOSE) down -v --remove-orphans 2>/dev/null || true + +# --------------------------------------------------------------------------- +# Test +# --------------------------------------------------------------------------- +.PHONY: test smoke + +test: + $(PSQL) -c "CREATE TABLE IF NOT EXISTS smoke_test (k INT, v TEXT)" + $(PSQL) -c "INSERT INTO smoke_test VALUES (1, 'hello'), (2, 'world')" + $(PSQL) -c "SELECT * FROM smoke_test ORDER BY k" + $(PSQL) -c "DROP TABLE smoke_test" + +smoke: up test + @echo "[smoke ($(GROUP))] passed" diff --git a/test/antithesis/README.md b/test/antithesis/README.md new file mode 100644 index 0000000000000..8f38b2982123d --- /dev/null +++ b/test/antithesis/README.md @@ -0,0 +1,680 @@ +# Antithesis POC + +This directory contains everything needed to run Materialize under +[Antithesis](https://antithesis.com/) — a deterministic-hypervisor fuzzer that +injects container-level faults (kill / pause / partition / drop / dup) at +randomized cadence and replays interesting histories. + +If you're just here to **make something happen locally**: + +```sh +make up-local GROUP=kafka # build + bring up the Kafka workload group +make smoke GROUP=kafka # build + up + a one-shot psql smoke test +make down GROUP=kafka # tear down (preserves volumes) +make clean GROUP=kafka # tear down + remove volumes +make build-all # regenerate compose YAML for every group +``` + +Valid `GROUP` values: `kafka`, `pg-cdc`, `mysql-cdc`, `sql-server-cdc`, +`parallel-workload`, `upsert-stress`, `combined` (kitchen-sink, default). + +If you're here to **understand how this is wired up**, read on. + +--- + +## Table of contents + +1. [What Antithesis is and what we use it for](#what-antithesis-is) +2. [Architecture at a glance](#architecture-at-a-glance) +3. [Workload groups](#workload-groups) +4. [Directory layout](#directory-layout) +5. [Test command conventions](#test-command-conventions) +6. [Local dev loop](#local-dev-loop) +7. [CI / push to the Antithesis registry](#ci--push-to-the-antithesis-registry) +8. [Authoring a new property / driver](#authoring-a-new-property--driver) +9. [Authoring a new workload group](#authoring-a-new-workload-group) +10. [Conventions that are easy to violate](#conventions-that-are-easy-to-violate) +11. [Next test frameworks to port](#next-test-frameworks-to-port) + +--- + +## What Antithesis is + +Antithesis runs the system under test (SUT) on a deterministic hypervisor that +records every nondeterministic decision (network, scheduling, IO) so a failure +can be replayed bit-for-bit. On top of that it provides: + +- **Fault injection** — random kill / pause / partition / packet drop / dup at + the container layer. We don't need any code in the SUT to participate. +- **Test Composer** — a directory of executable files (`/opt/antithesis/test/v1/