Skip to content

Commit 77c3efd

Browse files
committed
WIP - REBASE AWAY
1 parent fe25b3d commit 77c3efd

File tree

1 file changed

+43
-38
lines changed

1 file changed

+43
-38
lines changed

go/store/nbs/table_reader.go

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -769,74 +769,82 @@ func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks
769769
return nil
770770
}
771771

772-
// Build cumulative offsets array for efficient chunk location
773-
cumulativeOffsets := make([]uint64, count+1)
774-
cumulativeOffsets[0] = 0
772+
// Build offset records similar to the extract method
773+
// The index is sorted by prefix, but we need to process chunkRecs in storage order (by offset)
774+
type chunkRecord struct {
775+
offset uint64
776+
length uint32
777+
hash hash.Hash
778+
}
779+
780+
chunkRecs := make([]chunkRecord, 0, count)
781+
782+
// First pass: collect all chunk info from the sorted index
775783
for i := uint32(0); i < count; i++ {
776-
ie, err := tr.idx.indexEntry(i, nil)
784+
var h hash.Hash
785+
ie, err := tr.idx.indexEntry(i, &h)
777786
if err != nil {
778787
return err
779788
}
780-
cumulativeOffsets[i+1] = cumulativeOffsets[i] + uint64(ie.Length())
789+
790+
chunkRecs = append(chunkRecs, chunkRecord{
791+
offset: ie.Offset(),
792+
length: ie.Length(),
793+
hash: h,
794+
})
781795
}
796+
sort.Slice(chunkRecs, func(i, j int) bool {
797+
return chunkRecs[i].offset < chunkRecs[j].offset
798+
})
782799

783-
// Calculate total data size to determine where chunk records end
784-
totalDataSize := cumulativeOffsets[count]
800+
lastChunk := chunkRecs[len(chunkRecs)-1]
801+
totalDataSize := lastChunk.offset + uint64(lastChunk.length)
785802

786-
// Read data in 1MB chunks
803+
// Read data in 1MB chunkRecs
787804
const bufferSize = 1024 * 1024 // 1MB
788805
currentOffset := uint64(0)
789-
chunkCounter := uint32(0)
806+
chunkIndex := 0
790807

791808
// Reuse buffer across reads
792809
dataBlock := make([]byte, bufferSize)
793810

794-
for currentOffset < totalDataSize {
795-
// Calculate how much data to read (up to 1MB or remaining data)
811+
for currentOffset < totalDataSize && chunkIndex < len(chunkRecs) {
812+
// Calculate how much data to read
796813
remainingData := totalDataSize - currentOffset
797814
readSize := bufferSize
798815
if remainingData < bufferSize {
799816
readSize = int(remainingData)
800-
dataBlock = dataBlock[:readSize] // Resize to remaining data
817+
dataBlock = dataBlock[:readSize]
801818
}
802819

803820
_, err := tr.r.ReadAtWithStats(ctx, dataBlock, int64(currentOffset), stats)
804821
if err != nil {
805822
return err
806823
}
807824

808-
// Process all chunks that are fully contained within this data block
809825
blockStart := currentOffset
810826
blockEnd := currentOffset + uint64(readSize)
811827

812-
for chunkCounter < count {
828+
// Process all chunkRecs that are fully contained within this block
829+
for chunkIndex < len(chunkRecs) {
813830
if ctx.Err() != nil {
814831
return ctx.Err()
815832
}
816833

817-
chunkStart := cumulativeOffsets[chunkCounter]
818-
chunkEnd := cumulativeOffsets[chunkCounter+1]
834+
chunk := chunkRecs[chunkIndex]
835+
chunkEnd := chunk.offset + uint64(chunk.length)
819836

820-
// Check if this chunk extends beyond the current block
837+
// Check if chunk extends beyond current block
821838
if chunkEnd > blockEnd {
822-
// This chunk extends beyond current block, read next block
823-
currentOffset = chunkStart
839+
// This chunk extends beyond current block, read next block starting here
840+
currentOffset = chunk.offset
824841
break
825842
}
826843

827-
// Extract chunk data from buffer
828-
bufferOffset := chunkStart - blockStart
829-
chunkData := dataBlock[bufferOffset : bufferOffset+(chunkEnd-chunkStart)]
830-
831-
// Get the hash for this chunk
832-
var h hash.Hash
833-
_, err := tr.idx.indexEntry(chunkCounter, &h)
834-
if err != nil {
835-
return err
836-
}
837-
838-
// Create compressed chunk and decompress
839-
cchk, err := NewCompressedChunk(h, chunkData)
844+
bufferOffset := chunk.offset - blockStart
845+
chunkData := make([]byte, chunk.length)
846+
copy(chunkData, dataBlock[bufferOffset:bufferOffset+uint64(chunk.length)])
847+
cchk, err := NewCompressedChunk(chunk.hash, chunkData)
840848
if err != nil {
841849
return err
842850
}
@@ -848,15 +856,12 @@ func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks
848856
// Process the chunk
849857
cb(chk)
850858

851-
chunkCounter++
859+
chunkIndex++
852860
}
853861

854-
// If we processed all chunks in this block, move to next block
855-
if chunkCounter < count && cumulativeOffsets[chunkCounter] >= blockEnd {
862+
// If we didn't break out due to a chunk boundary, move to next block
863+
if chunkIndex < len(chunkRecs) && chunkRecs[chunkIndex].offset < blockEnd {
856864
currentOffset = blockEnd
857-
} else if chunkCounter >= count {
858-
// All chunks processed
859-
break
860865
}
861866
}
862867

0 commit comments

Comments
 (0)