diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index a8349eb5..35149345 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -404,6 +404,16 @@ class PULSAR_PUBLIC Client { */ uint64_t getNumberOfConsumers(); + /** + * Asynchronously get the SchemaInfo of a topic and a specific version. + * + * @topic the topic name + * @version the schema version, see Message::getLongSchemaVersion + * @callback the callback that is triggered when the SchemaInfo is retrieved successfully or not + */ + void getSchemaInfoAsync(const std::string& topic, int64_t version, + std::function callback); + private: Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections); diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index 47bc974a..fe99131f 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -177,7 +177,14 @@ class PULSAR_PUBLIC Message { bool hasSchemaVersion() const; /** - * Get the schema version + * Get the schema version. + * + * @return the the schema version on success or -1 if the message does not have the schema version + */ + int64_t getLongSchemaVersion() const; + + /** + * Get the schema version of the raw bytes. */ const std::string& getSchemaVersion() const; diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index 0b23493b..f563f632 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -164,9 +164,9 @@ Future BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } -Future> BinaryProtoLookupService::getSchema( - const TopicNamePtr& topicName) { - GetSchemaPromisePtr promise = std::make_shared>>(); +Future BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName, + const std::string& version) { + GetSchemaPromisePtr promise = std::make_shared>(); if (!topicName) { promise->setFailed(ResultInvalidTopicName); @@ -174,13 +174,13 @@ Future> BinaryProtoLookupService::getSchema( } cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) .addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(), - std::placeholders::_1, std::placeholders::_2, promise)); + version, std::placeholders::_1, std::placeholders::_2, promise)); return promise->getFuture(); } -void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, Result result, - const ClientConnectionWeakPtr& clientCnx, +void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, const std::string& version, + Result result, const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise) { if (result != ResultOk) { promise->setFailed(result); @@ -189,10 +189,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName ClientConnectionPtr conn = clientCnx.lock(); uint64_t requestId = newRequestId(); - LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName); + LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName + << " version: " << version); - conn->newGetSchema(topicName, requestId) - .addListener([promise](Result result, boost::optional schemaInfo) { + conn->newGetSchema(topicName, version, requestId) + .addListener([promise](Result result, SchemaInfo schemaInfo) { if (result != ResultOk) { promise->setFailed(result); return; diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index f8c91e6f..a3c059e4 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -34,7 +34,7 @@ class ConnectionPool; class LookupDataResult; class ServiceNameResolver; using NamespaceTopicsPromisePtr = std::shared_ptr>; -using GetSchemaPromisePtr = std::shared_ptr>>; +using GetSchemaPromisePtr = std::shared_ptr>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: @@ -52,7 +52,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; - Future> getSchema(const TopicNamePtr& topicName) override; + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; protected: // Mark findBroker as protected to make it accessible from test. @@ -80,7 +80,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Result result, const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise); - void sendGetSchemaRequest(const std::string& topiName, Result result, + void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result, const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise); void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, diff --git a/lib/Client.cc b/lib/Client.cc index e132189d..48c4a67b 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -23,7 +23,10 @@ #include #include "ClientImpl.h" +#include "Int64SerDes.h" #include "LogUtils.h" +#include "LookupService.h" +#include "TopicName.h" #include "Utils.h" DECLARE_LOG_OBJECT() @@ -191,4 +194,11 @@ void Client::shutdown() { impl_->shutdown(); } uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); } uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); } + +void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, + std::function callback) { + impl_->getLookup() + ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") + .addListener(callback); +} } // namespace pulsar diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 332a695c..72b9c8e2 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1311,10 +1311,10 @@ Future ClientConnection::newGetTopicsOfNamespace( return promise.getFuture(); } -Future> ClientConnection::newGetSchema(const std::string& topicName, - uint64_t requestId) { +Future ClientConnection::newGetSchema(const std::string& topicName, + const std::string& version, uint64_t requestId) { Lock lock(mutex_); - Promise> promise; + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << "Client is not connected to the broker"); @@ -1324,7 +1324,7 @@ Future> ClientConnection::newGetSchema(const pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise)); lock.unlock(); - sendCommand(Commands::newGetSchema(topicName, requestId)); + sendCommand(Commands::newGetSchema(topicName, version, requestId)); return promise.getFuture(); } @@ -1758,21 +1758,19 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp Lock lock(mutex_); auto it = pendingGetSchemaRequests_.find(response.request_id()); if (it != pendingGetSchemaRequests_.end()) { - Promise> getSchemaPromise = it->second; + Promise getSchemaPromise = it->second; pendingGetSchemaRequests_.erase(it); lock.unlock(); if (response.has_error_code()) { - if (response.error_code() == proto::TopicNotFound) { - getSchemaPromise.setValue(boost::none); - } else { - Result result = getResult(response.error_code(), response.error_message()); + Result result = getResult(response.error_code(), response.error_message()); + if (response.error_code() != proto::TopicNotFound) { LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server " << result << (response.has_error_message() ? (" (" + response.error_message() + ")") : "") << " -- req_id: " << response.request_id()); - getSchemaPromise.setFailed(result); } + getSchemaPromise.setFailed(result); return; } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 451b0cea..24a43d5c 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -185,8 +185,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> newGetSchema(const std::string& topicName, - uint64_t requestId); + Future newGetSchema(const std::string& topicName, const std::string& version, + uint64_t requestId); private: struct PendingRequestData { @@ -346,7 +346,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; - typedef std::map>> PendingGetSchemaMap; + typedef std::map> PendingGetSchemaMap; PendingGetSchemaMap pendingGetSchemaRequests_; mutable std::mutex mutex_; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index d04289f9..a8fc24c3 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -166,19 +166,17 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura if (autoDownloadSchema) { auto self = shared_from_this(); - auto confPtr = std::make_shared(conf); lookupServicePtr_->getSchema(topicName).addListener( - [self, topicName, confPtr, callback](Result res, boost::optional topicSchema) { + [self, topicName, callback](Result res, SchemaInfo topicSchema) { if (res != ResultOk) { callback(res, Producer()); + return; } - if (topicSchema) { - confPtr->setSchema(topicSchema.get()); - } - + ProducerConfiguration conf; + conf.setSchema(topicSchema); self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, - std::placeholders::_2, topicName, *confPtr, callback)); + std::placeholders::_2, topicName, conf, callback)); }); } else { lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( diff --git a/lib/Commands.cc b/lib/Commands.cc index 8af61e74..cec9d3bf 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -161,7 +161,8 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat return buffer; } -SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) { +SharedBuffer Commands::newGetSchema(const std::string& topic, const std::string& version, + uint64_t requestId) { static BaseCommand cmd; static std::mutex mutex; std::lock_guard lock(mutex); @@ -170,6 +171,9 @@ SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId auto getSchema = cmd.mutable_getschema(); getSchema->set_topic(topic); getSchema->set_request_id(requestId); + if (!version.empty()) { + getSchema->set_schema_version(version); + } const SharedBuffer buffer = writeMessageWithSize(cmd); cmd.clear_getschema(); diff --git a/lib/Commands.h b/lib/Commands.h index 5d247648..65a64064 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -92,7 +92,8 @@ class Commands { static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId, const std::string& listenerName); - static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId); + static SharedBuffer newGetSchema(const std::string& topic, const std::string& version, + uint64_t requestId); static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId, uint64_t sequenceId, ChecksumType checksumType, diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 920592d0..1956e01d 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -25,6 +25,7 @@ #include #include "ExecutorService.h" +#include "Int64SerDes.h" #include "LogUtils.h" #include "NamespaceName.h" #include "SchemaUtils.h" @@ -157,8 +158,9 @@ Future HTTPLookupService::getTopicsOfNamespaceAsync( return promise.getFuture(); } -Future> HTTPLookupService::getSchema(const TopicNamePtr &topicName) { - Promise> promise; +Future HTTPLookupService::getSchema(const TopicNamePtr &topicName, + const std::string &version) { + Promise promise; std::stringstream completeUrlStream; const auto &url = serviceNameResolver_.resolveHost(); @@ -171,6 +173,10 @@ Future> HTTPLookupService::getSchema(const T << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << "/schema"; } + if (!version.empty()) { + completeUrlStream << "/" << fromBigEndianBytes(version); + } + executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest, shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); @@ -450,7 +456,7 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, con Result result = sendHTTPRequest(completeUrl, responseData, responseCode); if (responseCode == 404) { - promise.setValue(boost::none); + promise.setFailed(ResultTopicNotFound); } else if (result != ResultOk) { promise.setFailed(result); } else { diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 8dc195c1..a1e12fc7 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -28,7 +28,7 @@ namespace pulsar { class ServiceNameResolver; using NamespaceTopicsPromise = Promise; using NamespaceTopicsPromisePtr = std::shared_ptr; -using GetSchemaPromise = Promise>; +using GetSchemaPromise = Promise; class HTTPLookupService : public LookupService, public std::enable_shared_from_this { class CurlInitializer { @@ -77,7 +77,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future getPartitionMetadataAsync(const TopicNamePtr&) override; - Future> getSchema(const TopicNamePtr& topicName) override; + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h new file mode 100644 index 00000000..dbc5d8a7 --- /dev/null +++ b/lib/Int64SerDes.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include // for ntohl + +namespace pulsar { + +inline int64_t fromBigEndianBytes(const std::string& bytes) { + const auto int32Array = reinterpret_cast(bytes.c_str()); + return (static_cast(ntohl(int32Array[0])) << 32) + static_cast(ntohl(int32Array[1])); +} + +inline std::string toBigEndianBytes(int64_t value) { + union { + char bytes[8]; + uint32_t int32Array[2]; + } u; + u.int32Array[0] = htonl(static_cast(value >> 32)); + u.int32Array[1] = htonl(static_cast(value & 0xFFFFFFFF)); + return {u.bytes, 8}; +} + +} // namespace pulsar diff --git a/lib/LookupService.h b/lib/LookupService.h index 84dc37ca..74011210 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -77,12 +76,14 @@ class LookupService { const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) = 0; /** - * returns current SchemaInfo {@link SchemaInfo} for a given topic. + * Get the SchemaInfo for a given topic and a specific schema version. * * @param topicName topic-name + * @param version the schema version byte array, if it's empty, use the latest version * @return SchemaInfo */ - virtual Future> getSchema(const TopicNamePtr& topicName) = 0; + virtual Future getSchema(const TopicNamePtr& topicName, + const std::string& version = "") = 0; virtual ~LookupService() {} }; diff --git a/lib/Message.cc b/lib/Message.cc index 98364f12..bfd65f4b 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -23,6 +23,7 @@ #include +#include "Int64SerDes.h" #include "KeyValueImpl.h" #include "MessageImpl.h" #include "PulsarApi.pb.h" @@ -184,6 +185,10 @@ bool Message::hasSchemaVersion() const { return false; } +int64_t Message::getLongSchemaVersion() const { + return (impl_ && impl_->hasSchemaVersion()) ? fromBigEndianBytes(impl_->getSchemaVersion()) : -1L; +} + const std::string& Message::getSchemaVersion() const { if (!impl_) { return emptyString; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 01dd82bb..c8fdaabe 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -67,10 +67,10 @@ class RetryableLookupService : public LookupService, [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } - Future> getSchema(const TopicNamePtr& topicName) override { - return executeAsync>( - "get-schema" + topicName->toString(), - [this, topicName] { return lookupService_->getSchema(topicName); }); + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { + return executeAsync("get-schema" + topicName->toString(), [this, topicName, version] { + return lookupService_->getSchema(topicName, version); + }); } template diff --git a/tests/Int64SerDesTest.cc b/tests/Int64SerDesTest.cc new file mode 100644 index 00000000..7dc662a5 --- /dev/null +++ b/tests/Int64SerDesTest.cc @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include "lib/Int64SerDes.h" + +using namespace pulsar; + +TEST(Int64SerDes, testNormal) { + int64_t x = 0x0102030405060708L; + const auto bytes = toBigEndianBytes(x); + ASSERT_EQ(bytes.size(), 8); + for (int i = 0; i < 8; i++) { + ASSERT_EQ(bytes[i], i + 1); + } + int64_t y = fromBigEndianBytes(bytes.data()); + ASSERT_EQ(x, y); +} + +TEST(Int64SerDes, testOverflow) { + int64_t x = 0x8000000000000000L; + const auto bytes = toBigEndianBytes(x); + ASSERT_EQ(bytes.size(), 8); + ASSERT_EQ(static_cast(bytes[0]), 0x80); + for (int i = 1; i < 8; i++) { + ASSERT_EQ(bytes[i], 0x00); + } + int64_t y = fromBigEndianBytes(bytes); + ASSERT_EQ(x, y); +} diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index f7ffc698..7712c4e7 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -26,6 +26,8 @@ #include #include +#include +#include #include "HttpHelper.h" #include "PulsarFriend.h" @@ -42,6 +44,8 @@ DECLARE_LOG_OBJECT() static std::string binaryLookupUrl = "pulsar://localhost:6650"; static std::string httpLookupUrl = "http://localhost:8080"; +extern std::string unique_str(); + TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared(1); AuthenticationPtr authData = AuthFactory::Disabled(); @@ -222,10 +226,11 @@ TEST(LookupServiceTest, testTimeout) { class LookupServiceTest : public ::testing::TestWithParam { public: + void SetUp() override { client_ = Client{GetParam()}; } void TearDown() override { client_.close(); } protected: - Client client_{GetParam()}; + Client client_{httpLookupUrl}; }; TEST_P(LookupServiceTest, basicGetNamespaceTopics) { @@ -289,15 +294,15 @@ TEST_P(LookupServiceTest, testGetSchema) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); auto lookup = clientImplPtr->getLookup(); - boost::optional schemaInfo; + SchemaInfo schemaInfo; auto future = lookup->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); - ASSERT_EQ(jsonSchema, schemaInfo->getSchema()); - ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType()); - ASSERT_EQ(properties, schemaInfo->getProperties()); + ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); + ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); + ASSERT_EQ(properties, schemaInfo.getProperties()); } -TEST_P(LookupServiceTest, testGetSchemaNotFund) { +TEST_P(LookupServiceTest, testGetSchemaNotFound) { const std::string topic = "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); @@ -307,10 +312,9 @@ TEST_P(LookupServiceTest, testGetSchemaNotFund) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); auto lookup = clientImplPtr->getLookup(); - boost::optional schemaInfo; + SchemaInfo schemaInfo; auto future = lookup->getSchema(TopicName::get(topic)); - ASSERT_EQ(ResultOk, future.get(schemaInfo)); - ASSERT_FALSE(schemaInfo); + ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); } TEST_P(LookupServiceTest, testGetKeyValueSchema) { @@ -333,15 +337,104 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); auto lookup = clientImplPtr->getLookup(); - boost::optional schemaInfo; + SchemaInfo schemaInfo; auto future = lookup->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); - ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema()); - ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType()); - ASSERT_FALSE(schemaInfo->getProperties().empty()); + ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); + ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); + ASSERT_FALSE(schemaInfo.getProperties().empty()); +} + +TEST_P(LookupServiceTest, testGetSchemaByVersion) { + const auto topic = "testGetSchemaByVersion" + unique_str() + GetParam().substr(0, 4); + const std::string schema1 = R"({ + "type": "record", + "name": "User", + "namespace": "test", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "age", "type": "int"} + ] +})"; + const std::string schema2 = R"({ + "type": "record", + "name": "User", + "namespace": "test", + "fields": [ + {"name": "age", "type": "int"}, + {"name": "name", "type": ["null", "string"]} + ] +})"; + ProducerConfiguration producerConf1; + producerConf1.setSchema(SchemaInfo{AVRO, "Avro", schema1}); + Producer producer1; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf1, producer1)); + ProducerConfiguration producerConf2; + producerConf2.setSchema(SchemaInfo{AVRO, "Avro", schema2}); + Producer producer2; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf2, producer2)); + + // Though these messages are invalid, the C++ client can send them successfully + producer1.send(MessageBuilder().setContent("msg0").build()); + producer2.send(MessageBuilder().setContent("msg1").build()); + + ConsumerConfiguration consumerConf; + consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest); + Consumer consumer; + ASSERT_EQ(ResultOk, client_.subscribe(topic, "sub", consumerConf, consumer)); + + Message msg1; + ASSERT_EQ(ResultOk, consumer.receive(msg1, 3000)); + Message msg2; + ASSERT_EQ(ResultOk, consumer.receive(msg2, 3000)); + + auto getSchemaInfo = [this](const std::string& topic, int64_t version) { + std::promise p; + client_.getSchemaInfoAsync(topic, version, [&p](Result result, const SchemaInfo& info) { + if (result == ResultOk) { + p.set_value(info); + } else { + p.set_exception(std::make_exception_ptr(std::runtime_error(strResult(result)))); + } + }); + return p.get_future().get(); + }; + { + ASSERT_EQ(msg1.getLongSchemaVersion(), 0); + const auto info = getSchemaInfo(topic, 0); + ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); + ASSERT_EQ(info.getSchema(), schema1); + } + { + ASSERT_EQ(msg2.getLongSchemaVersion(), 1); + const auto info = getSchemaInfo(topic, 1); + ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); + ASSERT_EQ(info.getSchema(), schema2); + } + { + const auto info = getSchemaInfo(topic, -1); + ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); + ASSERT_EQ(info.getSchema(), schema2); + } + try { + getSchemaInfo(topic, 2); + FAIL(); + } catch (const std::runtime_error& e) { + ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound)); + } + try { + getSchemaInfo(topic + "-not-exist", 0); + FAIL(); + } catch (const std::runtime_error& e) { + ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound)); + } + + consumer.close(); + producer1.close(); + producer2.close(); } -INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); +INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService { public: