Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ var (
_ AggFunc = (*groupPartialConcatDistinct)(nil)
_ AggFunc = (*groupOriginalConcatDistinct)(nil)

// All the AggFunc implementations for "STDDEV_POP/STDDEV_SAMP/VAR_POP/VAR_SAMP" are listed here.
_ AggFunc = (*stdDevPop4Float64)(nil)
_ AggFunc = (*stddevSamp4Float64)(nil)
_ AggFunc = (*varPop4Float64)(nil)
_ AggFunc = (*varSamp4Float64)(nil)

_ AggFunc = (*stdDevPopOriginal4DistinctFloat64)(nil)
_ AggFunc = (*stdDevPopPartial4DistinctFloat64)(nil)
_ AggFunc = (*stddevSampOriginal4DistinctFloat64)(nil)
_ AggFunc = (*stddevSampPartial4DistinctFloat64)(nil)
_ AggFunc = (*varPopOriginal4DistinctFloat64)(nil)
_ AggFunc = (*varPopPartial4DistinctFloat64)(nil)
_ AggFunc = (*varSampOriginal4DistinctFloat64)(nil)
_ AggFunc = (*varSampPartial4DistinctFloat64)(nil)

// All the AggFunc implementations for "BIT_OR" are listed here.
_ AggFunc = (*bitOrUint64)(nil)

Expand Down
40 changes: 40 additions & 0 deletions pkg/executor/aggfuncs/func_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,26 @@ func (e *baseAvgDistinct4Decimal) AppendFinalResult2Chunk(ctx AggFuncUpdateConte
return nil
}

func (e *baseAvgDistinct4Decimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4AvgDistinctDecimal)(partialResult)
resBuf := spillHelper.serializePartialResult4AvgDistinctDecimal(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseAvgDistinct4Decimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseAvgDistinct4Decimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4AvgDistinctDecimal)(pr)
success, dataMemDelta := helper.deserializePartialResult4AvgDistinctDecimal(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type avgPartial4DistinctDecimal struct {
baseAvgDistinct4Decimal
}
Expand Down Expand Up @@ -534,6 +554,26 @@ func (e *baseAvgDistinct4Float64) AppendFinalResult2Chunk(_ AggFuncUpdateContext
return nil
}

func (e *baseAvgDistinct4Float64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4AvgDistinctFloat64)(partialResult)
resBuf := spillHelper.serializePartialResult4AvgDistinctFloat64(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseAvgDistinct4Float64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseAvgDistinct4Float64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4AvgDistinctFloat64)(pr)
success, dataMemDelta := helper.deserializePartialResult4AvgDistinctFloat64(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type avgPartial4DistinctFloat64 struct {
baseAvgDistinct4Float64
}
Expand Down
140 changes: 140 additions & 0 deletions pkg/executor/aggfuncs/func_count_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ func (e *baseCountDistinct4Int) AppendFinalResult2Chunk(_ AggFuncUpdateContext,
return nil
}

func (e *baseCountDistinct4Int) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4CountDistinctInt)(partialResult)
resBuf := spillHelper.serializePartialResult4CountDistinctInt(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCountDistinct4Int) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCountDistinct4Int) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4CountDistinctInt)(pr)
success, dataMemDelta := helper.deserializePartialResult4CountDistinctInt(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type countPartialWithDistinct4Int struct {
baseCountDistinct4Int
}
Expand Down Expand Up @@ -181,6 +201,26 @@ func (e *baseCountDistinct4Real) AppendFinalResult2Chunk(_ AggFuncUpdateContext,
return nil
}

func (e *baseCountDistinct4Real) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4CountDistinctReal)(partialResult)
resBuf := spillHelper.serializePartialResult4CountDistinctReal(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCountDistinct4Real) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCountDistinct4Real) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4CountDistinctReal)(pr)
success, dataMemDelta := helper.deserializePartialResult4CountDistinctReal(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type countPartialWithDistinct4Real struct {
baseCountDistinct4Real
}
Expand Down Expand Up @@ -253,6 +293,26 @@ func (e *baseCountDistinct4Decimal) AppendFinalResult2Chunk(_ AggFuncUpdateConte
return nil
}

func (e *baseCountDistinct4Decimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4CountDistinctDecimal)(partialResult)
resBuf := spillHelper.serializePartialResult4CountDistinctDecimal(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCountDistinct4Decimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCountDistinct4Decimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4CountDistinctDecimal)(pr)
success, dataMemDelta := helper.deserializePartialResult4CountDistinctDecimal(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type countPartialWithDistinct4Decimal struct {
baseCountDistinct4Decimal
}
Expand Down Expand Up @@ -321,6 +381,26 @@ func (e *baseCountDistinct4Duration) AppendFinalResult2Chunk(_ AggFuncUpdateCont
return nil
}

func (e *baseCountDistinct4Duration) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4CountDistinctDuration)(partialResult)
resBuf := spillHelper.serializePartialResult4CountDistinctDuration(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCountDistinct4Duration) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCountDistinct4Duration) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4CountDistinctDuration)(pr)
success, dataMemDelta := helper.deserializePartialResult4CountDistinctDuration(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type countPartialWithDistinct4Duration struct {
baseCountDistinct4Duration
}
Expand Down Expand Up @@ -393,6 +473,26 @@ func (e *baseCountDistinct4String) AppendFinalResult2Chunk(_ AggFuncUpdateContex
return nil
}

func (e *baseCountDistinct4String) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4CountDistinctString)(partialResult)
resBuf := spillHelper.serializePartialResult4CountDistinctString(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCountDistinct4String) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCountDistinct4String) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4CountDistinctString)(pr)
success, dataMemDelta := helper.deserializePartialResult4CountDistinctString(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type countPartialWithDistinct4String struct {
baseCountDistinct4String
}
Expand Down Expand Up @@ -473,6 +573,26 @@ func (e *baseCountDistinct4MultiArgs) AppendFinalResult2Chunk(_ AggFuncUpdateCon
return nil
}

func (e *baseCountDistinct4MultiArgs) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4CountWithDistinct)(partialResult)
resBuf := spillHelper.serializePartialResult4CountWithDistinct(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCountDistinct4MultiArgs) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCountDistinct4MultiArgs) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4CountWithDistinct)(pr)
success, dataMemDelta := helper.deserializePartialResult4CountWithDistinct(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type countPartialWithDistinct struct {
baseCountDistinct4MultiArgs
}
Expand Down Expand Up @@ -904,6 +1024,26 @@ func (e *baseApproxCountDistinct) AppendFinalResult2Chunk(_ AggFuncUpdateContext
return nil
}

func (e *baseApproxCountDistinct) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4ApproxCountDistinct)(partialResult)
resBuf := spillHelper.serializePartialResult4ApproxCountDistinct(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseApproxCountDistinct) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseApproxCountDistinct) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4ApproxCountDistinct)(pr)
success, dataMemDelta := helper.deserializePartialResult4ApproxCountDistinct(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

func (*baseApproxCountDistinct) AllocPartialResult() (pr PartialResult, memDelta int64) {
return (PartialResult)(NewPartialResult4ApproxCountDistinct()), DefPartialResult4ApproxCountDistinctSize
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/aggfuncs/func_group_concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ func (e *baseGroupConcatDistinct4String) AppendFinalResult2Chunk(sctx AggFuncUpd
return nil
}

func (e *baseGroupConcatDistinct4String) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4GroupConcatDistinct)(partialResult)
resBuf := spillHelper.serializePartialResult4GroupConcatDistinct(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseGroupConcatDistinct4String) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseGroupConcatDistinct4String) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4GroupConcatDistinct)(pr)
success, dataMemDelta := helper.deserializePartialResult4GroupConcatDistinct(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

// SetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type.
func (e *baseGroupConcatDistinct4String) SetTruncated(t *int32) {
e.truncated = t
Expand Down
40 changes: 40 additions & 0 deletions pkg/executor/aggfuncs/func_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,26 @@ func (e *baseSumDistinct4Float64) AppendFinalResult2Chunk(_ AggFuncUpdateContext
return nil
}

func (e *baseSumDistinct4Float64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4SumDistinctFloat64)(partialResult)
resBuf := spillHelper.serializePartialResult4SumDistinctFloat64(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseSumDistinct4Float64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseSumDistinct4Float64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4SumDistinctFloat64)(pr)
success, dataMemDelta := helper.deserializePartialResult4SumDistinctFloat64(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type sum4PartialDistinctFloat64 struct {
baseSumDistinct4Float64
}
Expand Down Expand Up @@ -445,6 +465,26 @@ func (e *baseSumDistinct4Decimal) AppendFinalResult2Chunk(_ AggFuncUpdateContext
return nil
}

func (e *baseSumDistinct4Decimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4SumDistinctDecimal)(partialResult)
resBuf := spillHelper.serializePartialResult4SumDistinctDecimal(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseSumDistinct4Decimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseSumDistinct4Decimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4SumDistinctDecimal)(pr)
success, dataMemDelta := helper.deserializePartialResult4SumDistinctDecimal(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

type sum4PartialDistinct4Decimal struct {
baseSumDistinct4Decimal
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/aggfuncs/func_varpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ func (e *varPopOriginal4DistinctFloat64) AppendFinalResult2Chunk(_ AggFuncUpdate
return nil
}

func (e *varPopOriginal4DistinctFloat64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4VarPopDistinctFloat64)(partialResult)
resBuf := spillHelper.serializePartialResult4VarPopDistinctFloat64(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *varPopOriginal4DistinctFloat64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *varPopOriginal4DistinctFloat64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4VarPopDistinctFloat64)(pr)
success, dataMemDelta := helper.deserializePartialResult4VarPopDistinctFloat64(result)
if !success {
return nil, 0
}
return pr, memDelta + dataMemDelta
}

func (e *varPopOriginal4DistinctFloat64) UpdatePartialResult(sctx AggFuncUpdateContext, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
p := (*partialResult4VarPopDistinctFloat64)(pr)
for _, row := range rowsInGroup {
Expand Down
Loading
Loading