Skip to content

[feat] Support AUTO_PUBLISH schema. #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ enum Result
ResultMemoryBufferIsFull, /// Client-wide memory limit has been reached

ResultInterrupted, /// Interrupted while waiting to dequeue

ResultNotFound /// The generic was not found
};

// Return string representation of result code
Expand Down
3 changes: 2 additions & 1 deletion include/pulsar/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ enum SchemaType
// Return string representation of result code
PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType);

PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr);

class SchemaInfoImpl;

typedef std::map<std::string, std::string> StringMap;
Expand Down Expand Up @@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo {
private:
typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
SchemaInfoImplPtr impl_;
static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
};

} // namespace pulsar
Expand Down
37 changes: 37 additions & 0 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,43 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}

Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema(
const TopicNamePtr& topicName) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, boost::optional<SchemaInfo>>>();

if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
std::placeholders::_1, std::placeholders::_2, promise));

return promise->getFuture();
}

void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topiName, Result result,
const ClientConnectionWeakPtr& clientCnx,
GetSchemaPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}

ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topiName);

conn->newGetSchema(topiName, requestId)
.addListener([promise](Result result, boost::optional<SchemaInfo> schemaInfo) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
promise->setValue(schemaInfo);
});
}

void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
Expand Down
7 changes: 7 additions & 0 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_

#include <pulsar/Authentication.h>
#include <pulsar/Schema.h>

#include <mutex>

Expand All @@ -32,6 +33,7 @@ class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, boost::optional<SchemaInfo>>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
Expand All @@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;

Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;

private:
std::mutex mutex_;
uint64_t requestIdGenerator_ = 0;
Expand All @@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise);

void sendGetSchemaRequest(const std::string& topiName, Result result,
const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise);

void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
NamespaceTopicsPromisePtr promise);

Expand Down
63 changes: 63 additions & 0 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
break;
}

case BaseCommand::GET_SCHEMA_RESPONSE: {
const auto& response = incomingCmd.getschemaresponse();
LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: "
<< response.request_id());
Lock lock(mutex_);
PendingGetSchemaMap::iterator it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second;
pendingGetSchemaRequests_.erase(it);
lock.unlock();

if (response.has_error_code()) {
if (response.error_code() == proto::TopicNotFound) {
getSchemaPromise.setValue(boost::none);
} else {
Result result = getResult(response.error_code(), response.error_message());
LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server "
<< result
<< (response.has_error_message()
? (" (" + response.error_message() + ")")
: "")
<< " -- req_id: " << response.request_id());
getSchemaPromise.setFailed(result);
}
return;
}

auto schema = response.schema();
auto properMap = schema.properties();
StringMap properties;
for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
properties[kv->key()] = kv->value();
}
SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "",
schema.schema_data(), properties);
getSchemaPromise.setValue(schemaInfo);
} else {
lock.unlock();
LOG_WARN(
"GetSchemaResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
break;
}

default: {
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
Expand Down Expand Up @@ -1704,6 +1750,23 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
return promise.getFuture();
}

Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const std::string& topicName,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, boost::optional<SchemaInfo>> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}

pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newGetSchema(topicName, requestId));
return promise.getFuture();
}

void ClientConnection::closeSocket() {
boost::system::error_code err;
if (socket_) {
Expand Down
6 changes: 6 additions & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);

Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName,
uint64_t requestId);

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
Expand Down Expand Up @@ -320,6 +323,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;

typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> PendingGetSchemaMap;
PendingGetSchemaMap pendingGetSchemaRequests_;

mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;

Expand Down
25 changes: 22 additions & 3 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura
return;
}
}
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));

if (conf.getSchema().getSchemaType() == AUTO_PUBLISH) {
auto self = shared_from_this();
auto confPtr = std::make_shared<ProducerConfiguration>(conf);
lookupServicePtr_->getSchema(topicName).addListener(
[self, topicName, confPtr, callback](Result res, boost::optional<SchemaInfo> topicSchema) {
if (res != ResultOk) {
callback(res, Producer());
}
if (topicSchema) {
confPtr->setSchema(topicSchema.get());
}

self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
std::placeholders::_2, topicName, *confPtr, callback));
});
} else {
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
}
}

void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata,
Expand Down
16 changes: 16 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
return buffer;
}

SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::GET_SCHEMA);

auto getSchema = cmd.mutable_getschema();
getSchema->set_topic(topic);
getSchema->set_request_id(requestId);

const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_getschema();
return buffer;
}

SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
Expand Down Expand Up @@ -846,5 +861,6 @@ bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; }

bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; }

} // namespace pulsar
/* namespace pulsar */
2 changes: 2 additions & 0 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class Commands {
static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
const std::string& listenerName);

static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId);

static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload);
Expand Down
Loading