diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index f539aa24..074733ff 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -1431,6 +1431,7 @@ async fn tunnel_loop( let inflight_cap = INFLIGHT_ACTIVE; let mut max_inflight = INFLIGHT_OPTIMIST.min(inflight_cap); let mut consecutive_empty = 0u32; + let mut idle_tier = 0u32; let mut consecutive_data: u32 = 0; let mut is_elevated = false; let mut total_download_bytes: u64 = 0; @@ -1615,14 +1616,16 @@ async fn tunnel_loop( if inflight.is_empty() && !eof_seen { let all_legacy = mux.all_servers_legacy(); - // If all servers are legacy and we've had many consecutive - // empties, wait for client data before sending. - if all_legacy && consecutive_empty > 3 && !client_closed { + // Legacy-only fleets: after sustained idle, stop polling and + // wait for client data. Mixed fleets keep polling so + // round-robin can land on a long-poll-capable peer. + if all_legacy && (idle_tier > 4 || consecutive_empty > 3) && !client_closed { read_buf.reserve(65536); match reader.read_buf(&mut read_buf).await { Ok(0) => break, Ok(n) => { consecutive_empty = 0; + idle_tier = 0; let data = extract_bytes(&mut read_buf, n); let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); inflight.push(wrap_reply(meta, reply_rx)); @@ -1632,17 +1635,13 @@ async fn tunnel_loop( } } - // Escalating backoff: avoid flooding empty polls on idle - // sessions. Mirrors the pre-pipelining cadence. - let keepalive_delay = match consecutive_empty { + let keepalive_delay = match idle_tier { 0 => Duration::from_millis(20), 1 => Duration::from_millis(80), - 2 => Duration::from_millis(200), - 3 => Duration::from_millis(500), - _ => Duration::from_secs(2), + 2 => Duration::from_secs(4), + _ => Duration::from_secs(10), }; - if consecutive_empty > 0 { - // Wait for either the backoff timer or client data. + if idle_tier > 0 { if !client_closed { read_buf.reserve(65536); tokio::select! { @@ -1652,6 +1651,7 @@ async fn tunnel_loop( Ok(0) => break, Ok(n) => { consecutive_empty = 0; + idle_tier = 0; let data = extract_bytes(&mut read_buf, n); let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); inflight.push(wrap_reply(meta, reply_rx)); @@ -1745,6 +1745,7 @@ async fn tunnel_loop( next_write_seq += 1; if got_data { consecutive_empty = 0; + idle_tier = 0; consecutive_data = consecutive_data.saturating_add(1); let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0); total_download_bytes += bytes; @@ -1755,6 +1756,7 @@ async fn tunnel_loop( // empty result is expected. } else { consecutive_empty = consecutive_empty.saturating_add(1); + idle_tier = idle_tier.saturating_add(1); consecutive_data = 0; } if is_eof { @@ -1769,6 +1771,7 @@ async fn tunnel_loop( match write_tunnel_response(&mut writer, &buffered_resp).await? { WriteOutcome::Wrote => { consecutive_empty = 0; + idle_tier = 0; consecutive_data = consecutive_data.saturating_add(1); let bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0); total_download_bytes += bytes; @@ -1778,6 +1781,7 @@ async fn tunnel_loop( // Stale empty poll — don't break data streak. } else { consecutive_empty = consecutive_empty.saturating_add(1); + idle_tier = idle_tier.saturating_add(1); consecutive_data = 0; } } @@ -1881,6 +1885,7 @@ async fn tunnel_loop( meta.seq, ); consecutive_empty = consecutive_empty.saturating_add(1); + idle_tier = idle_tier.saturating_add(1); } ReplyOutcome::Dropped => { break;