Skip to content

Commit 94cf3fc

Browse files
Fix retriable errors not handled well when creating producer or consumer (#293)
* Fix retriable errors not handled well when subscribing Fixes #292 ### Motivation When a consumer failed to subscribe due to a retriable error, the time point comparation is wrong: https://github.com/apache/pulsar-client-cpp/blob/633f4bbe8c182128da09803172676b9d6af05057/lib/ConsumerImpl.cc#L321 `creationTimestamp_ + operationTimeut_` is the deadline, `TimeUtils::now()` is the current time, we should use `>` instead of `<` here to compare them. Otherwise, if the consumer encountered a retriable error and the deadline is not exceeded, the consumer won't reconnect and fail with `ResultRetryable`. ### Modifications Reverse the comparation between the deadline and the current time. When it times out, completing the future with `ResultTimeout` instead of the `result` itself, which is always `ResultRetryable`. Add `ConsumerTest.testRetrySubscribe` to verify this change. ### TODO Support configuring the operation timeout in milliseconds. * Fix the same error for producer as well * Add result conversion * Reduce duplicated code
1 parent 633f4bb commit 94cf3fc

File tree

5 files changed

+29
-10
lines changed

5 files changed

+29
-10
lines changed

lib/ConsumerImpl.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,9 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
318318
scheduleReconnection(get_shared_this_ptr());
319319
} else {
320320
// Consumer was not yet created, retry to connect to broker if it's possible
321-
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
322-
LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result));
321+
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
322+
if (result == ResultRetryable) {
323+
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
323324
scheduleReconnection(get_shared_this_ptr());
324325
} else {
325326
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));

lib/HandlerBase.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
138138
}
139139
}
140140

141-
bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; }
142-
143141
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
144142
const auto state = handler->state_.load();
145143
if (state == Pending || state == Ready) {
@@ -164,4 +162,12 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBase
164162
}
165163
}
166164

165+
Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const {
166+
if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
167+
return ResultTimeout;
168+
} else {
169+
return result;
170+
}
171+
}
172+
167173
} // namespace pulsar

lib/HandlerBase.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,6 @@ class HandlerBase {
6969
*/
7070
static void scheduleReconnection(HandlerBasePtr handler);
7171

72-
/*
73-
* Should we retry in error that are transient
74-
*/
75-
bool isRetriableError(Result result);
76-
7772
/**
7873
* Do some cleanup work before changing `connection_` to `cnx`.
7974
*
@@ -127,6 +122,8 @@ class HandlerBase {
127122
Backoff backoff_;
128123
uint64_t epoch_;
129124

125+
Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const;
126+
130127
private:
131128
DeadlineTimerPtr timer_;
132129

lib/ProducerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
281281
scheduleReconnection(shared_from_this());
282282
} else {
283283
// Producer was not yet created, retry to connect to broker if it's possible
284-
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
284+
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
285+
if (result == ResultRetryable) {
285286
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
286287
scheduleReconnection(shared_from_this());
287288
} else {

tests/ConsumerTest.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,4 +1321,18 @@ TEST(ConsumerTest, testNotSetSubscriptionName) {
13211321
client.close();
13221322
}
13231323

1324+
TEST(ConsumerTest, testRetrySubscribe) {
1325+
Client client{lookupUrl};
1326+
for (int i = 0; i < 10; i++) {
1327+
// "Subscription is fenced" error might happen here because the previous seek operation might not be
1328+
// done in broker, the consumer should retry until timeout
1329+
Consumer consumer;
1330+
ASSERT_EQ(client.subscribe("test-close-before-seek-done", "sub", consumer), ResultOk);
1331+
consumer.seekAsync(MessageId::earliest(), [](Result) {});
1332+
consumer.close();
1333+
}
1334+
// TODO: Currently it's hard to test the timeout error without configuring the operation timeout in
1335+
// milliseconds
1336+
}
1337+
13241338
} // namespace pulsar

0 commit comments

Comments
 (0)