Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,21 @@ observability:
# managed:
# enabled: true

# Cache backend availability. The in-process memory cache is always
# built; the shared redis cache is built iff `redis` is configured.
# Which backend serves a request is chosen per matched CachePolicy
# (its `backend` field, managed via the Admin API / control plane) —
# a policy asking for redis on a DP without `cache.redis` gets NO
# caching for its requests (cache_status = disabled), never a silent
# fallback to node-local memory.
cache:
backend: "memory" # memory | redis | qdrant
# Legacy knob — no longer selects a single global cache. Kept for
# config compatibility; `backend: "redis"` still requires the
# `redis` block below (validated at boot).
backend: "memory" # memory | redis
# redis:
# url: "redis://127.0.0.1:6379"
# mode: "single" # single | cluster | sentinel
# qdrant:
# url: "http://127.0.0.1:6333"
# collection: "aisix-semantic-cache"

# Models, API keys, provider keys, guardrails, cache policies, and
# observability exporters are NOT defined in this file. They are stored
Expand Down
10 changes: 7 additions & 3 deletions crates/aisix-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
//! and stores the response with `x-aisix-cache: miss`.
//!
//! Backends:
//! - [`MemoryCache`] (moka, in-process) — default, configured by
//! `cfg.cache.backend = "memory"`.
//! - Redis backend lands in a follow-up PR behind the `redis` feature.
//! - [`MemoryCache`] (moka, in-process) — always available.
//! - `RedisCache` (behind the `redis` feature) — built when the boot
//! config carries `cache.redis`.
//!
//! The proxy picks the backend per request from the matched
//! `CachePolicy.backend` (see `aisix-proxy::state::CacheBackends`);
//! the boot config only determines which instances exist.
//!
//! Streaming responses aren't cached at this layer — the upstream stream
//! has no terminal value to store.
Expand Down
11 changes: 11 additions & 0 deletions crates/aisix-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ impl Default for OtlpTracingConfig {
}
}

/// Boot-level cache backend availability (#519 B.8).
///
/// The in-process memory cache is always built; the redis cache is
/// built iff `redis` is set. Which instance serves a given request is
/// selected by the matched `CachePolicy.backend` (etcd-managed, per
/// policy) — NOT by this struct.
///
/// `backend` is a legacy knob kept parsing for config compatibility:
/// it no longer selects "the one global cache". Its only remaining
/// effect is fail-fast validation — `backend = "redis"` without a
/// `redis` block is rejected at boot.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct CacheConfig {
Expand Down
16 changes: 10 additions & 6 deletions crates/aisix-core/src/models/cache_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ use serde::{Deserialize, Serialize};

use crate::resource::Resource;

