Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 11 additions & 41 deletions src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ async fn refresh_group(group_id: web::Path<String>) -> AppResult<impl Responder>
"refreshed_files": json!(Vec::<String>::new()), // Initialize empty
"all_files": json!(Vec::<String>::new()) // Initialize empty
});
let mut refreshed_files_vec = Vec::new();
let mut all_files_vec: Vec<String> = Vec::new();

if repo.can_write() {
Expand Down Expand Up @@ -292,46 +291,17 @@ async fn refresh_group(group_id: web::Path<String>) -> AppResult<impl Responder>
};
repo_info["all_files"] = json!(all_files_vec.clone());

// For each file, check if it needs to be refreshed
for file_name in &all_files_vec {
match repo.get_file_hash(file_name).await {
Ok(file_hash) => {
if !group.has_hash(&file_hash).await? {
log_debug!(
TAG,
"File {} hash {} not found locally. Downloading...",
file_name,
file_hash
);
match group.download_hash_from_peers(&file_hash).await {
Ok(_) => {
log_debug!(
TAG,
"Successfully downloaded file hash {} for {}",
file_hash,
file_name
);
refreshed_files_vec.push(file_name.clone());
}
Err(e) => {
log_debug!(
TAG,
"Error downloading file {} hash {}: {}",
file_name,
file_hash,
e
);
// Optionally add to a list of files that failed to download
}
}
}
}
Err(e) => {
log_debug!(TAG, "Error getting hash for file {}: {}", file_name, e);
}
}
}
repo_info["refreshed_files"] = json!(refreshed_files_vec);
// Keep refresh metadata-only. Downloading every missing file body here
// can block later file discovery behind one slow or failing transfer.
// `refreshed_files` is retained for API compatibility; file bodies are
// now refreshed only by the explicit media endpoints.
log_debug!(
TAG,
"Repo {} refresh discovered {} files; body downloads are deferred to media endpoint.",
repo.id(),
all_files_vec.len()
);
repo_info["refreshed_files"] = json!(Vec::<String>::new());
}
Ok(Err(e)) => {
log_debug!(TAG, "Error getting repo hash for {}: {}", repo.id(), e);
Expand Down
110 changes: 90 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1355,10 +1355,8 @@ mod tests {
.as_array()
.expect("refreshed_files should be an array");
assert!(
refreshed_files.is_empty()
|| (refreshed_files.len() == 1
&& refreshed_files[0].as_str() == Some(file_name)),
"First refresh should report either no-op or one refreshed expected file, got {refreshed_files:?}"
refreshed_files.is_empty(),
"Refresh should discover metadata without downloading media bodies, got {refreshed_files:?}"
);

let all_files = repo_data["all_files"]
Expand All @@ -1371,35 +1369,37 @@ mod tests {
"all_files should contain the uploaded file"
);

// Verify file is accessible after refresh
let get_file_req = test::TestRequest::get()
let list_files_req = test::TestRequest::get()
.uri(&format!(
"/api/groups/{}/repos/{}/media/{}",
"/api/groups/{}/repos/{}/media",
group.id(),
refreshed_repo_id,
file_name
refreshed_repo_id
))
.to_request();
let get_file_resp = test::call_service(&app, get_file_req).await;
let list_files_resp = test::call_service(&app, list_files_req).await;
assert!(
get_file_resp.status().is_success(),
"File should be accessible after refresh"
list_files_resp.status().is_success(),
"File list should be accessible after metadata-only refresh"
);
let got_content = test::read_body(get_file_resp).await;
assert_eq!(
got_content.to_vec(),
file_content.to_vec(),
"File content should match after refresh"
let list_files_data: FilesResponse = test::read_body_json(list_files_resp).await;
let listed_file = list_files_data
.files
.iter()
.find(|file| file.name == file_name)
.expect("File list should include metadata for the uploaded file");
assert!(
!listed_file.is_downloaded,
"Refresh should not mark the file body as downloaded before explicit media GET"
);

// Test second refresh - should be no-op since all files are present
// Test second refresh before downloading the file body. It should remain metadata-only.
let refresh_req2 = test::TestRequest::post()
.uri(&format!("/api/groups/{}/refresh", group.id()))
.to_request();
let refresh_resp2 = test::call_service(&app, refresh_req2).await;
assert!(
refresh_resp2.status().is_success(),
"Second refresh should succeed"
"Second refresh should succeed before explicit media GET"
);

let refresh_data2: serde_json::Value = test::read_body_json(refresh_resp2).await;
Expand Down Expand Up @@ -1427,7 +1427,77 @@ mod tests {
.expect("refreshed_files should be an array");
assert!(
refreshed_files2.is_empty(),
"No files should be refreshed on second call since all are present"
"Repeated refresh should still not download media bodies, got {refreshed_files2:?}"
);

let list_files_after_second_refresh_req = test::TestRequest::get()
.uri(&format!(
"/api/groups/{}/repos/{}/media",
group.id(),
refreshed_repo_id
))
.to_request();
let list_files_after_second_refresh_resp =
test::call_service(&app, list_files_after_second_refresh_req).await;
assert!(
list_files_after_second_refresh_resp.status().is_success(),
"File list should still be accessible after repeated metadata-only refresh"
);
let list_files_after_second_refresh_data: FilesResponse =
test::read_body_json(list_files_after_second_refresh_resp).await;
let listed_file_after_second_refresh = list_files_after_second_refresh_data
.files
.iter()
.find(|file| file.name == file_name)
.expect("File list should still include metadata for the uploaded file");
assert!(
!listed_file_after_second_refresh.is_downloaded,
"Repeated refresh should not mark the file body as downloaded before explicit media GET"
);

// Verify file is accessible after refresh
let get_file_req = test::TestRequest::get()
.uri(&format!(
"/api/groups/{}/repos/{}/media/{}",
group.id(),
refreshed_repo_id,
file_name
))
.to_request();
let get_file_resp = test::call_service(&app, get_file_req).await;
assert!(
get_file_resp.status().is_success(),
"File should be accessible after refresh"
);
let got_content = test::read_body(get_file_resp).await;
assert_eq!(
got_content.to_vec(),
file_content.to_vec(),
"File content should match after refresh"
);

let list_files_after_get_req = test::TestRequest::get()
.uri(&format!(
"/api/groups/{}/repos/{}/media",
group.id(),
refreshed_repo_id
))
.to_request();
let list_files_after_get_resp = test::call_service(&app, list_files_after_get_req).await;
assert!(
list_files_after_get_resp.status().is_success(),
"File list should still be accessible after explicit media GET"
);
let list_files_after_get_data: FilesResponse =
test::read_body_json(list_files_after_get_resp).await;
let listed_file_after_get = list_files_after_get_data
.files
.iter()
.find(|file| file.name == file_name)
.expect("File list should still include the uploaded file");
assert!(
listed_file_after_get.is_downloaded,
"Explicit media GET should mark the file body as downloaded locally"
);

// Clean up both backends - secondary first, then main
Expand Down
112 changes: 107 additions & 5 deletions src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,108 @@ use futures::Stream;
use futures::StreamExt;
use serde_json::json;
use std::io;
use std::time::{Duration, Instant};

const MEDIA_DOWNLOAD_MAX_ATTEMPTS: u32 = 3;
const MEDIA_DOWNLOAD_PER_PEER_TIMEOUT: Duration = Duration::from_secs(18);
const MEDIA_DOWNLOAD_OVERALL_TIMEOUT: Duration = Duration::from_secs(55);
const MEDIA_DOWNLOAD_INITIAL_BACKOFF: Duration = Duration::from_millis(500);

// TODO: fold this media-specific overall budget into save-dweb-backend's
// Group::download_hash_from_peers once the backend crate exposes that knob.
async fn download_hash_for_media(
group: &save_dweb_backend::group::Group,
hash: &iroh_blobs::Hash,
) -> AppResult<()> {
let mut last_error = None;
let started = Instant::now();

for attempt in 1..=MEDIA_DOWNLOAD_MAX_ATTEMPTS {
let mut peer_repos = group.list_peer_repos().await;
if peer_repos.is_empty() {
return Err(anyhow::anyhow!("Cannot download hash. No other peers found").into());
}

let peer_count = peer_repos.len();
peer_repos.rotate_left((attempt as usize - 1) % peer_count);

for peer_repo in peer_repos {
let peer_id = peer_repo.id().to_string();

let Some(remaining) = MEDIA_DOWNLOAD_OVERALL_TIMEOUT.checked_sub(started.elapsed())
else {
let detail = format!(
"Timed out downloading hash {hash} after overall {}s media budget",
MEDIA_DOWNLOAD_OVERALL_TIMEOUT.as_secs()
);
log_info!(TAG, "{}", detail);
let last_detail = match last_error {
Some(error) => format!("{detail}; last peer error: {error}"),
None => detail,
};
return Err(anyhow::anyhow!(
"Unable to download hash {} from any peer after {} media attempts; last error: {}",
hash,
attempt.saturating_sub(1),
last_detail
)
.into());
};

log_info!(
TAG,
"Media download attempt {}/{} for hash {} from peer {}",
attempt,
MEDIA_DOWNLOAD_MAX_ATTEMPTS,
hash,
peer_id
);

let timeout_budget = std::cmp::min(MEDIA_DOWNLOAD_PER_PEER_TIMEOUT, remaining);
let download_result = tokio::time::timeout(timeout_budget, async {
let route_id_blob = peer_repo.get_route_id_blob().await?;
group
.iroh_blobs
.download_file_from(route_id_blob, hash)
.await
})
.await;

match download_result {
Ok(Ok(())) => return Ok(()),
Ok(Err(e)) => {
let detail = format!("Unable to download hash {hash} from peer {peer_id}: {e}");
log_info!(TAG, "{}", detail);
last_error = Some(detail);
}
Err(_) => {
let detail = format!(
"Timed out downloading hash {hash} from peer {peer_id} after {}ms",
timeout_budget.as_millis()
);
log_info!(TAG, "{}", detail);
last_error = Some(detail);
}
}
}

if attempt < MEDIA_DOWNLOAD_MAX_ATTEMPTS {
let backoff = MEDIA_DOWNLOAD_INITIAL_BACKOFF * attempt;
if let Some(remaining) = MEDIA_DOWNLOAD_OVERALL_TIMEOUT.checked_sub(started.elapsed()) {
tokio::time::sleep(std::cmp::min(backoff, remaining)).await;
}
}
}

let detail = last_error.unwrap_or_else(|| "no peer attempts completed".to_string());
Err(anyhow::anyhow!(
"Unable to download hash {} from any peer after {} media attempts; last error: {}",
hash,
MEDIA_DOWNLOAD_MAX_ATTEMPTS,
detail
)
.into())
}

pub fn scope() -> Scope {
web::scope("/media")
Expand Down Expand Up @@ -59,7 +161,7 @@ async fn list_files(path: web::Path<GroupRepoPath>) -> AppResult<impl Responder>

let hash = repo.get_hash_from_dht().await?;
if !group.has_hash(&hash).await? {
group.download_hash_from_peers(&hash).await?;
download_hash_for_media(group.as_ref(), &hash).await?;
}

// List files and check if they are downloaded
Expand All @@ -71,7 +173,7 @@ async fn list_files(path: web::Path<GroupRepoPath>) -> AppResult<impl Responder>
Ok(hash) => hash,
Err(_) => continue, // Handle the error or skip the file if there's an issue
};
let is_downloaded = repo.has_hash(&file_hash).await.unwrap_or(false); // Check if the file is downloaded
let is_downloaded = group.has_hash(&file_hash).await.unwrap_or(false); // Check if the file is local
files_with_status.push(json!({
"name": file_name,
"hash": file_hash,
Expand Down Expand Up @@ -100,15 +202,15 @@ async fn download_file(path: web::Path<GroupRepoMediaPath>) -> AppResult<impl Re
if !repo.can_write() {
let collection_hash = repo.get_hash_from_dht().await?;
if !group.has_hash(&collection_hash).await? {
group.download_hash_from_peers(&collection_hash).await?;
download_hash_for_media(group.as_ref(), &collection_hash).await?;
}
}

// Get the file hash
let file_hash = repo.get_file_hash(file_name).await?;

if !repo.can_write() && !group.has_hash(&file_hash).await? {
group.download_hash_from_peers(&file_hash).await?;
if !group.has_hash(&file_hash).await? {
download_hash_for_media(group.as_ref(), &file_hash).await?;
}
// Trigger file download from peers using the hash
let file_data = repo.get_file_stream(file_name).await?;
Expand Down
Loading