Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ async fn tunnel_loop(
let inflight_cap = INFLIGHT_ACTIVE;
let mut max_inflight = INFLIGHT_OPTIMIST.min(inflight_cap);
let mut consecutive_empty = 0u32;
let mut idle_tier = 0u32;
let mut consecutive_data: u32 = 0;
let mut is_elevated = false;
let mut total_download_bytes: u64 = 0;
Expand Down Expand Up @@ -1615,14 +1616,19 @@ async fn tunnel_loop(
if inflight.is_empty() && !eof_seen {
let all_legacy = mux.all_servers_legacy();

// If all servers are legacy and we've had many consecutive
// empties, wait for client data before sending.
if all_legacy && consecutive_empty > 3 && !client_closed {
// If every deployment is legacy and the session has gone
// idle, stop polling and just wait for client data. Apps
// maintain their own heartbeats (MQTT PINGREQ, FCM keepalive,
// etc.) which trigger client writes that send data ops — those
// act as natural polls. Mixed fleets must keep polling so
// round-robin can still land on a long-poll-capable peer.
if all_legacy && (idle_tier > 1 || consecutive_empty > 3) && !client_closed {
read_buf.reserve(65536);
match reader.read_buf(&mut read_buf).await {
Ok(0) => break,
Ok(n) => {
consecutive_empty = 0;
idle_tier = 0;
let data = extract_bytes(&mut read_buf, n);
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
inflight.push(wrap_reply(meta, reply_rx));
Expand All @@ -1632,17 +1638,14 @@ async fn tunnel_loop(
}
}

// Escalating backoff: avoid flooding empty polls on idle
// sessions. Mirrors the pre-pipelining cadence.
let keepalive_delay = match consecutive_empty {
// Early backoff: first few empties still poll with delay.
let keepalive_delay = match idle_tier {
0 => Duration::from_millis(20),
1 => Duration::from_millis(80),
2 => Duration::from_millis(200),
3 => Duration::from_millis(500),
_ => Duration::from_secs(2),
2 => Duration::from_secs(4),
_ => Duration::from_secs(10),
};
if consecutive_empty > 0 {
// Wait for either the backoff timer or client data.
if idle_tier > 0 {
if !client_closed {
read_buf.reserve(65536);
tokio::select! {
Expand All @@ -1652,6 +1655,7 @@ async fn tunnel_loop(
Ok(0) => break,
Ok(n) => {
consecutive_empty = 0;
idle_tier = 0;
let data = extract_bytes(&mut read_buf, n);
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
inflight.push(wrap_reply(meta, reply_rx));
Expand Down Expand Up @@ -1744,9 +1748,15 @@ async fn tunnel_loop(
};
next_write_seq += 1;
if got_data {
consecutive_empty = 0;
consecutive_data = consecutive_data.saturating_add(1);
let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
if bytes >= 1024 {
consecutive_empty = 0;
idle_tier = idle_tier / 2;
} else {
// Small response (heartbeat ACK) — don't reset idle state.
idle_tier = idle_tier.saturating_sub(1);
}
consecutive_data = consecutive_data.saturating_add(1);
total_download_bytes += bytes;
} else if meta.was_empty_poll && consecutive_data > 0 {
// Stale empty-poll reply during an active data
Expand All @@ -1755,6 +1765,7 @@ async fn tunnel_loop(
// empty result is expected.
} else {
consecutive_empty = consecutive_empty.saturating_add(1);
idle_tier = idle_tier.saturating_add(1);
consecutive_data = 0;
}
if is_eof {
Expand All @@ -1768,7 +1779,13 @@ async fn tunnel_loop(
let buf_eof = buffered_resp.eof.unwrap_or(false);
match write_tunnel_response(&mut writer, &buffered_resp).await? {
WriteOutcome::Wrote => {
consecutive_empty = 0;
let buf_bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
if buf_bytes >= 1024 {
consecutive_empty = 0;
idle_tier = idle_tier / 2;
} else {
idle_tier = idle_tier.saturating_sub(1);
}
consecutive_data = consecutive_data.saturating_add(1);
let bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
total_download_bytes += bytes;
Expand All @@ -1778,6 +1795,7 @@ async fn tunnel_loop(
// Stale empty poll — don't break data streak.
} else {
consecutive_empty = consecutive_empty.saturating_add(1);
idle_tier = idle_tier.saturating_add(1);
consecutive_data = 0;
}
}
Expand Down Expand Up @@ -1881,6 +1899,7 @@ async fn tunnel_loop(
meta.seq,
);
consecutive_empty = consecutive_empty.saturating_add(1);
idle_tier = idle_tier.saturating_add(1);
}
ReplyOutcome::Dropped => {
break;
Expand Down
Loading