Skip to content

Commit 2891ee7

Browse files
committed
optimized extractLogs and downstream helper methods
1 parent 7202fe8 commit 2891ee7

2 files changed

Lines changed: 99 additions & 140 deletions

File tree

exporter/exporterhelper/internal/queuebatch/logs_batch.go

Lines changed: 98 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"errors"
99
"fmt"
10-
"math"
1110

1211
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1312
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
@@ -68,198 +67,158 @@ func (req *logsRequest) split(limits map[request.SizerType]int64, sizers map[req
6867
break
6968
}
7069

71-
intLimits := make(map[request.SizerType]int)
70+
// Find the first exceeded limit to start with.
71+
var firstSzt request.SizerType
72+
var firstLimit int64
7273
for szt, limit := range limits {
73-
intLimits[szt] = int(limit)
74+
sz := sizers[szt]
75+
if limit > 0 && int64(req.size(szt, sz)) > limit {
76+
firstSzt = szt
77+
firstLimit = limit
78+
break
79+
}
7480
}
7581

76-
ld, _ := extractLogs(req.ld, intLimits, sizers)
82+
// Extract initial ld based on this first exceeded limit.
83+
ld, _ := extractLogs(req.ld, int(firstLimit), sizers[firstSzt])
7784
if ld.LogRecordCount() == 0 {
7885
return res, fmt.Errorf("one log record size is greater than max size, dropping items")
7986
}
80-
req.cachedSizes = nil
81-
res = append(res, newLogsRequest(ld))
87+
88+
ldReq := newLogsRequest(ld).(*logsRequest)
89+
90+
// Now check ld against other sizers and trim if necessary.
91+
for szt, limit := range limits {
92+
if szt == firstSzt {
93+
continue
94+
}
95+
sz := sizers[szt]
96+
if limit > 0 && int64(ldReq.size(szt, sz)) > limit {
97+
// ld exceeds this limit too!
98+
// Trim it and put remainder back to req!
99+
ldNew, _ := extractLogs(ldReq.ld, int(limit), sz)
100+
if ldNew.LogRecordCount() == 0 {
101+
return res, fmt.Errorf("one log record size is greater than max size, dropping items")
102+
}
103+
104+
// ldReq.ld now contains the REMAINDER of ld!
105+
// We should merge it back to req!
106+
ldReq.mergeTo(req, sizers)
107+
108+
// And ldReq becomes the new extracted part!
109+
ldReq = newLogsRequest(ldNew).(*logsRequest)
110+
}
111+
}
112+
113+
req.cachedSizes = make(map[request.SizerType]int)
114+
res = append(res, ldReq)
82115
}
83116
res = append(res, req)
84117
return res, nil
85118
}
86119