/// Cache backend choice. `Memory` is enforced by the DP today;
/// `Redis` is the kine-level wire-shape stub for the upcoming
/// shared-cluster backend (DP enforcement pending).
/// Cache backend choice. The DP selects the cache instance per
/// matched policy: `Memory` uses the in-process cache (always
/// available); `Redis` uses the shared redis cache iff the deployment
/// configured `cache.redis`. A `Redis` policy on a DP without redis
/// gets NO caching (`cache_status = disabled`) — never a silent
/// fallback to node-local memory.
#[derive(
Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
)]
Expand Down Expand Up @@ -47,9 +50,10 @@ pub struct CachePolicy {
#[serde(default = "default_enabled")]
pub enabled: bool,

/// Backend hint. `memory` is the only enforced backend today;
/// `redis` parses + persists but the DP currently falls back
/// to memory until that backend wires up.
/// Which cache instance serves requests matched by this policy.
/// `memory` always works; `redis` requires the DP to have
/// `cache.redis` configured — otherwise matching requests get no
/// caching at all (visible as `cache_status = disabled`).
#[serde(default)]
pub backend: CacheBackend,

Expand Down
60 changes: 36 additions & 24 deletions crates/aisix-proxy/src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! line. Errors surface via [`ProxyError`] which carries the right
//! status, error type, and (for rate-limits) Retry-After.

use aisix_cache::CacheKey;
use aisix_cache::{Cache, CacheKey};
use aisix_core::AppliedGuardrail;
use aisix_gateway::{BridgeContext, BridgeError, ChatFormat};
use aisix_guardrails::GuardrailVerdict;
Expand Down Expand Up @@ -1256,13 +1256,13 @@ async fn dispatch(
//
// Match order: first enabled policy whose `parsed_applies_to()`
// accepts (req.model, auth.entry.id) wins. We grab the WHOLE
// matching entry (not just `any`) so the post-call write below
// can use that policy's `ttl_seconds` via `put_with_ttl`. When
// multiple policies match the same request, the entry-table
// iteration order decides — that's an unspecified-but-stable
// tiebreak we'll formalise (probably "narrowest scope wins") in a
// follow-up if operators ever care.
let matched_policy_ttl = snapshot
// matching entry (not just `any`) so the backend selection and the
// post-call write below can use that policy's `backend` and
// `ttl_seconds`. When multiple policies match the same request,
// the entry-table iteration order decides — that's an
// unspecified-but-stable tiebreak we'll formalise (probably
// "narrowest scope wins") in a follow-up if operators ever care.
let matched_policy = snapshot
.cache_policies
.entries()
.iter()
Expand All @@ -1273,31 +1273,41 @@ async fn dispatch(
.parsed_applies_to()
.matches(&req.model, &auth.entry.id)
})
.cloned();

// #519 B.8: the matched policy's `backend` selects the cache
// instance. `memory` always resolves; `redis` resolves only when
// the deployment configured `cache.redis` — otherwise caching is
// INACTIVE for this request (`cache_status = disabled`, warn once
// per policy inside `for_policy_backend`). Never fall back to
// node-local memory: the operator asked for a shared cache, and a
// silent memory stand-in would serve per-node answers while the
// dashboard claims redis semantics.
let policy_cache: Option<Arc<dyn Cache>> = match (matched_policy.as_ref(), state.cache.as_ref())
{
(Some(entry), Some(backends)) => backends
.for_policy_backend(entry.value.backend, &entry.id, &entry.value.name)
.cloned(),
_ => None,
};
let matched_policy_ttl = policy_cache
.as_ref()
.and(matched_policy.as_ref())
.map(|entry| Duration::from_secs(u64::from(entry.value.ttl_seconds)));
let cache_active_by_policy = matched_policy_ttl.is_some();

// Cache lookup keyed on the *virtual* model name so a re-request
// hits the cache regardless of which target served the original.
// Even with `cache_active_by_policy = false` we still build the
// key to keep the cache_status path uniform — `disabled` is the
// outcome when the gate is closed, but the request itself is
// shaped the same way.
let cache_key = state
.cache
let cache_key = policy_cache
.as_ref()
.map(|_| CacheKey::from_request(req).fingerprint());

let cache_status = if cache_active_by_policy && state.cache.is_some() {
let cache_status = if policy_cache.is_some() {
CacheStatus::Miss
} else {
CacheStatus::Disabled
};

if let (true, Some(cache), Some(key)) = (
cache_active_by_policy,
state.cache.as_ref(),
cache_key.as_ref(),
) {
if let (Some(cache), Some(key)) = (policy_cache.as_ref(), cache_key.as_ref()) {
match cache.get(key).await {
Ok(Some(cached)) => {
reservation.commit_tokens(0);
Expand Down Expand Up @@ -1693,9 +1703,11 @@ async fn dispatch(
// not the cache backend's global fallback. Backends without
// per-entry support (defined via `Cache::put_with_ttl`'s default
// impl) silently fall back to `put`.
if let (Some(ttl), Some(cache), Some(key)) =
(matched_policy_ttl, state.cache.as_ref(), cache_key.as_ref())
{
if let (Some(ttl), Some(cache), Some(key)) = (
matched_policy_ttl,
policy_cache.as_ref(),
cache_key.as_ref(),
) {
if let Err(err) = cache.put_with_ttl(key, upstream.clone(), ttl).await {
tracing::warn!(error = %err, key = %key, "cache write failed");
}
Expand Down
168 changes: 167 additions & 1 deletion crates/aisix-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub use error::{ErrorEnvelope, ProxyError};
pub use health::{
HealthTracker, LivezState, ModelRuntimeStatusTracker, RuntimeStatus, RuntimeStatusSnapshot,
};
pub use state::ProxyState;
pub use state::{CacheBackends, ProxyState};

use axum::extract::State;
use axum::http::{header, HeaderValue, Request};
Expand Down Expand Up @@ -534,6 +534,15 @@ mod tests {
.insert(ResourceEntry::new(format!("cp-id-{name}"), policy, 1));
}

/// Policy seeder with an explicit `backend` — used by the #519
/// B.8 tests that pin per-policy backend dispatch.
fn seed_cache_policy_with_backend(snap: &AisixSnapshot, name: &str, backend: &str) {
let cfg = format!(r#"{{"name": "{name}", "backend": "{backend}", "applies_to": "all"}}"#);
let policy: aisix_core::models::CachePolicy = serde_json::from_str(&cfg).unwrap();
snap.cache_policies
.insert(ResourceEntry::new(format!("cp-id-{name}"), policy, 1));
}

fn seed_snapshot_with_limits(
model: &str,
allowed: &[&str],
Expand Down Expand Up @@ -3070,6 +3079,163 @@ data: [DONE]\n\n";
}
}

/// #519 B.8: a `backend: "redis"` policy on a DP without a redis
/// cache must DISABLE caching for matching requests — both
/// identical calls reach the upstream, neither carries an
/// `x-aisix-cache` header, and telemetry reports
/// `cache_status = "disabled"`. The pre-fix behavior (silent
/// fallback to the node-local memory cache) would serve the
/// second call from cache and fail wiremock's `.expect(2)`.
#[tokio::test]
async fn redis_backend_policy_without_redis_disables_caching() {
use aisix_obs::UsageSink;

let upstream = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/chat/completions"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": "cmpl-up",
"model": "gpt-4o",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "fresh"},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}
})))
.expect(2) // hard expectation: BOTH calls must pay the upstream
.mount(&upstream)
.await;

let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let hub = Arc::new(Hub::new());
hub.register_specialized("openai", Arc::new(openai_test_bridge()));
let snap = seed_snapshot("my-gpt4", &["my-gpt4"], &upstream.uri());
seed_cache_policy_with_backend(&snap, "redis-cache", "redis");
// Default test state ships a memory cache but NO redis
// instance — exactly the deployment the policy mismatches.
let state = build_state_with_cache(snap, hub).with_usage_sink(UsageSink::new(tx));

let body = serde_json::json!({
"model": "my-gpt4",
"messages": [{"role": "user", "content": "hi"}]
});
let make_req = || {
Request::builder()
.method("POST")
.uri("/v1/chat/completions")
.header("authorization", "Bearer sk-caller")
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap()
};

for _ in 0..2 {
let resp = run(build_router(state.clone()), make_req()).await;
assert_eq!(resp.status(), StatusCode::OK);
assert!(
resp.headers().get("x-aisix-cache").is_none(),
"redis policy without a redis backend must not emit x-aisix-cache",
);
let event = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("usage event was never emitted")
.expect("sender dropped");
assert_eq!(
event.cache_status, "disabled",
"unavailable backend must surface as cache_status=disabled",
);
}
}

/// #519 B.8 positive path: when the DP HAS a redis instance, a
/// `backend: "redis"` policy must dispatch to it — not to the
/// memory instance. A second MemoryCache stands in for redis
/// (instance dispatch is under test, not the redis wire
/// protocol): the second identical call is a cache hit, the
/// entry lives in the redis instance, and the memory instance
/// never saw the key.
#[tokio::test]
async fn redis_backend_policy_dispatches_to_redis_instance() {
use aisix_cache::{Cache, CacheKey, MemoryCache};

let upstream = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/chat/completions"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": "cmpl-up",
"model": "gpt-4o",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "via-redis"},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}
})))
.expect(1) // second call must be served from the redis instance
.mount(&upstream)
.await;

