diff --git a/apisix/utils/batch-processor-manager.lua b/apisix/utils/batch-processor-manager.lua index 5ff594c0568b..3eaac0510f3e 100644 --- a/apisix/utils/batch-processor-manager.lua +++ b/apisix/utils/batch-processor-manager.lua @@ -142,6 +142,7 @@ function _M:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entri retry_delay = conf.retry_delay, buffer_duration = conf.buffer_duration, inactive_timeout = conf.inactive_timeout, + max_buffer_bytes = conf.max_buffer_bytes, route_id = ctx.var.route_id, server_addr = ctx.var.server_addr, } diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua index 4ede38674c46..e459b799b548 100644 --- a/apisix/utils/batch-processor.lua +++ b/apisix/utils/batch-processor.lua @@ -29,6 +29,7 @@ local batch_processor_mt = { local execute_func local create_buffer_timer local batch_metrics +local batch_dropped_metrics local prometheus if ngx.config.subsystem == "http" then prometheus = require("apisix.plugins.prometheus.exporter") @@ -44,6 +45,15 @@ local schema = { buffer_duration = {type = "integer", minimum = 1, default= 60}, inactive_timeout = {type = "integer", minimum = 1, default= 5}, batch_max_size = {type = "integer", minimum = 1, default= 1000}, + -- Upper bound, in bytes, on the data buffered by this processor + -- (entries waiting in the buffer plus those still in-flight to the + -- sink). When exceeded, new entries are dropped with a warning instead + -- of being buffered. 0 (default) disables the check, preserving the + -- previous count-only behaviour. This caps memory when entries are + -- large (e.g. logging big response bodies) and the sink cannot keep up, + -- which otherwise lets the in-flight backlog grow without bound. See + -- apache/apisix#11244. + max_buffer_bytes = {type = "integer", minimum = 0, default = 0}, } } batch_processor.schema = schema @@ -76,6 +86,24 @@ local function set_metrics(self, count) end +-- count an entry dropped because max_buffer_bytes was exceeded, so operators +-- can alert on log loss. Registered lazily (same pattern as batch_metrics) and +-- recorded even without route_id/server_addr (e.g. global rules) via fallbacks. +local function incr_dropped_metric(self) + if not (prometheus and prometheus.get_prometheus()) then + return + end + if not batch_dropped_metrics then + batch_dropped_metrics = prometheus.get_prometheus():counter( + "batch_process_dropped_entries", + "dropped entries because max_buffer_bytes was exceeded", + {"name", "route_id", "server_addr"}) + end + batch_dropped_metrics:inc(1, + {self.name or "", self.route_id or "", self.server_addr or ""}) +end + + local function slice_batch(batch, n) local slice = {} local idx = 1 @@ -87,6 +115,41 @@ local function slice_batch(batch, n) end +-- Approximate the in-memory footprint of an entry by summing the byte length +-- of its string leaves (the dominant cost for log entries is the request / +-- response body strings). Numbers/booleans are counted as a small fixed size +-- and the walk is depth-bounded to stay cheap and cycle-safe. +local function estimate_entry_bytes(v, depth) + local t = type(v) + if t == "string" then + return #v + elseif t == "number" then + return 8 + elseif t == "boolean" then + return 1 + elseif t == "table" and (depth or 0) < 8 then + local n = 0 + for k, val in pairs(v) do + n = n + estimate_entry_bytes(k, (depth or 0) + 1) + + estimate_entry_bytes(val, (depth or 0) + 1) + end + return n + end + return 0 +end + + +-- Release the bytes accounted for a batch once it leaves the buffer/in-flight +-- accounting (processed successfully or dropped). Clamped at 0 so accounting +-- drift can never make the budget reject forever. +local function release_bytes(self, bytes) + self.buffer_bytes = self.buffer_bytes - (bytes or 0) + if self.buffer_bytes < 0 then + self.buffer_bytes = 0 + end +end + + function execute_func(premature, self, batch) -- In case of "err" and a valid "first_fail" batch processor considers, all first_fail-1 -- entries have been successfully consumed and hence reschedule the job for entries with @@ -98,6 +161,14 @@ function execute_func(premature, self, batch) #batch.entries + 1 - first_fail, "/", #batch.entries ,"]: ", err) batch.entries = slice_batch(batch.entries, first_fail) self.processed_entries = self.processed_entries + first_fail - 1 + -- the successfully consumed prefix is gone; re-account the bytes + -- still held by the remaining (to-be-retried) entries. + local remaining = 0 + for _, e in ipairs(batch.entries) do + remaining = remaining + estimate_entry_bytes(e) + end + release_bytes(self, (batch.bytes or 0) - remaining) + batch.bytes = remaining else core.log.error("Batch Processor[", self.name, "] failed to process entries: ", err) @@ -105,10 +176,12 @@ function execute_func(premature, self, batch) batch.retry_count = batch.retry_count + 1 if batch.retry_count <= self.max_retry_count and #batch.entries > 0 then + -- still in-flight: its bytes stay accounted until it terminates schedule_func_exec(self, self.retry_delay, batch) else self.processed_entries = self.processed_entries + #batch.entries + release_bytes(self, batch.bytes) core.log.error("Batch Processor[", self.name,"] exceeded ", "the max_retry_count[", batch.retry_count, "] dropping the entries") @@ -116,6 +189,7 @@ function execute_func(premature, self, batch) return end self.processed_entries = self.processed_entries + #batch.entries + release_bytes(self, batch.bytes) core.log.debug("Batch Processor[", self.name, "] successfully processed the entries") end @@ -175,23 +249,46 @@ function batch_processor:new(func, config) batch_max_size = config.batch_max_size, retry_delay = config.retry_delay, name = config.name, + max_buffer_bytes = config.max_buffer_bytes, batch_to_process = {}, - entry_buffer = {entries = {}, retry_count = 0}, + entry_buffer = {entries = {}, retry_count = 0, bytes = 0}, is_timer_running = false, first_entry_t = 0, last_entry_t = 0, route_id = config.route_id, server_addr = config.server_addr, - processed_entries = 0 + processed_entries = 0, + -- bytes currently held by this processor: buffered entries plus those + -- still in-flight to the sink. Used to enforce max_buffer_bytes. + buffer_bytes = 0, + dropped_entries = 0, } return setmetatable(processor, batch_processor_mt) end function batch_processor:push(entry) + -- enforce the byte budget before buffering: when the data already held + -- (buffered + in-flight) plus this entry would exceed max_buffer_bytes, + -- drop the entry instead of letting the backlog grow without bound. + local entry_bytes + if self.max_buffer_bytes and self.max_buffer_bytes > 0 then + entry_bytes = estimate_entry_bytes(entry) + if self.buffer_bytes + entry_bytes > self.max_buffer_bytes then + self.dropped_entries = self.dropped_entries + 1 + incr_dropped_metric(self) + core.log.error("Batch Processor[", self.name, "] max_buffer_bytes[", + self.max_buffer_bytes, "] exceeded (held ", self.buffer_bytes, + " + entry ", entry_bytes, "), dropping entry; total dropped: ", + self.dropped_entries) + return + end + end + -- if the batch size is one then immediately send for processing if self.batch_max_size == 1 then - local batch = {entries = {entry}, retry_count = 0} + self.buffer_bytes = self.buffer_bytes + (entry_bytes or 0) + local batch = {entries = {entry}, retry_count = 0, bytes = entry_bytes or 0} schedule_func_exec(self, 0, batch) return end @@ -205,6 +302,8 @@ function batch_processor:push(entry) local entries = self.entry_buffer.entries table.insert(entries, entry) + self.buffer_bytes = self.buffer_bytes + (entry_bytes or 0) + self.entry_buffer.bytes = self.entry_buffer.bytes + (entry_bytes or 0) set_metrics(self, #entries) if #entries == 1 then @@ -230,7 +329,7 @@ function batch_processor:process_buffer() core.log.debug("transferring buffer entries to processing pipe line, ", "buffercount[", #self.entry_buffer.entries ,"]") self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer - self.entry_buffer = {entries = {}, retry_count = 0} + self.entry_buffer = {entries = {}, retry_count = 0, bytes = 0} set_metrics(self, 0) end diff --git a/t/utils/batch-processor.t b/t/utils/batch-processor.t index 5792329cfeb5..fab8dff87b34 100644 --- a/t/utils/batch-processor.t +++ b/t/utils/batch-processor.t @@ -531,3 +531,77 @@ GET /t --- response_body done --- wait: 2 + + + +=== TEST 14: max_buffer_bytes caps buffered data and drops excess entries +# Regression for apache/apisix#11244: with large entries the buffer must be +# bounded by bytes, not only by entry count. A large batch_max_size keeps the +# entries in the buffer (no flush), so the byte budget alone decides admission. +--- config + location /t { + content_by_lua_block { + local Batch = require("apisix.utils.batch-processor") + local func_to_send = function() return true end + local config = { + max_retry_count = 0, + batch_max_size = 1000, -- large, so entries stay buffered + buffer_duration = 60, + inactive_timeout = 60, + retry_delay = 0, + max_buffer_bytes = 1000, -- ~1 entry of 604 bytes fits + } + local log_buffer, err = Batch:new(func_to_send, config) + if not log_buffer then ngx.say(err); return end + + -- each entry ~= #"data"(4) + 600 = 604 bytes + for i = 1, 5 do + log_buffer:push({ data = string.rep("x", 600) }) + end + + ngx.say("buffered=", #log_buffer.entry_buffer.entries) + ngx.say("dropped=", log_buffer.dropped_entries) + ngx.say("within_budget=", tostring(log_buffer.buffer_bytes <= 1000)) + } + } +--- request +GET /t +--- response_body +buffered=1 +dropped=4 +within_budget=true +--- wait: 0.5 + + + +=== TEST 15: max_buffer_bytes unset (default 0) preserves prior unbounded behavior +--- config + location /t { + content_by_lua_block { + local Batch = require("apisix.utils.batch-processor") + local func_to_send = function() return true end + local config = { + max_retry_count = 0, + batch_max_size = 1000, + buffer_duration = 60, + inactive_timeout = 60, + retry_delay = 0, + -- max_buffer_bytes omitted -> default 0 -> disabled + } + local log_buffer, err = Batch:new(func_to_send, config) + if not log_buffer then ngx.say(err); return end + + for i = 1, 5 do + log_buffer:push({ data = string.rep("x", 600) }) + end + + ngx.say("buffered=", #log_buffer.entry_buffer.entries) + ngx.say("dropped=", log_buffer.dropped_entries) + } + } +--- request +GET /t +--- response_body +buffered=5 +dropped=0 +--- wait: 0.5