Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apisix/utils/batch-processor-manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ function _M:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entri
retry_delay = conf.retry_delay,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
max_buffer_bytes = conf.max_buffer_bytes,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}
Expand Down
107 changes: 103 additions & 4 deletions apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ local batch_processor_mt = {
local execute_func
local create_buffer_timer
local batch_metrics
local batch_dropped_metrics
local prometheus
if ngx.config.subsystem == "http" then
prometheus = require("apisix.plugins.prometheus.exporter")
Expand All @@ -44,6 +45,15 @@ local schema = {
buffer_duration = {type = "integer", minimum = 1, default= 60},
inactive_timeout = {type = "integer", minimum = 1, default= 5},
batch_max_size = {type = "integer", minimum = 1, default= 1000},
-- Upper bound, in bytes, on the data buffered by this processor
-- (entries waiting in the buffer plus those still in-flight to the
-- sink). When exceeded, new entries are dropped with a warning instead
-- of being buffered. 0 (default) disables the check, preserving the
-- previous count-only behaviour. This caps memory when entries are
-- large (e.g. logging big response bodies) and the sink cannot keep up,
-- which otherwise lets the in-flight backlog grow without bound. See
-- apache/apisix#11244.
max_buffer_bytes = {type = "integer", minimum = 0, default = 0},
}
}
batch_processor.schema = schema
Expand Down Expand Up @@ -76,6 +86,24 @@ local function set_metrics(self, count)
end


-- count an entry dropped because max_buffer_bytes was exceeded, so operators
-- can alert on log loss. Registered lazily (same pattern as batch_metrics) and
-- recorded even without route_id/server_addr (e.g. global rules) via fallbacks.
local function incr_dropped_metric(self)
if not (prometheus and prometheus.get_prometheus()) then
return
end
if not batch_dropped_metrics then
batch_dropped_metrics = prometheus.get_prometheus():counter(
"batch_process_dropped_entries",
"dropped entries because max_buffer_bytes was exceeded",
{"name", "route_id", "server_addr"})
end
batch_dropped_metrics:inc(1,
{self.name or "", self.route_id or "", self.server_addr or ""})
end


local function slice_batch(batch, n)
local slice = {}
local idx = 1
Expand All @@ -87,6 +115,41 @@ local function slice_batch(batch, n)
end


-- Approximate the in-memory footprint of an entry by summing the byte length
-- of its string leaves (the dominant cost for log entries is the request /
-- response body strings). Numbers/booleans are counted as a small fixed size
-- and the walk is depth-bounded to stay cheap and cycle-safe.
local function estimate_entry_bytes(v, depth)
local t = type(v)
if t == "string" then
return #v
elseif t == "number" then
return 8
elseif t == "boolean" then
return 1
elseif t == "table" and (depth or 0) < 8 then
local n = 0
for k, val in pairs(v) do
n = n + estimate_entry_bytes(k, (depth or 0) + 1)
+ estimate_entry_bytes(val, (depth or 0) + 1)
end
return n
end
return 0
end


-- Release the bytes accounted for a batch once it leaves the buffer/in-flight
-- accounting (processed successfully or dropped). Clamped at 0 so accounting
-- drift can never make the budget reject forever.
local function release_bytes(self, bytes)
self.buffer_bytes = self.buffer_bytes - (bytes or 0)
if self.buffer_bytes < 0 then
self.buffer_bytes = 0
end
end


function execute_func(premature, self, batch)
-- In case of "err" and a valid "first_fail" batch processor considers, all first_fail-1
-- entries have been successfully consumed and hence reschedule the job for entries with
Expand All @@ -98,24 +161,35 @@ function execute_func(premature, self, batch)
#batch.entries + 1 - first_fail, "/", #batch.entries ,"]: ", err)
batch.entries = slice_batch(batch.entries, first_fail)
self.processed_entries = self.processed_entries + first_fail - 1
-- the successfully consumed prefix is gone; re-account the bytes
-- still held by the remaining (to-be-retried) entries.
local remaining = 0
for _, e in ipairs(batch.entries) do
remaining = remaining + estimate_entry_bytes(e)
end
release_bytes(self, (batch.bytes or 0) - remaining)
batch.bytes = remaining
else
core.log.error("Batch Processor[", self.name,
"] failed to process entries: ", err)
end