let hub = Arc::new(Hub::new());
hub.register_specialized("openai", Arc::new(openai_test_bridge()));
let snap = seed_snapshot("my-gpt4", &["my-gpt4"], &upstream.uri());
seed_cache_policy_with_backend(&snap, "redis-cache", "redis");

let memory: Arc<dyn Cache> = Arc::new(MemoryCache::with_defaults());
let redis_standin: Arc<dyn Cache> = Arc::new(MemoryCache::with_defaults());
let mut state = build_state_with_cache(snap, hub);
state.cache = Some(CacheBackends::new(
memory.clone(),
Some(redis_standin.clone()),
));

let body = serde_json::json!({
"model": "my-gpt4",
"messages": [{"role": "user", "content": "hi"}]
});
let make_req = || {
Request::builder()
.method("POST")
.uri("/v1/chat/completions")
.header("authorization", "Bearer sk-caller")
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap()
};

let resp = run(build_router(state.clone()), make_req()).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get("x-aisix-cache")
.and_then(|v| v.to_str().ok()),
Some("miss"),
);

let resp = run(build_router(state), make_req()).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get("x-aisix-cache")
.and_then(|v| v.to_str().ok()),
Some("hit"),
);

// The entry must live in the redis instance and ONLY there —
// a dispatch bug that wrote to the memory instance would
// still produce a "hit" above, so pin the instance directly.
let req: aisix_gateway::ChatFormat = serde_json::from_value(body).unwrap();
let key = CacheKey::from_request(&req).fingerprint();
assert!(
redis_standin.get(&key).await.unwrap().is_some(),
"cache entry must be written to the policy's redis backend",
);
assert!(
memory.get(&key).await.unwrap().is_none(),
"memory instance must not be touched by a redis-backend policy",
);
}

#[tokio::test]
async fn applies_to_model_caches_matched_model() {
// Counterpart to the negative test above: when the policy
Expand Down
Loading
Loading