diff --git a/src/groups.rs b/src/groups.rs index bce8683..31d7aa7 100644 --- a/src/groups.rs +++ b/src/groups.rs @@ -172,7 +172,6 @@ async fn refresh_group(group_id: web::Path) -> AppResult "refreshed_files": json!(Vec::::new()), // Initialize empty "all_files": json!(Vec::::new()) // Initialize empty }); - let mut refreshed_files_vec = Vec::new(); let mut all_files_vec: Vec = Vec::new(); if repo.can_write() { @@ -292,46 +291,17 @@ async fn refresh_group(group_id: web::Path) -> AppResult }; repo_info["all_files"] = json!(all_files_vec.clone()); - // For each file, check if it needs to be refreshed - for file_name in &all_files_vec { - match repo.get_file_hash(file_name).await { - Ok(file_hash) => { - if !group.has_hash(&file_hash).await? { - log_debug!( - TAG, - "File {} hash {} not found locally. Downloading...", - file_name, - file_hash - ); - match group.download_hash_from_peers(&file_hash).await { - Ok(_) => { - log_debug!( - TAG, - "Successfully downloaded file hash {} for {}", - file_hash, - file_name - ); - refreshed_files_vec.push(file_name.clone()); - } - Err(e) => { - log_debug!( - TAG, - "Error downloading file {} hash {}: {}", - file_name, - file_hash, - e - ); - // Optionally add to a list of files that failed to download - } - } - } - } - Err(e) => { - log_debug!(TAG, "Error getting hash for file {}: {}", file_name, e); - } - } - } - repo_info["refreshed_files"] = json!(refreshed_files_vec); + // Keep refresh metadata-only. Downloading every missing file body here + // can block later file discovery behind one slow or failing transfer. + // `refreshed_files` is retained for API compatibility; file bodies are + // now refreshed only by the explicit media endpoints. + log_debug!( + TAG, + "Repo {} refresh discovered {} files; body downloads are deferred to media endpoint.", + repo.id(), + all_files_vec.len() + ); + repo_info["refreshed_files"] = json!(Vec::::new()); } Ok(Err(e)) => { log_debug!(TAG, "Error getting repo hash for {}: {}", repo.id(), e); diff --git a/src/lib.rs b/src/lib.rs index 8d57264..5990b92 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1355,10 +1355,8 @@ mod tests { .as_array() .expect("refreshed_files should be an array"); assert!( - refreshed_files.is_empty() - || (refreshed_files.len() == 1 - && refreshed_files[0].as_str() == Some(file_name)), - "First refresh should report either no-op or one refreshed expected file, got {refreshed_files:?}" + refreshed_files.is_empty(), + "Refresh should discover metadata without downloading media bodies, got {refreshed_files:?}" ); let all_files = repo_data["all_files"] @@ -1371,35 +1369,37 @@ mod tests { "all_files should contain the uploaded file" ); - // Verify file is accessible after refresh - let get_file_req = test::TestRequest::get() + let list_files_req = test::TestRequest::get() .uri(&format!( - "/api/groups/{}/repos/{}/media/{}", + "/api/groups/{}/repos/{}/media", group.id(), - refreshed_repo_id, - file_name + refreshed_repo_id )) .to_request(); - let get_file_resp = test::call_service(&app, get_file_req).await; + let list_files_resp = test::call_service(&app, list_files_req).await; assert!( - get_file_resp.status().is_success(), - "File should be accessible after refresh" + list_files_resp.status().is_success(), + "File list should be accessible after metadata-only refresh" ); - let got_content = test::read_body(get_file_resp).await; - assert_eq!( - got_content.to_vec(), - file_content.to_vec(), - "File content should match after refresh" + let list_files_data: FilesResponse = test::read_body_json(list_files_resp).await; + let listed_file = list_files_data + .files + .iter() + .find(|file| file.name == file_name) + .expect("File list should include metadata for the uploaded file"); + assert!( + !listed_file.is_downloaded, + "Refresh should not mark the file body as downloaded before explicit media GET" ); - // Test second refresh - should be no-op since all files are present + // Test second refresh before downloading the file body. It should remain metadata-only. let refresh_req2 = test::TestRequest::post() .uri(&format!("/api/groups/{}/refresh", group.id())) .to_request(); let refresh_resp2 = test::call_service(&app, refresh_req2).await; assert!( refresh_resp2.status().is_success(), - "Second refresh should succeed" + "Second refresh should succeed before explicit media GET" ); let refresh_data2: serde_json::Value = test::read_body_json(refresh_resp2).await; @@ -1427,7 +1427,77 @@ mod tests { .expect("refreshed_files should be an array"); assert!( refreshed_files2.is_empty(), - "No files should be refreshed on second call since all are present" + "Repeated refresh should still not download media bodies, got {refreshed_files2:?}" + ); + + let list_files_after_second_refresh_req = test::TestRequest::get() + .uri(&format!( + "/api/groups/{}/repos/{}/media", + group.id(), + refreshed_repo_id + )) + .to_request(); + let list_files_after_second_refresh_resp = + test::call_service(&app, list_files_after_second_refresh_req).await; + assert!( + list_files_after_second_refresh_resp.status().is_success(), + "File list should still be accessible after repeated metadata-only refresh" + ); + let list_files_after_second_refresh_data: FilesResponse = + test::read_body_json(list_files_after_second_refresh_resp).await; + let listed_file_after_second_refresh = list_files_after_second_refresh_data + .files + .iter() + .find(|file| file.name == file_name) + .expect("File list should still include metadata for the uploaded file"); + assert!( + !listed_file_after_second_refresh.is_downloaded, + "Repeated refresh should not mark the file body as downloaded before explicit media GET" + ); + + // Verify file is accessible after refresh + let get_file_req = test::TestRequest::get() + .uri(&format!( + "/api/groups/{}/repos/{}/media/{}", + group.id(), + refreshed_repo_id, + file_name + )) + .to_request(); + let get_file_resp = test::call_service(&app, get_file_req).await; + assert!( + get_file_resp.status().is_success(), + "File should be accessible after refresh" + ); + let got_content = test::read_body(get_file_resp).await; + assert_eq!( + got_content.to_vec(), + file_content.to_vec(), + "File content should match after refresh" + ); + + let list_files_after_get_req = test::TestRequest::get() + .uri(&format!( + "/api/groups/{}/repos/{}/media", + group.id(), + refreshed_repo_id + )) + .to_request(); + let list_files_after_get_resp = test::call_service(&app, list_files_after_get_req).await; + assert!( + list_files_after_get_resp.status().is_success(), + "File list should still be accessible after explicit media GET" + ); + let list_files_after_get_data: FilesResponse = + test::read_body_json(list_files_after_get_resp).await; + let listed_file_after_get = list_files_after_get_data + .files + .iter() + .find(|file| file.name == file_name) + .expect("File list should still include the uploaded file"); + assert!( + listed_file_after_get.is_downloaded, + "Explicit media GET should mark the file body as downloaded locally" ); // Clean up both backends - secondary first, then main diff --git a/src/media.rs b/src/media.rs index 3407fc1..c310caf 100644 --- a/src/media.rs +++ b/src/media.rs @@ -12,6 +12,108 @@ use futures::Stream; use futures::StreamExt; use serde_json::json; use std::io; +use std::time::{Duration, Instant}; + +const MEDIA_DOWNLOAD_MAX_ATTEMPTS: u32 = 3; +const MEDIA_DOWNLOAD_PER_PEER_TIMEOUT: Duration = Duration::from_secs(18); +const MEDIA_DOWNLOAD_OVERALL_TIMEOUT: Duration = Duration::from_secs(55); +const MEDIA_DOWNLOAD_INITIAL_BACKOFF: Duration = Duration::from_millis(500); + +// TODO: fold this media-specific overall budget into save-dweb-backend's +// Group::download_hash_from_peers once the backend crate exposes that knob. +async fn download_hash_for_media( + group: &save_dweb_backend::group::Group, + hash: &iroh_blobs::Hash, +) -> AppResult<()> { + let mut last_error = None; + let started = Instant::now(); + + for attempt in 1..=MEDIA_DOWNLOAD_MAX_ATTEMPTS { + let mut peer_repos = group.list_peer_repos().await; + if peer_repos.is_empty() { + return Err(anyhow::anyhow!("Cannot download hash. No other peers found").into()); + } + + let peer_count = peer_repos.len(); + peer_repos.rotate_left((attempt as usize - 1) % peer_count); + + for peer_repo in peer_repos { + let peer_id = peer_repo.id().to_string(); + + let Some(remaining) = MEDIA_DOWNLOAD_OVERALL_TIMEOUT.checked_sub(started.elapsed()) + else { + let detail = format!( + "Timed out downloading hash {hash} after overall {}s media budget", + MEDIA_DOWNLOAD_OVERALL_TIMEOUT.as_secs() + ); + log_info!(TAG, "{}", detail); + let last_detail = match last_error { + Some(error) => format!("{detail}; last peer error: {error}"), + None => detail, + }; + return Err(anyhow::anyhow!( + "Unable to download hash {} from any peer after {} media attempts; last error: {}", + hash, + attempt.saturating_sub(1), + last_detail + ) + .into()); + }; + + log_info!( + TAG, + "Media download attempt {}/{} for hash {} from peer {}", + attempt, + MEDIA_DOWNLOAD_MAX_ATTEMPTS, + hash, + peer_id + ); + + let timeout_budget = std::cmp::min(MEDIA_DOWNLOAD_PER_PEER_TIMEOUT, remaining); + let download_result = tokio::time::timeout(timeout_budget, async { + let route_id_blob = peer_repo.get_route_id_blob().await?; + group + .iroh_blobs + .download_file_from(route_id_blob, hash) + .await + }) + .await; + + match download_result { + Ok(Ok(())) => return Ok(()), + Ok(Err(e)) => { + let detail = format!("Unable to download hash {hash} from peer {peer_id}: {e}"); + log_info!(TAG, "{}", detail); + last_error = Some(detail); + } + Err(_) => { + let detail = format!( + "Timed out downloading hash {hash} from peer {peer_id} after {}ms", + timeout_budget.as_millis() + ); + log_info!(TAG, "{}", detail); + last_error = Some(detail); + } + } + } + + if attempt < MEDIA_DOWNLOAD_MAX_ATTEMPTS { + let backoff = MEDIA_DOWNLOAD_INITIAL_BACKOFF * attempt; + if let Some(remaining) = MEDIA_DOWNLOAD_OVERALL_TIMEOUT.checked_sub(started.elapsed()) { + tokio::time::sleep(std::cmp::min(backoff, remaining)).await; + } + } + } + + let detail = last_error.unwrap_or_else(|| "no peer attempts completed".to_string()); + Err(anyhow::anyhow!( + "Unable to download hash {} from any peer after {} media attempts; last error: {}", + hash, + MEDIA_DOWNLOAD_MAX_ATTEMPTS, + detail + ) + .into()) +} pub fn scope() -> Scope { web::scope("/media") @@ -59,7 +161,7 @@ async fn list_files(path: web::Path) -> AppResult let hash = repo.get_hash_from_dht().await?; if !group.has_hash(&hash).await? { - group.download_hash_from_peers(&hash).await?; + download_hash_for_media(group.as_ref(), &hash).await?; } // List files and check if they are downloaded @@ -71,7 +173,7 @@ async fn list_files(path: web::Path) -> AppResult Ok(hash) => hash, Err(_) => continue, // Handle the error or skip the file if there's an issue }; - let is_downloaded = repo.has_hash(&file_hash).await.unwrap_or(false); // Check if the file is downloaded + let is_downloaded = group.has_hash(&file_hash).await.unwrap_or(false); // Check if the file is local files_with_status.push(json!({ "name": file_name, "hash": file_hash, @@ -100,15 +202,15 @@ async fn download_file(path: web::Path) -> AppResult