Skip to content

Connection pool size configuration #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:UseKeyspaceCaseSensitiveTests.*\
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
:MetricsTests.Integration_Cassandra_Requests\
:MetricsTests.Integration_Cassandra_StatsShardConnections\
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
Expand Down Expand Up @@ -68,6 +69,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:UseKeyspaceCaseSensitiveTests.*\
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
:MetricsTests.Integration_Cassandra_Requests\
:MetricsTests.Integration_Cassandra_StatsShardConnections\
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
:PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
Expand Down
31 changes: 28 additions & 3 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1780,21 +1780,46 @@ cass_cluster_set_queue_size_event(CassCluster* cluster,
unsigned queue_size));

/**
* Sets the number of connections made to each server in each
* IO thread.
* Sets the number of connections opened by the driver to each host.
*
* <b>Default:</b> 1
* Notice that this overrides the number of connections per shard
* set by `cass_cluster_set_core_connections_per_shard()`.
*
* <b>Default:</b> 1 per shard (i.e. `cass_cluster_set_core_connections_per_shard(cluster, 1)`)
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] num_connections
* @return CASS_OK if successful, otherwise an error occurred.
*
* @see cass_cluster_set_core_connections_per_shard()
*/
CASS_EXPORT CassError
cass_cluster_set_core_connections_per_host(CassCluster* cluster,
unsigned num_connections);

/**
* Sets the number of connections opened by the driver to each shard.
*
* Cassandra nodes are treated as if they have one shard.
*
* This will override the `cass_cluster_set_core_connections_per_host`, if set.
*
* <b>Default:</b> 1
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] num_connections
* @return CASS_OK if successful, otherwise an error occurred.
*
* @see cass_cluster_set_core_connections_per_host()
*/
CASS_EXPORT CassError
cass_cluster_set_core_connections_per_shard(CassCluster* cluster,
unsigned num_connections);

