Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions compaction/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package compaction

import (
"errors"
"os"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/indexwriter"
)

func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.PreloadedData, error) {
writer := indexwriter.New(params)
src := NewMergeSource(filename, srcs)

if err := createAndWrite(
filename+consts.OffsetsTmpFileSuffix,
filename+consts.OffsetsFileSuffix,
func(f *os.File) error { return writer.WriteOffsetsFile(f, src) },
); err != nil {
return nil, err
}

if err := createAndWrite(
filename+consts.IDTmpFileSuffix,
filename+consts.IDFileSuffix,
func(f *os.File) error { return writer.WriteIDFile(f, src) },
); err != nil {
return nil, err
}

if err := createAndWriteBoth(
filename+consts.TokenTmpFileSuffix,
filename+consts.TokenFileSuffix,
filename+consts.LIDTmpFileSuffix,
filename+consts.LIDFileSuffix,
func(tf, lf *os.File) error { return writer.WriteTokenTriplet(tf, lf, src) },
); err != nil {
return nil, err
}

if err := createAndWrite(
filename+consts.InfoTmpFileSuffix,
filename+consts.InfoFileSuffix,
func(f *os.File) error { return writer.WriteInfoFile(f, src) },
); err != nil {
return nil, err
}

if err := mergeDocs(filename, srcs...); err != nil {
return nil, err
}

info := src.Info()
info.IndexOnDisk = 0

for _, suffix := range []string{
consts.InfoFileSuffix,
consts.TokenFileSuffix,
consts.OffsetsFileSuffix,
consts.IDFileSuffix,
consts.LIDFileSuffix,
} {
st, err := os.Stat(info.Path + suffix)
if err != nil {
return nil, err
}
info.IndexOnDisk += uint64(st.Size())
}

lidsTable := writer.LIDsTable()
preloaded := &sealed.PreloadedData{
Info: info,
TokenTable: writer.TokenTable(),
BlocksData: sealed.BlocksData{
LIDsTable: &lidsTable,
IDsTable: writer.IDsTable(),
BlocksOffsets: src.BlockOffsets(),
},
}

return preloaded, nil
}

func mergeDocs(filename string, srcs ...Source) error {
return createAndWrite(
filename+consts.DocsTmpFileSuffix,
filename+consts.DocsFileSuffix,
func(f *os.File) error {
var docsSize uint64
for _, src := range srcs {
for loc, err := range src.DocBlock() {
if err != nil {
return err
}

payload, offset := loc.First, loc.Second
if _, err := f.WriteAt(payload, int64(offset+docsSize)); err != nil {
return err
}
}

docsSize += src.Info().DocsOnDisk
}

return nil
},
)
}

func syncAndClose(f *os.File) error {
if err := f.Sync(); err != nil {
f.Close()
return err
}
return f.Close()
}

func createAndWrite(
tmp, final string,
write func(*os.File) error,
) error {
f, err := os.Create(tmp)
if err != nil {
return err
}

if err := errors.Join(write(f), syncAndClose(f)); err != nil {
return err
}

return os.Rename(tmp, final)
}

func createAndWriteBoth(
atmp, afinal,
btmp, bfinal string,
write func(*os.File, *os.File) error,
) error {
a, err := os.Create(atmp)
if err != nil {
return err
}

b, err := os.Create(btmp)
if err != nil {
a.Close()
return err
}

writeErr := write(a, b)
if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil {
return err
}

if err := os.Rename(atmp, afinal); err != nil {
return err
}

return os.Rename(btmp, bfinal)
}
Loading