package compute
import (
"encoding/csv"
"fmt"
"os"
"time"
"github.com/daviszhen/plan/pkg/chunk"
"github.com/daviszhen/plan/pkg/common"
"github.com/daviszhen/plan/pkg/storage"
"github.com/daviszhen/plan/pkg/util"
pqReader "github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
)
type OprBaseState struct {
outputTypes []common.LType
outputIndice []int
outputExec *ExprExec
}
type OprProjectState struct {
projTypes []common.LType
projExec *ExprExec
}
type OprScanState struct {
pqFile source.ParquetFile
pqReader *pqReader.ParquetReader
dataFile *os.File
reader *csv.Reader
tablePath string
colIndice []int
readedColTyps []common.LType
tabEnt *storage.CatalogEntry
tableScanState *storage.TableScanState
colScanState *ColumnDataScanState
maxRows int
showRaw bool
}
type OprSortState struct {
localSort *LocalSort
orderKeyExec *ExprExec
keyTypes []common.LType
payloadTypes []common.LType
}
type OprAggrState struct {
hAggr *HashAggr
haScanState *HashAggrScanState
referChildren bool
constGroupby bool
ungroupAggr bool
ungroupAggrDone bool
groupbyWithParamsExec *ExprExec
groupbyExec *ExprExec
}
type OprJoinState struct {
cross *CrossProduct
hjoin *HashJoin
}
type OprLimitState struct {
limit *Limit
}
type OprFilterState struct {
filterExec *ExprExec
filterSel *chunk.SelectVector
}
type OprInsertState struct {
insertChunk *chunk.Chunk
}
type OprStubState struct {
deserial util.Deserialize
maxRowCnt int
rowReadCnt int
}
type OperatorState struct {
OprBaseState
OprScanState
OprProjectState
OprJoinState
OprAggrState
OprFilterState
OprSortState
OprLimitState
OprInsertState
OprStubState
}
type OperatorResult int
const (
InvalidOpResult OperatorResult = 0
NeedMoreInput OperatorResult = 1
haveMoreOutput OperatorResult = 2
Done OperatorResult = 3
)
type SourceResult int
const (
SrcResHaveMoreOutput SourceResult = iota
SrcResDone
)
type SinkResult int
const (
SinkResNeedMoreInput SinkResult = iota
SinkResDone
)
type ExecStats struct {
_totalTime time.Duration
_totalChildTime time.Duration
}
func (stats ExecStats) String() string {
if stats._totalTime == 0 {
return fmt.Sprintf("total time is 0")
}
return fmt.Sprintf("time : total %v, this %v (%.2f) , child %v",
stats._totalTime,
stats._totalTime-stats._totalChildTime,
float64(stats._totalTime-stats._totalChildTime)/float64(stats._totalTime),
stats._totalChildTime,
)
}
var _ OperatorExec = &Runner{}
type OperatorExec interface {
Init() error
Execute(input, output *chunk.Chunk, state *OperatorState) (OperatorResult, error)
Close() error
}