Skip to content

[feat] Support auto download schema when create producer. #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/pulsar/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ enum SchemaType
// Return string representation of result code
PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType);

PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr);

class SchemaInfoImpl;

typedef std::map<std::string, std::string> StringMap;
Expand Down Expand Up @@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo {
private:
typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
SchemaInfoImplPtr impl_;
static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
};

} // namespace pulsar
Expand Down
37 changes: 37 additions & 0 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,43 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}

Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema(
const TopicNamePtr& topicName) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, boost::optional<SchemaInfo>>>();

if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
std::placeholders::_1, std::placeholders::_2, promise));

return promise->getFuture();
}

void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, Result result,
const ClientConnectionWeakPtr& clientCnx,
GetSchemaPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}

ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName);

conn->newGetSchema(topicName, requestId)
.addListener([promise](Result result, boost::optional<SchemaInfo> schemaInfo) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
promise->setValue(schemaInfo);
});
}

void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
Expand Down
7 changes: 7 additions & 0 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_

#include <pulsar/Authentication.h>
#include <pulsar/Schema.h>

#include <mutex>

Expand All @@ -32,6 +33,7 @@ class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, boost::optional<SchemaInfo>>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
Expand All @@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;

Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;

private:
std::mutex mutex_;
uint64_t requestIdGenerator_ = 0;
Expand All @@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise);

void sendGetSchemaRequest(const std::string& topiName, Result result,
const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise);

void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
NamespaceTopicsPromisePtr promise);

Expand Down
63 changes: 63 additions & 0 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
break;
}

case BaseCommand::GET_SCHEMA_RESPONSE: {
const auto& response = incomingCmd.getschemaresponse();
LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: "
<< response.request_id());
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second;
pendingGetSchemaRequests_.erase(it);
lock.unlock();

if (response.has_error_code()) {
if (response.error_code() == proto::TopicNotFound) {
getSchemaPromise.setValue(boost::none);
} else {
Result result = getResult(response.error_code(), response.error_message());
LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server "
<< result
<< (response.has_error_message()
? (" (" + response.error_message() + ")")
: "")
<< " -- req_id: " << response.request_id());
getSchemaPromise.setFailed(result);
}
return;
}

const auto& schema = response.schema();
const auto& properMap = schema.properties();
StringMap properties;
for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
properties[kv->key()] = kv->value();
}
SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "",
schema.schema_data(), properties);
getSchemaPromise.setValue(schemaInfo);
} else {
lock.unlock();
LOG_WARN(
"GetSchemaResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
break;
}

default: {
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
Expand Down Expand Up @@ -1708,6 +1754,23 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
return promise.getFuture();
}

Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const std::string& topicName,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, boost::optional<SchemaInfo>> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}

pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newGetSchema(topicName, requestId));
return promise.getFuture();
}

void ClientConnection::closeSocket() {
boost::system::error_code err;
if (socket_) {
Expand Down
6 changes: 6 additions & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);

Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName,
uint64_t requestId);

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
Expand Down Expand Up @@ -327,6 +330,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;

typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> PendingGetSchemaMap;
PendingGetSchemaMap pendingGetSchemaRequests_;

mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;

Expand Down
27 changes: 23 additions & 4 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }

void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback) {
CreateProducerCallback callback, bool autoDownloadSchema) {
if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
throw std::invalid_argument("Batching and chunking of messages can't be enabled together");
}
Expand All @@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura
return;
}
}
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));

if (autoDownloadSchema) {
auto self = shared_from_this();
auto confPtr = std::make_shared<ProducerConfiguration>(conf);
lookupServicePtr_->getSchema(topicName).addListener(
[self, topicName, confPtr, callback](Result res, boost::optional<SchemaInfo> topicSchema) {
if (res != ResultOk) {
callback(res, Producer());
}
if (topicSchema) {
confPtr->setSchema(topicSchema.get());
}

self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
std::placeholders::_2, topicName, *confPtr, callback));
});
} else {
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
}
}

void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata,
Expand Down
6 changes: 5 additions & 1 deletion lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
bool poolConnections);
~ClientImpl();

/**
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
* that exists for the topic.
*/
void createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback);
CreateProducerCallback callback, bool autoDownloadSchema = false);

void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);
Expand Down
16 changes: 16 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
return buffer;
}

SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::GET_SCHEMA);

auto getSchema = cmd.mutable_getschema();
getSchema->set_topic(topic);
getSchema->set_request_id(requestId);

const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_getschema();
return buffer;
}

SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
Expand Down Expand Up @@ -868,5 +883,6 @@ bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; }

bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; }

} // namespace pulsar
/* namespace pulsar */
2 changes: 2 additions & 0 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class Commands {
static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
const std::string& listenerName);

static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId);

static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload);
Expand Down
Loading