Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 124 additions & 18 deletions crates/lpm-cli/src/commands/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ fn record_timing_detail_ms(bucket: &mut u128, start: Option<Instant>) {

struct V2ReusablePrevalidation {
hits: HashMap<String, lpm_store::v2::ReusableObject>,
ready_links: HashMap<String, V2LinkTaskResult>,
candidate_count: usize,
concurrency: usize,
validation_timings: V2ReusableValidationTimings,
Expand All @@ -171,6 +172,13 @@ struct V2LinkTaskResult {

type V2LinkHandle = tokio::task::JoinHandle<Result<V2LinkTaskResult, LpmError>>;

struct V2PrevalidationCandidate {
key: String,
sri: String,
link_reuse_plan: Option<Arc<lpm_linker::v2::LinkPlanV2>>,
link_reuse_target: Option<lpm_linker::v2::V2Target>,
}

fn spawn_v2_link_task(
plan: std::sync::Arc<lpm_linker::v2::LinkPlanV2>,
target: lpm_linker::v2::V2Target,
Expand All @@ -191,8 +199,10 @@ fn spawn_v2_link_task(
async fn prevalidate_v2_reusable_objects(
packages: &[InstallPackage],
store_v2: Arc<lpm_store::v2::Store>,
link_reuse_plan: Option<Arc<lpm_linker::v2::LinkPlanV2>>,
link_reuse_targets: &HashMap<String, lpm_linker::v2::V2Target>,
) -> Result<V2ReusablePrevalidation, LpmError> {
let candidates: Vec<(String, String)> = packages
let candidates: Vec<V2PrevalidationCandidate> = packages
.iter()
.filter(|package| {
!matches!(
Expand All @@ -201,16 +211,21 @@ async fn prevalidate_v2_reusable_objects(
)
})
.filter_map(|package| {
Some((
install_pkg_key(package),
package.integrity.as_ref()?.clone(),
))
let key = install_pkg_key(package);
let target = link_reuse_targets.get(&key).cloned();
Some(V2PrevalidationCandidate {
key,
sri: package.integrity.as_ref()?.clone(),
link_reuse_plan: link_reuse_plan.clone(),
link_reuse_target: target,
})
})
.collect();

if candidates.is_empty() {
return Ok(V2ReusablePrevalidation {
hits: HashMap::new(),
ready_links: HashMap::new(),
candidate_count: 0,
concurrency: 0,
validation_timings: V2ReusableValidationTimings::default(),
Expand All @@ -219,34 +234,80 @@ async fn prevalidate_v2_reusable_objects(

let candidate_count = candidates.len();
let concurrency = v2_cache_check_concurrency(candidate_count);
let mut checks = futures::stream::iter(candidates.into_iter().map(|(key, sri)| {
let mut checks = futures::stream::iter(candidates.into_iter().map(|candidate| {
let store_v2 = Arc::clone(&store_v2);
tokio::task::spawn_blocking(move || {
store_v2
.reusable_object_with_timings(&sri)
.map(|(hit, timings)| (key, hit, timings))
let (hit, timings) = store_v2.reusable_object_with_timings(&candidate.sri)?;
if let (Some(_), Some(plan), Some(target)) = (
hit.as_ref(),
candidate.link_reuse_plan.as_ref(),
candidate.link_reuse_target.as_ref(),
) {
let start = Instant::now();
if let Some((materialized, _tree_integrity)) =
lpm_linker::v2::reuse_v2_one_if_valid(plan, target, &store_v2)?
{
return Ok::<V2PrevalidationCheck, LpmError>(V2PrevalidationCheck::ReadyLink {
key: candidate.key,
task: V2LinkTaskResult {
materialized,
freshly_populated: false,
ms: start.elapsed().as_millis(),
},
timings,
});
}
}
Ok::<V2PrevalidationCheck, LpmError>(V2PrevalidationCheck::Object {
key: candidate.key,
hit,
timings,
})
})
}))
.buffer_unordered(concurrency);

let mut hits = HashMap::with_capacity(candidate_count);
let mut ready_links = HashMap::new();
let mut validation_timings = V2ReusableValidationTimings::default();
while let Some(result) = checks.next().await {
let (key, hit, timings) = result
.map_err(|e| LpmError::Registry(format!("v2 cache check task panicked: {e}")))??;
validation_timings.record(timings, hit.is_some());
if let Some(hit) = hit {
hits.insert(key, hit);
match result
.map_err(|e| LpmError::Registry(format!("v2 cache check task panicked: {e}")))??
{
V2PrevalidationCheck::ReadyLink { key, task, timings } => {
validation_timings.record(timings, true);
ready_links.insert(key, task);
}
V2PrevalidationCheck::Object { key, hit, timings } => {
validation_timings.record(timings, hit.is_some());
if let Some(hit) = hit {
hits.insert(key, hit);
}
}
}
}
Ok(V2ReusablePrevalidation {
hits,
ready_links,
candidate_count,
concurrency,
validation_timings,
})
}

enum V2PrevalidationCheck {
ReadyLink {
key: String,
task: V2LinkTaskResult,
timings: lpm_store::v2::ReusableObjectCheckTimings,
},
Object {
key: String,
hit: Option<lpm_store::v2::ReusableObject>,
timings: lpm_store::v2::ReusableObjectCheckTimings,
},
}

fn btree_from_hash_map(map: &HashMap<String, String>) -> BTreeMap<String, String> {
map.iter()
.map(|(key, value)| (key.clone(), value.clone()))
Expand Down Expand Up @@ -3155,15 +3216,23 @@ async fn run_with_options_under_store_lock(
// short-circuits below and the fetch tasks further down. Drained
// at the link stage and folded into the LinkResult.
let mut v2_event_link_handles: Vec<V2LinkHandle> = Vec::new();
let mut v2_event_completed_link_results: Vec<V2LinkTaskResult> = Vec::new();
fetch_stage_timings.plan_ms = fetch_plan_start.elapsed().as_millis();
let v2_prevalidate_start = Instant::now();
let v2_reusable_prevalidation = if !force && v2_mode {
match store_v2_handle.as_ref() {
Some(store_v2) => {
prevalidate_v2_reusable_objects(&packages, std::sync::Arc::clone(store_v2)).await?
prevalidate_v2_reusable_objects(
&packages,
std::sync::Arc::clone(store_v2),
v2_plan.as_ref().map(std::sync::Arc::clone),
&v2_target_by_key,
)
.await?
}
None => V2ReusablePrevalidation {
hits: HashMap::new(),
ready_links: HashMap::new(),
candidate_count: 0,
concurrency: 0,
validation_timings: V2ReusableValidationTimings::default(),
Expand All @@ -3172,6 +3241,7 @@ async fn run_with_options_under_store_lock(
} else {
V2ReusablePrevalidation {
hits: HashMap::new(),
ready_links: HashMap::new(),
candidate_count: 0,
concurrency: 0,
validation_timings: V2ReusableValidationTimings::default(),
Expand All @@ -3181,8 +3251,13 @@ async fn run_with_options_under_store_lock(
fetch_stage_timings.v2_reusable_candidate_count =
v2_reusable_prevalidation.candidate_count as u64;
fetch_stage_timings.v2_reusable_concurrency = v2_reusable_prevalidation.concurrency as u64;
fetch_stage_timings.v2_reusable_hit_count = v2_reusable_prevalidation.hits.len() as u64;
fetch_stage_timings.v2_reusable_hit_count = v2_reusable_prevalidation
.hits
.len()
.saturating_add(v2_reusable_prevalidation.ready_links.len())
as u64;
fetch_stage_timings.v2_reusable_validation = v2_reusable_prevalidation.validation_timings;
let mut v2_ready_link_results = v2_reusable_prevalidation.ready_links;
let v2_reusable_objects = v2_reusable_prevalidation.hits;

//b: stale-entry cleanup runs once, up front — must
Expand Down Expand Up @@ -3279,6 +3354,21 @@ async fn run_with_options_under_store_lock(
// being a no-op (pre-4d drain) and broke under the wired-up
// 4d spec path because every package was downloaded twice.
let package_key = install_pkg_key(p);
if !force
&& v2_mode
&& !is_local_source
&& let Some(task) = v2_ready_link_results.remove(&package_key)
{
let classification_start = timing_detail_start(fetch_detail_timing_enabled);
cached += 1;
v2_event_completed_link_results.push(task);
record_timing_detail_ms(
&mut fetch_stage_timings.cache_classify_v2_reusable_hit_ms,
classification_start,
);
continue;
}

if !force
&& v2_mode
&& !is_local_source
Expand Down Expand Up @@ -4549,9 +4639,25 @@ async fn run_with_options_under_store_lock(
let plan = v2_plan
.as_ref()
.expect("v2_event_driven implies v2_plan is Some");
let mut materialized_all: Vec<MaterializedPackage> =
Vec::with_capacity(v2_event_link_handles.len());
let mut materialized_all: Vec<MaterializedPackage> = Vec::with_capacity(
v2_event_completed_link_results
.len()
.saturating_add(v2_event_link_handles.len()),
);
let mut linked_count = 0usize;
for task in v2_event_completed_link_results.drain(..) {
let package_display = timing_detail_mode
.trace()
.then(|| format!("{}@{}", task.materialized.name, task.materialized.version));
v2_link_task_timings.record(task.ms, task.freshly_populated);
if let Some(package_display) = package_display.as_deref() {
slow_package_timings.record_link_v2_one(package_display, task.ms);
}
if task.freshly_populated {
linked_count += 1;
}
materialized_all.push(task.materialized);
}
let link_await_start = Instant::now();
for h in v2_event_link_handles.drain(..) {
let task = h
Expand Down
76 changes: 70 additions & 6 deletions crates/lpm-cli/src/commands/install/tests/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ async fn prevalidate_v2_reusable_objects_returns_verified_registry_hits() {
pkg.integrity = Some(sri);
let key = install_pkg_key(&pkg);

let prevalidation = prevalidate_v2_reusable_objects(&[pkg], std::sync::Arc::new(store))
.await
.unwrap();
let prevalidation =
prevalidate_v2_reusable_objects(&[pkg], std::sync::Arc::new(store), None, &HashMap::new())
.await
.unwrap();

assert_eq!(prevalidation.candidate_count, 1);
assert_eq!(prevalidation.hits.len(), 1);
assert!(prevalidation.ready_links.is_empty());
assert!(prevalidation.concurrency >= 1);
assert_eq!(prevalidation.validation_timings.checked_count, 1);
assert_eq!(prevalidation.validation_timings.hit_count, 1);
Expand All @@ -80,6 +82,66 @@ async fn prevalidate_v2_reusable_objects_returns_verified_registry_hits() {
assert!(hit.tree_integrity.as_str().starts_with("sha256-"));
}

#[tokio::test]
async fn prevalidate_v2_reusable_objects_returns_ready_links_for_valid_link_entries() {
let dir = tempfile::tempdir().unwrap();
let project_dir = dir.path().join("project");
std::fs::create_dir_all(&project_dir).unwrap();
let store = lpm_store::v2::Store::at(dir.path().join("store"));
let tarball = build_minimal_tarball_with_pkg("ready-link", "1.0.0");
let (_, sri, _) = store.extract_object_from_bytes(&tarball, None).unwrap();
let target = lpm_linker::v2::V2Target {
target: LinkTarget {
name: "ready-link".to_string(),
version: "1.0.0".to_string(),
store_path: PathBuf::new(),
dependencies: Vec::new(),
aliases: HashMap::new(),
is_direct: true,
root_link_names: None,
wrapper_id: None,
materialization: lpm_linker::Materialization::CasBacked,
peers: Vec::new(),
patch_fingerprint: None,
},
source_sri: sri.clone(),
verified_object_tree_integrity: None,
};
let plan = std::sync::Arc::new(
lpm_linker::v2::link_v2_prepare(
&project_dir,
vec![target.clone()],
&store,
lpm_linker::LinkerMode::Isolated,
)
.unwrap(),
);
lpm_linker::v2::link_v2_one(&plan, &target, &store).unwrap();

let mut pkg = fake_pkg("ready-link", "1.0.0", true);
pkg.integrity = Some(sri);
let key = install_pkg_key(&pkg);
let mut targets = HashMap::new();
targets.insert(key.clone(), target);

let prevalidation =
prevalidate_v2_reusable_objects(&[pkg], std::sync::Arc::new(store), Some(plan), &targets)
.await
.unwrap();

assert_eq!(prevalidation.candidate_count, 1);
assert!(prevalidation.hits.is_empty());
assert_eq!(prevalidation.ready_links.len(), 1);
assert_eq!(prevalidation.validation_timings.checked_count, 1);
assert_eq!(prevalidation.validation_timings.hit_count, 1);
let ready = prevalidation
.ready_links
.get(&key)
.expect("valid existing link entry must be returned as ready");
assert_eq!(ready.materialized.name, "ready-link");
assert!(!ready.freshly_populated);
}

#[tokio::test]
async fn prevalidate_v2_reusable_objects_removes_tampered_registry_objects() {
let dir = tempfile::tempdir().unwrap();
Expand All @@ -95,12 +157,14 @@ async fn prevalidate_v2_reusable_objects_removes_tampered_registry_objects() {
let mut pkg = fake_pkg("tampered", "1.0.0", true);
pkg.integrity = Some(sri);

let prevalidation = prevalidate_v2_reusable_objects(&[pkg], std::sync::Arc::new(store))
.await
.unwrap();
let prevalidation =
prevalidate_v2_reusable_objects(&[pkg], std::sync::Arc::new(store), None, &HashMap::new())
.await
.unwrap();

assert_eq!(prevalidation.candidate_count, 1);
assert!(prevalidation.hits.is_empty());
assert!(prevalidation.ready_links.is_empty());
assert_eq!(prevalidation.validation_timings.checked_count, 1);
assert_eq!(prevalidation.validation_timings.miss_count, 1);
assert_eq!(prevalidation.validation_timings.removed_count, 1);
Expand Down
30 changes: 30 additions & 0 deletions crates/lpm-linker/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,36 @@ pub fn link_v2_one(
Ok((mat, entry.freshly_populated))
}

/// Return a materialized package for an already-valid link entry.
///
/// This is a warm-relink fast path for callers that can use an existing
/// `links/<graph-key>/` entry directly. It never populates from the object
/// tree; `Ok(None)` means the caller should use [`link_v2_one`], which keeps
/// the full object-validation and rebuild behavior.
pub fn reuse_v2_one_if_valid(
plan: &LinkPlanV2,
target: &V2Target,
store: &Store,
) -> Result<Option<(MaterializedPackage, VerifiedObjectTreeIntegrity)>, LpmError> {
let key = plan.key_map.get_for(&target.target).ok_or_else(|| {
LpmError::Store(format!(
"v2 linker: missing graph key for {}@{} (key map pre-pass failed)",
target.target.name, target.target.version
))
})?;
let Some(entry) = store.reusable_link_entry_from_snapshots(key, &target.source_sri)? else {
return Ok(None);
};
Ok(Some((
MaterializedPackage {
name: target.target.name.clone(),
version: target.target.version.clone(),
destination: entry.package_dir,
},
entry.tree_integrity,
)))
}

/// Result handle for [`link_v2_finalize`] — separated from
/// [`LinkResult`] so the caller assembles the final result with its
/// own `linked` / `materialized` counts (which the per-package phase
Expand Down
4 changes: 2 additions & 2 deletions crates/lpm-store/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ pub use link_meta::{
pub use platform::PlatformTuple;
pub use store::{
COMPAT_ISLAND_COMPLETE_FILENAME, CompatIslandKeyEntry, DepLink, LinkEntry, LinkEntryRequest,
ReusableObject, ReusableObjectCheckTimings, Store, StoreV2Paths, VerifiedObjectTreeIntegrity,
compat_island_key,
ReusableLinkEntry, ReusableObject, ReusableObjectCheckTimings, Store, StoreV2Paths,
VerifiedObjectTreeIntegrity, compat_island_key,
};
Loading
Loading