package compute
import (
"fmt"
"math"
"github.com/daviszhen/plan/pkg/chunk"
"github.com/daviszhen/plan/pkg/common"
"github.com/daviszhen/plan/pkg/util"
)
func (run *Runner) aggrInit() error {
{
if len(run.op.GroupBys) == 0 {
constExpr := &Expr{
Typ: ET_Const,
DataTyp: common.IntegerType(),
ConstValue: NewIntegerConst(1),
}
run.op.GroupBys = append(run.op.GroupBys, constExpr)
run.state.constGroupby = true
}
refChildrenOutput := make([]*Expr, 0)
for i := 0; i < len(run.op.Children[0].Outputs); i++ {
ref := run.op.Children[0].Outputs[i]
refChildrenOutput = append(refChildrenOutput, &Expr{
Typ: ET_Column,
DataTyp: ref.DataTyp,
BaseInfo: BaseInfo{
ColRef: ColumnBind{
math.MaxUint64,
uint64(i),
},
},
})
}
run.state.hAggr = NewHashAggr(
run.state.outputTypes,
run.op.Aggs,
run.op.GroupBys,
nil,
nil,
refChildrenOutput,
)
if run.op.Children[0].Typ == POT_Filter {
run.state.hAggr._printHash = true
}
groupExprs := make([]*Expr, 0)
groupExprs = append(groupExprs, run.state.hAggr._groupedAggrData._groups...)
groupExprs = append(groupExprs, run.state.hAggr._groupedAggrData._paramExprs...)
groupExprs = append(groupExprs, run.state.hAggr._groupedAggrData._refChildrenOutput...)
run.state.groupbyWithParamsExec = NewExprExec(groupExprs...)
run.state.groupbyExec = NewExprExec(run.state.hAggr._groupedAggrData._groups...)
run.state.filterExec = NewExprExec(run.op.Filters...)
run.state.filterSel = chunk.NewSelectVector(util.DefaultVectorSize)
run.state.outputExec = NewExprExec(run.op.Outputs...)
bSet := make(ColumnBindSet)
collectColRefs2(bSet, run.op.Outputs...)
for bind := range bSet {
if int64(bind.table()) < 0 {
run.state.referChildren = true
break
}
}
run.state.ungroupAggr = !run.state.referChildren && run.state.constGroupby
}
return nil
}
func (run *Runner) aggrExec(output *chunk.Chunk, state *OperatorState) (OperatorResult, error) {
var err error
var res OperatorResult
if run.state.hAggr._has == HAS_INIT {
cnt := 0
for {
childChunk := &chunk.Chunk{}
res, err = run.execChild(run.children[0], childChunk, state)
if err != nil {
return 0, err
}
if res == InvalidOpResult {
return InvalidOpResult, nil
}
if res == Done {
break
}
if childChunk.Card() == 0 {
continue
}
cnt += childChunk.Card()
typs := make([]common.LType, 0)
typs = append(typs, run.state.hAggr._groupedAggrData._groupTypes...)
typs = append(typs, run.state.hAggr._groupedAggrData._payloadTypes...)
typs = append(typs, run.state.hAggr._groupedAggrData._childrenOutputTypes...)
groupChunk := &chunk.Chunk{}
groupChunk.Init(typs, util.DefaultVectorSize)
err = run.state.groupbyWithParamsExec.executeExprs([]*chunk.Chunk{childChunk, nil, nil}, groupChunk)
if err != nil {
return InvalidOpResult, err
}
run.state.hAggr.Sink(groupChunk)
}
run.state.hAggr.Finalize()
run.state.hAggr._has = HAS_SCAN
fmt.Println("get build child cnt", cnt)
fmt.Println("tuple collection size", run.state.hAggr._groupings[0]._tableData._finalizedHT._dataCollection._count)
}
if run.state.hAggr._has == HAS_SCAN {
if run.state.haScanState == nil {
run.state.haScanState = NewHashAggrScanState()
err = run.initChildren()
if err != nil {
return InvalidOpResult, err
}
}
for {
if run.state.ungroupAggr {
if run.state.ungroupAggrDone {
return Done, nil
}
run.state.ungroupAggrDone = true
}
groupAddAggrTypes := make([]common.LType, 0)
groupAddAggrTypes = append(groupAddAggrTypes, run.state.hAggr._groupedAggrData._groupTypes...)
groupAddAggrTypes = append(groupAddAggrTypes, run.state.hAggr._groupedAggrData._aggrReturnTypes...)
groupAndAggrChunk := &chunk.Chunk{}
groupAndAggrChunk.Init(groupAddAggrTypes, util.DefaultVectorSize)
util.AssertFunc(len(run.state.hAggr._groupedAggrData._groupingFuncs) == 0)
childChunk := &chunk.Chunk{}
childChunk.Init(run.state.hAggr._groupedAggrData._childrenOutputTypes, util.DefaultVectorSize)
res = run.state.hAggr.GetData(run.state.haScanState, groupAndAggrChunk, childChunk)
if res == InvalidOpResult {
return InvalidOpResult, nil
}
if res == Done {
break
}
x := childChunk.Card()
filterInputTypes := make([]common.LType, 0)
filterInputTypes = append(filterInputTypes, run.state.hAggr._groupedAggrData._aggrReturnTypes...)
filterInputChunk := &chunk.Chunk{}
filterInputChunk.Init(filterInputTypes, util.DefaultVectorSize)
for i := 0; i < len(run.state.hAggr._groupedAggrData._aggregates); i++ {
filterInputChunk.Data[i].Reference(groupAndAggrChunk.Data[run.state.hAggr._groupedAggrData.GroupCount()+i])
}
filterInputChunk.SetCard(groupAndAggrChunk.Card())
var count int
count, err = state.filterExec.executeSelect([]*chunk.Chunk{childChunk, nil, filterInputChunk}, state.filterSel)
if err != nil {
return InvalidOpResult, err
}
if count == 0 {
run.state.haScanState._filteredCnt1 += childChunk.Card() - count
continue
}
var childChunk2 *chunk.Chunk
var aggrStatesChunk2 *chunk.Chunk
var filtered int
if count == childChunk.Card() {
childChunk2 = childChunk
aggrStatesChunk2 = groupAndAggrChunk
util.AssertFunc(childChunk.Card() == childChunk2.Card())
util.AssertFunc(groupAndAggrChunk.Card() == aggrStatesChunk2.Card())
util.AssertFunc(childChunk2.Card() == aggrStatesChunk2.Card())
} else {
filtered = childChunk.Card() - count
run.state.haScanState._filteredCnt2 += filtered
childChunkIndice := make([]int, 0)
for i := 0; i < childChunk.ColumnCount(); i++ {
childChunkIndice = append(childChunkIndice, i)
}
aggrStatesChunkIndice := make([]int, 0)
for i := 0; i < groupAndAggrChunk.ColumnCount(); i++ {
aggrStatesChunkIndice = append(aggrStatesChunkIndice, i)
}
childChunk2 = &chunk.Chunk{}
childChunk2.Init(run.children[0].state.outputTypes, util.DefaultVectorSize)
aggrStatesChunk2 = &chunk.Chunk{}
aggrStatesChunk2.Init(groupAddAggrTypes, util.DefaultVectorSize)
childChunk2.SliceIndice(childChunk, state.filterSel, count, 0, childChunkIndice)
aggrStatesChunk2.SliceIndice(groupAndAggrChunk, state.filterSel, count, 0, aggrStatesChunkIndice)
util.AssertFunc(count == childChunk2.Card())
util.AssertFunc(count == aggrStatesChunk2.Card())
util.AssertFunc(childChunk2.Card() == aggrStatesChunk2.Card())
}
var aggrStatesChunk3 *chunk.Chunk
if run.state.ungroupAggr {
aggrStatesTyps := make([]common.LType, 0)
aggrStatesTyps = append(aggrStatesTyps, run.state.hAggr._groupedAggrData._aggrReturnTypes...)
aggrStatesChunk3 = &chunk.Chunk{}
aggrStatesChunk3.Init(aggrStatesTyps, util.DefaultVectorSize)
for i := 0; i < len(run.state.hAggr._groupedAggrData._aggregates); i++ {
aggrStatesChunk3.Data[i].Reference(aggrStatesChunk2.Data[run.state.hAggr._groupedAggrData.GroupCount()+i])
}
aggrStatesChunk3.SetCard(aggrStatesChunk2.Card())
} else {
aggrStatesChunk3 = aggrStatesChunk2
}
err = run.state.outputExec.executeExprs([]*chunk.Chunk{childChunk2, nil, aggrStatesChunk3}, output)
if err != nil {
return InvalidOpResult, err
}
if filtered == 0 {
util.AssertFunc(filtered == 0)
util.AssertFunc(output.Card() == childChunk2.Card())
util.AssertFunc(x >= childChunk2.Card())
}
util.AssertFunc(output.Card()+filtered == childChunk.Card())
util.AssertFunc(x == childChunk.Card())
util.AssertFunc(output.Card() == childChunk2.Card())
run.state.haScanState._outputCnt += output.Card()
run.state.haScanState._childCnt2 += childChunk.Card()
run.state.haScanState._childCnt3 += x
if output.Card() > 0 {
return haveMoreOutput, nil
}
}
}
fmt.Println("scan cnt",
"childCnt",
run.state.haScanState._childCnt,
"childCnt2",
run.state.haScanState._childCnt2,
"childCnt3",
run.state.haScanState._childCnt3,
"outputCnt",
run.state.haScanState._outputCnt,
"filteredCnt1",
run.state.haScanState._filteredCnt1,
"filteredCnt2",
run.state.haScanState._filteredCnt2,
)
return Done, nil
}
func (run *Runner) aggrClose() error {
run.state.hAggr = nil
return nil
}