Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rand = "0.8"
h2 = "0.4"
http = "1"
flate2 = "1"
zstd = "0.13"
directories = "5"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
# 64-bit atomics on 32-bit MIPS/ARMv5 targets. Rust's std AtomicU64 is
Expand Down
40 changes: 34 additions & 6 deletions assets/apps_script/CodeFull.gs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,17 @@ function _doTunnel(req) {
// On a 5-DNS-query batch, this collapses 5 serial cache.get round trips
// into one cache.getAll round trip.
function _doTunnelBatch(req) {
// Compressed batch: forward opaquely, skip edge-DNS inspection.
if (req.zops) {
return _doTunnelBatchForwardCompressed(req.zops);
}

var ops = (req && req.ops) || [];
var zc = req && req.zc;

// Feature off: byte-identical to the pre-feature behavior.
if (!ENABLE_EDGE_DNS_CACHE) {
return _doTunnelBatchForward(ops);
return _doTunnelBatchForward(ops, zc);
}

var results = new Array(ops.length); // sparse: filled by edge-DNS hits
Expand Down Expand Up @@ -272,10 +278,11 @@ function _doTunnelBatch(req) {

// Nothing was served locally — forward verbatim, no splice needed.
if (forwardOps.length === ops.length) {
return _doTunnelBatchForward(ops);
return _doTunnelBatchForward(ops, zc);
}

// Partial: forward the un-served ops and splice results back in place.
// Don't pass zc here — Apps Script needs to parse r[] for splicing.
var resp = _doTunnelBatchFetch(forwardOps);
if (resp.error) return _json({ e: resp.error });
if (resp.r.length !== forwardOps.length) {
Expand All @@ -287,11 +294,30 @@ function _doTunnelBatch(req) {
}

// Verbatim forward: no splice, response passed through unchanged.
function _doTunnelBatchForward(ops) {
function _doTunnelBatchForward(ops, zc) {
var body = { k: TUNNEL_AUTH_KEY, ops: ops };
if (zc) body.zc = zc;
var resp = UrlFetchApp.fetch(TUNNEL_SERVER_URL + "/tunnel/batch", {
method: "post",
contentType: "application/json",
payload: JSON.stringify(body),
muteHttpExceptions: true,
followRedirects: true,
});
if (resp.getResponseCode() !== 200) {
return _json({ e: "tunnel batch HTTP " + resp.getResponseCode() });
}
return ContentService.createTextOutput(resp.getContentText())
.setMimeType(ContentService.MimeType.JSON);
}

// Compressed forward: zops is an opaque blob, passed to tunnel-node as-is.
// Response is also opaque (may contain zr instead of r).
function _doTunnelBatchForwardCompressed(zops) {
var resp = UrlFetchApp.fetch(TUNNEL_SERVER_URL + "/tunnel/batch", {
method: "post",
contentType: "application/json",
payload: JSON.stringify({ k: TUNNEL_AUTH_KEY, ops: ops }),
payload: JSON.stringify({ k: TUNNEL_AUTH_KEY, zops: zops }),
muteHttpExceptions: true,
followRedirects: true,
});
Expand All @@ -304,11 +330,13 @@ function _doTunnelBatchForward(ops) {

// Forward + parse for the splice path. Returns { r:[...] } on success or
// { error: "..." } on any failure.
function _doTunnelBatchFetch(ops) {
function _doTunnelBatchFetch(ops, zc) {
var body = { k: TUNNEL_AUTH_KEY, ops: ops };
if (zc) body.zc = zc;
var resp = UrlFetchApp.fetch(TUNNEL_SERVER_URL + "/tunnel/batch", {
method: "post",
contentType: "application/json",
payload: JSON.stringify({ k: TUNNEL_AUTH_KEY, ops: ops }),
payload: JSON.stringify(body),
muteHttpExceptions: true,
followRedirects: true,
});
Expand Down
43 changes: 40 additions & 3 deletions src/domain_fronter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ pub struct DomainFronter {
/// payloads. Mirrors `Config::disable_padding` (#391). Default false
/// (padding active = stronger DPI defense at +25% bandwidth cost).
disable_padding: bool,
zstd_enabled: Arc<AtomicBool>,
/// Per-instance auto-blacklist tuning. Mirrors `Config::auto_blacklist_*`
/// (#391, #444). Cached here so the hot path in `record_timeout_strike`
/// doesn't have to reach back through the Config (which we don't keep
Expand Down Expand Up @@ -543,6 +544,10 @@ pub struct BatchTunnelResponse {
pub r: Vec<TunnelResponse>,
#[serde(default)]
pub e: Option<String>,
#[serde(default)]
pub zr: Option<String>,
#[serde(default)]
pub zc: Option<u8>,
}

impl DomainFronter {
Expand Down Expand Up @@ -626,6 +631,7 @@ impl DomainFronter {
today_bytes: AtomicU64::new(0),
today_key: std::sync::Mutex::new(current_pt_day_key()),
disable_padding: config.disable_padding,
zstd_enabled: Arc::new(AtomicBool::new(false)),
auto_blacklist_strikes: config.auto_blacklist_strikes.max(1),
auto_blacklist_window: Duration::from_secs(
config.auto_blacklist_window_secs.clamp(1, 3600),
Expand Down Expand Up @@ -3105,7 +3111,20 @@ impl DomainFronter {
let mut map = serde_json::Map::new();
map.insert("k".into(), Value::String(self.auth_key.clone()));
map.insert("t".into(), Value::String("batch".into()));
map.insert("ops".into(), serde_json::to_value(ops)?);
if self.zstd_enabled.load(Ordering::Relaxed) {
let ops_json = serde_json::to_vec(ops)?;
match zstd::encode_all(ops_json.as_slice(), 3) {
Ok(compressed) => {
map.insert("zops".into(), Value::String(B64.encode(&compressed)));
}
Err(_) => {
map.insert("ops".into(), serde_json::to_value(ops)?);
}
}
} else {
map.insert("ops".into(), serde_json::to_value(ops)?);
}
map.insert("zc".into(), Value::Number(1.into()));
if !self.disable_padding {
add_random_pad(&mut map);
}
Expand Down Expand Up @@ -3238,8 +3257,26 @@ impl DomainFronter {
"batch response body (trace only): {}",
&json_str[..json_str.len().min(500)]
);
match serde_json::from_str(json_str) {
Ok(v) => Ok(v),
match serde_json::from_str::<BatchTunnelResponse>(json_str) {
Ok(mut resp) => {
if let Some(zr_b64) = resp.zr.take() {
match B64.decode(&zr_b64) {
Ok(compressed) => match zstd::decode_all(compressed.as_slice()) {
Ok(decompressed) => match serde_json::from_slice(&decompressed) {
Ok(r) => { resp.r = r; }
Err(e) => tracing::error!("zr json parse failed: {}", e),
},
Err(e) => tracing::error!("zr zstd decompress failed: {}", e),
},
Err(e) => tracing::error!("zr base64 decode failed: {}", e),
}
}
if resp.zc.is_some() && !self.zstd_enabled.load(Ordering::Relaxed) {
tracing::info!("tunnel-node supports zstd, enabling compressed batches");
self.zstd_enabled.store(true, Ordering::Relaxed);
}
Ok(resp)
}
Err(e) => {
// Same redaction policy on the error path. Length and
// the serde error message are enough to locate the
Expand Down
2 changes: 1 addition & 1 deletion src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const INFLIGHT_OPTIMIST: usize = 2;

/// Maximum pipeline depth when data is actively flowing. Ramps up on
/// data-bearing replies, drops back to IDLE after consecutive empties.
const INFLIGHT_ACTIVE: usize = 4;
const INFLIGHT_ACTIVE: usize = 6;

/// How many consecutive empty replies before dropping from active to idle depth.
const INFLIGHT_COOLDOWN: u32 = 3;
Expand Down
91 changes: 89 additions & 2 deletions tunnel-node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading