package compute
import (
"fmt"
"github.com/daviszhen/plan/pkg/chunk"
"github.com/daviszhen/plan/pkg/common"
"github.com/daviszhen/plan/pkg/util"
)
func (run *Runner) filterInit() error {
var err error
var filterExec *ExprExec
filterExec, err = initFilterExec(run.op.Filters)
if err != nil {
return err
}
run.state.filterExec = filterExec
run.state.filterSel = chunk.NewSelectVector(util.DefaultVectorSize)
return nil
}
func initFilterExec(filters []*Expr) (*ExprExec, error) {
var andFilter *Expr
if len(filters) > 0 {
andFilter = filters[0]
for i, filter := range filters {
if i > 0 {
if andFilter.DataTyp.Id != common.LTID_BOOLEAN ||
filter.DataTyp.Id != common.LTID_BOOLEAN {
return nil, fmt.Errorf("need boolean expr")
}
binder := FunctionBinder{}
andFilter = binder.BindScalarFunc(
FuncAnd,
[]*Expr{
andFilter,
filter,
},
IsOperator(FuncAnd),
)
}
}
}
return NewExprExec(andFilter), nil
}
func (run *Runner) runFilterExec(input *chunk.Chunk, output *chunk.Chunk, filterOnLocal bool) error {
var err error
var count int
if filterOnLocal {
count, err = run.state.filterExec.executeSelect([]*chunk.Chunk{nil, nil, input}, run.state.filterSel)
if err != nil {
return err
}
} else {
count, err = run.state.filterExec.executeSelect([]*chunk.Chunk{input, nil, nil}, run.state.filterSel)
if err != nil {
return err
}
}
if count == input.Card() {
output.ReferenceIndice(input, run.state.outputIndice)
} else {
output.SliceIndice(input, run.state.filterSel, count, 0, run.state.outputIndice)
}
return nil
}
func (run *Runner) filterExec(output *chunk.Chunk, state *OperatorState) (OperatorResult, error) {
childChunk := &chunk.Chunk{}
var res OperatorResult
var err error
if len(run.children) != 0 {
for {
res, err = run.execChild(run.children[0], childChunk, state)
if err != nil {
return 0, err
}
if res == InvalidOpResult {
return InvalidOpResult, nil
}
if res == Done {
return res, nil
}
if childChunk.Card() > 0 {
break
}
}
}
err = run.runFilterExec(childChunk, output, false)
if err != nil {
return 0, err
}
return haveMoreOutput, nil
}
func (run *Runner) filterClose() error {
return nil
}