Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ozontech/seq-db/asyncsearcher"
"github.com/ozontech/seq-db/buildinfo"
"github.com/ozontech/seq-db/compaction"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
Expand Down Expand Up @@ -323,6 +324,18 @@ func startStore(
Workers: cfg.SkipMaskManager.Workers,
CacheSizeLimit: uint64(cfg.SkipMaskManager.CacheSize),
},
Compaction: compaction.Config{
MergeTrigger: cfg.Compaction.STCS.MergeTrigger,
MergeFanIn: cfg.Compaction.STCS.MergeFanIn,
MergeFanOutSize: uint64(cfg.Compaction.STCS.MergeFanOutSize),

BucketLowerbound: cfg.Compaction.STCS.BucketLowerbound,
BucketUpperbound: cfg.Compaction.STCS.BucketUpperbound,

Workers: cfg.Compaction.Workers,
TimeWindow: cfg.Compaction.TimeWindow,
TickInterval: cfg.Compaction.TickInterval,
},
}

s3cli := initS3Client(cfg)
Expand Down
27 changes: 19 additions & 8 deletions compaction/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compaction

import (
"sync"
"time"

"go.uber.org/zap"

Expand All @@ -12,28 +13,37 @@ import (
)

type Executor struct {
params common.SealParams

workers int
wg sync.WaitGroup
p *planner

p *planner
}

// FIXME(dkharms): I need to pass here [common.SealParams].
func NewExecutor(workers int, p *planner) *Executor {
e := Executor{workers: workers, p: p}
func NewExecutor(workers int, params common.SealParams, p *planner) *Executor {
e := Executor{workers: workers, p: p, params: params}
e.init()
return &e
}

func (e *Executor) Close() {
e.p.close()
func (e *Executor) Stop() {
e.p.stop()
e.wg.Wait()
}

func (e *Executor) init() {
for range e.workers {
e.wg.Go(func() {
for t := range e.p.tasks {
t.onComplete(e.compact(t))
compactionInflight.Inc()

start := time.Now()
result, err := e.compact(t)
compactionDurationSeconds.Observe(time.Since(start).Seconds())

t.onComplete(result, err)
compactionInflight.Dec()
}
})
}
Expand All @@ -48,6 +58,7 @@ func (e *Executor) compact(t task) (*sealed.PreloadedData, error) {
for _, f := range t.snapshot.Fractions() {
names = append(names, f.Info().Name())
srcs = append(srcs, frac.NewSealedSource(f))
compactionBytesTotal.Add(float64(f.Info().IndexOnDisk))
}

logger.Info(
Expand All @@ -56,6 +67,6 @@ func (e *Executor) compact(t task) (*sealed.PreloadedData, error) {
zap.Strings("names", names),
)

preloaded, err := Merge(t.filename, common.SealParams{}, srcs...)
preloaded, err := Merge(t.filename, e.params, srcs...)
return preloaded, err
}
52 changes: 52 additions & 0 deletions compaction/metrics.go
Original file line number Diff line number Diff line change
@@ -1 +1,53 @@
package compaction

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/ozontech/seq-db/metric"
)

var (
compactionInflight = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "seq_db_store",
Subsystem: "compaction",
Name: "inflight",
Help: "Number of running compactions",
})

compactionSkipped = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "seq_db_store",
Subsystem: "compaction",
Name: "skipped_total",
Help: "Tick-triggered tasks dropped because all workers were busy or no candidates were found",
})

compactionBinsTotal = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "seq_db_store",
Subsystem: "compaction",
Name: "bins_total",
Help: "Number of active time-bins considered for compaction",
})

compactionDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "seq_db_store",
Subsystem: "compaction",
Name: "duration_seconds",
Help: "Time spent executing a single compaction",
Buckets: metric.SecondsBuckets,
})

compactionBytesTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "seq_db_store",
Subsystem: "compaction",
Name: "bytes_total",
Help: "Total index bytes merged across all compactions",
})

compactionResultTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "seq_db_store",
Subsystem: "compaction",
Name: "result_total",
Help: "Compaction outcomes by result (success, empty, error)",
}, []string{"result"})
)
50 changes: 33 additions & 17 deletions compaction/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,29 @@ import (

"go.uber.org/zap"

"github.com/alecthomas/units"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
)

type Config struct {
MergeTrigger int
MergeFanIn int
MergeFanOutSize uint64

BucketLowerbound float64
BucketUpperbound float64

Workers int
TimeWindow time.Duration
TickInterval time.Duration
}

type fraction interface {
Info() *common.Info
}

const (
// TODO(dkharms): Move this options to config.
compactionTick = time.Second
compactionWindow = 24 * time.Hour
)

type task struct {
bin time.Time
filename string
Expand All @@ -34,6 +40,8 @@ type task struct {
}

type planner struct {
cfg Config

wg sync.WaitGroup
ctx context.Context
done chan struct{}
Expand All @@ -49,8 +57,10 @@ type planner struct {
stats map[time.Time]int
}

func NewPlanner(ctx context.Context, fm *fracmanager.FracManager) *planner {
func NewPlanner(ctx context.Context, fm *fracmanager.FracManager, cfg Config) *planner {
p := planner{
cfg: cfg,

ctx: ctx,
done: make(chan struct{}),

Expand All @@ -68,7 +78,7 @@ func NewPlanner(ctx context.Context, fm *fracmanager.FracManager) *planner {

func (p *planner) init() {
p.wg.Go(func() {
t := time.NewTicker(compactionTick)
t := time.NewTicker(p.cfg.TickInterval)

for {
select {
Expand All @@ -83,6 +93,7 @@ func (p *planner) init() {
case <-t.C:
task, ok := p.pick()
if !ok {
compactionSkipped.Inc()
continue
}

Expand All @@ -91,13 +102,14 @@ func (p *planner) init() {
case <-time.NewTimer(time.Second).C:
// If all executor workers are busy for some long period of time,
// we want to drop the task because it might contain stale decision.
compactionSkipped.Inc()
}
}
}
})
}

func (p *planner) close() {
func (p *planner) stop() {
close(p.done)
}

Expand All @@ -109,7 +121,8 @@ func (p *planner) pick() (task, bool) {
snapshot[i] = fractions[i]
}

bins := p.distribute(compactionWindow, snapshot)
bins := p.distribute(p.cfg.TimeWindow, snapshot)
compactionBinsTotal.Set(float64(len(bins)))
times := p.prioritize(bins)

p.mu.Lock()
Expand All @@ -123,13 +136,12 @@ func (p *planner) pick() (task, bool) {
continue
}

// TODO(dkharms): Move this options to config.
picked := strategySTCS{
mergeTrigger: 4,
mergeFanIn: 32,
mergeFanOutSize: 128 * uint64(units.MiB),
bucketLowerbound: 0.5,
bucketUpperbound: 1.5,
mergeTrigger: p.cfg.MergeTrigger,
mergeFanIn: p.cfg.MergeFanIn,
mergeFanOutSize: p.cfg.MergeFanOutSize,
bucketLowerbound: p.cfg.BucketLowerbound,
bucketUpperbound: p.cfg.BucketUpperbound,
}.Pick(bins[t].fracs)

if len(picked) == 0 {
Expand All @@ -156,11 +168,14 @@ func (p *planner) pick() (task, bool) {
delete(p.inflight, t)

if err != nil {
compactionResultTotal.WithLabelValues("error").Inc()

logger.Error(
"failed to compact fractions",
zap.Error(err),
zap.Any("snapshot", names(csnapshot.Fractions())),
)

return
}

Expand All @@ -172,6 +187,7 @@ func (p *planner) pick() (task, bool) {
return
}

compactionResultTotal.WithLabelValues("success").Inc()
// TODO(dkharms): Is it fine to substitute and delete?
// We need somehow substitute and delete atomically.
p.fm.SubstituteWithSealed(s, csnapshot)
Expand Down
13 changes: 12 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@ storage:
frac_size: 16MiB
total_size: 1GiB

compaction:
workers: 4
time_window: 24h
tick_interval: 1s
stcs:
merge_trigger: 4
merge_fan_in: 32
merge_fan_out_size: 512MiB
bucket_lowerbound: 0.5
bucket_upperbound: 1.5

# For testing or developments purposes you can run MinIO S3 compatible object storage locally.
#
# docker run -p 9000:9000 -p 9001:9001 \
# quay.io/minio/minio server /data --console-address ":9001"

offloading:
enabled: true
enabled: false
retention: 5m
endpoint: http://localhost:9000/
bucket: remote-storage
Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func Parse(path string) (Config, error) {
}

/* Set computed defaults if user did not override them */
c.Compaction.Workers = cmp.Or(c.Compaction.Workers, NumCPU)

c.Resources.ReaderWorkers = cmp.Or(c.Resources.ReaderWorkers, NumCPU)
c.Resources.SearchWorkers = cmp.Or(c.Resources.SearchWorkers, NumCPU)
Expand Down Expand Up @@ -202,6 +203,19 @@ type Config struct {
DocBlockZstdCompressionLevel int `config:"doc_block_zstd_compression_level" default:"3"`
} `config:"compression"`

Compaction struct {
STCS struct {
MergeTrigger int `config:"merge_trigger" default:"4"`
MergeFanIn int `config:"merge_fan_in" default:"32"`
MergeFanOutSize Bytes `config:"merge_fan_out_size" default:"512MiB"`
BucketLowerbound float64 `config:"bucket_lowerbound" default:"0.5"`
BucketUpperbound float64 `config:"bucket_upperbound" default:"1.5"`
} `config:"stcs"`
Workers int `config:"workers"`
TimeWindow time.Duration `config:"time_window" default:"24h"`
TickInterval time.Duration `config:"tick_interval" default:"1s"`
} `config:"compaction"`

Indexing struct {
MaxTokenSize int `config:"max_token_size" default:"72"`
CaseSensitive bool `config:"case_sensitive"`
Expand Down
23 changes: 23 additions & 0 deletions config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ func (c *Config) storeValidations() []validateFn {
inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent),

greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck),

greaterThan("compaction.stcs.merge_trigger", 0, c.Compaction.STCS.MergeTrigger),
greaterThan("compaction.stcs.merge_fan_out_size", 0, c.Compaction.STCS.MergeFanOutSize),
greaterOrEqualThan("compaction.stcs.merge_fan_in", c.Compaction.STCS.MergeTrigger, c.Compaction.STCS.MergeFanIn),

greaterThan("compaction.stcs.bucket_lowerbound", 0, c.Compaction.STCS.BucketLowerbound),
greaterOrEqualThan("compaction.stcs.bucket_upperbound", c.Compaction.STCS.BucketLowerbound, c.Compaction.STCS.BucketUpperbound),

greaterOrEqualThan("compaction.workers", 0, c.Compaction.Workers),
greaterThan("compaction.time_window", 0, c.Compaction.TimeWindow),
greaterThan("compaction.tick_interval", 0, c.Compaction.TickInterval),
}

if c.Offloading.Enabled {
Expand Down Expand Up @@ -106,6 +117,18 @@ func greaterThan[T cmp.Ordered](field string, base, v T) validateFn {
}
}

func greaterOrEqualThan[T cmp.Ordered](field string, base, v T) validateFn {
return func() error {
if v < base {
return fmt.Errorf(
"field %q must be greater or equal than %v",
field, base,
)
}
return nil
}
}

func inRange[T cmp.Ordered](field string, from, to, v T) validateFn {
return func() error {
if v < from || to < v {
Expand Down
Loading