87120
// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
88-
func extractLogs(srcLogs plog.Logs, limits map[request.SizerType]int, sizers map[request.SizerType]sizer.LogsSizer) (plog.Logs, map[request.SizerType]int) {
121+
func extractLogs(srcLogs plog.Logs, capacity int, sz sizer.LogsSizer) (plog.Logs, int) {
89122
destLogs := plog.NewLogs()
90-
capacityLeft := make(map[request.SizerType]int)
91-
removedSizes := make(map[request.SizerType]int)
92-
93-
for szt, limit := range limits {
94-
sz := sizers[szt]
95-
if limit == 0 {
96-
capacityLeft[szt] = math.MaxInt
97-
} else {
98-
capacityLeft[szt] = limit - sz.LogsSize(destLogs)
99-
}
100-
removedSizes[szt] = 0
101-
}
123+
capacityLeft := capacity - sz.LogsSize(destLogs)
124+
removedSize := 0
102125

103126
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
104-
for _, cap := range capacityLeft {
105-
if cap <= 0 {
106-
return false
107-
}
108-
}
109-
110-
fitsAll := true
111-
for szt, sz := range sizers {
112-
rawRlSize := sz.ResourceLogsSize(srcRL)
113-
rlSize := sz.DeltaSize(rawRlSize)
114-
if rlSize > capacityLeft[szt] {
115-
fitsAll = false
116-
break
117-
}
127+
// If the no more capacity left just return.
128+
if capacityLeft == 0 {
129+
return false
118130
}
119-
120-
if !fitsAll {
121-
extSrcRL, extRlSizes := extractResourceLogs(srcRL, capacityLeft, sizers)
122-
123-
for szt, sz := range sizers {
124-
extRlSize := extRlSizes[szt]
125-
capacityLeft[szt] = 0
126-
removedSizes[szt] += extRlSize
127-
128-
rawRlSize := sz.ResourceLogsSize(srcRL)
129-
rlSize := sz.DeltaSize(rawRlSize)
130-
removedSizes[szt] += rlSize - rawRlSize - (sz.DeltaSize(rawRlSize-extRlSize) - (rawRlSize - extRlSize))
131-
}
132-
131+
rawRlSize := sz.ResourceLogsSize(srcRL)
132+
rlSize := sz.DeltaSize(rawRlSize)
133+
134+
if rlSize > capacityLeft {
135+
extSrcRL, extRsSize := extractResourceLogs(srcRL, capacityLeft, sz)
136+
// This cannot make it to exactly 0 for the bytes,
137+
// force it to be 0 since that is the stopping condition.
138+
capacityLeft = 0
139+
removedSize += extRsSize
140+
// There represents the delta between the delta sizes.
141+
removedSize += rlSize - rawRlSize - (sz.DeltaSize(rawRlSize-extRsSize) - (rawRlSize - extRsSize))
142+
// It is possible that for the bytes scenario, the extracted field contains no scope logs.
143+
// Do not add it to the destination if that is the case.
133144
if extSrcRL.ScopeLogs().Len() > 0 {
134145
extSrcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
135146
}
136147
return extSrcRL.ScopeLogs().Len() != 0
137148
}
149+
capacityLeft -= rlSize
150+
removedSize += rlSize
138151

139-
for szt, sz := range sizers {
140-
rawRlSize := sz.ResourceLogsSize(srcRL)
141-
rlSize := sz.DeltaSize(rawRlSize)
142-
capacityLeft[szt] -= rlSize
143-
removedSizes[szt] += rlSize
144-
}
145152
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
146153
return true
147154
})
148-
return destLogs, removedSizes
155+
return destLogs, removedSize
149156
}
150157

151158
// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
152-
func extractResourceLogs(srcRL plog.ResourceLogs, limits map[request.SizerType]int, sizers map[request.SizerType]sizer.LogsSizer) (plog.ResourceLogs, map[request.SizerType]int) {
159+
func extractResourceLogs(srcRL plog.ResourceLogs, capacity int, sz sizer.LogsSizer) (plog.ResourceLogs, int) {
153160
destRL := plog.NewResourceLogs()
154161
destRL.SetSchemaUrl(srcRL.SchemaUrl())
155162
srcRL.Resource().CopyTo(destRL.Resource())
156163

157-
capacityLeft := make(map[request.SizerType]int)
158-
removedSizes := make(map[request.SizerType]int)
159-
160-
for szt, limit := range limits {
161-
sz := sizers[szt]
162-
capacityLeft[szt] = limit - (sz.DeltaSize(limit) - limit) - sz.ResourceLogsSize(destRL)
163-
removedSizes[szt] = 0
164-
}
165-
164+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
165+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ResourceLogsSize(destRL)
166+
removedSize := 0
166167
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
167-
for _, cap := range capacityLeft {
168-
if cap <= 0 {
169-
return false
170-
}
171-
}
172-
173-
fitsAll := true
174-
for szt, sz := range sizers {
175-
rawSlSize := sz.ScopeLogsSize(srcSL)
176-
slSize := sz.DeltaSize(rawSlSize)
177-
if slSize > capacityLeft[szt] {
178-
fitsAll = false
179-
break
180-
}
168+
// If the no more capacity left just return.
169+
if capacityLeft == 0 {
170+
return false
181171
}
182-
183-
if !fitsAll {
184-
extSrcSL, extSlSizes := extractScopeLogs(srcSL, capacityLeft, sizers)
185-
186-
for szt, sz := range sizers {
187-
extSlSize := extSlSizes[szt]
188-
capacityLeft[szt] = 0
189-
removedSizes[szt] += extSlSize
190-
191-
rawSlSize := sz.ScopeLogsSize(srcSL)
192-
slSize := sz.DeltaSize(rawSlSize)
193-
removedSizes[szt] += slSize - rawSlSize - (sz.DeltaSize(rawSlSize-extSlSize) - (rawSlSize - extSlSize))
194-
}
195-
172+
rawSlSize := sz.ScopeLogsSize(srcSL)
173+
slSize := sz.DeltaSize(rawSlSize)
174+
if slSize > capacityLeft {
175+
extSrcSL, extSlSize := extractScopeLogs(srcSL, capacityLeft, sz)
176+
// This cannot make it to exactly 0 for the bytes,
177+
// force it to be 0 since that is the stopping condition.
178+
capacityLeft = 0
179+
removedSize += extSlSize
180+
// There represents the delta between the delta sizes.
181+
removedSize += slSize - rawSlSize - (sz.DeltaSize(rawSlSize-extSlSize) - (rawSlSize - extSlSize))
182+
// It is possible that for the bytes scenario, the extracted field contains no log records.
183+
// Do not add it to the destination if that is the case.
196184
if extSrcSL.LogRecords().Len() > 0 {
197185
extSrcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
198186
}
199187
return extSrcSL.LogRecords().Len() != 0
200188
}
201-
202-
for szt, sz := range sizers {
203-
rawSlSize := sz.ScopeLogsSize(srcSL)
204-
slSize := sz.DeltaSize(rawSlSize)
205-
capacityLeft[szt] -= slSize
206-
removedSizes[szt] += slSize
207-
}
189+
capacityLeft -= slSize
190+
removedSize += slSize
208191
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
209192
return true
210193
})
211-
return destRL, removedSizes
194+
return destRL, removedSize
212195
}
213196

214197
// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
215-
func extractScopeLogs(srcSL plog.ScopeLogs, limits map[request.SizerType]int, sizers map[request.SizerType]sizer.LogsSizer) (plog.ScopeLogs, map[request.SizerType]int) {
198+
func extractScopeLogs(srcSL plog.ScopeLogs, capacity int, sz sizer.LogsSizer) (plog.ScopeLogs, int) {
216199
destSL := plog.NewScopeLogs()
217200
destSL.SetSchemaUrl(srcSL.SchemaUrl())
218201
srcSL.Scope().CopyTo(destSL.Scope())
219202

220-
capacityLeft := make(map[request.SizerType]int)
221-
removedSizes := make(map[request.SizerType]int)
222-
223-
for szt, limit := range limits {
224-
sz := sizers[szt]
225-
if limit == 0 {
226-
capacityLeft[szt] = math.MaxInt
227-
} else {
228-
capacityLeft[szt] = limit - (sz.DeltaSize(limit) - limit) - sz.ScopeLogsSize(destSL)
229-
}
230-
removedSizes[szt] = 0
231-
}
232-
203+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
204+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ScopeLogsSize(destSL)
205+
removedSize := 0
233206
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
234-
for _, cap := range capacityLeft {
235-
if cap <= 0 {
236-
return false
237-
}
238-
}
239-
240-
fitsAll := true
241-
for szt, sz := range sizers {
242-
rlSize := sz.DeltaSize(sz.LogRecordSize(srcLR))
243-
if rlSize > capacityLeft[szt] {
244-
fitsAll = false
245-
break
246-
}
247-
}
248-
249-
if !fitsAll {
250-
for szt := range sizers {
251-
capacityLeft[szt] = 0
252-
}
207+
// If the no more capacity left just return.
208+
if capacityLeft == 0 {
253209
return false
254210
}
255-
256-
for szt, sz := range sizers {
257-
rlSize := sz.DeltaSize(sz.LogRecordSize(srcLR))
258-
capacityLeft[szt] -= rlSize
259-
removedSizes[szt] += rlSize
211+
rlSize := sz.DeltaSize(sz.LogRecordSize(srcLR))
212+
if rlSize > capacityLeft {
213+
// This cannot make it to exactly 0 for the bytes,
214+
// force it to be 0 since that is the stopping condition.
215+
capacityLeft = 0
216+
return false
260217
}
218+
capacityLeft -= rlSize
219+
removedSize += rlSize
261220
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
262221
return true
263222
})
264-
return destSL, removedSizes
223+
return destSL, removedSize
265224
}

exporter/exporterhelper/internal/queuebatch/logs_batch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func TestMergeSplitLogsInputNotModifiedIfErrorReturned(t *testing.T) {
284284
func TestExtractLogs(t *testing.T) {
285285
for i := 1; i < 10; i++ {
286286
ld := testdata.GenerateLogs(10)
287-
extractedLogs, _ := extractLogs(ld, map[request.SizerType]int{request.SizerTypeItems: i}, map[request.SizerType]sizer.LogsSizer{request.SizerTypeItems: &sizer.LogsCountSizer{}})
287+
extractedLogs, _ := extractLogs(ld, i, &sizer.LogsCountSizer{})
288288
assert.Equal(t, i, extractedLogs.LogRecordCount())
289289
assert.Equal(t, 10-i, ld.LogRecordCount())
290290
}

0 commit comments

Comments
 (0)