Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ class PULSAR_PUBLIC Client {
*/
uint64_t getNumberOfConsumers();

/**
* Asynchronously get the SchemaInfo of a topic and a specific version.
*
* @topic the topic name
* @version the schema version, see Message::getLongSchemaVersion
* @callback the callback that is triggered when the SchemaInfo is retrieved successfully or not
*/
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback);

private:
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections);
Expand Down
9 changes: 8 additions & 1 deletion include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ class PULSAR_PUBLIC Message {
bool hasSchemaVersion() const;

/**
* Get the schema version
* Get the schema version.
*
* @return the the schema version on success or -1 if the message does not have the schema version
*/
int64_t getLongSchemaVersion() const;

/**
* Get the schema version of the raw bytes.
*/
const std::string& getSchemaVersion() const;

Expand Down
19 changes: 10 additions & 9 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,23 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}

Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema(
const TopicNamePtr& topicName) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, boost::optional<SchemaInfo>>>();
Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
const std::string& version) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, SchemaInfo>>();

if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
std::placeholders::_1, std::placeholders::_2, promise));
version, std::placeholders::_1, std::placeholders::_2, promise));

return promise->getFuture();
}

void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, Result result,
const ClientConnectionWeakPtr& clientCnx,
void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, const std::string& version,
Result result, const ClientConnectionWeakPtr& clientCnx,
GetSchemaPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
Expand All @@ -189,10 +189,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName

ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName);
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName
<< " version: " << version);

conn->newGetSchema(topicName, requestId)
.addListener([promise](Result result, boost::optional<SchemaInfo> schemaInfo) {
conn->newGetSchema(topicName, version, requestId)
.addListener([promise](Result result, SchemaInfo schemaInfo) {
if (result != ResultOk) {
promise->setFailed(result);
return;
Expand Down
6 changes: 3 additions & 3 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, boost::optional<SchemaInfo>>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
Expand All @@ -52,7 +52,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;

Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

protected:
// Mark findBroker as protected to make it accessible from test.
Expand Down Expand Up @@ -80,7 +80,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
Result result, const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise);

void sendGetSchemaRequest(const std::string& topiName, Result result,
void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result,
const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise);

void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
Expand Down
10 changes: 10 additions & 0 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
#include <utility>

#include "ClientImpl.h"
#include "Int64SerDes.h"
#include "LogUtils.h"
#include "LookupService.h"
#include "TopicName.h"
#include "Utils.h"

DECLARE_LOG_OBJECT()
Expand Down Expand Up @@ -191,4 +194,11 @@ void Client::shutdown() { impl_->shutdown(); }

uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); }
uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); }

void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback) {
impl_->getLookup()
->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
.addListener(callback);
}
} // namespace pulsar
18 changes: 8 additions & 10 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1311,10 +1311,10 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
return promise.getFuture();
}

Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const std::string& topicName,
uint64_t requestId) {
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName,
const std::string& version, uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, boost::optional<SchemaInfo>> promise;
Promise<Result, SchemaInfo> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
Expand All @@ -1324,7 +1324,7 @@ Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const

pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newGetSchema(topicName, requestId));
sendCommand(Commands::newGetSchema(topicName, version, requestId));
return promise.getFuture();
}

Expand Down Expand Up @@ -1758,21 +1758,19 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second;
Promise<Result, SchemaInfo> getSchemaPromise = it->second;
pendingGetSchemaRequests_.erase(it);
lock.unlock();

if (response.has_error_code()) {
if (response.error_code() == proto::TopicNotFound) {
getSchemaPromise.setValue(boost::none);
} else {
Result result = getResult(response.error_code(), response.error_message());
Result result = getResult(response.error_code(), response.error_message());
if (response.error_code() != proto::TopicNotFound) {
LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server " << result
<< (response.has_error_message() ? (" (" + response.error_message() + ")")
: "")
<< " -- req_id: " << response.request_id());
getSchemaPromise.setFailed(result);
}
getSchemaPromise.setFailed(result);
return;
}

Expand Down
6 changes: 3 additions & 3 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
CommandGetTopicsOfNamespace_Mode mode,
uint64_t requestId);

Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName,
uint64_t requestId);
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version,
uint64_t requestId);

private:
struct PendingRequestData {
Expand Down Expand Up @@ -346,7 +346,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;

typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> PendingGetSchemaMap;
typedef std::map<long, Promise<Result, SchemaInfo>> PendingGetSchemaMap;
PendingGetSchemaMap pendingGetSchemaRequests_;

mutable std::mutex mutex_;
Expand Down
12 changes: 5 additions & 7 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,17 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura

if (autoDownloadSchema) {
auto self = shared_from_this();
auto confPtr = std::make_shared<ProducerConfiguration>(conf);
lookupServicePtr_->getSchema(topicName).addListener(
[self, topicName, confPtr, callback](Result res, boost::optional<SchemaInfo> topicSchema) {
[self, topicName, callback](Result res, SchemaInfo topicSchema) {
if (res != ResultOk) {
callback(res, Producer());
return;
}
if (topicSchema) {
confPtr->setSchema(topicSchema.get());
}

ProducerConfiguration conf;
conf.setSchema(topicSchema);
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
std::placeholders::_2, topicName, *confPtr, callback));
std::placeholders::_2, topicName, conf, callback));
});
} else {
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
Expand Down
6 changes: 5 additions & 1 deletion lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
return buffer;
}

SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) {
SharedBuffer Commands::newGetSchema(const std::string& topic, const std::string& version,
uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -170,6 +171,9 @@ SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId
auto getSchema = cmd.mutable_getschema();
getSchema->set_topic(topic);
getSchema->set_request_id(requestId);
if (!version.empty()) {
getSchema->set_schema_version(version);
}

const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_getschema();
Expand Down
3 changes: 2 additions & 1 deletion lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class Commands {
static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
const std::string& listenerName);

static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId);
static SharedBuffer newGetSchema(const std::string& topic, const std::string& version,
uint64_t requestId);

static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType checksumType,
Expand Down
12 changes: 9 additions & 3 deletions lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <boost/property_tree/ptree.hpp>

#include "ExecutorService.h"
#include "Int64SerDes.h"
#include "LogUtils.h"
#include "NamespaceName.h"
#include "SchemaUtils.h"
Expand Down Expand Up @@ -157,8 +158,9 @@ Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync(
return promise.getFuture();
}

Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const TopicNamePtr &topicName) {
Promise<Result, boost::optional<SchemaInfo>> promise;
Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr &topicName,
const std::string &version) {
Promise<Result, SchemaInfo> promise;
std::stringstream completeUrlStream;

const auto &url = serviceNameResolver_.resolveHost();
Expand All @@ -171,6 +173,10 @@ Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const T
<< topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName() << "/schema";
}
if (!version.empty()) {
completeUrlStream << "/" << fromBigEndianBytes(version);
}

executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
shared_from_this(), promise, completeUrlStream.str()));
return promise.getFuture();
Expand Down Expand Up @@ -450,7 +456,7 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, con
Result result = sendHTTPRequest(completeUrl, responseData, responseCode);

if (responseCode == 404) {
promise.setValue(boost::none);
promise.setFailed(ResultTopicNotFound);
} else if (result != ResultOk) {
promise.setFailed(result);
} else {
Expand Down
4 changes: 2 additions & 2 deletions lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace pulsar {
class ServiceNameResolver;
using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
using GetSchemaPromise = Promise<Result, boost::optional<SchemaInfo>>;
using GetSchemaPromise = Promise<Result, SchemaInfo>;

class HTTPLookupService : public LookupService, public std::enable_shared_from_this<HTTPLookupService> {
class CurlInitializer {
Expand Down Expand Up @@ -77,7 +77,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&) override;

Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
Expand Down
42 changes: 42 additions & 0 deletions lib/Int64SerDes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <stdint.h>

#include <boost/asio.hpp> // for ntohl

namespace pulsar {

inline int64_t fromBigEndianBytes(const std::string& bytes) {
const auto int32Array = reinterpret_cast<const uint32_t*>(bytes.c_str());
return (static_cast<int64_t>(ntohl(int32Array[0])) << 32) + static_cast<int64_t>(ntohl(int32Array[1]));
}

inline std::string toBigEndianBytes(int64_t value) {
union {
char bytes[8];
uint32_t int32Array[2];
} u;
u.int32Array[0] = htonl(static_cast<int32_t>(value >> 32));
u.int32Array[1] = htonl(static_cast<int32_t>(value & 0xFFFFFFFF));
return {u.bytes, 8};
}

} // namespace pulsar
7 changes: 4 additions & 3 deletions lib/LookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <pulsar/Result.h>
#include <pulsar/Schema.h>

#include <boost/optional.hpp>
#include <memory>
#include <ostream>
#include <vector>
Expand Down Expand Up @@ -77,12 +76,14 @@ class LookupService {
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) = 0;

/**
* returns current SchemaInfo {@link SchemaInfo} for a given topic.
* Get the SchemaInfo for a given topic and a specific schema version.
*
* @param topicName topic-name
* @param version the schema version byte array, if it's empty, use the latest version
* @return SchemaInfo
*/
virtual Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) = 0;
virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName,
const std::string& version = "") = 0;

virtual ~LookupService() {}
};
Expand Down
5 changes: 5 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <iostream>

#include "Int64SerDes.h"
#include "KeyValueImpl.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
Expand Down Expand Up @@ -184,6 +185,10 @@ bool Message::hasSchemaVersion() const {
return false;
}

int64_t Message::getLongSchemaVersion() const {
return (impl_ && impl_->hasSchemaVersion()) ? fromBigEndianBytes(impl_->getSchemaVersion()) : -1L;
}

const std::string& Message::getSchemaVersion() const {
if (!impl_) {
return emptyString;
Expand Down
Loading