package compute
import (
"sync"
"unsafe"
"github.com/daviszhen/plan/pkg/chunk"
"github.com/daviszhen/plan/pkg/common"
"github.com/daviszhen/plan/pkg/storage"
"github.com/daviszhen/plan/pkg/util"
)
type TupleDataBlock struct {
_handle *storage.BlockHandle
_cap uint64
_size uint64
}
func NewTupleDataBlock(bufferMgr *storage.BufferManager, cap uint64) *TupleDataBlock {
ret := &TupleDataBlock{
_cap: cap,
_size: 0,
}
bufferMgr.Allocate(cap, false, &ret._handle)
return ret
}
func (block *TupleDataBlock) RemainingCapacity() uint64 {
return block._cap - block._size
}
func (block *TupleDataBlock) RemainingRows(rowWidth uint64) uint64 {
return block.RemainingCapacity() / rowWidth
}
func (block *TupleDataBlock) Close() {
}
type TupleDataAllocator struct {
_bufferMgr *storage.BufferManager
_layout *TupleDataLayout
_rowBlocks []*TupleDataBlock
_heapBlocks []*TupleDataBlock
}
func NewTupleDataAllocator(bufferMgr *storage.BufferManager, layout *TupleDataLayout) *TupleDataAllocator {
ret := &TupleDataAllocator{
_bufferMgr: bufferMgr,
_layout: layout.copy(),
}
return ret
}
func (alloc *TupleDataAllocator) Build(
segment *TupleDataSegment,
pinState *TupleDataPinState,
chunkState *TupleDataChunkState,
appendOffset uint64,
appendCount uint64,
) {
if !util.Empty(segment._chunks) {
alloc.ReleaseOrStoreHandles(
pinState,
segment,
util.Back(segment._chunks),
true,
)
}
offset := uint64(0)
chunkPartIndices := make([]util.Pair[uint64, uint64], 0)
for offset != appendCount {
if util.Empty(segment._chunks) ||
util.Back(segment._chunks)._count == util.DefaultVectorSize {
segment._chunks = append(segment._chunks, NewTupleDataChunk())
}
chunk := util.Back(segment._chunks)
next := min(
appendCount-offset,
util.DefaultVectorSize-chunk._count,
)
part := alloc.BuildChunkPart(
pinState,
chunkState,
appendOffset+offset,
next,
)
chunk.AddPart(part, alloc._layout)
chunkPartIndices = append(chunkPartIndices,
util.Pair[uint64, uint64]{
First: uint64(len(segment._chunks) - 1),
Second: uint64(len(chunk._parts) - 1),
})
chunkPart := util.Back(chunk._parts)
next = uint64(chunkPart._count)
segment._count += next
offset += next
}
parts := make([]*TupleDataChunkPart, 0)
for _, pair := range chunkPartIndices {
parts = append(parts,
segment._chunks[pair.First]._parts[pair.Second],
)
}
alloc.InitChunkStateInternal(
pinState,
chunkState,
appendOffset,
false,
true,
false,
parts,
)
}
func (alloc *TupleDataAllocator) InitChunkStateInternal(
pinState *TupleDataPinState,
chunkState *TupleDataChunkState,
offset uint64,
recompute bool,
initHeapPointers bool,
initHeapSizes bool,
parts []*TupleDataChunkPart,
) {
rowLocSlice := chunk.GetSliceInPhyFormatFlat[unsafe.Pointer](chunkState._rowLocations)
heapSizesSlice := chunk.GetSliceInPhyFormatFlat[uint64](chunkState._heapSizes)
heapLocSlice := chunk.GetSliceInPhyFormatFlat[unsafe.Pointer](chunkState._heapLocations)
rowWidth := alloc._layout.rowWidth()
for _, part := range parts {
next := part._count
baseRowPtr := alloc.GetRowPointer(pinState, part)
for i := uint32(0); i < next; i++ {
rowLocSlice[offset+uint64(i)] =
util.PointerAdd(baseRowPtr, int(i)*rowWidth)
}
if alloc._layout.allConst() {
offset += uint64(next)
continue
}
if part._totalHeapSize == 0 {
if initHeapSizes {
panic("usp")
}
offset += uint64(next)
continue
}
if recompute {
panic("usp")
}
if initHeapSizes {
panic("usp")
}
if initHeapPointers {
heapLocSlice[offset] = util.PointerAdd(
part._baseHeapPtr,
int(part._heapBlockOffset),
)
for i := uint32(1); i < next; i++ {
idx := offset + uint64(i)
heapLocSlice[idx] = util.PointerAdd(
heapLocSlice[idx-1],
int(heapSizesSlice[idx-1]),
)
}
}
offset += uint64(next)
}
util.AssertFunc(offset <= util.DefaultVectorSize)
}
func (alloc *TupleDataAllocator) BuildChunkPart(
pinState *TupleDataPinState,
chunkState *TupleDataChunkState,
appendOffset uint64,
appendCount uint64,
) *TupleDataChunkPart {
result := new(TupleDataChunkPart)
if util.Empty(alloc._rowBlocks) ||
util.Back(alloc._rowBlocks).RemainingCapacity() <
uint64(alloc._layout.rowWidth()) {
alloc._rowBlocks = append(alloc._rowBlocks,
NewTupleDataBlock(alloc._bufferMgr, storage.BLOCK_SIZE))
}
result._rowBlockIdx = uint32(len(alloc._rowBlocks) - 1)
rowBlock := util.Back(alloc._rowBlocks)
result._rowBlockOffset = uint32(rowBlock._size)
result._count = uint32(min(
rowBlock.RemainingRows(uint64(alloc._layout.rowWidth())),
appendCount))
if !alloc._layout.allConst() {
heapSizesSlice := chunk.GetSliceInPhyFormatFlat[uint64](chunkState._heapSizes)
tHeapSize := uint64(0)
for i := uint32(0); i < result._count; i++ {
tHeapSize += heapSizesSlice[appendOffset+uint64(i)]
}
if tHeapSize == 0 {
result._heapBlockIdx = INVALID_INDEX
result._heapBlockOffset = INVALID_INDEX
result._totalHeapSize = 0
result._baseHeapPtr = nil
} else {
if util.Empty(alloc._heapBlocks) ||
util.Back(alloc._heapBlocks).RemainingCapacity() < heapSizesSlice[appendOffset] {
sz := max(storage.BLOCK_SIZE, heapSizesSlice[appendOffset])
alloc._heapBlocks = append(alloc._heapBlocks,
NewTupleDataBlock(alloc._bufferMgr, sz),
)
}
result._heapBlockIdx = uint32(len(alloc._heapBlocks) - 1)
heapBlock := util.Back(alloc._heapBlocks)
result._heapBlockOffset = uint32(heapBlock._size)
heapRest := heapBlock.RemainingCapacity()
if tHeapSize <= heapRest {
result._totalHeapSize = uint32(tHeapSize)
} else {
result._totalHeapSize = 0
for i := uint32(0); i < result._count; i++ {
sz := heapSizesSlice[appendOffset+uint64(i)]
if uint64(result._totalHeapSize)+sz > heapRest {
result._count = i
break
}
result._totalHeapSize += uint32(sz)
}
}
heapBlock._size += uint64(result._totalHeapSize)
result._baseHeapPtr = alloc.GetBaseHeapPointer(pinState, result)
}
}
util.AssertFunc(result._count != 0 && result._count <= util.DefaultVectorSize)
rowBlock._size += uint64(result._count * uint32(alloc._layout.rowWidth()))
return result
}
func (alloc *TupleDataAllocator) GetRowPointer(
pinState *TupleDataPinState,
part *TupleDataChunkPart) unsafe.Pointer {
ptr := alloc.PinRowBlock(pinState, part).Ptr()
return util.PointerAdd(ptr, int(part._rowBlockOffset))
}
func (alloc *TupleDataAllocator) PinRowBlock(
pinState *TupleDataPinState,
part *TupleDataChunkPart) *storage.BufferHandle {
idx := part._rowBlockIdx
if handle, ok := pinState._rowHandles[idx]; !ok {
util.AssertFunc(idx < uint32(len(alloc._rowBlocks)))
rowBlock := alloc._rowBlocks[idx]
util.AssertFunc(rowBlock._handle != nil)
util.AssertFunc(uint64(part._rowBlockOffset) < rowBlock._size)
util.AssertFunc(uint64(part._rowBlockOffset+part._count*uint32(alloc._layout.rowWidth())) <= rowBlock._size)
handle = alloc._bufferMgr.Pin(rowBlock._handle)
pinState._rowHandles[idx] = handle
return handle
} else {
return handle
}
}
func (alloc *TupleDataAllocator) GetBaseHeapPointer(
pinState *TupleDataPinState,
part *TupleDataChunkPart) unsafe.Pointer {
return alloc.PinHeapBlock(pinState, part).Ptr()
}
func (alloc *TupleDataAllocator) PinHeapBlock(
pinState *TupleDataPinState,
part *TupleDataChunkPart) *storage.BufferHandle {
idx := part._heapBlockIdx
if handle, ok := pinState._heapHandles[idx]; !ok {
util.AssertFunc(idx < uint32(len(alloc._heapBlocks)))
heapBlock := alloc._heapBlocks[idx]
util.AssertFunc(heapBlock._handle != nil)
util.AssertFunc(uint64(part._heapBlockOffset) < heapBlock._size)
util.AssertFunc(uint64(part._heapBlockOffset+part._totalHeapSize) <= heapBlock._size)
handle = alloc._bufferMgr.Pin(heapBlock._handle)
pinState._heapHandles[idx] = handle
return handle
} else {
return handle
}
}
func (alloc *TupleDataAllocator) ReleaseOrStoreHandles(
pinState *TupleDataPinState,
segment *TupleDataSegment,
chunk *TupleDataChunk,
releaseHeap bool,
) {
alloc.ReleaseOrStoreHandles2(
segment,
&segment.mu._pinnedRowHandles,
pinState._rowHandles,
chunk._rowBlockIds,
alloc._rowBlocks,
pinState._properties,
)
if !alloc._layout.allConst() && releaseHeap {
alloc.ReleaseOrStoreHandles2(
segment,
&segment.mu._pinnedHeapHandles,
pinState._heapHandles,
chunk._heapBlockIds,
alloc._heapBlocks,
pinState._properties,
)
}
}
func (alloc *TupleDataAllocator) ReleaseOrStoreHandles2(
segment *TupleDataSegment,
pinnedHandles *[]*storage.BufferHandle,
handles map[uint32]*storage.BufferHandle,
blockIds UnorderedSet,
blocks []*TupleDataBlock,
prop TupleDataPinProperties,
) {
for {
found := false
removed := make([]uint32, 0)
for id, handle := range handles {
if blockIds.find(uint64(id)) {
continue
}
switch prop {
case PIN_PRRP_KEEP_PINNED:
fun := func() {
segment.mu.Lock()
defer segment.mu.Unlock()
blockCount := id + 1
if blockCount > uint32(len(*pinnedHandles)) {
need := blockCount - uint32(len(*pinnedHandles))
*pinnedHandles = append(*pinnedHandles,
make([]*storage.BufferHandle, need)...)
}
(*pinnedHandles)[id] = handle
}
fun()
default:
panic("usp")
}
removed = append(removed, id)
found = true
break
}
for _, rem := range removed {
delete(handles, rem)
}
if !found {
break
}
}
}
func (alloc *TupleDataAllocator) InitChunkState(
seg *TupleDataSegment,
pinState *TupleDataPinState,
chunkState *TupleDataChunkState,
chunkIdx int,
initHeap bool,
) {
util.AssertFunc(chunkIdx < util.Size(seg._chunks))
chunk := seg._chunks[chunkIdx]
alloc.ReleaseOrStoreHandles(
pinState,
seg,
chunk,
len(chunk._heapBlockIds) != 0,
)
alloc.InitChunkStateInternal(
pinState,
chunkState,
0,
false,
initHeap,
initHeap,
chunk._parts,
)
}
type TupleDataSegment struct {
_allocator *TupleDataAllocator
_chunks []*TupleDataChunk
_count uint64
mu struct {
sync.Mutex
_pinnedRowHandles []*storage.BufferHandle
_pinnedHeapHandles []*storage.BufferHandle
}
}
func (segment *TupleDataSegment) Unpin() {
segment.mu.Lock()
defer segment.mu.Unlock()
for _, handle := range segment.mu._pinnedRowHandles {
handle.Close()
}
for _, handle := range segment.mu._pinnedHeapHandles {
handle.Close()
}
segment.mu._pinnedRowHandles = nil
segment.mu._pinnedHeapHandles = nil
}
func (segment *TupleDataSegment) ChunkCount() int {
return util.Size(segment._chunks)
}
func NewTupleDataSegment(alloc *TupleDataAllocator) *TupleDataSegment {
ret := &TupleDataSegment{
_allocator: alloc,
}
return ret
}
type TupleDataChunk struct {
_parts []*TupleDataChunkPart
_rowBlockIds UnorderedSet
_heapBlockIds UnorderedSet
_count uint64
}
func NewTupleDataChunk() *TupleDataChunk {
ret := &TupleDataChunk{
_rowBlockIds: make(UnorderedSet),
_heapBlockIds: make(UnorderedSet),
}
return ret
}
func (chunk *TupleDataChunk) AddPart(
part *TupleDataChunkPart,
layout *TupleDataLayout) {
chunk._count += uint64(part._count)
chunk._rowBlockIds.insert(uint64(part._rowBlockIdx))
if !layout.allConst() && part._totalHeapSize > 0 {
chunk._heapBlockIds.insert(uint64(part._heapBlockIdx))
}
chunk._parts = append(chunk._parts, part)
}
type TupleDataChunkPart struct {
sync.Mutex
_rowBlockIdx uint32
_rowBlockOffset uint32
_heapBlockIdx uint32
_heapBlockOffset uint32
_baseHeapPtr unsafe.Pointer
_totalHeapSize uint32
_count uint32
}
type TupleDataChunkState struct {
_data []chunk.UnifiedFormat
_rowLocations *chunk.Vector
_heapLocations *chunk.Vector
_heapSizes *chunk.Vector
_columnIds []int
_childrenOutputIds []int
}
func NewTupleDataChunkState(cnt, childrenOutputCnt int) *TupleDataChunkState {
ret := &TupleDataChunkState{
_data: make([]chunk.UnifiedFormat, cnt+childrenOutputCnt),
_rowLocations: chunk.NewVector2(common.PointerType(), util.DefaultVectorSize),
_heapLocations: chunk.NewVector2(common.PointerType(), util.DefaultVectorSize),
_heapSizes: chunk.NewVector2(common.UbigintType(), util.DefaultVectorSize),
}
for i := 0; i < cnt; i++ {
ret._columnIds = append(ret._columnIds, i)
}
for i := 0; i < childrenOutputCnt; i++ {
ret._childrenOutputIds = append(ret._childrenOutputIds, i+cnt)
}
return ret
}
type TupleDataPinProperties int
const (
PIN_PRRP_INVALID TupleDataPinProperties = 0
PIN_PRRP_KEEP_PINNED TupleDataPinProperties = 1
)
type TupleDataPinState struct {
_rowHandles map[uint32]*storage.BufferHandle
_heapHandles map[uint32]*storage.BufferHandle
_properties TupleDataPinProperties
}
func NewTupleDataPinState() *TupleDataPinState {
ret := &TupleDataPinState{
_rowHandles: make(map[uint32]*storage.BufferHandle),
_heapHandles: make(map[uint32]*storage.BufferHandle),
_properties: PIN_PRRP_INVALID,
}
return ret
}
type TupleDataChunkIterator struct {
_collection *TupleDataCollection
_initHeap bool
_startSegIdx int
_startChunkIdx int
_endSegIdx int
_endChunkIdx int
_state TupleDataScanState
_curSegIdx int
_curChunkIdx int
}
func NewTupleDataChunkIterator(
collection *TupleDataCollection,
prop TupleDataPinProperties,
from, to int,
initHeap bool,
) *TupleDataChunkIterator {
ret := &TupleDataChunkIterator{
_collection: collection,
_initHeap: initHeap,
}
ret._state._pinState = *NewTupleDataPinState()
ret._state._pinState._properties = prop
layout := collection._layout
ret._state._chunkState = *NewTupleDataChunkState(layout.columnCount(), layout.childrenOutputCount())
util.AssertFunc(from < to)
util.AssertFunc(from <= collection.ChunkCount())
idx := 0
for segIdx := 0; segIdx < util.Size(collection._segments); segIdx++ {
seg := collection._segments[segIdx]
if from >= idx && from <= idx+seg.ChunkCount() {
ret._startSegIdx = segIdx
ret._startChunkIdx = from - idx
}
if to >= idx && to <= idx+seg.ChunkCount() {
ret._endSegIdx = segIdx
ret._endChunkIdx = to - idx
}
idx += seg.ChunkCount()
}
ret.Reset()
return ret
}
func NewTupleDataChunkIterator2(
collection *TupleDataCollection,
prop TupleDataPinProperties,
initHeap bool,
) *TupleDataChunkIterator {
return NewTupleDataChunkIterator(
collection,
prop,
0,
collection.ChunkCount(),
initHeap,
)
}
func (iter *TupleDataChunkIterator) Reset() {
iter._state._segmentIdx = iter._startSegIdx
iter._state._chunkIdx = iter._startChunkIdx
iter._collection.NextScanIndex(
&iter._state,
&iter._curSegIdx,
&iter._curChunkIdx)
iter.InitCurrentChunk()
}
func (iter *TupleDataChunkIterator) InitCurrentChunk() {
seg := iter._collection._segments[iter._curSegIdx]
seg._allocator.InitChunkState(
seg,
&iter._state._pinState,
&iter._state._chunkState,
iter._curChunkIdx,
iter._initHeap,
)
}
func (iter *TupleDataChunkIterator) Done() bool {
return iter._curSegIdx == iter._endSegIdx &&
iter._curChunkIdx == iter._endChunkIdx
}
func (iter *TupleDataChunkIterator) Next() bool {
util.AssertFunc(!iter.Done())
oldSegIdx := iter._curSegIdx
if !iter._collection.NextScanIndex(
&iter._state,
&iter._curSegIdx,
&iter._curChunkIdx) || iter.Done() {
iter._collection.FinalizePinState2(
&iter._state._pinState,
iter._collection._segments[oldSegIdx],
)
iter._curSegIdx = iter._endSegIdx
iter._curChunkIdx = iter._endChunkIdx
return false
}
if iter._curSegIdx != oldSegIdx {
iter._collection.FinalizePinState2(
&iter._state._pinState,
iter._collection._segments[oldSegIdx],
)
}
iter.InitCurrentChunk()
return true
}
func (iter *TupleDataChunkIterator) GetCurrentChunkCount() int {
return int(iter._collection._segments[iter._curSegIdx]._chunks[iter._curChunkIdx]._count)
}
func (iter *TupleDataChunkIterator) GetChunkState() *TupleDataChunkState {
return &iter._state._chunkState
}
func (iter *TupleDataChunkIterator) GetRowLocations() []unsafe.Pointer {
return chunk.GetSliceInPhyFormatFlat[unsafe.Pointer](iter._state._chunkState._rowLocations)
}