diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index f539aa24..be671b4e 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -1431,6 +1431,7 @@ async fn tunnel_loop( let inflight_cap = INFLIGHT_ACTIVE; let mut max_inflight = INFLIGHT_OPTIMIST.min(inflight_cap); let mut consecutive_empty = 0u32; + let mut idle_tier = 0u32; let mut consecutive_data: u32 = 0; let mut is_elevated = false; let mut total_download_bytes: u64 = 0; @@ -1615,14 +1616,19 @@ async fn tunnel_loop( if inflight.is_empty() && !eof_seen { let all_legacy = mux.all_servers_legacy(); - // If all servers are legacy and we've had many consecutive - // empties, wait for client data before sending. - if all_legacy && consecutive_empty > 3 && !client_closed { + // If every deployment is legacy and the session has gone + // idle, stop polling and just wait for client data. Apps + // maintain their own heartbeats (MQTT PINGREQ, FCM keepalive, + // etc.) which trigger client writes that send data ops — those + // act as natural polls. Mixed fleets must keep polling so + // round-robin can still land on a long-poll-capable peer. + if all_legacy && (idle_tier > 1 || consecutive_empty > 3) && !client_closed { read_buf.reserve(65536); match reader.read_buf(&mut read_buf).await { Ok(0) => break, Ok(n) => { consecutive_empty = 0; + idle_tier = 0; let data = extract_bytes(&mut read_buf, n); let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); inflight.push(wrap_reply(meta, reply_rx)); @@ -1632,17 +1638,14 @@ async fn tunnel_loop( } } - // Escalating backoff: avoid flooding empty polls on idle - // sessions. Mirrors the pre-pipelining cadence. - let keepalive_delay = match consecutive_empty { + // Early backoff: first few empties still poll with delay. + let keepalive_delay = match idle_tier { 0 => Duration::from_millis(20), 1 => Duration::from_millis(80), - 2 => Duration::from_millis(200), - 3 => Duration::from_millis(500), - _ => Duration::from_secs(2), + 2 => Duration::from_secs(4), + _ => Duration::from_secs(10), }; - if consecutive_empty > 0 { - // Wait for either the backoff timer or client data. + if idle_tier > 0 { if !client_closed { read_buf.reserve(65536); tokio::select! { @@ -1652,6 +1655,7 @@ async fn tunnel_loop( Ok(0) => break, Ok(n) => { consecutive_empty = 0; + idle_tier = 0; let data = extract_bytes(&mut read_buf, n); let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); inflight.push(wrap_reply(meta, reply_rx)); @@ -1744,9 +1748,15 @@ async fn tunnel_loop( }; next_write_seq += 1; if got_data { - consecutive_empty = 0; - consecutive_data = consecutive_data.saturating_add(1); let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0); + if bytes >= 1024 { + consecutive_empty = 0; + idle_tier = idle_tier / 2; + } else { + // Small response (heartbeat ACK) — don't reset idle state. + idle_tier = idle_tier.saturating_sub(1); + } + consecutive_data = consecutive_data.saturating_add(1); total_download_bytes += bytes; } else if meta.was_empty_poll && consecutive_data > 0 { // Stale empty-poll reply during an active data @@ -1755,6 +1765,7 @@ async fn tunnel_loop( // empty result is expected. } else { consecutive_empty = consecutive_empty.saturating_add(1); + idle_tier = idle_tier.saturating_add(1); consecutive_data = 0; } if is_eof { @@ -1768,7 +1779,13 @@ async fn tunnel_loop( let buf_eof = buffered_resp.eof.unwrap_or(false); match write_tunnel_response(&mut writer, &buffered_resp).await? { WriteOutcome::Wrote => { - consecutive_empty = 0; + let buf_bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0); + if buf_bytes >= 1024 { + consecutive_empty = 0; + idle_tier = idle_tier / 2; + } else { + idle_tier = idle_tier.saturating_sub(1); + } consecutive_data = consecutive_data.saturating_add(1); let bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0); total_download_bytes += bytes; @@ -1778,6 +1795,7 @@ async fn tunnel_loop( // Stale empty poll — don't break data streak. } else { consecutive_empty = consecutive_empty.saturating_add(1); + idle_tier = idle_tier.saturating_add(1); consecutive_data = 0; } } @@ -1881,6 +1899,7 @@ async fn tunnel_loop( meta.seq, ); consecutive_empty = consecutive_empty.saturating_add(1); + idle_tier = idle_tier.saturating_add(1); } ReplyOutcome::Dropped => { break;