From 4194e012e6fc11facdba045d178635c663df1e3b Mon Sep 17 00:00:00 2001 From: Zhiwei Liang Date: Fri, 5 Jun 2026 17:24:03 -0400 Subject: [PATCH 1/3] Azure SDK migration for kv store Signed-off-by: Zhiwei Liang --- Cargo.lock | 517 ++++++++++++++++++++++++---- crates/key-value-azure/Cargo.toml | 12 +- crates/key-value-azure/src/lib.rs | 17 +- crates/key-value-azure/src/store.rs | 513 +++++++++++---------------- 4 files changed, 667 insertions(+), 392 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85ffa8c547..a67450dd39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,7 +40,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", "cipher", - "cpufeatures", + "cpufeatures 0.2.15", ] [[package]] @@ -547,6 +547,28 @@ dependencies = [ "zeroize", ] +[[package]] +name = "aws-lc-rs" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a2f9779ce85b93ab6170dd940ad0169b5766ff848247aff13bb788b832fe3f4" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "aws-runtime" version = "1.4.3" @@ -903,50 +925,95 @@ dependencies = [ [[package]] name = "azure_core" -version = "0.21.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +checksum = "fe6a26a7d374b440015cbbcbf2d9d8be5a133aa940599f5e5dc569504baa262e" dependencies = [ + "async-lock", "async-trait", - "base64 0.22.1", + "azure_core_macros", "bytes", - "dyn-clone", "futures", - "getrandom 0.2.15", "hmac", - "http-types", - "once_cell", - "paste", "pin-project", - "rand 0.8.5", - "reqwest 0.12.9", "rustc_version", "serde", "serde_json", "sha2", - "time", + "tokio", + "tracing", + "typespec", + "typespec_client_core", +] + +[[package]] +name = "azure_core_macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b52dba6a345f3ad2d42ff8d0d63df9d0994cfa29657bf18ffdbf149f78a4f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", "tracing", - "url", - "uuid", ] [[package]] name = "azure_data_cosmos" -version = "0.21.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa5603f2de38c21165a1b5dfed94d64b1ab265526b0686e8557c907a53a0ee2" +checksum = "e4abdf6256d507568b79a91278e71414af8c9e27293ac402983d6da4e88a9c54" dependencies = [ + "async-lock", "async-trait", - "azure_core 0.21.0", + "azure_core 1.0.0", + "azure_data_cosmos_driver", + "base64 0.22.1", + "futures", + "pin-project", + "reqwest 0.13.4", + "serde", + "serde_json", + "tracing", + "url", +] + +[[package]] +name = "azure_data_cosmos_driver" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d93b7fae81a83326ad44b73dbd70a6ade7ab9b6d4aff05133f68029563ad9d5" +dependencies = [ + "arc-swap", + "async-lock", + "async-trait", + "azure_core 1.0.0", + "azure_data_cosmos_macros", + "backtrace", + "base64 0.22.1", "bytes", + "crossbeam-epoch", "futures", + "h2 0.4.14", + "reqwest 0.13.4", "serde", "serde_json", - "thiserror 1.0.69", - "time", + "tokio", "tracing", "url", "uuid", + "windows 0.62.2", +] + +[[package]] +name = "azure_data_cosmos_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d950637b4229226b5d4a41d71d777b8193ebe86abcbf57e564d5a4bb95e7562" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -971,23 +1038,20 @@ dependencies = [ [[package]] name = "azure_identity" -version = "0.21.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ddd80344317c40c04b603807b63a5cefa532f1b43522e72f480a988141f744" +checksum = "32edf96b356ca7c51d7590c4925cc36efc3947a5da4468e8e0b25c56ecbb3de5" dependencies = [ "async-lock", - "async-process", "async-trait", - "azure_core 0.21.0", + "azure_core 1.0.0", "futures", - "oauth2", "pin-project", "serde", + "serde_json", "time", "tracing", - "tz-rs", "url", - "uuid", ] [[package]] @@ -1552,6 +1616,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.38" @@ -1903,6 +1978,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "cranelift-assembler-x64" version = "0.131.0" @@ -2341,12 +2425,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.11" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", - "serde", + "serde_core", ] [[package]] @@ -3507,7 +3591,7 @@ dependencies = [ "libc", "log", "rustversion", - "windows", + "windows 0.61.3", ] [[package]] @@ -3552,9 +3636,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" dependencies = [ "cfg-if", + "js-sys", "libc", - "r-efi", + "r-efi 5.2.0", "wasi 0.14.2+wasi-0.2.4", + "wasm-bindgen", +] + +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.1", + "wasip2", + "wasip3", ] [[package]] @@ -3891,9 +3991,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.6" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +checksum = "171fefbc92fe4a4de27e0698d6a5b392d6a0e333506bc49133760b3bcf948733" dependencies = [ "atomic-waker", "bytes", @@ -4208,7 +4308,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2 0.4.6", + "h2 0.4.14", "http 1.3.1", "http-body 1.0.1", "httparse", @@ -5315,6 +5415,12 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "mach2" version = "0.4.2" @@ -5779,9 +5885,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "521739c6d2bac4aa25192232afe6841231376b2b26d4d9fae5ecf8ca5772e441" [[package]] name = "num-integer" @@ -6804,7 +6910,7 @@ dependencies = [ "nix 0.30.1", "tokio", "tracing", - "windows", + "windows 0.61.3", ] [[package]] @@ -7030,37 +7136,44 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.5" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", + "cfg_aliases", "pin-project-lite", "quinn-proto", "quinn-udp", "rustc-hash", "rustls 0.23.37", - "socket2 0.5.7", - "thiserror 1.0.69", + "socket2 0.6.0", + "thiserror 2.0.17", "tokio", "tracing", + "web-time", ] [[package]] name = "quinn-proto" -version = "0.11.8" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ + "aws-lc-rs", "bytes", - "rand 0.8.5", + "getrandom 0.3.2", + "lru-slab", + "rand 0.9.4", "ring", "rustc-hash", "rustls 0.23.37", + "rustls-pki-types", "slab", - "thiserror 1.0.69", + "thiserror 2.0.17", "tinyvec", "tracing", + "web-time", ] [[package]] @@ -7092,6 +7205,12 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "radium" version = "0.7.0" @@ -7142,6 +7261,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -7199,6 +7329,12 @@ dependencies = [ "getrandom 0.3.2", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_distr" version = "0.5.1" @@ -7431,7 +7567,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.4.6", + "h2 0.4.14", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7482,10 +7618,12 @@ dependencies = [ "bytes", "futures-core", "futures-util", + "h2 0.4.14", "http 1.3.1", "http-body 1.0.1", "http-body-util", "hyper 1.8.1", + "hyper-rustls 0.27.3", "hyper-tls", "hyper-util", "js-sys", @@ -7493,13 +7631,17 @@ dependencies = [ "native-tls", "percent-encoding", "pin-project-lite", + "quinn", + "rustls 0.23.37", "rustls-pki-types", + "rustls-platform-verifier", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", "tower-http", @@ -7810,6 +7952,7 @@ version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -7880,6 +8023,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ + "web-time", "zeroize", ] @@ -7937,6 +8081,7 @@ version = "0.103.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -8220,14 +8365,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", - "ryu", "serde", + "serde_core", + "zmij", ] [[package]] @@ -8351,7 +8497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.15", "digest", ] @@ -8378,7 +8524,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.15", "digest", ] @@ -9237,11 +9383,10 @@ version = "4.1.0-pre0" dependencies = [ "anyhow", "async-trait", - "azure_core 0.21.0", + "azure_core 1.0.0", "azure_data_cosmos", - "azure_identity 0.21.0", + "azure_identity 1.0.0", "futures", - "reqwest 0.12.9", "serde", "spin-factor-key-value", "tokio", @@ -10224,9 +10369,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.36" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", @@ -10235,22 +10380,22 @@ dependencies = [ "num-conv", "num_threads", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.18" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", @@ -10333,9 +10478,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.48.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", @@ -10557,7 +10702,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", - "h2 0.4.6", + "h2 0.4.14", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -10623,12 +10768,17 @@ version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ + "async-compression", "bitflags 2.10.0", "bytes", + "futures-core", "futures-util", "http 1.3.1", "http-body 1.0.1", + "http-body-util", "pin-project-lite", + "tokio", + "tokio-util", "tower 0.5.2", "tower-layer", "tower-service", @@ -10758,6 +10908,57 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "typespec" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21666a31293beab8f41d38c2849ddbc342cd9c7cb4d71a9818868287a8934e53" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "serde", + "serde_json", + "url", +] + +[[package]] +name = "typespec_client_core" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924f0c734e0ac3b881ab99d032bd28fcc969d2bb73ef1b8dd4772fd8e518a382" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "dyn-clone", + "futures", + "pin-project", + "rand 0.10.1", + "reqwest 0.13.4", + "serde", + "serde_json", + "time", + "tokio", + "tracing", + "typespec", + "typespec_macros", + "url", + "uuid", +] + +[[package]] +name = "typespec_macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c608f4427943f8adb211abc95c87672b1b98847152783507d54e3246e502f60" +dependencies = [ + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.117", +] + [[package]] name = "tz-rs" version = "0.6.14" @@ -10980,11 +11181,14 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.0" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.4.2", + "js-sys", + "rand 0.10.1", + "wasm-bindgen", ] [[package]] @@ -11301,6 +11505,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.1+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen 0.46.0", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + [[package]] name = "wasite" version = "0.1.0" @@ -12341,11 +12563,23 @@ version = "0.61.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" dependencies = [ - "windows-collections", + "windows-collections 0.2.0", "windows-core 0.61.2", - "windows-future", + "windows-future 0.2.1", "windows-link 0.1.3", - "windows-numerics", + "windows-numerics 0.2.0", +] + +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections 0.3.2", + "windows-core 0.62.2", + "windows-future 0.3.2", + "windows-numerics 0.3.1", ] [[package]] @@ -12357,6 +12591,15 @@ dependencies = [ "windows-core 0.61.2", ] +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core 0.62.2", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -12379,6 +12622,19 @@ dependencies = [ "windows-strings 0.4.2", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + [[package]] name = "windows-future" version = "0.2.1" @@ -12387,14 +12643,25 @@ checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ "windows-core 0.61.2", "windows-link 0.1.3", - "windows-threading", + "windows-threading 0.1.0", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core 0.62.2", + "windows-link 0.2.1", + "windows-threading 0.2.1", ] [[package]] name = "windows-implement" -version = "0.60.0" +version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", @@ -12403,9 +12670,9 @@ dependencies = [ [[package]] name = "windows-interface" -version = "0.59.1" +version = "0.59.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", @@ -12434,6 +12701,16 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core 0.62.2", + "windows-link 0.2.1", +] + [[package]] name = "windows-registry" version = "0.2.0" @@ -12463,6 +12740,15 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows-strings" version = "0.1.0" @@ -12482,6 +12768,15 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows-sys" version = "0.45.0" @@ -12608,6 +12903,15 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -12822,6 +13126,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck 0.5.0", + "wit-parser 0.244.0", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -12831,6 +13161,37 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck 0.5.0", + "indexmap 2.14.0", + "prettyplease", + "syn 2.0.117", + "wasm-metadata 0.244.0", + "wit-bindgen-core", + "wit-component 0.244.0", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.117", + "wit-bindgen-core", + "wit-bindgen-rust", +] + [[package]] name = "wit-component" version = "0.224.1" @@ -13261,6 +13622,12 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + [[package]] name = "zstd" version = "0.13.2" diff --git a/crates/key-value-azure/Cargo.toml b/crates/key-value-azure/Cargo.toml index 14fac53ad2..b08ab5d1f6 100644 --- a/crates/key-value-azure/Cargo.toml +++ b/crates/key-value-azure/Cargo.toml @@ -11,19 +11,13 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -azure_core = "0.21.0" -azure_data_cosmos = "0.21.0" -azure_identity = "0.21.0" +azure_core = { version = "1.0", default-features = false } +azure_data_cosmos = { version = "0.34", features = ["key_auth"] } +azure_identity = "1.0" futures = { workspace = true } -reqwest = { version = "0.12", default-features = false } serde = { workspace = true } spin-factor-key-value = { path = "../factor-key-value" } tokio = { workspace = true } [lints] workspace = true - -[features] -# Enables reusing connections to the Azure Cosmos DB service. -connection-pooling = [] -default = ["connection-pooling"] diff --git a/crates/key-value-azure/src/lib.rs b/crates/key-value-azure/src/lib.rs index 82195fc5ab..54d61bd9ce 100644 --- a/crates/key-value-azure/src/lib.rs +++ b/crates/key-value-azure/src/lib.rs @@ -1,5 +1,6 @@ mod store; +use azure_data_cosmos::Region; use serde::Deserialize; use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore; @@ -32,8 +33,15 @@ pub struct AzureCosmosKeyValueRuntimeConfig { /// The Azure Cosmos DB database. database: String, /// The Azure Cosmos DB container where data is stored. - /// The CosmosDB container must be created with the default partition key, /id + /// The CosmosDB container must be created with the default partition + /// key path, /id container: String, + + /// Optional. The Azure region the spin application is running in (or the + /// closest Azure region to it), used as the proximity-sorting anchor + /// for the Azure SDK's region selection. When omitted, defaults to + /// East US. + region: Option, } impl MakeKeyValueStore for AzureKeyValueStore { @@ -51,13 +59,18 @@ impl MakeKeyValueStore for AzureKeyValueStore { Some(key) => KeyValueAzureCosmosAuthOptions::RuntimeConfigValues( KeyValueAzureCosmosRuntimeConfigOptions::new(key), ), - None => KeyValueAzureCosmosAuthOptions::Environmental, + None => KeyValueAzureCosmosAuthOptions::DeveloperTools, }; + let region = runtime_config + .region + .map(Region::from) + .unwrap_or(Region::EAST_US); KeyValueAzureCosmos::new( runtime_config.account, runtime_config.database, runtime_config.container, auth_options, + region, self.app_id.clone(), ) } diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index bdcef81f9e..1f3296bb80 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -1,20 +1,29 @@ -use anyhow::{Context, Result}; +use anyhow::Result; use async_trait::async_trait; +use azure_core::credentials::{Secret, TokenCredential}; +use azure_data_cosmos::query::FeedScope; +use azure_data_cosmos::{AccountReference, PatchInstructions, PatchOperation}; use azure_data_cosmos::{ - CosmosEntity, - prelude::{ - AuthorizationToken, CollectionClient, CosmosClient, CosmosClientBuilder, Operation, Query, - }, + CosmosClient, Precondition, Query, Region, RoutingStrategy, clients::ContainerClient, + options::ItemWriteOptions, }; -use futures::StreamExt; +use futures::TryStreamExt; use serde::{Deserialize, Serialize}; use spin_factor_key_value::{ - Cas, Error, Store, StoreManager, SwapError, log_cas_error, log_error, log_error_v3, v3, + Cas, Error, Store, StoreManager, SwapError, log_error, log_error_v3, v3, }; use std::sync::{Arc, Mutex}; pub struct KeyValueAzureCosmos { - client: CollectionClient, + /// Parameters for initializing the Cosmos DB client + account: String, + database: String, + container: String, + auth_options: KeyValueAzureCosmosAuthOptions, + region: Region, + + /// The Cosmos DB client + client: tokio::sync::OnceCell, /// An optional app id /// /// If provided, the store will handle multiple stores per container using a @@ -40,37 +49,11 @@ impl KeyValueAzureCosmosRuntimeConfigOptions { pub enum KeyValueAzureCosmosAuthOptions { /// Runtime Config values indicates the account and key have been specified directly RuntimeConfigValues(KeyValueAzureCosmosRuntimeConfigOptions), - /// Environmental indicates that the environment variables of the process should be used to - /// create the TokenCredential for the Cosmos client. This will use the Azure Rust SDK's - /// DefaultCredentialChain to derive the TokenCredential based on what environment variables - /// have been set. + /// Uses DeveloperToolsCredential when the runtime config omits `key`. /// - /// Service Principal with client secret: - /// - `AZURE_TENANT_ID`: ID of the service principal's Azure tenant. - /// - `AZURE_CLIENT_ID`: the service principal's client ID. - /// - `AZURE_CLIENT_SECRET`: one of the service principal's secrets. - /// - /// Service Principal with certificate: - /// - `AZURE_TENANT_ID`: ID of the service principal's Azure tenant. - /// - `AZURE_CLIENT_ID`: the service principal's client ID. - /// - `AZURE_CLIENT_CERTIFICATE_PATH`: path to a PEM or PKCS12 certificate file including the private key. - /// - `AZURE_CLIENT_CERTIFICATE_PASSWORD`: (optional) password for the certificate file. - /// - /// Workload Identity (Kubernetes, injected by the Workload Identity mutating webhook): - /// - `AZURE_TENANT_ID`: ID of the service principal's Azure tenant. - /// - `AZURE_CLIENT_ID`: the service principal's client ID. - /// - `AZURE_FEDERATED_TOKEN_FILE`: TokenFilePath is the path of a file containing a Kubernetes service account token. - /// - /// Managed Identity (User Assigned or System Assigned identities): - /// - `AZURE_CLIENT_ID`: (optional) if using a user assigned identity, this will be the client ID of the identity. - /// - /// Azure CLI: - /// - `AZURE_TENANT_ID`: (optional) use a specific tenant via the Azure CLI. - /// - /// Common across each: - /// - `AZURE_AUTHORITY_HOST`: (optional) the host for the identity provider. For example, for Azure public cloud the host defaults to "https://login.microsoftonline.com". - /// See also: https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/identity/README.md - Environmental, + /// Athenticated via developer tools only: Azure CLI (`az login`), + /// then Azure Developer CLI (`azd auth login`). + DeveloperTools, } impl KeyValueAzureCosmos { @@ -79,45 +62,67 @@ impl KeyValueAzureCosmos { database: String, container: String, auth_options: KeyValueAzureCosmosAuthOptions, + region: Region, app_id: Option, ) -> Result { - let token = match auth_options { - KeyValueAzureCosmosAuthOptions::RuntimeConfigValues(config) => { - AuthorizationToken::primary_key(config.key).map_err(log_error)? - } - KeyValueAzureCosmosAuthOptions::Environmental => { - AuthorizationToken::from_token_credential( - azure_identity::create_default_credential()?, - ) - } - }; - let cosmos_client = cosmos_client(account, token)?; - let database_client = cosmos_client.database_client(database); - let client = database_client.collection_client(container); - - Ok(Self { client, app_id }) + Ok(Self { + account, + database, + container, + auth_options, + region, + client: tokio::sync::OnceCell::new(), + app_id, + }) } } -fn cosmos_client(account: impl Into, token: AuthorizationToken) -> Result { - if cfg!(feature = "connection-pooling") { - let client = reqwest::ClientBuilder::new() - .build() - .context("failed to build reqwest client")?; - let transport_options = azure_core::TransportOptions::new(std::sync::Arc::new(client)); - Ok(CosmosClientBuilder::new(account, token) - .transport(transport_options) - .build()) - } else { - Ok(CosmosClient::new(account, token)) - } +async fn build_cosmos_client( + account: &str, + auth_options: &KeyValueAzureCosmosAuthOptions, + region: &Region, +) -> Result { + let endpoint: azure_data_cosmos::AccountEndpoint = + format!("https://{account}.documents.azure.com/") + .parse() + .map_err(log_error)?; + + let account_ref = match auth_options { + KeyValueAzureCosmosAuthOptions::RuntimeConfigValues(config) => { + AccountReference::with_authentication_key(endpoint, Secret::from(config.key.clone())) + } + KeyValueAzureCosmosAuthOptions::DeveloperTools => { + let credential: Arc = + azure_identity::DeveloperToolsCredential::new(None).map_err(log_error)?; + AccountReference::with_credential(endpoint, credential) + } + }; + + let routing_strategy = RoutingStrategy::ProximityTo(region.clone()); + + CosmosClient::builder() + .build(account_ref, routing_strategy) + .await + .map_err(log_error) } #[async_trait] impl StoreManager for KeyValueAzureCosmos { async fn get(&self, name: &str) -> Result, Error> { + let client = self + .client + .get_or_try_init(|| async { + return build_cosmos_client(&self.account, &self.auth_options, &self.region) + .await? + .database_client(&self.database) + .container_client(&self.container) + .await + .map_err(log_error); + }) + .await? + .clone(); Ok(Arc::new(AzureCosmosStore { - client: self.client.clone(), + client, store_id: self.app_id.as_ref().map(|i| format!("{i}/{name}")), })) } @@ -127,17 +132,16 @@ impl StoreManager for KeyValueAzureCosmos { } fn summary(&self, _store_name: &str) -> Option { - let database = self.client.database_client().database_name(); - let collection = self.client.collection_name(); Some(format!( - "Azure CosmosDB database: {database}, collection: {collection}" + "Azure CosmosDB database: {}, container: {}", + self.database, self.container )) } } #[derive(Clone)] struct AzureCosmosStore { - client: CollectionClient, + client: ContainerClient, /// An optional store id to use as a partition key for all operations. /// /// If the store ID is not set, the store will use `/id` (the row key) as @@ -151,8 +155,10 @@ struct AzureCosmosStore { #[async_trait] impl Store for AzureCosmosStore { async fn get(&self, key: &str, max_result_bytes: usize) -> Result>, Error> { - let pair = self.get_entity::(key).await?; - let value = pair.map(|p| p.value); + let value = self + .query_one::(self.get_query(key)) + .await? + .map(|p| p.value); // Currently there's no way to stream a single query result using the // `azure_data_cosmos` crate without buffering, so the damage (in terms @@ -184,44 +190,57 @@ impl Store for AzureCosmosStore { value: value.to_vec(), store_id: self.store_id.clone(), }; + + let partition_key = partition_key(self.store_id.as_deref(), key); self.client - .create_document(pair) - .is_upsert(true) + .upsert_item(partition_key, key, pair, None) .await .map_err(log_error)?; Ok(()) } async fn delete(&self, key: &str) -> Result<(), Error> { - let document_client = self - .client - .document_client(key, &self.store_id.clone().unwrap_or(key.to_string())) - .map_err(log_error)?; - if let Err(e) = document_client.delete_document().await { - if e.as_http_error().map(|e| e.status() != 404).unwrap_or(true) { - return Err(log_error(e)); - } + let partition_key = partition_key(self.store_id.as_deref(), key); + + match self.client.delete_item(partition_key, key, None).await { + Ok(_) => Ok(()), + Err(e) if e.status().is_not_found() => Ok(()), + Err(e) => Err(log_error(e)), } - Ok(()) } async fn exists(&self, key: &str) -> Result { + Ok(self + .query_one::(self.get_id_query(key)) + .await? + .is_some()) + } + + async fn get_keys(&self, max_result_bytes: usize) -> Result, Error> { let mut stream = self .client - .query_documents(Query::new(self.get_id_query(key))) - .query_cross_partition(true) - .max_item_count(1) - .into_stream::(); - - match stream.next().await { - Some(Ok(res)) => Ok(!res.results.is_empty()), - Some(Err(e)) => Err(log_error(e)), - None => Ok(false), + .query_items::( + Query::from(self.get_keys_query()), + FeedScope::full_container(), + None, + ) + .await + .map_err(log_error)?; + + let mut result = Vec::new(); + let mut byte_count = std::mem::size_of::>(); + + while let Some(key) = stream.try_next().await.map_err(log_error)? { + byte_count += std::mem::size_of::() + key.id.len(); + if byte_count > max_result_bytes { + return Err(Error::Other(format!( + "query result exceeds limit of {max_result_bytes} bytes" + ))); + } + result.push(key.id); } - } - async fn get_keys(&self, max_result_bytes: usize) -> Result, Error> { - self.get_keys(max_result_bytes).await + Ok(result) } async fn get_keys_async( @@ -234,25 +253,25 @@ impl Store for AzureCosmosStore { let (keys_tx, keys_rx) = tokio::sync::mpsc::channel(4); let (err_tx, err_rx) = tokio::sync::oneshot::channel(); - let query = self - .client - .query_documents(Query::new(self.get_keys_query())) - .query_cross_partition(true); + let client = self.client.clone(); + let query = self.get_keys_query(); let the_work = async move { - let mut stream = query.into_stream::(); - while let Some(resp) = stream.next().await { - let resp = resp.map_err(log_error_v3)?; - - if resp.results.iter().map(|(k, _)| k.id.len()).sum::() > max_result_bytes { + let mut stream = client + .query_items::(Query::from(query), FeedScope::full_container(), None) + .await + .map_err(log_error_v3)?; + + let mut byte_count = std::mem::size_of::>(); + while let Some(key) = stream.try_next().await.map_err(log_error_v3)? { + byte_count += std::mem::size_of::() + key.id.len(); + if byte_count > max_result_bytes { return Err(v3::Error::Other(format!( "query exceeds limit of {max_result_bytes} bytes" ))); } - for (key, _) in resp.results { - keys_tx.send(key.id).await.map_err(log_error_v3)?; - } + keys_tx.send(key.id).await.map_err(log_error_v3)?; } Ok(()) }; @@ -269,36 +288,30 @@ impl Store for AzureCosmosStore { keys: Vec, max_result_bytes: usize, ) -> Result>)>, Error> { - let stmt = Query::new(self.get_in_query(keys)); - let query = self + let mut stream = self .client - .query_documents(stmt) - .query_cross_partition(true); + .query_items::( + Query::from(self.get_in_query(keys)), + FeedScope::full_container(), + None, + ) + .await + .map_err(log_error)?; - let mut res = Vec::new(); - let mut stream = query.into_stream::(); + let mut results = Vec::new(); let mut byte_count = std::mem::size_of::>)>>(); - while let Some(resp) = stream.next().await { - let resp = resp.map_err(log_error)?.results; - byte_count += resp - .iter() - .map(|(pair, _)| { - std::mem::size_of::<(String, Option>)>() - + pair.id.len() - + pair.value.len() - }) - .sum::(); + while let Some(pair) = stream.try_next().await.map_err(log_error)? { + byte_count += + std::mem::size_of::<(String, Option>)>() + pair.id.len() + pair.value.len(); + if byte_count > max_result_bytes { return Err(Error::Other(format!( "query result exceeds limit of {max_result_bytes} bytes" ))); } - res.extend( - resp.into_iter() - .map(|(pair, _)| (pair.id, Some(pair.value))), - ); + results.push((pair.id, Some(pair.value))) } - Ok(res) + Ok(results) } async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { @@ -320,53 +333,34 @@ impl Store for AzureCosmosStore { /// The initial value for the item must be set through this interface, as this sets the /// number value if it does not exist. If the value was previously set using /// the `set` interface, this will fail due to a type mismatch. - // TODO: The function should parse the new value from the return response - // rather than sending an additional new request. However, the current SDK - // version does not support this. async fn increment(&self, key: String, delta: i64) -> Result { - let operations = vec![Operation::incr("/value", delta).map_err(log_error)?]; + let patch = + PatchInstructions::default().with_operation(PatchOperation::increment("/value", delta)); + let partition_key = partition_key(self.store_id.as_deref(), &key); + match self .client - .document_client(&key, &self.store_id.clone().unwrap_or(key.to_string())) - .map_err(log_error)? - .patch_document(operations) + .patch_item(partition_key.clone(), &key, patch, None) .await { - Err(e) => { - if e.as_http_error() - .map(|e| e.status() == 404) - .unwrap_or(false) + Err(e) if e.status().is_not_found() => { + let counter = Counter { + id: key.clone(), + value: delta, + store_id: self.store_id.clone(), + }; + match self + .client + .create_item(partition_key, &key, counter, None) + .await { - let counter = Counter { - id: key.clone(), - value: delta, - store_id: self.store_id.clone(), - }; - if let Err(e) = self.client.create_document(counter).is_upsert(false).await { - if e.as_http_error() - .map(|e| e.status()) - .unwrap_or(azure_core::StatusCode::Continue) - == 409 - { - // Conflict trying to create counter, retry increment - self.increment(key, delta).await?; - } else { - return Err(log_error(e)); - } - } - Ok(delta) - } else { - Err(log_error(e)) + Ok(_) => Ok(delta), + Err(e) if e.status().is_conflict() => self.increment(key, delta).await, + Err(e) => Err(log_error(e)), } } - Ok(_) => self - .get_entity::(key.as_ref()) - .await? - .map(|c| c.value) - .ok_or(Error::Other( - "increment returned an empty value after patching, which indicates a bug" - .to_string(), - )), + Err(e) => Err(log_error(e)), + Ok(response) => Ok(response.into_model::().map_err(log_error)?.value), } } @@ -387,63 +381,33 @@ impl Store for AzureCosmosStore { struct CompareAndSwap { key: String, - client: CollectionClient, + client: ContainerClient, bucket_rep: u32, - etag: Mutex>, + etag: Mutex>, store_id: Option, } -impl CompareAndSwap { - fn get_query(&self) -> String { - let mut query = format!("SELECT * FROM c WHERE c.id='{}'", self.key); - self.append_store_id(&mut query, true); - query - } - - fn append_store_id(&self, query: &mut String, condition_already_exists: bool) { - append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists); - } -} - #[async_trait] impl Cas for CompareAndSwap { /// `current` will fetch the current value for the key and store the etag for the record. The /// etag will be used to perform and optimistic concurrency update using the `if-match` header. async fn current(&self, max_result_bytes: usize) -> Result>, Error> { - let mut stream = self - .client - .query_documents(Query::new(self.get_query())) - .query_cross_partition(true) - .max_item_count(1) - .into_stream::(); - - let current_value: Option<(Vec, Option)> = match stream.next().await { - Some(r) => { - let r = r.map_err(log_error)?; - match r.results.first() { - Some((item, Some(attr))) => { - Some((item.clone().value, Some(attr.etag().to_string()))) - } - Some((item, None)) => Some((item.clone().value, None)), - _ => None, - } - } - None => None, - }; + let partition_key = partition_key(self.store_id.as_deref(), &self.key); + let result = self.client.read_item(partition_key, &self.key, None).await; - let value = match current_value { - Some((value, etag)) => { - self.etag.lock().unwrap().clone_from(&etag); - Some(value) + let value = match result { + Ok(response) => { + *self.etag.lock().unwrap() = response.headers().etag().cloned(); + Some(response.into_model::().map_err(log_error)?.value) + } + Err(e) if e.status().is_not_found() => { + *self.etag.lock().unwrap() = None; + None } - None => None, + Err(e) => return Err(log_error(e)), }; - // Currently there's no way to stream a single query result using the - // `azure_data_cosmos` crate without buffering, so the damage (in terms - // of host memory usage) is already done, but we can still enforce the - // limit: - if std::mem::size_of::>>() + value.as_ref().map(|v| v.len()).unwrap_or(0) + if std::mem::size_of::>>() + value.as_ref().map(Vec::len).unwrap_or(0) > max_result_bytes { Err(Error::Other(format!( @@ -463,31 +427,27 @@ impl Cas for CompareAndSwap { store_id: self.store_id.clone(), }; - let doc_client = self - .client - .document_client(&self.key, &pair.partition_key()) - .map_err(log_cas_error)?; + let partition_key = partition_key(self.store_id.as_deref(), &self.key); + let etag = self.etag.lock().unwrap().clone(); - let etag_value = self.etag.lock().unwrap().clone(); - match etag_value { + let response = match etag { Some(etag) => { - // attempt to replace the document if the etag matches - doc_client - .replace_document(pair) - .if_match_condition(azure_core::request_options::IfMatchCondition::Match(etag)) + let opts = + ItemWriteOptions::default().with_precondition(Precondition::IfMatch(etag)); + self.client + .replace_item(partition_key, &self.key, &pair, Some(opts)) .await - .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) - .map(drop) } None => { - // if we have no etag, then we assume the document does not yet exist and must insert; no upserts. self.client - .create_document(pair) + .create_item(partition_key, &self.key, &pair, None) .await - .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) - .map(drop) } - } + }; + + response + .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) + .map(drop) } async fn bucket_rep(&self) -> u32 { @@ -500,69 +460,33 @@ impl Cas for CompareAndSwap { } impl AzureCosmosStore { - async fn get_entity(&self, key: &str) -> Result, Error> + async fn query_one(&self, query: String) -> Result, Error> where - F: CosmosEntity + Send + Sync + serde::de::DeserializeOwned + Clone, + T: serde::de::DeserializeOwned + Send + 'static, { - let query = self - .client - .query_documents(Query::new(self.get_query(key))) - .query_cross_partition(true) - .max_item_count(1); - - // There can be no duplicated keys, so we create the stream and only take the first result. - let mut stream = query.into_stream::(); - let Some(res) = stream.next().await else { - return Ok(None); - }; - Ok(res - .map_err(log_error)? - .results - .first() - .map(|(p, _)| p.clone())) - } - - async fn get_keys(&self, max_result_bytes: usize) -> Result, Error> { - let query = self + let mut stream = self .client - .query_documents(Query::new(self.get_keys_query())) - .query_cross_partition(true); - let mut res = Vec::new(); - - let mut stream = query.into_stream::(); - let mut byte_count = std::mem::size_of::>(); - while let Some(resp) = stream.next().await { - let resp = resp.map_err(log_error)?.results; - byte_count += resp - .iter() - .map(|(key, _)| std::mem::size_of::() + key.id.len()) - .sum::(); - if byte_count > max_result_bytes { - return Err(Error::Other(format!( - "query result exceeds limit of {max_result_bytes} bytes" - ))); - } - res.extend(resp.into_iter().map(|(key, _)| key.id)); - } - - Ok(res) + .query_items(Query::from(query), FeedScope::full_container(), None) + .await + .map_err(log_error)?; + stream.try_next().await.map_err(log_error) } fn get_query(&self, key: &str) -> String { let mut query = format!("SELECT * FROM c WHERE c.id='{key}'"); - self.append_store_id(&mut query, true); + append_store_id_condition(&mut query, self.store_id.as_deref(), true); query } fn get_id_query(&self, key: &str) -> String { let mut query = format!("SELECT c.id, c.store_id FROM c WHERE c.id='{key}'"); - self.append_store_id(&mut query, true); + append_store_id_condition(&mut query, self.store_id.as_deref(), true); query } fn get_keys_query(&self) -> String { let mut query = "SELECT c.id, c.store_id FROM c".to_owned(); - self.append_store_id(&mut query, false); + append_store_id_condition(&mut query, self.store_id.as_deref(), false); query } @@ -574,13 +498,15 @@ impl AzureCosmosStore { .join(", "); let mut query = format!("SELECT * FROM c WHERE c.id IN ({in_clause})"); - self.append_store_id(&mut query, true); + append_store_id_condition(&mut query, self.store_id.as_deref(), true); query } +} - fn append_store_id(&self, query: &mut String, condition_already_exists: bool) { - append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists); - } +fn partition_key(store_id: Option<&str>, key: &str) -> String { + store_id + .map(str::to_string) + .unwrap_or_else(|| key.to_string()) } /// Appends an option store id condition to the query. @@ -601,7 +527,7 @@ fn append_store_id_condition( } } -// Pair structure for key value operations +/// Pair structure for key value operations #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Pair { pub id: String, @@ -609,16 +535,7 @@ pub struct Pair { #[serde(skip_serializing_if = "Option::is_none")] pub store_id: Option, } - -impl CosmosEntity for Pair { - type Entity = String; - - fn partition_key(&self) -> Self::Entity { - self.store_id.clone().unwrap_or_else(|| self.id.clone()) - } -} - -// Counter structure for increment operations +/// Counter structure for increment operations #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Counter { pub id: String, @@ -627,26 +544,10 @@ pub struct Counter { pub store_id: Option, } -impl CosmosEntity for Counter { - type Entity = String; - - fn partition_key(&self) -> Self::Entity { - self.store_id.clone().unwrap_or_else(|| self.id.clone()) - } -} - -// Key structure for operations with generic value types +/// Key structure for operations with generic value types #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Key { pub id: String, #[serde(skip_serializing_if = "Option::is_none")] pub store_id: Option, } - -impl CosmosEntity for Key { - type Entity = String; - - fn partition_key(&self) -> Self::Entity { - self.store_id.clone().unwrap_or_else(|| self.id.clone()) - } -} From a305727241765a0027d9f33c74c9d78775200698 Mon Sep 17 00:00:00 2001 From: Zhiwei Liang Date: Fri, 5 Jun 2026 17:37:32 -0400 Subject: [PATCH 2/3] Azure auth migration Signed-off-by: Zhiwei Liang --- crates/key-value-azure/src/auth.rs | 146 ++++++++++++++++++++++++++++ crates/key-value-azure/src/lib.rs | 36 +++++-- crates/key-value-azure/src/store.rs | 33 +------ 3 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 crates/key-value-azure/src/auth.rs diff --git a/crates/key-value-azure/src/auth.rs b/crates/key-value-azure/src/auth.rs new file mode 100644 index 0000000000..7b3bc7e7a9 --- /dev/null +++ b/crates/key-value-azure/src/auth.rs @@ -0,0 +1,146 @@ +use anyhow::Result; +use azure_core::credentials::TokenCredential; +use std::sync::Arc; + +/// Azure Cosmos Key / Value runtime config literal options for authentication +#[derive(Clone, Debug)] +pub struct KeyValueAzureCosmosRuntimeConfigOptions { + pub(crate) key: String, +} + +impl KeyValueAzureCosmosRuntimeConfigOptions { + pub fn new(key: String) -> Self { + Self { key } + } +} + +/// Azure Cosmos Key / Value enumeration for the possible authentication options +#[derive(Clone, Debug)] +pub enum KeyValueAzureCosmosAuthOptions { + /// Runtime Config values indicates the account and key have been specified directly + RuntimeConfigValues(KeyValueAzureCosmosRuntimeConfigOptions), + /// An Azure AD token credential, used when the runtime config omits `key`. + /// + /// The specific credential is chosen by the operator via the `auth_type` + /// runtime-config field (defaulting to developer tools for local + /// development). There is deliberately no fallback *between* credential + /// types: `azure_identity` 1.0 removed `EnvironmentCredential` / + /// `DefaultAzureCredential` because silently trying a different identity + /// after one fails is a security footgun (Azure/azure-sdk-for-rust#2283). + /// This mirrors their recommended "specific credential" pattern. + AadCredential(AzureCredentialKind), +} + +/// The specific Azure AD credential to use when authenticating to Cosmos +/// without an account key. +/// +/// Each variant maps to exactly one `azure_identity` credential; the operator +/// names the one matching their deployment via the `auth_type` runtime-config +/// field. Modeled on `azure_identity`'s `specific_credential.rs` example. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub enum AzureCredentialKind { + /// Developer tools: Azure CLI (`az login`), then Azure Developer CLI + /// (`azd auth login`). Intended for local development; the default when + /// `auth_type` is omitted. + #[default] + DeveloperTools, + /// Managed identity (Azure VM, App Service, or AKS with managed identity). + /// + /// `client_id` optionally selects a *user-assigned* managed identity by its + /// client ID; when `None`, the SDK uses the *system-assigned* identity. + ManagedIdentity { client_id: Option }, + /// Workload identity (AKS federated token). + WorkloadIdentity, + /// Service principal authenticated with a client secret. Reads + /// `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET` from the + /// environment — the same variables the legacy SDK's `EnvironmentCredential` + /// used, so existing deployments keep working without config changes. + ServicePrincipal, +} + +impl AzureCredentialKind { + /// Parses the `auth_type` runtime-config value into a credential kind. + /// + /// `None` (the field omitted) defaults to [`AzureCredentialKind::DeveloperTools`], + /// intended for local development. An unrecognized value is an error rather + /// than a silent fallback. + /// + /// `client_id` selects a user-assigned managed identity by its client ID. It + /// only applies to `managed_identity`; with any other `auth_type` it is + /// ignored. + pub fn from_auth_type(auth_type: Option<&str>, client_id: Option) -> Result { + // Case-insensitive, but the value must be one of the canonical + // snake_case names: a non-canonical form (e.g. a space or hyphen + // separator) is rejected rather than silently normalized, so the + // accepted set matches exactly what the docs and the error below list. + // `client_id` is consumed only by the `managed_identity` arm; for any + // other auth type it is simply dropped. + match auth_type.map(|s| s.to_lowercase()).as_deref() { + None => Ok(Self::default()), + Some("developer_tools") => Ok(Self::DeveloperTools), + Some("managed_identity") => Ok(Self::ManagedIdentity { client_id }), + Some("workload_identity") => Ok(Self::WorkloadIdentity), + Some("service_principal") => Ok(Self::ServicePrincipal), + Some(other) => anyhow::bail!( + "unknown Azure Cosmos `auth_type` {other:?}; expected one of \ + \"managed_identity\", \"workload_identity\", \"service_principal\", \ + or \"developer_tools\" (or set `key` for account-key auth)" + ), + } + } + + /// Constructs the corresponding `azure_identity` token credential. + /// + /// This runs the credential's own setup (which may fail — e.g. if the + /// environment for workload identity or service principal is absent), so it + /// is called lazily when the Cosmos client is first built. + pub(crate) fn credential(&self) -> azure_core::Result> { + match self { + Self::DeveloperTools => Ok(azure_identity::DeveloperToolsCredential::new(None)?), + Self::ManagedIdentity { client_id } => { + // Pass options only when a user-assigned client ID was given; + // `None` keeps the SDK default of the system-assigned identity. + let options = + client_id + .as_ref() + .map(|id| azure_identity::ManagedIdentityCredentialOptions { + user_assigned_id: Some(azure_identity::UserAssignedId::ClientId( + id.clone(), + )), + ..Default::default() + }); + Ok(azure_identity::ManagedIdentityCredential::new(options)?) + } + Self::WorkloadIdentity => Ok(azure_identity::WorkloadIdentityCredential::new(None)?), + Self::ServicePrincipal => { + // azure_identity 1.0 removed the env-driven `EnvironmentCredential`, + // so read the same variables it used and pass them to + // `ClientSecretCredential` explicitly. A missing variable surfaces + // here (lazily, when the client is first built) as a clear error. + let tenant_id = service_principal_env("AZURE_TENANT_ID")?; + let client_id = service_principal_env("AZURE_CLIENT_ID")?; + let secret = service_principal_env("AZURE_CLIENT_SECRET")?; + Ok(azure_identity::ClientSecretCredential::new( + &tenant_id, + client_id, + secret.into(), + None, + )?) + } + } + } +} + +/// Reads a required Service Principal environment variable, mapping a missing +/// value to a clear credential error. +fn service_principal_env(name: &str) -> azure_core::Result { + std::env::var(name).map_err(|_| { + azure_core::Error::with_message( + azure_core::error::ErrorKind::Credential, + format!( + "Azure Cosmos `service_principal` auth requires the `{name}` \ + environment variable to be set" + ), + ) + }) +} diff --git a/crates/key-value-azure/src/lib.rs b/crates/key-value-azure/src/lib.rs index 54d61bd9ce..14c71e2911 100644 --- a/crates/key-value-azure/src/lib.rs +++ b/crates/key-value-azure/src/lib.rs @@ -1,12 +1,14 @@ +mod auth; mod store; use azure_data_cosmos::Region; use serde::Deserialize; use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore; -pub use store::{ - KeyValueAzureCosmos, KeyValueAzureCosmosAuthOptions, KeyValueAzureCosmosRuntimeConfigOptions, +pub use auth::{ + AzureCredentialKind, KeyValueAzureCosmosAuthOptions, KeyValueAzureCosmosRuntimeConfigOptions, }; +pub use store::KeyValueAzureCosmos; /// A key-value store that uses Azure Cosmos as the backend. pub struct AzureKeyValueStore { @@ -16,7 +18,7 @@ pub struct AzureKeyValueStore { impl AzureKeyValueStore { /// Creates a new `AzureKeyValueStore`. /// - /// When `app_id` is provided, the store will a partition key of `$app_id/$store_name`, + /// When `app_id` is provided, the store will use a partition key of `$app_id/$store_name`, /// otherwise the partition key will be `id`. pub fn new(app_id: Option) -> Self { Self { app_id } @@ -33,8 +35,8 @@ pub struct AzureCosmosKeyValueRuntimeConfig { /// The Azure Cosmos DB database. database: String, /// The Azure Cosmos DB container where data is stored. - /// The CosmosDB container must be created with the default partition - /// key path, /id + /// The container's partition key path must be `/id` (the default) — or + /// `/store_id` if the store is constructed with an `app_id`. container: String, /// Optional. The Azure region the spin application is running in (or the @@ -42,6 +44,23 @@ pub struct AzureCosmosKeyValueRuntimeConfig { /// for the Azure SDK's region selection. When omitted, defaults to /// East US. region: Option, + + /// Optional. When `key` is omitted, selects which Azure AD credential to + /// use: "managed_identity", "workload_identity", "service_principal", or + /// "developer_tools". When omitted, defaults to developer tools (Azure CLI + /// / azd), intended for local development. Ignored when `key` is set. + /// + /// "service_principal" reads `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and + /// `AZURE_CLIENT_SECRET` from the environment. + /// + /// There is intentionally no automatic fallback between credential types; + /// name the one matching your deployment. + auth_type: Option, + + /// Optional. Only used with `auth_type = "managed_identity"`: the client ID + /// of a user-assigned managed identity to authenticate. When omitted, the + /// system-assigned identity is used. Ignored with any other `auth_type`. + client_id: Option, } impl MakeKeyValueStore for AzureKeyValueStore { @@ -59,7 +78,12 @@ impl MakeKeyValueStore for AzureKeyValueStore { Some(key) => KeyValueAzureCosmosAuthOptions::RuntimeConfigValues( KeyValueAzureCosmosRuntimeConfigOptions::new(key), ), - None => KeyValueAzureCosmosAuthOptions::DeveloperTools, + None => { + KeyValueAzureCosmosAuthOptions::AadCredential(AzureCredentialKind::from_auth_type( + runtime_config.auth_type.as_deref(), + runtime_config.client_id, + )?) + } }; let region = runtime_config .region diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 1f3296bb80..ba3e4f174a 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -1,6 +1,6 @@ use anyhow::Result; use async_trait::async_trait; -use azure_core::credentials::{Secret, TokenCredential}; +use azure_core::credentials::Secret; use azure_data_cosmos::query::FeedScope; use azure_data_cosmos::{AccountReference, PatchInstructions, PatchOperation}; use azure_data_cosmos::{ @@ -14,6 +14,8 @@ use spin_factor_key_value::{ }; use std::sync::{Arc, Mutex}; +use crate::auth::KeyValueAzureCosmosAuthOptions; + pub struct KeyValueAzureCosmos { /// Parameters for initializing the Cosmos DB client account: String, @@ -32,30 +34,6 @@ pub struct KeyValueAzureCosmos { app_id: Option, } -/// Azure Cosmos Key / Value runtime config literal options for authentication -#[derive(Clone, Debug)] -pub struct KeyValueAzureCosmosRuntimeConfigOptions { - key: String, -} - -impl KeyValueAzureCosmosRuntimeConfigOptions { - pub fn new(key: String) -> Self { - Self { key } - } -} - -/// Azure Cosmos Key / Value enumeration for the possible authentication options -#[derive(Clone, Debug)] -pub enum KeyValueAzureCosmosAuthOptions { - /// Runtime Config values indicates the account and key have been specified directly - RuntimeConfigValues(KeyValueAzureCosmosRuntimeConfigOptions), - /// Uses DeveloperToolsCredential when the runtime config omits `key`. - /// - /// Athenticated via developer tools only: Azure CLI (`az login`), - /// then Azure Developer CLI (`azd auth login`). - DeveloperTools, -} - impl KeyValueAzureCosmos { pub fn new( account: String, @@ -91,9 +69,8 @@ async fn build_cosmos_client( KeyValueAzureCosmosAuthOptions::RuntimeConfigValues(config) => { AccountReference::with_authentication_key(endpoint, Secret::from(config.key.clone())) } - KeyValueAzureCosmosAuthOptions::DeveloperTools => { - let credential: Arc = - azure_identity::DeveloperToolsCredential::new(None).map_err(log_error)?; + KeyValueAzureCosmosAuthOptions::AadCredential(kind) => { + let credential = kind.credential().map_err(log_error)?; AccountReference::with_credential(endpoint, credential) } }; From 8504928513e5101043580a6fd7680bd0e0e0645c Mon Sep 17 00:00:00 2001 From: Zhiwei Liang Date: Sun, 7 Jun 2026 13:26:31 -0400 Subject: [PATCH 3/3] Migrating `get` to read_item; set partition key for `exists`; cleanup Signed-off-by: Zhiwei Liang --- crates/key-value-azure/src/store.rs | 50 +++++++++++------------------ 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index ba3e4f174a..f118e0833f 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -132,20 +132,18 @@ struct AzureCosmosStore { #[async_trait] impl Store for AzureCosmosStore { async fn get(&self, key: &str, max_result_bytes: usize) -> Result>, Error> { - let value = self - .query_one::(self.get_query(key)) - .await? - .map(|p| p.value); + let partition_key = partition_key(self.store_id.as_deref(), key); + let value = match self.client.read_item(partition_key, key, None).await { + Ok(response) => Some(response.into_model::().map_err(log_error)?.value), + Err(e) if e.status().is_not_found() => None, + Err(e) => return Err(log_error(e)), + }; - // Currently there's no way to stream a single query result using the - // `azure_data_cosmos` crate without buffering, so the damage (in terms - // of host memory usage) is already done, but we can still enforce the - // limit: - if std::mem::size_of::>>() + value.as_ref().map(|v| v.len()).unwrap_or(0) + if std::mem::size_of::>>() + value.as_ref().map(Vec::len).unwrap_or(0) > max_result_bytes { Err(Error::Other(format!( - "query result exceeds limit of {max_result_bytes} bytes" + "read result exceeds limit of {max_result_bytes} bytes" ))) } else { Ok(value) @@ -187,10 +185,16 @@ impl Store for AzureCosmosStore { } async fn exists(&self, key: &str) -> Result { - Ok(self - .query_one::(self.get_id_query(key)) - .await? - .is_some()) + let mut stream = self + .client + .query_items::( + Query::from(self.get_id_query(key)), + FeedScope::partition(partition_key(self.store_id.as_deref(), key)), + None, + ) + .await + .map_err(log_error)?; + Ok(stream.try_next().await.map_err(log_error)?.is_some()) } async fn get_keys(&self, max_result_bytes: usize) -> Result, Error> { @@ -437,24 +441,6 @@ impl Cas for CompareAndSwap { } impl AzureCosmosStore { - async fn query_one(&self, query: String) -> Result, Error> - where - T: serde::de::DeserializeOwned + Send + 'static, - { - let mut stream = self - .client - .query_items(Query::from(query), FeedScope::full_container(), None) - .await - .map_err(log_error)?; - stream.try_next().await.map_err(log_error) - } - - fn get_query(&self, key: &str) -> String { - let mut query = format!("SELECT * FROM c WHERE c.id='{key}'"); - append_store_id_condition(&mut query, self.store_id.as_deref(), true); - query - } - fn get_id_query(&self, key: &str) -> String { let mut query = format!("SELECT c.id, c.store_id FROM c WHERE c.id='{key}'"); append_store_id_condition(&mut query, self.store_id.as_deref(), true);