/**
* Sets the maximum number of connections made to each server in each
* IO thread.
Expand Down
60 changes: 58 additions & 2 deletions scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use openssl::ssl::SslContextBuilder;
use openssl_sys::SSL_CTX_up_ref;
use scylla::client::execution_profile::ExecutionProfileBuilder;
use scylla::client::session_builder::SessionBuilder;
use scylla::client::{SelfIdentity, WriteCoalescingDelay};
use scylla::client::{PoolSize, SelfIdentity, WriteCoalescingDelay};
use scylla::frame::Compression;
use scylla::policies::load_balancing::{
DefaultPolicyBuilder, LatencyAwarenessBuilder, LoadBalancingPolicy,
Expand All @@ -25,7 +25,7 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
use std::net::IpAddr;
use std::num::NonZero;
use std::num::{NonZero, NonZeroUsize};
use std::os::raw::{c_char, c_int, c_uint};
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -47,6 +47,8 @@ const DEFAULT_MAX_SCHEMA_WAIT_TIME: Duration = Duration::from_millis(10000);
const DEFAULT_SCHEMA_AGREEMENT_INTERVAL: Duration = Duration::from_millis(200);
// - setting TCP_NODELAY is true
const DEFAULT_SET_TCP_NO_DELAY: bool = true;
// - connection pool size is 1 per shard
const DEFAULT_CONNECTION_POOL_SIZE: PoolSize = PoolSize::PerShard(NonZeroUsize::new(1).unwrap());
// - enabling write coalescing
const DEFAULT_ENABLE_WRITE_COALESCING: bool = true;
// - write coalescing delay
Expand Down Expand Up @@ -234,6 +236,7 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
.schema_agreement_interval(DEFAULT_SCHEMA_AGREEMENT_INTERVAL)
.tcp_nodelay(DEFAULT_SET_TCP_NO_DELAY)
.connection_timeout(DEFAULT_CONNECT_TIMEOUT)
.pool_size(DEFAULT_CONNECTION_POOL_SIZE)
.write_coalescing(DEFAULT_ENABLE_WRITE_COALESCING)
.write_coalescing_delay(DEFAULT_WRITE_COALESCING_DELAY)
.keepalive_interval(DEFAULT_KEEPALIVE_INTERVAL)
Expand Down Expand Up @@ -454,6 +457,59 @@ pub unsafe extern "C" fn cass_cluster_set_connect_timeout(
cluster.session_builder.config.connect_timeout = Duration::from_millis(timeout_ms.into());
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_core_connections_per_host(
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
num_connections: c_uint,
) -> CassError {
let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
tracing::error!(
"Provided null cluster pointer to cass_cluster_set_core_connections_per_host!"
);
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
};

match NonZeroUsize::new(num_connections as usize) {
Some(non_zero_conns) => {
cluster.session_builder.config.connection_pool_size = PoolSize::PerHost(non_zero_conns);
CassError::CASS_OK
}
None => {
tracing::error!(
"Provided zero connections to cass_cluster_set_core_connections_per_host!"
);
CassError::CASS_ERROR_LIB_BAD_PARAMS
}
}
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_core_connections_per_shard(
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
num_connections: c_uint,
) -> CassError {
let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
tracing::error!(
"Provided null cluster pointer to cass_cluster_set_core_connections_per_shard!"
);
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
};

match NonZeroUsize::new(num_connections as usize) {
Some(non_zero_conns) => {
cluster.session_builder.config.connection_pool_size =
PoolSize::PerShard(non_zero_conns);
CassError::CASS_OK
}
None => {
tracing::error!(
"Provided zero connections to cass_cluster_set_core_connections_per_shard!"
);
CassError::CASS_ERROR_LIB_BAD_PARAMS
}
}
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_coalesce_delay(
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
Expand Down
4 changes: 0 additions & 4 deletions src/testing_unimplemented.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ CASS_EXPORT CassError cass_cluster_set_cloud_secure_connection_bundle_no_ssl_lib
CASS_EXPORT void cass_cluster_set_constant_reconnect(CassCluster* cluster, cass_uint64_t delay_ms) {
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_constant_reconnect\n");
}
CASS_EXPORT CassError cass_cluster_set_core_connections_per_host(CassCluster* cluster,
unsigned num_connections) {
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_core_connections_per_host\n");
}
CASS_EXPORT CassError cass_cluster_set_host_listener_callback(CassCluster* cluster,
CassHostListenerCallback callback,
void* data) {
Expand Down
13 changes: 13 additions & 0 deletions tests/src/integration/objects/cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
return *this;
}

/**
* Assign the number of connections made to each shard
*
* NOTE: One extra connection is established (the control connection)
*
* @param connections Number of connection per shard (default: 1)
* @return Cluster object
*/
Cluster& with_core_connections_per_shard(unsigned int connections = 1u) {
EXPECT_EQ(CASS_OK, cass_cluster_set_core_connections_per_shard(get(), connections));
return *this;
}

/**
* Sets credentials for plain text authentication
*
Expand Down
25 changes: 25 additions & 0 deletions tests/src/integration/tests/test_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ CASSANDRA_INTEGRATION_TEST_F(MetricsTests, ErrorsConnectionTimeouts) {
EXPECT_GE(2u, metrics.errors.connection_timeouts);
}

/**
* This test ensures that the driver is reporting the number of connections
* when connection pool size is configured per shard.
*/
CASSANDRA_INTEGRATION_TEST_F(MetricsTests, StatsShardConnections) {
CHECK_FAILURE;

const unsigned int CONNS_PER_SHARD = 2;

Session session =
default_cluster().with_core_connections_per_shard(CONNS_PER_SHARD).connect();

size_t nr_hosts = explode(contact_points_, ',').size();
size_t nr_shards = Options::is_scylla() ? Options::smp() : 1;
size_t expected_connection_count = nr_hosts * nr_shards * CONNS_PER_SHARD;

CassMetrics metrics = session.metrics();
for (int i = 0; i < 100 && metrics.stats.total_connections < expected_connection_count; ++i) {
metrics = session.metrics();
msleep(100);
}

EXPECT_GE(metrics.stats.total_connections, expected_connection_count);
}

/**
* This test ensures that the driver is reporting the proper timeouts for requests
*
Expand Down