Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/pebble/internal/sstableinternal"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
Expand Down Expand Up @@ -251,6 +252,8 @@ type compaction struct {
metrics map[int]*LevelMetrics

pickerMetrics compactionPickerMetrics

smoother *rate.Smoother
}

// inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
Expand Down Expand Up @@ -315,7 +318,7 @@ func (c *compaction) userKeyBounds() base.UserKeyBounds {
}

func newCompaction(
pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider, smoother *rate.Smoother,
) *compaction {
c := &compaction{
kind: compactionKindDefault,
Expand All @@ -332,6 +335,7 @@ func newCompaction(
maxOutputFileSize: pc.maxOutputFileSize,
maxOverlapBytes: pc.maxOverlapBytes,
pickerMetrics: pc.pickerMetrics,
smoother: smoother,
}
c.startLevel = &c.inputs[0]
if pc.startLevel.l0SublevelInfo != nil {
Expand Down Expand Up @@ -438,8 +442,7 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64)
// maxOverlapBytes will cause splits at f10, f20,..., f990, which
// means an upper bound file count of 100 files. Say the input bytes
// in the flush are such that acceptableFileCount=10. We will fatten
// up maxOverlapBytes by 10x to ensure that the upper bound file count
// drops to 10. However, it is possible that in practice, even without
// iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter) 10. However, it is possible that in practice, even without
// this change, we would have produced no more than 10 files, and that
// this change makes the files unnecessarily wide. Say the input bytes
// are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
Expand Down Expand Up @@ -747,6 +750,7 @@ func (c *compaction) newInputIters(
iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
compaction: true,
bufferPool: &c.bufferPool,
smoother: c.smoother,
}))
// TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
// deletions into memory upfront. (See #2015, which reverted this.) There
Expand Down Expand Up @@ -904,6 +908,7 @@ func (c *compaction) newRangeDelIter(
internalIterOpts{
compaction: true,
bufferPool: &c.bufferPool,
smoother: c.smoother,
}, iterRangeDeletions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1784,7 +1789,7 @@ func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompac
return false
}

c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), d.smoother)
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, manual.done)
Expand All @@ -1810,7 +1815,7 @@ func (d *DB) tryScheduleAutoCompaction(
if pc == nil {
return false
}
c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), d.smoother)
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, nil)
Expand Down Expand Up @@ -2510,7 +2515,7 @@ func (d *DB) compactAndWrite(
IneffectualSingleDeleteCallback: d.opts.Experimental.IneffectualSingleDeleteCallback,
SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback,
}
iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter, d.smoother)

runnerCfg := compact.RunnerConfig{
CompactionBounds: base.UserKeyBoundsFromInternal(c.smallest, c.largest),
Expand All @@ -2519,7 +2524,7 @@ func (d *DB) compactAndWrite(
MaxGrandparentOverlapBytes: c.maxOverlapBytes,
TargetOutputFileSize: c.maxOutputFileSize,
}
runner := compact.NewRunner(runnerCfg, iter)
runner := compact.NewRunner(runnerCfg, iter, d.smoother)
for runner.MoreDataToWrite() {
if c.cancel.Load() {
return runner.Finish().WithError(ErrCancelledCompaction)
Expand Down
4 changes: 2 additions & 2 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func TestCompactionPickerL0(t *testing.T) {
var result strings.Builder
if pc != nil {
checkClone(t, pc)
c := newCompaction(pc, opts, time.Now(), nil /* provider */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* smoother */)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down Expand Up @@ -760,7 +760,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
})
var result strings.Builder
if pc != nil {
c := newCompaction(pc, opts, time.Now(), nil /* provider */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* smoother */)
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down
4 changes: 2 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func TestPickCompaction(t *testing.T) {
vs.picker = &tc.picker
pc, got := vs.picker.pickAuto(compactionEnv{diskAvailBytes: math.MaxUint64}), ""
if pc != nil {
c := newCompaction(pc, opts, time.Now(), nil /* provider */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* smoother */)

gotStart := fileNums(c.startLevel.files)
gotML := ""
Expand Down Expand Up @@ -1297,7 +1297,7 @@ func TestCompactionOutputLevel(t *testing.T) {
d.ScanArgs(t, "start", &start)
d.ScanArgs(t, "base", &base)
pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base)
c := newCompaction(pc, opts, time.Now(), nil /* provider */)
c := newCompaction(pc, opts, time.Now(), nil /* provider */, nil /* smoother */)
return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n",
c.outputLevel.level, c.maxOutputFileSize)

Expand Down
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
Expand Down Expand Up @@ -307,6 +308,8 @@ type DB struct {

commit *commitPipeline

smoother *rate.Smoother

// readState provides access to the state needed for reading without needing
// to acquire DB.mu.
readState struct {
Expand Down Expand Up @@ -1621,6 +1624,8 @@ func (d *DB) Close() error {
panic(err)
}

d.smoother.Stop()

// Clear the finalizer that is used to check that an unreferenced DB has been
// closed. We're closing the DB here, so the check performed by that
// finalizer isn't necessary.
Expand Down
2 changes: 1 addition & 1 deletion download.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (d *DB) tryLaunchDownloadForFile(

download.numLaunchedDownloads++
doneCh = make(chan error, 1)
c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider)
c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, d.smoother)
c.isDownload = true
d.mu.compact.downloadingCount++
d.addInProgressCompaction(c)
Expand Down
7 changes: 6 additions & 1 deletion internal/compact/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -248,6 +249,8 @@ type Iter struct {
span keyspan.Span

stats IterStats

smoother *rate.Smoother
}

// IterConfig contains the parameters necessary to create a compaction iterator.
Expand Down Expand Up @@ -305,14 +308,16 @@ func NewIter(
cfg IterConfig,
pointIter base.InternalIterator,
rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
s *rate.Smoother,
) *Iter {
cfg.ensureDefaults()
i := &Iter{
cmp: cfg.Comparer.Compare,
cfg: cfg,
// We don't want a nil keyBuf because if the first key we encounter is
// empty, it would become nil.
keyBuf: make([]byte, 8),
keyBuf: make([]byte, 8),
smoother: s,
}

iter := pointIter
Expand Down
14 changes: 10 additions & 4 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/sstable"
)
Expand Down Expand Up @@ -107,14 +108,16 @@ type Runner struct {
// Last range key span (or portion of it) that was not yet written to a table.
lastRangeKeySpan keyspan.Span
stats Stats
smoother *rate.Smoother
}

// NewRunner creates a new Runner.
func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
func NewRunner(cfg RunnerConfig, iter *Iter, s *rate.Smoother) *Runner {
r := &Runner{
cmp: iter.cmp,
cfg: cfg,
iter: iter,
cmp: iter.cmp,
cfg: cfg,
iter: iter,
smoother: s,
}
r.key, r.value = r.iter.First()
return r
Expand Down Expand Up @@ -176,10 +179,13 @@ func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error)
}
var pinnedKeySize, pinnedValueSize, pinnedCount uint64
key, value := r.key, r.value
si := r.smoother.Track(tw.EstimatedSize)
defer si.Close()
for ; key != nil; key, value = r.iter.Next() {
if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) {
break
}
si.Tick()

switch key.Kind() {
case base.InternalKeyKindRangeDelete:
Expand Down
Loading