diff --git a/Makefile b/Makefile index afc21299..9c12d64c 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :PreparedMetadataTests.*\ :UseKeyspaceCaseSensitiveTests.*\ :ServerSideFailureTests.*\ +:TimestampTests.*\ :MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ :MetricsTests.Integration_Cassandra_Requests\ :MetricsTests.Integration_Cassandra_StatsShardConnections\ @@ -38,6 +39,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :ExecutionProfileTest.Integration_Cassandra_LatencyAwareRouting\ :-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\ :HeartbeatTests.Integration_Cassandra_HeartbeatFailed\ +:TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\ :ControlConnectionTests.Integration_Cassandra_TopologyChange\ :ControlConnectionTests.Integration_Cassandra_FullOutage\ :ControlConnectionTests.Integration_Cassandra_TerminatedUsingMultipleIoThreadsWithError\ @@ -77,6 +79,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :PreparedMetadataTests.*\ :UseKeyspaceCaseSensitiveTests.*\ :ServerSideFailureTests.*\ +:TimestampTests.*\ :MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ :MetricsTests.Integration_Cassandra_Requests\ :MetricsTests.Integration_Cassandra_StatsShardConnections\ @@ -90,6 +93,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\ :PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\ :HeartbeatTests.Integration_Cassandra_HeartbeatFailed\ +:TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\ :ControlConnectionTests.Integration_Cassandra_TopologyChange\ :ControlConnectionTests.Integration_Cassandra_FullOutage\ :ControlConnectionTests.Integration_Cassandra_TerminatedUsingMultipleIoThreadsWithError\ diff --git a/README.md b/README.md index 9f21c126..94f240f4 100644 --- a/README.md +++ b/README.md @@ -173,25 +173,6 @@ The driver inherits almost all the features of C/C++ and Rust drivers, such as: cass_user_type_set_custom[by_name] Unimplemented because of the same reasons as binding for statements.
Note: The driver does not check whether the type of the value being set for a field of the UDT is compatible with the field's actual type. - - Timestamp generators - - - cass_timestamp_gen_server_side_new - Timestamp generator is not implemented in the Rust driver. - - - cass_timestamp_gen_monotonic_new - - - cass_timestamp_gen_monotonic_new_with_settings - - - cass_timestamp_gen_free - - - cass_cluster_set_timestamp_gen - Metadata diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index b8e756a0..c9e0a860 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -6,6 +6,7 @@ use crate::future::CassFuture; use crate::retry_policy::CassRetryPolicy; use crate::retry_policy::RetryPolicy::*; use crate::ssl::CassSsl; +use crate::timestamp_generator::CassTimestampGen; use crate::types::*; use crate::uuid::CassUuid; use openssl::ssl::SslContextBuilder; @@ -19,6 +20,7 @@ use scylla::policies::load_balancing::{ }; use scylla::policies::retry::RetryPolicy; use scylla::policies::speculative_execution::SimpleSpeculativeExecutionPolicy; +use scylla::policies::timestamp_generator::TimestampGenerator; use scylla::routing::ShardAwarePortRange; use scylla::statement::{Consistency, SerialConsistency}; use std::collections::HashMap; @@ -466,6 +468,33 @@ pub unsafe extern "C" fn cass_cluster_set_tcp_keepalive( cluster.session_builder.config.tcp_keepalive_interval = tcp_keepalive_interval; } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_cluster_set_timestamp_gen( + cluster_raw: CassBorrowedExclusivePtr, + timestamp_gen_raw: CassBorrowedSharedPtr, +) { + let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else { + tracing::error!("Provided null cluster pointer to cass_cluster_set_timestamp_gen!"); + return; + }; + let Some(timestamp_gen) = BoxFFI::as_ref(timestamp_gen_raw) else { + tracing::error!( + "Provided null timestamp generator pointer to cass_cluster_set_timestamp_gen!" + ); + return; + }; + + let rust_timestamp_gen: Option> = match timestamp_gen { + // In rust-driver, `None` is equivalent to using server-side timestamp generator. + CassTimestampGen::ServerSide => None, + CassTimestampGen::Monotonic(monotonic_timestamp_generator) => { + Some(Arc::clone(monotonic_timestamp_generator) as _) + } + }; + + cluster.session_builder.config.timestamp_generator = rust_timestamp_gen; +} + #[unsafe(no_mangle)] pub unsafe extern "C" fn cass_cluster_set_connection_heartbeat_interval( cluster_raw: CassBorrowedExclusivePtr, diff --git a/scylla-rust-wrapper/src/lib.rs b/scylla-rust-wrapper/src/lib.rs index daf11ccf..cb261bf5 100644 --- a/scylla-rust-wrapper/src/lib.rs +++ b/scylla-rust-wrapper/src/lib.rs @@ -37,6 +37,7 @@ pub mod ssl; pub mod statement; #[cfg(test)] pub mod testing; +pub mod timestamp_generator; pub mod tuple; pub mod user_type; pub mod uuid; diff --git a/scylla-rust-wrapper/src/timestamp_generator.rs b/scylla-rust-wrapper/src/timestamp_generator.rs new file mode 100644 index 00000000..623ac293 --- /dev/null +++ b/scylla-rust-wrapper/src/timestamp_generator.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; +use std::time::Duration; + +use scylla::policies::timestamp_generator::MonotonicTimestampGenerator; + +use crate::argconv::{BoxFFI, CMut, CassOwnedExclusivePtr, FFI, FromBox}; + +pub enum CassTimestampGen { + ServerSide, + Monotonic(Arc), +} + +impl FFI for CassTimestampGen { + type Origin = FromBox; +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_timestamp_gen_server_side_new() +-> CassOwnedExclusivePtr { + BoxFFI::into_ptr(Box::new(CassTimestampGen::ServerSide)) +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_timestamp_gen_monotonic_new() +-> CassOwnedExclusivePtr { + BoxFFI::into_ptr(Box::new(CassTimestampGen::Monotonic(Arc::new( + // Generator with default settings (warning_threshold=1s, warning_interval=1s) + MonotonicTimestampGenerator::new(), + )))) +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_timestamp_gen_monotonic_new_with_settings( + warning_threshold_us: i64, + warning_interval_ms: i64, +) -> CassOwnedExclusivePtr { + let generator = if warning_threshold_us <= 0 { + // If threshold is <= 0, we disable the warnings. + MonotonicTimestampGenerator::new().without_warnings() + } else { + let warning_threshold = Duration::from_micros(warning_threshold_us as u64); + let warning_interval = if warning_interval_ms <= 0 { + // Inverval <= 0 fallbacks to 1ms. + Duration::from_millis(1) + } else { + Duration::from_millis(warning_interval_ms as u64) + }; + + MonotonicTimestampGenerator::new().with_warning_times(warning_threshold, warning_interval) + }; + + BoxFFI::into_ptr(Box::new(CassTimestampGen::Monotonic(Arc::new(generator)))) +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_timestamp_gen_free( + timestamp_gen_raw: CassOwnedExclusivePtr, +) { + BoxFFI::free(timestamp_gen_raw) +} diff --git a/src/testing_unimplemented.cpp b/src/testing_unimplemented.cpp index 3843b8ac..960ab3e2 100644 --- a/src/testing_unimplemented.cpp +++ b/src/testing_unimplemented.cpp @@ -84,10 +84,6 @@ CASS_EXPORT CassError cass_cluster_set_prepare_on_up_or_add_host(CassCluster* cl cass_bool_t enabled) { throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_prepare_on_up_or_add_host\n"); } -CASS_EXPORT void cass_cluster_set_timestamp_gen(CassCluster* cluster, - CassTimestampGen* timestamp_gen) { - throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_timestamp_gen\n"); -} CASS_EXPORT void cass_cluster_set_whitelist_dc_filtering(CassCluster* cluster, const char* dcs) { throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_whitelist_dc_filtering\n"); } diff --git a/src/timestamp_generator.cpp b/src/timestamp_generator.cpp index d186d810..40f94e82 100644 --- a/src/timestamp_generator.cpp +++ b/src/timestamp_generator.cpp @@ -22,32 +22,6 @@ using namespace datastax::internal::core; -extern "C" { - -CassTimestampGen* cass_timestamp_gen_server_side_new() { - TimestampGenerator* timestamp_gen = new ServerSideTimestampGenerator(); - timestamp_gen->inc_ref(); - return CassTimestampGen::to(timestamp_gen); -} - -CassTimestampGen* cass_timestamp_gen_monotonic_new() { - TimestampGenerator* timestamp_gen = new MonotonicTimestampGenerator(); - timestamp_gen->inc_ref(); - return CassTimestampGen::to(timestamp_gen); -} - -CassTimestampGen* cass_timestamp_gen_monotonic_new_with_settings(int64_t warning_threshold_us, - int64_t warning_interval_ms) { - TimestampGenerator* timestamp_gen = - new MonotonicTimestampGenerator(warning_threshold_us, warning_interval_ms); - timestamp_gen->inc_ref(); - return CassTimestampGen::to(timestamp_gen); -} - -void cass_timestamp_gen_free(CassTimestampGen* timestamp_gen) { timestamp_gen->dec_ref(); } - -} // extern "C" - int64_t MonotonicTimestampGenerator::next() { while (true) { int64_t last = last_.load();