Skip to content

Commit 25aba8f

Browse files
committed
WIP - REBASE AWAY
1 parent 6beae3b commit 25aba8f

File tree

1 file changed

+85
-21
lines changed

1 file changed

+85
-21
lines changed

go/store/nbs/table_reader.go

Lines changed: 85 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,9 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc(
505505
offsetRecords offsetRecSlice,
506506
stats *Stats,
507507
readAtOffsets func(
508-
ctx context.Context,
509-
rb readBatch,
510-
stats *Stats) error,
508+
ctx context.Context,
509+
rb readBatch,
510+
stats *Stats) error,
511511
) error {
512512
batches := toReadBatches(offsetRecords, tr.blockSize)
513513
for i := range batches {
@@ -765,36 +765,100 @@ func (tr tableReader) clone() (tableReader, error) {
765765

766766
func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk), stats *Stats) error {
767767
count := tr.idx.chunkCount()
768-
for i := uint32(0); i < count; i++ {
769-
if ctx.Err() != nil {
770-
return ctx.Err()
771-
}
768+
if count == 0 {
769+
return nil
770+
}
772771

773-
var h hash.Hash
774-
ie, err := tr.idx.indexEntry(i, &h)
772+
// Build cumulative offsets array for efficient chunk location
773+
cumulativeOffsets := make([]uint64, count+1)
774+
cumulativeOffsets[0] = 0
775+
for i := uint32(0); i < count; i++ {
776+
ie, err := tr.idx.indexEntry(i, nil)
775777
if err != nil {
776778
return err
777779
}
780+
cumulativeOffsets[i+1] = cumulativeOffsets[i] + uint64(ie.Length())
781+
}
778782

779-
res := make([]byte, ie.Length())
780-
n, err := tr.r.ReadAtWithStats(ctx, res, int64(ie.Offset()), stats)
781-
if err != nil {
782-
return err
783-
}
784-
if uint32(n) != ie.Length() {
785-
return errors.New("failed to read all data")
783+
// Calculate total data size to determine where chunk records end
784+
totalDataSize := cumulativeOffsets[count]
785+
786+
// Read data in 1MB chunks
787+
const bufferSize = 1024 * 1024 // 1MB
788+
currentOffset := uint64(0)
789+
chunkCounter := uint32(0)
790+
791+
// Reuse buffer across reads
792+
dataBlock := make([]byte, bufferSize)
793+
794+
for currentOffset < totalDataSize {
795+
// Calculate how much data to read (up to 1MB or remaining data)
796+
remainingData := totalDataSize - currentOffset
797+
readSize := bufferSize
798+
if remainingData < bufferSize {
799+
readSize = int(remainingData)
800+
dataBlock = dataBlock[:readSize] // Resize to remaining data
786801
}
787802

788-
cchk, err := NewCompressedChunk(h, res)
803+
_, err := tr.r.ReadAtWithStats(ctx, dataBlock, int64(currentOffset), stats)
789804
if err != nil {
790805
return err
791806
}
792-
chk, err := cchk.ToChunk()
793-
if err != nil {
794-
return err
807+
808+
// Process all chunks that are fully contained within this data block
809+
blockStart := currentOffset
810+
blockEnd := currentOffset + uint64(readSize)
811+
812+
for chunkCounter < count {
813+
if ctx.Err() != nil {
814+
return ctx.Err()
815+
}
816+
817+
chunkStart := cumulativeOffsets[chunkCounter]
818+
chunkEnd := cumulativeOffsets[chunkCounter+1]
819+
820+
// Check if this chunk extends beyond the current block
821+
if chunkEnd > blockEnd {
822+
// This chunk extends beyond current block, read next block
823+
currentOffset = chunkStart
824+
break
825+
}
826+
827+
// Extract chunk data from buffer
828+
bufferOffset := chunkStart - blockStart
829+
chunkData := dataBlock[bufferOffset : bufferOffset+(chunkEnd-chunkStart)]
830+
831+
// Get the hash for this chunk
832+
var h hash.Hash
833+
_, err := tr.idx.indexEntry(chunkCounter, &h)
834+
if err != nil {
835+
return err
836+
}
837+
838+
// Create compressed chunk and decompress
839+
cchk, err := NewCompressedChunk(h, chunkData)
840+
if err != nil {
841+
return err
842+
}
843+
chk, err := cchk.ToChunk()
844+
if err != nil {
845+
return err
846+
}
847+
848+
// Process the chunk
849+
cb(chk)
850+
851+
chunkCounter++
795852
}
796853

797-
cb(chk)
854+
// If we processed all chunks in this block, move to next block
855+
if chunkCounter < count && cumulativeOffsets[chunkCounter] >= blockEnd {
856+
currentOffset = blockEnd
857+
} else if chunkCounter >= count {
858+
// All chunks processed
859+
break
860+
}
798861
}
862+
799863
return nil
800864
}

0 commit comments

Comments
 (0)