Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ async fn tunnel_loop(
let inflight_cap = INFLIGHT_ACTIVE;
let mut max_inflight = INFLIGHT_OPTIMIST.min(inflight_cap);
let mut consecutive_empty = 0u32;
let mut idle_tier = 0u32;
let mut consecutive_data: u32 = 0;
let mut is_elevated = false;
let mut total_download_bytes: u64 = 0;
Expand Down Expand Up @@ -1615,14 +1616,16 @@ async fn tunnel_loop(
if inflight.is_empty() && !eof_seen {
let all_legacy = mux.all_servers_legacy();

// If all servers are legacy and we've had many consecutive
// empties, wait for client data before sending.
if all_legacy && consecutive_empty > 3 && !client_closed {
// Legacy-only fleets: after sustained idle, stop polling and
// wait for client data. Mixed fleets keep polling so
// round-robin can land on a long-poll-capable peer.
if all_legacy && (idle_tier > 4 || consecutive_empty > 3) && !client_closed {
read_buf.reserve(65536);
match reader.read_buf(&mut read_buf).await {
Ok(0) => break,
Ok(n) => {
consecutive_empty = 0;
idle_tier = 0;
let data = extract_bytes(&mut read_buf, n);
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
inflight.push(wrap_reply(meta, reply_rx));
Expand All @@ -1632,17 +1635,13 @@ async fn tunnel_loop(
}
}

// Escalating backoff: avoid flooding empty polls on idle
// sessions. Mirrors the pre-pipelining cadence.
let keepalive_delay = match consecutive_empty {
let keepalive_delay = match idle_tier {
0 => Duration::from_millis(20),
1 => Duration::from_millis(80),
2 => Duration::from_millis(200),
3 => Duration::from_millis(500),
_ => Duration::from_secs(2),
2 => Duration::from_secs(4),
_ => Duration::from_secs(10),
};
if consecutive_empty > 0 {
// Wait for either the backoff timer or client data.
if idle_tier > 0 {
if !client_closed {
read_buf.reserve(65536);
tokio::select! {
Expand All @@ -1652,6 +1651,7 @@ async fn tunnel_loop(
Ok(0) => break,
Ok(n) => {
consecutive_empty = 0;
idle_tier = 0;
let data = extract_bytes(&mut read_buf, n);
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
inflight.push(wrap_reply(meta, reply_rx));
Expand Down Expand Up @@ -1745,6 +1745,7 @@ async fn tunnel_loop(
next_write_seq += 1;
if got_data {
consecutive_empty = 0;
idle_tier = 0;
consecutive_data = consecutive_data.saturating_add(1);
let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
total_download_bytes += bytes;
Expand All @@ -1755,6 +1756,7 @@ async fn tunnel_loop(
// empty result is expected.
} else {
consecutive_empty = consecutive_empty.saturating_add(1);
idle_tier = idle_tier.saturating_add(1);
consecutive_data = 0;
}
if is_eof {
Expand All @@ -1769,6 +1771,7 @@ async fn tunnel_loop(
match write_tunnel_response(&mut writer, &buffered_resp).await? {
WriteOutcome::Wrote => {
consecutive_empty = 0;
idle_tier = 0;
consecutive_data = consecutive_data.saturating_add(1);
let bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
total_download_bytes += bytes;
Expand All @@ -1778,6 +1781,7 @@ async fn tunnel_loop(
// Stale empty poll — don't break data streak.
} else {
consecutive_empty = consecutive_empty.saturating_add(1);
idle_tier = idle_tier.saturating_add(1);
consecutive_data = 0;
}
}
Expand Down Expand Up @@ -1881,6 +1885,7 @@ async fn tunnel_loop(
meta.seq,
);
consecutive_empty = consecutive_empty.saturating_add(1);
idle_tier = idle_tier.saturating_add(1);
}
ReplyOutcome::Dropped => {
break;
Expand Down
Loading