Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions quickwit/quickwit-indexing/src/source/doc_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use quickwit_common::Progress;
use quickwit_common::uri::Uri;
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::Position;
use quickwit_proto::types::{Offset, Position};
use quickwit_storage::StorageResolver;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};

Expand Down Expand Up @@ -146,8 +146,13 @@ impl DocFileReader {
pub struct ObjectUriBatchReader {
partition_id: PartitionId,
reader: DocFileReader,
current_offset: usize,
is_eof: bool,
current_position: Position,
}

fn parse_offset(offset: &Offset) -> anyhow::Result<usize> {
offset
.as_usize()
.context("file offset should be stored as usize")
}

impl ObjectUriBatchReader {
Expand All @@ -157,26 +162,22 @@ impl ObjectUriBatchReader {
uri: &Uri,
position: Position,
) -> anyhow::Result<Self> {
let current_offset = match position {
Position::Beginning => 0,
Position::Offset(offset) => offset
.as_usize()
.context("file offset should be stored as usize")?,
let current_offset = match &position {
Position::Eof(_) => {
return Ok(ObjectUriBatchReader {
partition_id,
reader: DocFileReader::empty(),
current_offset: 0,
is_eof: true,
current_position: position,
});
}
Position::Beginning => 0,
Position::Offset(offset) => parse_offset(offset)?,
};
let reader = DocFileReader::from_uri(storage_resolver, uri, current_offset).await?;
Ok(ObjectUriBatchReader {
partition_id,
reader,
current_offset,
is_eof: false,
current_position: position,
})
}

Expand All @@ -186,11 +187,15 @@ impl ObjectUriBatchReader {
source_type: SourceType,
) -> anyhow::Result<BatchBuilder> {
let mut batch_builder = BatchBuilder::new(source_type);
if self.is_eof {
return Ok(batch_builder);
}
let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize;
let mut new_offset = self.current_offset;
let current_offset = match &self.current_position {
Position::Eof(_) => return Ok(batch_builder),
Position::Beginning => 0,
Position::Offset(offset) => parse_offset(offset)?,
};

let limit_num_bytes = current_offset + BATCH_NUM_BYTES_LIMIT as usize;
let mut new_offset = current_offset;
let mut eof_position: Option<Position> = None;
while new_offset < limit_num_bytes {
if let Some(record) = source_progress
.protect_future(self.reader.next_record())
Expand All @@ -199,30 +204,26 @@ impl ObjectUriBatchReader {
new_offset = record.next_offset as usize;
batch_builder.add_doc(record.doc);
if record.is_last {
self.is_eof = true;
eof_position = Some(Position::eof(new_offset));
break;
}
} else {
self.is_eof = true;
eof_position = Some(Position::eof(new_offset));
break;
}
}
let to_position = if self.is_eof {
Position::eof(new_offset)
} else {
Position::offset(new_offset)
};
let to_position = eof_position.unwrap_or(Position::offset(new_offset));
batch_builder.checkpoint_delta.record_partition_delta(
self.partition_id.clone(),
Position::offset(self.current_offset),
to_position,
self.current_position.clone(),
to_position.clone(),
)?;
self.current_offset = new_offset;
self.current_position = to_position;
Ok(batch_builder)
}

pub fn is_eof(&self) -> bool {
self.is_eof
self.current_position.is_eof()
}
}

Expand Down
46 changes: 46 additions & 0 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,52 @@ mod tests {
assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2));
}

#[tokio::test]
async fn test_checkpoint_delta_of_existing_messages() {
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap();
let partition_id_1 = PreProcessedPayload::ObjectUri(test_uri_1.clone()).partition_id();

let (dummy_doc_file_2, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_2 = Uri::from_str(dummy_doc_file_2.path().to_str().unwrap()).unwrap();
let partition_id_2 = PreProcessedPayload::ObjectUri(test_uri_2.clone()).partition_id();

let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = init_state(
"test-index",
&[
(
partition_id_1.clone(),
("existing_token_1".to_string(), Position::Beginning, true),
),
(
partition_id_2.clone(),
(
"existing_token_2".to_string(),
Position::offset((DUMMY_DOC.len() + 1) * 2),
true,
),
),
],
);
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
let batches = process_messages(
&mut coordinator,
queue,
&[(&test_uri_1, "ack-id-1"), (&test_uri_2, "ack-id-2")],
)
.await;
assert_eq!(batches.len(), 2);
let deltas = batches[0].checkpoint_delta.iter().collect::<Vec<_>>();
assert_eq!(deltas.len(), 1);
assert_eq!(deltas[0].1.from, Position::Beginning);
assert_eq!(deltas[0].1.to, Position::eof(350u64));
let deltas = batches[1].checkpoint_delta.iter().collect::<Vec<_>>();
assert_eq!(deltas.len(), 1);
assert_eq!(deltas[0].1.from, Position::Offset(70u64.into()));
assert_eq!(deltas[0].1.to, Position::eof(350u64));
}

#[tokio::test]
async fn test_process_multiple_coordinator() {
let queue = Arc::new(MemoryQueueForTests::new());
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use doc_mapping_uid::DocMappingUid;
pub use doc_uid::{DocUid, DocUidGenerator};
pub use index_uid::IndexUid;
pub use pipeline_uid::PipelineUid;
pub use position::Position;
pub use position::{Offset, Position};
pub use shard_id::ShardId;

/// The size of an ULID in bytes. Use `ULID_LEN` for the length of Base32 encoded ULID strings.
Expand Down