From b7ec56995bb12f836472afa8a3c985a135c60a0d Mon Sep 17 00:00:00 2001 From: Jarvis Date: Thu, 11 Jun 2026 15:17:19 +0800 Subject: [PATCH 1/3] wip: per-policy cache backend dispatch (agent session cut mid-work, resume + verify) --- config.example.yaml | 15 +++- crates/aisix-cache/src/lib.rs | 10 ++- crates/aisix-core/src/config.rs | 11 +++ crates/aisix-core/src/models/cache_policy.rs | 16 ++-- crates/aisix-proxy/src/chat.rs | 55 ++++++++------ crates/aisix-proxy/src/lib.rs | 2 +- crates/aisix-proxy/src/state.rs | 77 ++++++++++++++++++-- crates/aisix-server/src/main.rs | 36 +++++---- 8 files changed, 165 insertions(+), 57 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index 87595caf..257d8ee0 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -77,14 +77,21 @@ observability: # managed: # enabled: true +# Cache backend availability. The in-process memory cache is always +# built; the shared redis cache is built iff `redis` is configured. +# Which backend serves a request is chosen per matched CachePolicy +# (its `backend` field, managed via the Admin API / control plane) — +# a policy asking for redis on a DP without `cache.redis` gets NO +# caching for its requests (cache_status = disabled), never a silent +# fallback to node-local memory. cache: - backend: "memory" # memory | redis | qdrant + # Legacy knob — no longer selects a single global cache. Kept for + # config compatibility; `backend: "redis"` still requires the + # `redis` block below (validated at boot). + backend: "memory" # memory | redis # redis: # url: "redis://127.0.0.1:6379" # mode: "single" # single | cluster | sentinel - # qdrant: - # url: "http://127.0.0.1:6333" - # collection: "aisix-semantic-cache" # Models, API keys, provider keys, guardrails, cache policies, and # observability exporters are NOT defined in this file. They are stored diff --git a/crates/aisix-cache/src/lib.rs b/crates/aisix-cache/src/lib.rs index 6a8f4b95..c44894d1 100644 --- a/crates/aisix-cache/src/lib.rs +++ b/crates/aisix-cache/src/lib.rs @@ -6,9 +6,13 @@ //! and stores the response with `x-aisix-cache: miss`. //! //! Backends: -//! - [`MemoryCache`] (moka, in-process) — default, configured by -//! `cfg.cache.backend = "memory"`. -//! - Redis backend lands in a follow-up PR behind the `redis` feature. +//! - [`MemoryCache`] (moka, in-process) — always available. +//! - `RedisCache` (behind the `redis` feature) — built when the boot +//! config carries `cache.redis`. +//! +//! The proxy picks the backend per request from the matched +//! `CachePolicy.backend` (see `aisix-proxy::state::CacheBackends`); +//! the boot config only determines which instances exist. //! //! Streaming responses aren't cached at this layer — the upstream stream //! has no terminal value to store. diff --git a/crates/aisix-core/src/config.rs b/crates/aisix-core/src/config.rs index 6f9d3d48..2e434f5c 100644 --- a/crates/aisix-core/src/config.rs +++ b/crates/aisix-core/src/config.rs @@ -517,6 +517,17 @@ impl Default for OtlpTracingConfig { } } +/// Boot-level cache backend availability (#519 B.8). +/// +/// The in-process memory cache is always built; the redis cache is +/// built iff `redis` is set. Which instance serves a given request is +/// selected by the matched `CachePolicy.backend` (etcd-managed, per +/// policy) — NOT by this struct. +/// +/// `backend` is a legacy knob kept parsing for config compatibility: +/// it no longer selects "the one global cache". Its only remaining +/// effect is fail-fast validation — `backend = "redis"` without a +/// `redis` block is rejected at boot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields, default)] pub struct CacheConfig { diff --git a/crates/aisix-core/src/models/cache_policy.rs b/crates/aisix-core/src/models/cache_policy.rs index 7e3316df..9e1c4e09 100644 --- a/crates/aisix-core/src/models/cache_policy.rs +++ b/crates/aisix-core/src/models/cache_policy.rs @@ -15,9 +15,12 @@ use serde::{Deserialize, Serialize}; use crate::resource::Resource; -/// Cache backend choice. `Memory` is enforced by the DP today; -/// `Redis` is the kine-level wire-shape stub for the upcoming -/// shared-cluster backend (DP enforcement pending). +/// Cache backend choice. The DP selects the cache instance per +/// matched policy: `Memory` uses the in-process cache (always +/// available); `Redis` uses the shared redis cache iff the deployment +/// configured `cache.redis`. A `Redis` policy on a DP without redis +/// gets NO caching (`cache_status = disabled`) — never a silent +/// fallback to node-local memory. #[derive( Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema, )] @@ -47,9 +50,10 @@ pub struct CachePolicy { #[serde(default = "default_enabled")] pub enabled: bool, - /// Backend hint. `memory` is the only enforced backend today; - /// `redis` parses + persists but the DP currently falls back - /// to memory until that backend wires up. + /// Which cache instance serves requests matched by this policy. + /// `memory` always works; `redis` requires the DP to have + /// `cache.redis` configured — otherwise matching requests get no + /// caching at all (visible as `cache_status = disabled`). #[serde(default)] pub backend: CacheBackend, diff --git a/crates/aisix-proxy/src/chat.rs b/crates/aisix-proxy/src/chat.rs index dcd8ec56..212c455d 100644 --- a/crates/aisix-proxy/src/chat.rs +++ b/crates/aisix-proxy/src/chat.rs @@ -16,7 +16,7 @@ //! line. Errors surface via [`ProxyError`] which carries the right //! status, error type, and (for rate-limits) Retry-After. -use aisix_cache::CacheKey; +use aisix_cache::{Cache, CacheKey}; use aisix_core::AppliedGuardrail; use aisix_gateway::{BridgeContext, BridgeError, ChatFormat}; use aisix_guardrails::GuardrailVerdict; @@ -1256,13 +1256,13 @@ async fn dispatch( // // Match order: first enabled policy whose `parsed_applies_to()` // accepts (req.model, auth.entry.id) wins. We grab the WHOLE - // matching entry (not just `any`) so the post-call write below - // can use that policy's `ttl_seconds` via `put_with_ttl`. When - // multiple policies match the same request, the entry-table - // iteration order decides — that's an unspecified-but-stable - // tiebreak we'll formalise (probably "narrowest scope wins") in a - // follow-up if operators ever care. - let matched_policy_ttl = snapshot + // matching entry (not just `any`) so the backend selection and the + // post-call write below can use that policy's `backend` and + // `ttl_seconds`. When multiple policies match the same request, + // the entry-table iteration order decides — that's an + // unspecified-but-stable tiebreak we'll formalise (probably + // "narrowest scope wins") in a follow-up if operators ever care. + let matched_policy = snapshot .cache_policies .entries() .iter() @@ -1273,31 +1273,40 @@ async fn dispatch( .parsed_applies_to() .matches(&req.model, &auth.entry.id) }) - .map(|entry| Duration::from_secs(u64::from(entry.value.ttl_seconds))); - let cache_active_by_policy = matched_policy_ttl.is_some(); + .cloned(); + + // #519 B.8: the matched policy's `backend` selects the cache + // instance. `memory` always resolves; `redis` resolves only when + // the deployment configured `cache.redis` — otherwise caching is + // INACTIVE for this request (`cache_status = disabled`, warn once + // per policy inside `for_policy_backend`). Never fall back to + // node-local memory: the operator asked for a shared cache, and a + // silent memory stand-in would serve per-node answers while the + // dashboard claims redis semantics. + let policy_cache: Option> = match (matched_policy.as_ref(), state.cache.as_ref()) + { + (Some(entry), Some(backends)) => backends + .for_policy_backend(entry.value.backend, &entry.id, &entry.value.name) + .cloned(), + _ => None, + }; + let matched_policy_ttl = policy_cache.as_ref().and(matched_policy.as_ref()).map( + |entry| Duration::from_secs(u64::from(entry.value.ttl_seconds)), + ); // Cache lookup keyed on the *virtual* model name so a re-request // hits the cache regardless of which target served the original. - // Even with `cache_active_by_policy = false` we still build the - // key to keep the cache_status path uniform — `disabled` is the - // outcome when the gate is closed, but the request itself is - // shaped the same way. - let cache_key = state - .cache + let cache_key = policy_cache .as_ref() .map(|_| CacheKey::from_request(req).fingerprint()); - let cache_status = if cache_active_by_policy && state.cache.is_some() { + let cache_status = if policy_cache.is_some() { CacheStatus::Miss } else { CacheStatus::Disabled }; - if let (true, Some(cache), Some(key)) = ( - cache_active_by_policy, - state.cache.as_ref(), - cache_key.as_ref(), - ) { + if let (Some(cache), Some(key)) = (policy_cache.as_ref(), cache_key.as_ref()) { match cache.get(key).await { Ok(Some(cached)) => { reservation.commit_tokens(0); @@ -1694,7 +1703,7 @@ async fn dispatch( // per-entry support (defined via `Cache::put_with_ttl`'s default // impl) silently fall back to `put`. if let (Some(ttl), Some(cache), Some(key)) = - (matched_policy_ttl, state.cache.as_ref(), cache_key.as_ref()) + (matched_policy_ttl, policy_cache.as_ref(), cache_key.as_ref()) { if let Err(err) = cache.put_with_ttl(key, upstream.clone(), ttl).await { tracing::warn!(error = %err, key = %key, "cache write failed"); diff --git a/crates/aisix-proxy/src/lib.rs b/crates/aisix-proxy/src/lib.rs index 8e7be618..80963e19 100644 --- a/crates/aisix-proxy/src/lib.rs +++ b/crates/aisix-proxy/src/lib.rs @@ -59,7 +59,7 @@ pub use error::{ErrorEnvelope, ProxyError}; pub use health::{ HealthTracker, LivezState, ModelRuntimeStatusTracker, RuntimeStatus, RuntimeStatusSnapshot, }; -pub use state::ProxyState; +pub use state::{CacheBackends, ProxyState}; use axum::extract::State; use axum::http::{header, HeaderValue, Request}; diff --git a/crates/aisix-proxy/src/state.rs b/crates/aisix-proxy/src/state.rs index 95afdf0a..1c10ea65 100644 --- a/crates/aisix-proxy/src/state.rs +++ b/crates/aisix-proxy/src/state.rs @@ -7,7 +7,7 @@ //! - the per-key [`Limiter`] — queried before each upstream call and //! finalised after the response completes //! - an `Arc` shared with the admin `/metrics` endpoint -//! - an `Arc` consulted before bridge dispatch (None disables +//! - the [`CacheBackends`] consulted before bridge dispatch (None disables //! caching for that ProxyState; tests use this to keep the cache off //! the hot path when they don't care about it) //! - the configured request-body size limit @@ -15,8 +15,10 @@ //! Cheap to clone: every field is either an `Arc` or a small Copy scalar. use aisix_cache::{Cache, MemoryCache}; +use aisix_core::models::CacheBackend; use aisix_core::snapshot::SnapshotHandle; use aisix_core::{AisixSnapshot, ProxyConfig}; +use dashmap::DashSet; use aisix_gateway::Hub; use aisix_guardrails::LiveGuardrailIndex; use aisix_obs::{Metrics, OtlpHttpFanOut, UsageSink}; @@ -28,13 +30,76 @@ use crate::client_ip::ResolvedRealIp; use crate::health::{HealthTracker, LivezState, ModelRuntimeStatusTracker}; use crate::routing::RoutingRegistry; +/// The cache instances a DP deployment has available, selected per +/// request by the matched `CachePolicy.backend` (#519 B.8). +/// +/// The memory cache is always built (in-process, no config needed); +/// the redis cache exists iff the boot config carries `cache.redis`. +/// A policy that asks for `redis` on a deployment without one gets NO +/// caching for its requests (`cache_status = disabled`) — never a +/// silent fallback to node-local memory, which would lie about the +/// sharing semantics the operator picked. +#[derive(Clone)] +pub struct CacheBackends { + memory: Arc, + redis: Option>, + /// Policy ids already warned about an unavailable redis backend, + /// so the gate logs once per policy instead of once per request. + redis_warned: Arc>, +} + +impl CacheBackends { + pub fn new(memory: Arc, redis: Option>) -> Self { + Self { + memory, + redis, + redis_warned: Arc::new(DashSet::new()), + } + } + + /// Memory cache only — the default for self-hosted dev and tests. + pub fn memory_only() -> Self { + Self::new(Arc::new(MemoryCache::with_defaults()), None) + } + + /// Resolve the cache instance for a matched policy's `backend`. + /// + /// `Memory` always resolves. `Redis` resolves iff the deployment + /// configured one; otherwise caching is inactive for the request + /// and we warn once per policy id. + pub fn for_policy_backend( + &self, + backend: CacheBackend, + policy_id: &str, + policy_name: &str, + ) -> Option<&Arc> { + match backend { + CacheBackend::Memory => Some(&self.memory), + CacheBackend::Redis => { + let redis = self.redis.as_ref(); + if redis.is_none() && self.redis_warned.insert(policy_id.to_string()) { + tracing::warn!( + target: "aisix::cache", + policy_id = %policy_id, + policy_name = %policy_name, + "cache policy requests backend=redis but this DP has no \ + redis cache configured; caching is disabled for matching \ + requests (set `cache.redis` in the gateway config)" + ); + } + redis + } + } + } +} + #[derive(Clone)] pub struct ProxyState { pub snapshot: SnapshotHandle, pub hub: Arc, pub limiter: Arc, pub metrics: Arc, - pub cache: Option>, + pub cache: Option, pub routing: Arc, /// Per-request guardrail index. Resolves the applicable chain from /// attachment scope + priority on each request. Rebuilds lazily @@ -79,7 +144,7 @@ impl ProxyState { hub, limiter: Arc::new(Limiter::new()), metrics: Arc::new(Metrics::new(false)), - cache: Some(Arc::new(MemoryCache::with_defaults())), + cache: Some(CacheBackends::memory_only()), routing: Arc::new(RoutingRegistry::new()), guardrail_index, budgets: Arc::new(BudgetClient::disabled()), @@ -107,7 +172,7 @@ impl ProxyState { hub, limiter, metrics: Arc::new(Metrics::new(false)), - cache: Some(Arc::new(MemoryCache::with_defaults())), + cache: Some(CacheBackends::memory_only()), routing: Arc::new(RoutingRegistry::new()), guardrail_index, budgets: Arc::new(BudgetClient::disabled()), @@ -123,13 +188,13 @@ impl ProxyState { /// Full constructor used by the server bootstrap — lets the same /// Metrics handle be shared with the admin `/metrics` endpoint and - /// lets the caller supply a configured Cache backend. + /// lets the caller supply the configured cache backends. pub fn with_components( snapshot: SnapshotHandle, hub: Arc, limiter: Arc, metrics: Arc, - cache: Option>, + cache: Option, cfg: &ProxyConfig, ) -> Self { let guardrail_index = LiveGuardrailIndex::new(snapshot.clone(), None); diff --git a/crates/aisix-server/src/main.rs b/crates/aisix-server/src/main.rs index bffa58bc..1d01e440 100644 --- a/crates/aisix-server/src/main.rs +++ b/crates/aisix-server/src/main.rs @@ -35,7 +35,7 @@ use aisix_provider_openai::OpenAiBridge; use aisix_provider_vertex::VertexBridge; use aisix_proxy::background::run_background_model_check_once; use aisix_proxy::budget::BudgetClient; -use aisix_proxy::ProxyState; +use aisix_proxy::{CacheBackends, ProxyState}; use aisix_ratelimit::Limiter; use clap::Parser; use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions}; @@ -378,23 +378,31 @@ async fn run(mut cfg: Config) -> anyhow::Result<()> { let hub = Arc::new(build_hub()); let limiter = Arc::new(Limiter::new()); let metrics = Arc::new(Metrics::new(true)); - // Cache backend selection. Memory by default; Redis when configured. - let cache: Option> = match cfg.cache.backend { - CacheBackend::Memory => Some(Arc::new(MemoryCache::with_defaults())), - CacheBackend::Redis => { - let url = cfg - .cache - .redis - .as_ref() - .map(|r| r.url.clone()) - .ok_or_else(|| anyhow::anyhow!("cache.backend = redis but cache.redis missing"))?; + // Cache backends (#519 B.8). The memory cache is always built + // (in-process, cheap); the redis cache is built iff `cache.redis` + // is configured. Which instance serves a request is selected by + // the matched CachePolicy's `backend` field at the proxy's cache + // gate — `cache.backend` no longer picks a single global cache. + // It still fails fast on the contradictory `backend = redis` + // without a `cache.redis` block, so old configs that relied on it + // surface the misconfiguration at boot instead of per request. + if cfg.cache.backend == CacheBackend::Redis && cfg.cache.redis.is_none() { + anyhow::bail!("cache.backend = redis but cache.redis missing"); + } + let redis_cache: Option> = match cfg.cache.redis.as_ref() { + Some(redis_cfg) => { tracing::info!(target: "aisix::cache", backend = "redis", "connecting cache backend"); - let redis = RedisCache::connect(&url) - .await - .map_err(|e| anyhow::anyhow!("redis cache connect failed (url={url}): {e}"))?; + let redis = RedisCache::connect(&redis_cfg.url).await.map_err(|e| { + anyhow::anyhow!("redis cache connect failed (url={}): {e}", redis_cfg.url) + })?; Some(Arc::new(redis) as Arc) } + None => None, }; + let cache = Some(CacheBackends::new( + Arc::new(MemoryCache::with_defaults()), + redis_cache, + )); let mut proxy_state = ProxyState::with_components( snapshot_handle.clone(), From 71996785c85d56219d5bbb521569f8cd2b5655d0 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Thu, 11 Jun 2026 15:54:28 +0800 Subject: [PATCH 2/3] feat(cache): respect per-policy backend; unavailable redis fails visible, not silent-memory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Complete the per-policy cache backend dispatch (api7/AISIX-Cloud#519 B.8) on top of the salvaged WIP: - rustfmt the WIP (import order in state.rs, two chat.rs hunks) - unit tests: redis policy w/o redis disables caching (both calls pay the upstream, no header, cache_status=disabled); redis policy with a redis instance dispatches to it and ONLY it (entry pinned on the instance by fingerprint, memory instance stays empty) - e2e: new cache-policy-backend spec — memory-only DP + backend=redis policy means every identical chat reaches the upstream (fails on origin/main, which silently served the repeat from node-local memory); real-redis positive path with two DPs sharing one redis (miss on A, hit on B without re-hitting the upstream) - docs synced: caching.md backend boundary rewritten, bootstrap-config cache section, production-deployment, response-caching tutorial, roadmap status - schemas/resources/cache_policy.schema.json regenerated (dump-schema) --- crates/aisix-proxy/src/chat.rs | 15 +- crates/aisix-proxy/src/lib.rs | 166 +++++++++++ crates/aisix-proxy/src/state.rs | 2 +- docs/configuration/bootstrap-config.md | 12 +- docs/configuration/caching.md | 14 +- docs/operations/production-deployment.md | 9 +- docs/roadmap.md | 6 +- docs/tutorials/enable-response-caching.md | 2 +- schemas/resources/cache_policy.schema.json | 4 +- .../cases/cache-policy-backend-e2e.test.ts | 261 ++++++++++++++++++ 10 files changed, 460 insertions(+), 31 deletions(-) create mode 100644 tests/e2e/src/cases/cache-policy-backend-e2e.test.ts diff --git a/crates/aisix-proxy/src/chat.rs b/crates/aisix-proxy/src/chat.rs index 212c455d..b8f15563 100644 --- a/crates/aisix-proxy/src/chat.rs +++ b/crates/aisix-proxy/src/chat.rs @@ -1290,9 +1290,10 @@ async fn dispatch( .cloned(), _ => None, }; - let matched_policy_ttl = policy_cache.as_ref().and(matched_policy.as_ref()).map( - |entry| Duration::from_secs(u64::from(entry.value.ttl_seconds)), - ); + let matched_policy_ttl = policy_cache + .as_ref() + .and(matched_policy.as_ref()) + .map(|entry| Duration::from_secs(u64::from(entry.value.ttl_seconds))); // Cache lookup keyed on the *virtual* model name so a re-request // hits the cache regardless of which target served the original. @@ -1702,9 +1703,11 @@ async fn dispatch( // not the cache backend's global fallback. Backends without // per-entry support (defined via `Cache::put_with_ttl`'s default // impl) silently fall back to `put`. - if let (Some(ttl), Some(cache), Some(key)) = - (matched_policy_ttl, policy_cache.as_ref(), cache_key.as_ref()) - { + if let (Some(ttl), Some(cache), Some(key)) = ( + matched_policy_ttl, + policy_cache.as_ref(), + cache_key.as_ref(), + ) { if let Err(err) = cache.put_with_ttl(key, upstream.clone(), ttl).await { tracing::warn!(error = %err, key = %key, "cache write failed"); } diff --git a/crates/aisix-proxy/src/lib.rs b/crates/aisix-proxy/src/lib.rs index 80963e19..5e797420 100644 --- a/crates/aisix-proxy/src/lib.rs +++ b/crates/aisix-proxy/src/lib.rs @@ -534,6 +534,15 @@ mod tests { .insert(ResourceEntry::new(format!("cp-id-{name}"), policy, 1)); } + /// Policy seeder with an explicit `backend` — used by the #519 + /// B.8 tests that pin per-policy backend dispatch. + fn seed_cache_policy_with_backend(snap: &AisixSnapshot, name: &str, backend: &str) { + let cfg = format!(r#"{{"name": "{name}", "backend": "{backend}", "applies_to": "all"}}"#); + let policy: aisix_core::models::CachePolicy = serde_json::from_str(&cfg).unwrap(); + snap.cache_policies + .insert(ResourceEntry::new(format!("cp-id-{name}"), policy, 1)); + } + fn seed_snapshot_with_limits( model: &str, allowed: &[&str], @@ -3070,6 +3079,163 @@ data: [DONE]\n\n"; } } + /// #519 B.8: a `backend: "redis"` policy on a DP without a redis + /// cache must DISABLE caching for matching requests — both + /// identical calls reach the upstream, neither carries an + /// `x-aisix-cache` header, and telemetry reports + /// `cache_status = "disabled"`. The pre-fix behavior (silent + /// fallback to the node-local memory cache) would serve the + /// second call from cache and fail wiremock's `.expect(2)`. + #[tokio::test] + async fn redis_backend_policy_without_redis_disables_caching() { + use aisix_obs::UsageSink; + + let upstream = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/chat/completions")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "id": "cmpl-up", + "model": "gpt-4o", + "choices": [{ + "index": 0, + "message": {"role": "assistant", "content": "fresh"}, + "finish_reason": "stop" + }], + "usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} + }))) + .expect(2) // hard expectation: BOTH calls must pay the upstream + .mount(&upstream) + .await; + + let (tx, mut rx) = tokio::sync::mpsc::channel(8); + let hub = Arc::new(Hub::new()); + hub.register_specialized("openai", Arc::new(openai_test_bridge())); + let snap = seed_snapshot("my-gpt4", &["my-gpt4"], &upstream.uri()); + seed_cache_policy_with_backend(&snap, "redis-cache", "redis"); + // Default test state ships a memory cache but NO redis + // instance — exactly the deployment the policy mismatches. + let state = build_state_with_cache(snap, hub).with_usage_sink(UsageSink::new(tx)); + + let body = serde_json::json!({ + "model": "my-gpt4", + "messages": [{"role": "user", "content": "hi"}] + }); + let make_req = || { + Request::builder() + .method("POST") + .uri("/v1/chat/completions") + .header("authorization", "Bearer sk-caller") + .header("content-type", "application/json") + .body(Body::from(body.to_string())) + .unwrap() + }; + + for _ in 0..2 { + let resp = run(build_router(state.clone()), make_req()).await; + assert_eq!(resp.status(), StatusCode::OK); + assert!( + resp.headers().get("x-aisix-cache").is_none(), + "redis policy without a redis backend must not emit x-aisix-cache", + ); + let event = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()) + .await + .expect("usage event was never emitted") + .expect("sender dropped"); + assert_eq!( + event.cache_status, "disabled", + "unavailable backend must surface as cache_status=disabled", + ); + } + } + + /// #519 B.8 positive path: when the DP HAS a redis instance, a + /// `backend: "redis"` policy must dispatch to it — not to the + /// memory instance. A second MemoryCache stands in for redis + /// (instance dispatch is under test, not the redis wire + /// protocol): the second identical call is a cache hit, the + /// entry lives in the redis instance, and the memory instance + /// never saw the key. + #[tokio::test] + async fn redis_backend_policy_dispatches_to_redis_instance() { + use aisix_cache::{Cache, CacheKey, MemoryCache}; + + let upstream = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/chat/completions")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "id": "cmpl-up", + "model": "gpt-4o", + "choices": [{ + "index": 0, + "message": {"role": "assistant", "content": "via-redis"}, + "finish_reason": "stop" + }], + "usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} + }))) + .expect(1) // second call must be served from the redis instance + .mount(&upstream) + .await; + + let hub = Arc::new(Hub::new()); + hub.register_specialized("openai", Arc::new(openai_test_bridge())); + let snap = seed_snapshot("my-gpt4", &["my-gpt4"], &upstream.uri()); + seed_cache_policy_with_backend(&snap, "redis-cache", "redis"); + + let memory: Arc = Arc::new(MemoryCache::with_defaults()); + let redis_standin: Arc = Arc::new(MemoryCache::with_defaults()); + let mut state = build_state_with_cache(snap, hub); + state.cache = Some(CacheBackends::new( + memory.clone(), + Some(redis_standin.clone()), + )); + + let body = serde_json::json!({ + "model": "my-gpt4", + "messages": [{"role": "user", "content": "hi"}] + }); + let make_req = || { + Request::builder() + .method("POST") + .uri("/v1/chat/completions") + .header("authorization", "Bearer sk-caller") + .header("content-type", "application/json") + .body(Body::from(body.to_string())) + .unwrap() + }; + + let resp = run(build_router(state.clone()), make_req()).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers() + .get("x-aisix-cache") + .and_then(|v| v.to_str().ok()), + Some("miss"), + ); + + let resp = run(build_router(state), make_req()).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers() + .get("x-aisix-cache") + .and_then(|v| v.to_str().ok()), + Some("hit"), + ); + + // The entry must live in the redis instance and ONLY there — + // a dispatch bug that wrote to the memory instance would + // still produce a "hit" above, so pin the instance directly. + let req: aisix_gateway::ChatFormat = serde_json::from_value(body).unwrap(); + let key = CacheKey::from_request(&req).fingerprint(); + assert!( + redis_standin.get(&key).await.unwrap().is_some(), + "cache entry must be written to the policy's redis backend", + ); + assert!( + memory.get(&key).await.unwrap().is_none(), + "memory instance must not be touched by a redis-backend policy", + ); + } + #[tokio::test] async fn applies_to_model_caches_matched_model() { // Counterpart to the negative test above: when the policy diff --git a/crates/aisix-proxy/src/state.rs b/crates/aisix-proxy/src/state.rs index 1c10ea65..6da4713a 100644 --- a/crates/aisix-proxy/src/state.rs +++ b/crates/aisix-proxy/src/state.rs @@ -18,11 +18,11 @@ use aisix_cache::{Cache, MemoryCache}; use aisix_core::models::CacheBackend; use aisix_core::snapshot::SnapshotHandle; use aisix_core::{AisixSnapshot, ProxyConfig}; -use dashmap::DashSet; use aisix_gateway::Hub; use aisix_guardrails::LiveGuardrailIndex; use aisix_obs::{Metrics, OtlpHttpFanOut, UsageSink}; use aisix_ratelimit::Limiter; +use dashmap::DashSet; use std::sync::Arc; use crate::budget::BudgetClient; diff --git a/docs/configuration/bootstrap-config.md b/docs/configuration/bootstrap-config.md index ffb1e8ec..29390bff 100644 --- a/docs/configuration/bootstrap-config.md +++ b/docs/configuration/bootstrap-config.md @@ -192,18 +192,20 @@ Bootstrap observability settings are process-wide. They are different from dynam ## `cache` -Use `cache` to choose the bootstrap cache backend. +Use `cache` to declare which cache backends the process builds at startup. + +The in-process memory cache is always built. The shared Redis cache is built only when the `redis` block is present. Which backend serves a given request is selected by the matched cache policy's `backend` field (a dynamic resource) — not by this section. Important fields: | Field | Description | Default | | --- | --- | --- | -| `backend` | which cache backend the process uses (`memory` or `redis`) | `memory` | -| `redis` | Redis connection block (`url`, optional `mode`); only consulted when `backend: redis` | none | +| `backend` | legacy knob — no longer selects a single global cache; `redis` without a `redis` block is rejected at boot | `memory` | +| `redis` | Redis connection block (`url`, optional `mode`); when present, the process builds the shared Redis cache | none | -`memory` is the default path. `redis` has runtime backend selection and connection logic, but the broader cache docs and support boundaries are still being expanded. +A cache policy that requests `backend: redis` on a process without `cache.redis` gets no caching for its requests (`cache_status = disabled`, one warning per policy in the gateway log) — never a silent fallback to memory. -Use bootstrap cache settings to decide whether the process has a cache backend available at all. Use dynamic cache policies to decide which requests actually participate in caching. +Use bootstrap cache settings to decide which cache backends the process has available. Use dynamic cache policies to decide which requests actually participate in caching, and on which backend. ## `managed` diff --git a/docs/configuration/caching.md b/docs/configuration/caching.md index 6d74faa3..4d62e865 100644 --- a/docs/configuration/caching.md +++ b/docs/configuration/caching.md @@ -76,13 +76,12 @@ Current schema supports: - `memory` - `redis` -Current runtime boundary: +The proxy selects the cache instance per request from the matched policy's `backend` field: -- `memory` is the reliable default path -- bootstrap config can wire a Redis backend at process start -- the dynamic `CachePolicy.backend` field should still be treated conservatively because broader Redis support boundaries are still being expanded +- `memory` uses the in-process cache — always available +- `redis` uses the shared Redis cache — available only when the bootstrap config carries a `cache.redis` block -Note: the per-policy `backend` field is parsed and stored on the `CachePolicy` row but is not consulted by the runtime proxy — the proxy always uses the cache backend selected by bootstrap-config (`cache.backend`) regardless of what each policy specifies. The field is preserved for forward compatibility; do not depend on it to override the runtime backend. +A `redis` policy on a process without `cache.redis` gets no caching for its requests: responses carry no `x-aisix-cache` header, telemetry reports `cache_status = disabled`, and the gateway logs a warning once per policy. There is no silent fallback to the in-process memory cache — a memory stand-in would serve per-node answers while the policy claims shared-cache semantics. ## Operator Guidance @@ -94,10 +93,11 @@ Note: the per-policy `backend` field is parsed and stored on the `CachePolicy` r ### Responses never show `x-aisix-cache` -Check both sides: +Check all three: -- a bootstrap cache backend must be available - an enabled cache policy must match the request +- the matched policy's `backend` must be available in the process — `redis` requires `cache.redis` in the bootstrap config +- look for the `cache policy requests backend=redis but this DP has no redis cache configured` warning in the gateway log ### A policy matches too broadly diff --git a/docs/operations/production-deployment.md b/docs/operations/production-deployment.md index 3ebfe164..dabe365d 100644 --- a/docs/operations/production-deployment.md +++ b/docs/operations/production-deployment.md @@ -38,14 +38,11 @@ For most teams, a solid first production baseline is: ## Cache Backend Choice -Current bootstrap cache backends are: +The process always builds the in-process memory cache. Add a `cache.redis` block to also build the shared Redis cache. Which backend serves a request is selected by the matched cache policy's `backend` field — a policy that requests `redis` on a process without `cache.redis` gets no caching for its requests (no silent fallback to memory). -- `memory` -- `redis` +The legacy `cache.backend` knob no longer selects a single global cache; `backend: redis` without `cache.redis.url` still fails at startup so misconfigurations surface early. -`memory` is the simplest production baseline. If you select `redis`, the bootstrap config must include `cache.redis.url` or startup will fail. - -That makes `memory` the lowest-risk default for first rollout. +`memory`-backed policies remain the simplest production baseline, making them the lowest-risk default for first rollout. ## Managed Versus Standalone diff --git a/docs/roadmap.md b/docs/roadmap.md index 704139a3..3e260d13 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -45,11 +45,11 @@ Applies to: ### Redis-Backed Cache Policy Completion Current status: -- Cache policy schema supports `memory` and `redis` as backend hints. -- Current runtime behavior is still centered on `memory`, and Redis should not yet be treated as fully available customer behavior. +- The data plane enforces `CachePolicy.backend` per matched policy: `memory` uses the in-process cache; `redis` uses the shared Redis cache when the bootstrap config provides `cache.redis`, otherwise caching is disabled for matching requests (no silent memory fallback). +- Redis cluster/sentinel modes and broader support boundaries are still being expanded. Planned outcome: -- Clear, fully supported Redis-backed cache policy behavior. +- Clear, fully supported Redis-backed cache policy behavior across all Redis deployment modes. Applies to: - `AISIX AI Gateway` diff --git a/docs/tutorials/enable-response-caching.md b/docs/tutorials/enable-response-caching.md index 6a630d91..ba28af39 100644 --- a/docs/tutorials/enable-response-caching.md +++ b/docs/tutorials/enable-response-caching.md @@ -50,7 +50,7 @@ Field meanings (full reference in [Caching](../configuration/caching.md)): - `enabled: true` — the cache gate consults this policy on every request - `applies_to: "all"` — matches every request. For targeted rollouts use `model:` or `api_key:`. - `ttl_seconds: 3600` — cache entry lifetime hint. Defaults to `3600` if omitted. -- `backend: "memory"` is the default. The standalone gateway enforces `memory`; `redis` parses and persists but currently falls back to memory until the backend wires up. +- `backend: "memory"` is the default and uses the in-process cache. `redis` uses the shared Redis cache and requires `cache.redis` in the gateway's bootstrap config — without it, requests matching the policy get no caching at all (no silent fallback to memory). Wait for the snapshot to propagate: diff --git a/schemas/resources/cache_policy.schema.json b/schemas/resources/cache_policy.schema.json index 4634f06e..b35aa4cf 100644 --- a/schemas/resources/cache_policy.schema.json +++ b/schemas/resources/cache_policy.schema.json @@ -13,7 +13,7 @@ "type": "string" }, "backend": { - "description": "Backend hint. `memory` is the only enforced backend today; `redis` parses + persists but the DP currently falls back to memory until that backend wires up.", + "description": "Which cache instance serves requests matched by this policy. `memory` always works; `redis` requires the DP to have `cache.redis` configured — otherwise matching requests get no caching at all (visible as `cache_status = disabled`).", "default": "memory", "allOf": [ { @@ -40,7 +40,7 @@ }, "definitions": { "CacheBackend": { - "description": "Cache backend choice. `Memory` is enforced by the DP today; `Redis` is the kine-level wire-shape stub for the upcoming shared-cluster backend (DP enforcement pending).", + "description": "Cache backend choice. The DP selects the cache instance per matched policy: `Memory` uses the in-process cache (always available); `Redis` uses the shared redis cache iff the deployment configured `cache.redis`. A `Redis` policy on a DP without redis gets NO caching (`cache_status = disabled`) — never a silent fallback to node-local memory.", "type": "string", "enum": [ "memory", diff --git a/tests/e2e/src/cases/cache-policy-backend-e2e.test.ts b/tests/e2e/src/cases/cache-policy-backend-e2e.test.ts new file mode 100644 index 00000000..17740bd0 --- /dev/null +++ b/tests/e2e/src/cases/cache-policy-backend-e2e.test.ts @@ -0,0 +1,261 @@ +import { createHash, randomUUID } from "node:crypto"; +import { connect } from "node:net"; +import { afterAll, beforeAll, describe, expect, test } from "vitest"; +import { + AdminClient, + EtcdClient, + spawnApp, + startOpenAiUpstream, + waitConfigPropagation, + type OpenAiUpstream, + type SpawnedApp, +} from "../harness/index.js"; + +// E2E: per-policy cache backend dispatch (api7/AISIX-Cloud#519 B.8). +// +// `CachePolicy.backend` selects which cache instance serves a request: +// - `memory` → the in-process cache, always available; +// - `redis` → the shared redis cache, available iff the bootstrap +// config carries `cache.redis`. +// +// A `redis` policy on a memory-only DP must DISABLE caching for its +// matching requests — every identical call pays the upstream and no +// `x-aisix-cache` header is emitted. The pre-fix behavior silently +// fell back to the node-local memory cache, which would serve the +// second call from cache while the policy claims shared semantics. + +const CALLER_PLAINTEXT = "sk-cache-backend-e2e-caller"; +const CALLER_KEY_HASH = createHash("sha256") + .update(CALLER_PLAINTEXT) + .digest("hex"); + +const REDIS_URL = + process.env.AISIX_E2E_REDIS ?? "redis://127.0.0.1:6379"; + +/** RESP-level PING so the redis-positive suite can skip honestly when + * no redis is reachable (CI provisions redis:7-alpine on :6379). */ +async function redisPing(url: string): Promise { + const m = /^redis:\/\/(?:[^@/]*@)?([^:/]+)(?::(\d+))?/.exec(url); + if (!m) return false; + const host = m[1]; + const port = m[2] ? Number(m[2]) : 6379; + return new Promise((resolve) => { + const sock = connect({ host, port }, () => sock.write("PING\r\n")); + const done = (ok: boolean) => { + sock.destroy(); + resolve(ok); + }; + sock.once("data", (buf) => done(buf.toString().startsWith("+PONG"))); + sock.once("error", () => done(false)); + sock.setTimeout(1000, () => done(false)); + }); +} + +interface SeededApp { + app: SpawnedApp; + admin: AdminClient; +} + +/** + * Seed one DP with: + * - a model `` plus a `redis`-backend policy scoped to it + * (the subject under test), and + * - a canary model `` plus a `memory`-backend policy + * scoped to it, created AFTER the redis policy. + * + * etcd delivers watch events in revision order, so once the canary + * policy is observable (its responses carry `x-aisix-cache`), the + * earlier redis policy is in the snapshot too. Without that positive + * signal, the "no caching happened" assertions below could pass + * vacuously while the policy simply hadn't propagated yet. + */ +async function seedApp( + app: SpawnedApp, + upstreamBase: string, + modelAlias: string, + canaryAlias: string, +): Promise { + const admin = new AdminClient(app.adminUrl, app.adminKey); + const pk = await admin.createProviderKey({ + display_name: `${modelAlias}-pk`, + secret: "sk-mock", + api_base: `${upstreamBase}/v1`, + }); + for (const alias of [modelAlias, canaryAlias]) { + await admin.createModel({ + display_name: alias, + provider: "openai", + model_name: "gpt-4o-mini", + provider_key_id: pk.id, + }); + } + await admin.createApiKey({ + key_hash: CALLER_KEY_HASH, + allowed_models: [modelAlias, canaryAlias], + }); + // Order matters: redis policy FIRST, canary memory policy SECOND. + await admin.json("POST", "/admin/v1/cache_policies", { + name: `${modelAlias}-redis-policy`, + enabled: true, + backend: "redis", + applies_to: `model:${modelAlias}`, + }); + await admin.json("POST", "/admin/v1/cache_policies", { + name: `${canaryAlias}-canary-policy`, + enabled: true, + backend: "memory", + applies_to: `model:${canaryAlias}`, + }); + return { app, admin }; +} + +function chatRequest( + proxyUrl: string, + model: string, + prompt: string, +): Promise { + return fetch(`${proxyUrl}/v1/chat/completions`, { + method: "POST", + headers: { + authorization: `Bearer ${CALLER_PLAINTEXT}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + model, + messages: [{ role: "user", content: prompt }], + }), + }); +} + +/** Wait until the canary policy (created after the redis policy) is + * live: a canary chat carries `x-aisix-cache` (miss or hit). */ +async function waitCanaryPolicyLive( + proxyUrl: string, + canaryAlias: string, +): Promise { + await waitConfigPropagation(async () => { + try { + const resp = await chatRequest(proxyUrl, canaryAlias, "canary-probe"); + await resp.text(); + return resp.status === 200 && resp.headers.get("x-aisix-cache") !== null; + } catch { + return false; + } + }); +} + +describe("cache policy backend=redis on a memory-only DP disables caching", () => { + let app: SpawnedApp | undefined; + let upstream: OpenAiUpstream | undefined; + let etcdReachable = false; + + beforeAll(async () => { + etcdReachable = await new EtcdClient().ping(); + if (!etcdReachable) return; + + upstream = await startOpenAiUpstream(); + // Default harness config: `cache.backend = memory`, no + // `cache.redis` block — a memory-only DP. + app = await spawnApp(); + await seedApp(app, upstream.baseUrl, "cache-redis-only", "cache-canary"); + await waitCanaryPolicyLive(app.proxyUrl, "cache-canary"); + }); + + afterAll(async () => { + await app?.exit(); + await upstream?.close(); + }); + + test("identical requests ALL reach the upstream; no x-aisix-cache header", async (ctx) => { + if (!etcdReachable || !app || !upstream) { + ctx.skip(); + return; + } + + const baseline = upstream.receivedRequests.length; + const prompt = `redis-unavailable ${randomUUID()}`; + + const first = await chatRequest(app.proxyUrl, "cache-redis-only", prompt); + expect(first.status).toBe(200); + expect(first.headers.get("x-aisix-cache")).toBeNull(); + await first.text(); + expect(upstream.receivedRequests.length).toBe(baseline + 1); + + // Pre-fix, this second identical call was served from the + // node-local memory cache (`x-aisix-cache: hit`, upstream count + // unchanged). With per-policy dispatch it must pay the upstream. + const second = await chatRequest(app.proxyUrl, "cache-redis-only", prompt); + expect(second.status).toBe(200); + expect(second.headers.get("x-aisix-cache")).toBeNull(); + await second.text(); + expect(upstream.receivedRequests.length).toBe(baseline + 2); + }); +}); + +describe("cache policy backend=redis with a configured redis is shared across DPs", () => { + let appA: SpawnedApp | undefined; + let appB: SpawnedApp | undefined; + let upstream: OpenAiUpstream | undefined; + let infraReady = false; + + beforeAll(async () => { + infraReady = + (await new EtcdClient().ping()) && (await redisPing(REDIS_URL)); + if (!infraReady) return; + + upstream = await startOpenAiUpstream(); + const redisExtra = { + cache: { backend: "memory", redis: { url: REDIS_URL } }, + }; + // Two DP instances sharing one redis. A hit on the instance that + // never served the original request proves the entry really lives + // in redis — an (incorrect) memory-cache write could only produce + // hits on the same instance. + appA = await spawnApp({ extra: redisExtra }); + appB = await spawnApp({ extra: redisExtra }); + await seedApp(appA, upstream.baseUrl, "cache-redis-shared", "canary-a"); + await seedApp(appB, upstream.baseUrl, "cache-redis-shared", "canary-b"); + await waitCanaryPolicyLive(appA.proxyUrl, "canary-a"); + await waitCanaryPolicyLive(appB.proxyUrl, "canary-b"); + }); + + afterAll(async () => { + await appA?.exit(); + await appB?.exit(); + await upstream?.close(); + }); + + test("miss on DP A, hit on DP B without re-hitting the upstream", async (ctx) => { + if (!infraReady || !appA || !appB || !upstream) { + ctx.skip(); + return; + } + + const baseline = upstream.receivedRequests.length; + // Unique per run — redis outlives the test, identical prompts + // from a previous run would already be cached. + const prompt = `redis-shared ${randomUUID()}`; + + const first = await chatRequest(appA.proxyUrl, "cache-redis-shared", prompt); + expect(first.status).toBe(200); + expect(first.headers.get("x-aisix-cache")).toBe("miss"); + const firstBody = (await first.json()) as { + choices: Array<{ message: { content: string } }>; + }; + expect(upstream.receivedRequests.length).toBe(baseline + 1); + + const second = await chatRequest(appB.proxyUrl, "cache-redis-shared", prompt); + expect(second.status).toBe(200); + expect(second.headers.get("x-aisix-cache")).toBe("hit"); + const secondBody = (await second.json()) as { + choices: Array<{ message: { content: string } }>; + }; + expect(upstream.receivedRequests.length).toBe(baseline + 1); + + // The replay must be byte-equivalent content — DP B never talked + // to the upstream for this fingerprint. + expect(secondBody.choices[0]?.message.content).toBe( + firstBody.choices[0]?.message.content, + ); + }); +}); From bc10a686741716005f93e8b0a9620fb2e3507652 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Thu, 11 Jun 2026 16:05:44 +0800 Subject: [PATCH 3/3] review: keep credentials-bearing redis URL out of the connect error --- crates/aisix-server/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/aisix-server/src/main.rs b/crates/aisix-server/src/main.rs index 1d01e440..62f30ba2 100644 --- a/crates/aisix-server/src/main.rs +++ b/crates/aisix-server/src/main.rs @@ -393,7 +393,10 @@ async fn run(mut cfg: Config) -> anyhow::Result<()> { Some(redis_cfg) => { tracing::info!(target: "aisix::cache", backend = "redis", "connecting cache backend"); let redis = RedisCache::connect(&redis_cfg.url).await.map_err(|e| { - anyhow::anyhow!("redis cache connect failed (url={}): {e}", redis_cfg.url) + // Deliberately no URL in the message: redis URLs carry + // credentials (redis://user:pass@host) and this error + // lands in logs that may ship to centralized sinks. + anyhow::anyhow!("redis cache connect failed (cache.redis.url): {e}") })?; Some(Arc::new(redis) as Arc) }