batch.retry_count = batch.retry_count + 1
if batch.retry_count <= self.max_retry_count and #batch.entries > 0 then
-- still in-flight: its bytes stay accounted until it terminates
schedule_func_exec(self, self.retry_delay,
batch)
else
self.processed_entries = self.processed_entries + #batch.entries
release_bytes(self, batch.bytes)
core.log.error("Batch Processor[", self.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,
"] dropping the entries")
end
return
end
self.processed_entries = self.processed_entries + #batch.entries
release_bytes(self, batch.bytes)
core.log.debug("Batch Processor[", self.name,
"] successfully processed the entries")
end
Expand Down Expand Up @@ -175,23 +249,46 @@ function batch_processor:new(func, config)
batch_max_size = config.batch_max_size,
retry_delay = config.retry_delay,
name = config.name,
max_buffer_bytes = config.max_buffer_bytes,
batch_to_process = {},
entry_buffer = {entries = {}, retry_count = 0},
entry_buffer = {entries = {}, retry_count = 0, bytes = 0},
is_timer_running = false,
first_entry_t = 0,
last_entry_t = 0,
route_id = config.route_id,
server_addr = config.server_addr,
processed_entries = 0
processed_entries = 0,
-- bytes currently held by this processor: buffered entries plus those
-- still in-flight to the sink. Used to enforce max_buffer_bytes.
buffer_bytes = 0,
dropped_entries = 0,
}

return setmetatable(processor, batch_processor_mt)
end

function batch_processor:push(entry)
-- enforce the byte budget before buffering: when the data already held
-- (buffered + in-flight) plus this entry would exceed max_buffer_bytes,
-- drop the entry instead of letting the backlog grow without bound.
local entry_bytes
if self.max_buffer_bytes and self.max_buffer_bytes > 0 then
entry_bytes = estimate_entry_bytes(entry)
if self.buffer_bytes + entry_bytes > self.max_buffer_bytes then
self.dropped_entries = self.dropped_entries + 1
incr_dropped_metric(self)
core.log.error("Batch Processor[", self.name, "] max_buffer_bytes[",
self.max_buffer_bytes, "] exceeded (held ", self.buffer_bytes,
" + entry ", entry_bytes, "), dropping entry; total dropped: ",
self.dropped_entries)
return
end
end

-- if the batch size is one then immediately send for processing
if self.batch_max_size == 1 then
local batch = {entries = {entry}, retry_count = 0}
self.buffer_bytes = self.buffer_bytes + (entry_bytes or 0)
local batch = {entries = {entry}, retry_count = 0, bytes = entry_bytes or 0}
schedule_func_exec(self, 0, batch)
return
end
Expand All @@ -205,6 +302,8 @@ function batch_processor:push(entry)

local entries = self.entry_buffer.entries
table.insert(entries, entry)
self.buffer_bytes = self.buffer_bytes + (entry_bytes or 0)
self.entry_buffer.bytes = self.entry_buffer.bytes + (entry_bytes or 0)
set_metrics(self, #entries)

if #entries == 1 then
Expand All @@ -230,7 +329,7 @@ function batch_processor:process_buffer()
core.log.debug("transferring buffer entries to processing pipe line, ",
"buffercount[", #self.entry_buffer.entries ,"]")
self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
self.entry_buffer = {entries = {}, retry_count = 0}
self.entry_buffer = {entries = {}, retry_count = 0, bytes = 0}
set_metrics(self, 0)
end

Expand Down
74 changes: 74 additions & 0 deletions t/utils/batch-processor.t
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,77 @@ GET /t
--- response_body
done
--- wait: 2



=== TEST 14: max_buffer_bytes caps buffered data and drops excess entries
# Regression for apache/apisix#11244: with large entries the buffer must be
# bounded by bytes, not only by entry count. A large batch_max_size keeps the
# entries in the buffer (no flush), so the byte budget alone decides admission.
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local func_to_send = function() return true end
local config = {
max_retry_count = 0,
batch_max_size = 1000, -- large, so entries stay buffered
buffer_duration = 60,
inactive_timeout = 60,
retry_delay = 0,
max_buffer_bytes = 1000, -- ~1 entry of 604 bytes fits
}
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then ngx.say(err); return end

-- each entry ~= #"data"(4) + 600 = 604 bytes
for i = 1, 5 do
log_buffer:push({ data = string.rep("x", 600) })
end

ngx.say("buffered=", #log_buffer.entry_buffer.entries)
ngx.say("dropped=", log_buffer.dropped_entries)
ngx.say("within_budget=", tostring(log_buffer.buffer_bytes <= 1000))
}
}
--- request
GET /t
--- response_body
buffered=1
dropped=4
within_budget=true
--- wait: 0.5



=== TEST 15: max_buffer_bytes unset (default 0) preserves prior unbounded behavior
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local func_to_send = function() return true end
local config = {
max_retry_count = 0,
batch_max_size = 1000,
buffer_duration = 60,
inactive_timeout = 60,
retry_delay = 0,
-- max_buffer_bytes omitted -> default 0 -> disabled
}
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then ngx.say(err); return end

for i = 1, 5 do
log_buffer:push({ data = string.rep("x", 600) })
end

ngx.say("buffered=", #log_buffer.entry_buffer.entries)
ngx.say("dropped=", log_buffer.dropped_entries)
}
}
--- request
GET /t
--- response_body
buffered=5
dropped=0
--- wait: 0.5
Loading