diff --git a/Makefile b/Makefile index dd09fe04..260ad353 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,6 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\ :ExecutionProfileTest.Integration_Cassandra_RoundRobin\ :ExecutionProfileTest.Integration_Cassandra_TokenAwareRouting\ -:ExecutionProfileTest.Integration_Cassandra_RetryPolicy\ :ExecutionProfileTest.Integration_Cassandra_SpeculativeExecutionPolicy\ :DCExecutionProfileTest.Integration_Cassandra_DCAware\ :ControlConnectionTests.Integration_Cassandra_TopologyChange\ @@ -107,7 +106,6 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\ :ExecutionProfileTest.Integration_Cassandra_RoundRobin\ :ExecutionProfileTest.Integration_Cassandra_TokenAwareRouting\ -:ExecutionProfileTest.Integration_Cassandra_RetryPolicy\ :ExecutionProfileTest.Integration_Cassandra_SpeculativeExecutionPolicy\ :DCExecutionProfileTest.Integration_Cassandra_DCAware\ :ControlConnectionTests.Integration_Cassandra_TopologyChange\ diff --git a/scylla-rust-wrapper/src/batch.rs b/scylla-rust-wrapper/src/batch.rs index b777deae..48628b54 100644 --- a/scylla-rust-wrapper/src/batch.rs +++ b/scylla-rust-wrapper/src/batch.rs @@ -109,11 +109,14 @@ pub unsafe extern "C" fn cass_batch_set_retry_policy( let maybe_arced_retry_policy: Option> = ArcFFI::as_ref(retry_policy).map(|policy| match policy { - CassRetryPolicy::DefaultRetryPolicy(default) => { + CassRetryPolicy::Default(default) => { default.clone() as Arc } - CassRetryPolicy::FallthroughRetryPolicy(fallthrough) => fallthrough.clone(), - CassRetryPolicy::DowngradingConsistencyRetryPolicy(downgrading) => downgrading.clone(), + CassRetryPolicy::Fallthrough(fallthrough) => fallthrough.clone(), + CassRetryPolicy::DowngradingConsistency(downgrading) => downgrading.clone(), + CassRetryPolicy::Logging(logging) => Arc::clone(logging) as _, + #[cfg(cpp_integration_testing)] + CassRetryPolicy::Ignoring(ignoring) => Arc::clone(ignoring) as _, }); Arc::make_mut(&mut batch.state) diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index f41fc4b2..7d7d825f 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -5,7 +5,6 @@ use crate::exec_profile::{CassExecProfile, ExecProfileName, exec_profile_builder use crate::future::CassFuture; use crate::load_balancing::{CassHostFilter, LoadBalancingConfig, LoadBalancingKind}; use crate::retry_policy::CassRetryPolicy; -use crate::retry_policy::RetryPolicy::*; use crate::ssl::CassSsl; use crate::timestamp_generator::CassTimestampGen; use crate::types::*; @@ -1144,9 +1143,12 @@ pub unsafe extern "C" fn cass_cluster_set_retry_policy( }; let retry_policy: Arc = match ArcFFI::as_ref(retry_policy) { - Some(DefaultRetryPolicy(default)) => Arc::clone(default) as _, - Some(FallthroughRetryPolicy(fallthrough)) => Arc::clone(fallthrough) as _, - Some(DowngradingConsistencyRetryPolicy(downgrading)) => Arc::clone(downgrading) as _, + Some(CassRetryPolicy::Default(default)) => Arc::clone(default) as _, + Some(CassRetryPolicy::Fallthrough(fallthrough)) => Arc::clone(fallthrough) as _, + Some(CassRetryPolicy::DowngradingConsistency(downgrading)) => Arc::clone(downgrading) as _, + Some(CassRetryPolicy::Logging(logging)) => Arc::clone(logging) as _, + #[cfg(cpp_integration_testing)] + Some(CassRetryPolicy::Ignoring(ignoring)) => Arc::clone(ignoring) as _, None => { tracing::error!("Provided null retry policy pointer to cass_cluster_set_retry_policy!"); return; diff --git a/scylla-rust-wrapper/src/exec_profile.rs b/scylla-rust-wrapper/src/exec_profile.rs index 2ca6621c..5d06f1d1 100644 --- a/scylla-rust-wrapper/src/exec_profile.rs +++ b/scylla-rust-wrapper/src/exec_profile.rs @@ -27,9 +27,6 @@ use crate::cluster::{ }; use crate::load_balancing::{LoadBalancingConfig, LoadBalancingKind}; use crate::retry_policy::CassRetryPolicy; -use crate::retry_policy::RetryPolicy::{ - DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy, -}; use crate::session::CassSessionInner; use crate::statement::CassStatement; use crate::types::{ @@ -670,9 +667,12 @@ pub unsafe extern "C" fn cass_execution_profile_set_retry_policy( return CassError::CASS_ERROR_LIB_BAD_PARAMS; }; let retry_policy: Arc = match ArcFFI::as_ref(retry_policy) { - Some(DefaultRetryPolicy(default)) => Arc::clone(default) as _, - Some(FallthroughRetryPolicy(fallthrough)) => Arc::clone(fallthrough) as _, - Some(DowngradingConsistencyRetryPolicy(downgrading)) => Arc::clone(downgrading) as _, + Some(CassRetryPolicy::Default(default)) => Arc::clone(default) as _, + Some(CassRetryPolicy::Fallthrough(fallthrough)) => Arc::clone(fallthrough) as _, + Some(CassRetryPolicy::DowngradingConsistency(downgrading)) => Arc::clone(downgrading) as _, + Some(CassRetryPolicy::Logging(logging)) => Arc::clone(logging) as _, + #[cfg(cpp_integration_testing)] + Some(CassRetryPolicy::Ignoring(ignoring)) => Arc::clone(ignoring) as _, None => { tracing::error!( "Provided null retry policy pointer to cass_execution_profile_set_retry_policy!" diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index ce97ac48..e49841a7 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -9,10 +9,12 @@ use scylla::policies::retry::RetryDecision; use crate::argconv::{ ArcFFI, BoxFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, + CassOwnedSharedPtr, }; use crate::batch::CassBatch; use crate::cluster::CassCluster; use crate::future::{CassFuture, CassResultValue}; +use crate::retry_policy::CassRetryPolicy; use crate::statement::{BoundStatement, CassStatement}; use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t}; @@ -174,3 +176,37 @@ pub unsafe extern "C" fn testing_batch_set_sleeping_history_listener( .batch .set_history_listener(history_listener) } + +/// A retry policy that always ignores all errors. +/// +/// Useful for testing purposes. +#[derive(Debug)] +pub struct IgnoringRetryPolicy; + +#[derive(Debug)] +struct IgnoringRetrySession; + +impl scylla::policies::retry::RetryPolicy for IgnoringRetryPolicy { + fn new_session(&self) -> Box { + Box::new(IgnoringRetrySession) + } +} + +impl scylla::policies::retry::RetrySession for IgnoringRetrySession { + fn decide_should_retry( + &mut self, + _request_info: scylla::policies::retry::RequestInfo, + ) -> RetryDecision { + RetryDecision::IgnoreWriteError + } + + fn reset(&mut self) {} +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn testing_retry_policy_ignoring_new() +-> CassOwnedSharedPtr { + ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Ignoring(Arc::new( + IgnoringRetryPolicy, + )))) +} diff --git a/scylla-rust-wrapper/src/retry_policy.rs b/scylla-rust-wrapper/src/retry_policy.rs index 4058277a..40a34b98 100644 --- a/scylla-rust-wrapper/src/retry_policy.rs +++ b/scylla-rust-wrapper/src/retry_policy.rs @@ -1,17 +1,84 @@ use scylla::policies::retry::{ - DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy, + DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy, RequestInfo, + RetryDecision, RetryPolicy, RetrySession, }; use std::sync::Arc; -use crate::argconv::{ArcFFI, CMut, CassOwnedSharedPtr, FFI, FromArc}; +use crate::argconv::{ArcFFI, CMut, CassBorrowedSharedPtr, CassOwnedSharedPtr, FFI, FromArc}; -pub enum RetryPolicy { - DefaultRetryPolicy(Arc), - FallthroughRetryPolicy(Arc), - DowngradingConsistencyRetryPolicy(Arc), +#[derive(Debug)] +pub struct CassLoggingRetryPolicy { + child_policy: Arc, } -pub type CassRetryPolicy = RetryPolicy; +struct CassLoggingRetrySession { + child_session: Box, +} + +impl RetryPolicy for CassLoggingRetryPolicy { + fn new_session(&self) -> Box { + Box::new(CassLoggingRetrySession { + child_session: self.child_policy.new_session(), + }) + } +} + +impl RetrySession for CassLoggingRetrySession { + fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision { + let error = request_info.error; + let initial_consistency = request_info.consistency; + let decision = self.child_session.decide_should_retry(request_info); + + match &decision { + RetryDecision::RetrySameTarget(consistency) => tracing::info!( + "Retrying on the same target; Error: {}; Initial Consistency: {:?}; New Consistency: {:?}", + error, + initial_consistency, + consistency + ), + RetryDecision::RetryNextTarget(consistency) => tracing::info!( + "Retrying on the next target; Error: {}; Initial Consistency: {:?}; New Consistency: {:?}", + error, + initial_consistency, + consistency + ), + RetryDecision::IgnoreWriteError => { + tracing::info!("Ignoring write error; Error: {}", error) + } + // cpp-driver does not log in case of DontRetry decision. + _ => {} + } + + decision + } + + fn reset(&mut self) { + self.child_session.reset(); + } +} + +#[derive(Debug)] +pub enum CassRetryPolicy { + Default(Arc), + Fallthrough(Arc), + DowngradingConsistency(Arc), + Logging(Arc), + #[cfg(cpp_integration_testing)] + Ignoring(Arc), +} + +impl RetryPolicy for CassRetryPolicy { + fn new_session(&self) -> Box { + match self { + Self::Default(policy) => policy.new_session(), + Self::Fallthrough(policy) => policy.new_session(), + Self::DowngradingConsistency(policy) => policy.new_session(), + Self::Logging(policy) => policy.new_session(), + #[cfg(cpp_integration_testing)] + Self::Ignoring(policy) => policy.new_session(), + } + } +} impl FFI for CassRetryPolicy { type Origin = FromArc; @@ -19,7 +86,7 @@ impl FFI for CassRetryPolicy { #[unsafe(no_mangle)] pub extern "C" fn cass_retry_policy_default_new() -> CassOwnedSharedPtr { - ArcFFI::into_ptr(Arc::new(RetryPolicy::DefaultRetryPolicy(Arc::new( + ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Default(Arc::new( DefaultRetryPolicy, )))) } @@ -27,18 +94,32 @@ pub extern "C" fn cass_retry_policy_default_new() -> CassOwnedSharedPtr CassOwnedSharedPtr { - ArcFFI::into_ptr(Arc::new(RetryPolicy::DowngradingConsistencyRetryPolicy( - Arc::new(DowngradingConsistencyRetryPolicy), - ))) + ArcFFI::into_ptr(Arc::new(CassRetryPolicy::DowngradingConsistency(Arc::new( + DowngradingConsistencyRetryPolicy, + )))) } #[unsafe(no_mangle)] pub extern "C" fn cass_retry_policy_fallthrough_new() -> CassOwnedSharedPtr { - ArcFFI::into_ptr(Arc::new(RetryPolicy::FallthroughRetryPolicy(Arc::new( + ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Fallthrough(Arc::new( FallthroughRetryPolicy, )))) } +#[unsafe(no_mangle)] +pub extern "C" fn cass_retry_policy_logging_new( + child_policy_raw: CassBorrowedSharedPtr, +) -> CassOwnedSharedPtr { + let Some(child_policy) = ArcFFI::cloned_from_ptr(child_policy_raw) else { + tracing::error!("Provided null pointer to child policy in cass_retry_policy_logging_new!"); + return ArcFFI::null(); + }; + + ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Logging(Arc::new( + CassLoggingRetryPolicy { child_policy }, + )))) +} + #[unsafe(no_mangle)] pub unsafe extern "C" fn cass_retry_policy_free( retry_policy: CassOwnedSharedPtr, diff --git a/scylla-rust-wrapper/src/statement.rs b/scylla-rust-wrapper/src/statement.rs index 33eabf2d..e9f87976 100644 --- a/scylla-rust-wrapper/src/statement.rs +++ b/scylla-rust-wrapper/src/statement.rs @@ -582,11 +582,14 @@ pub unsafe extern "C" fn cass_statement_set_retry_policy( let maybe_arced_retry_policy: Option> = ArcFFI::as_ref(retry_policy).map(|policy| match policy { - CassRetryPolicy::DefaultRetryPolicy(default) => { + CassRetryPolicy::Default(default) => { default.clone() as Arc } - CassRetryPolicy::FallthroughRetryPolicy(fallthrough) => fallthrough.clone(), - CassRetryPolicy::DowngradingConsistencyRetryPolicy(downgrading) => downgrading.clone(), + CassRetryPolicy::Fallthrough(fallthrough) => fallthrough.clone(), + CassRetryPolicy::DowngradingConsistency(downgrading) => downgrading.clone(), + CassRetryPolicy::Logging(logging) => Arc::clone(logging) as _, + #[cfg(cpp_integration_testing)] + CassRetryPolicy::Ignoring(ignoring) => Arc::clone(ignoring) as _, }); match &mut statement.statement { diff --git a/src/testing.cpp b/src/testing.cpp index 2f7d9c12..c5fe2ed6 100644 --- a/src/testing.cpp +++ b/src/testing.cpp @@ -121,4 +121,8 @@ void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_tim testing_batch_set_sleeping_history_listener(batch, sleep_time_ms); } +CassRetryPolicy* retry_policy_ignoring_new() { + return testing_retry_policy_ignoring_new(); +} + }}} // namespace datastax::internal::testing diff --git a/src/testing.hpp b/src/testing.hpp index 75bb9629..7ca0e473 100644 --- a/src/testing.hpp +++ b/src/testing.hpp @@ -54,6 +54,8 @@ CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* state CASS_EXPORT void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_time_ms); +CASS_EXPORT CassRetryPolicy* retry_policy_ignoring_new(); + }}} // namespace datastax::internal::testing #endif diff --git a/src/testing_rust_impls.h b/src/testing_rust_impls.h index a5866ff7..4954e074 100644 --- a/src/testing_rust_impls.h +++ b/src/testing_rust_impls.h @@ -40,4 +40,18 @@ CASS_EXPORT void testing_batch_set_sleeping_history_listener(CassBatch *batch, cass_uint64_t sleep_time_ms); } +/** + * Creates a new ignoring retry policy. + * + * This policy never retries any requests, regardless of the error. + * It simply ignores the error. + * + * @public @memberof CassRetryPolicy + * + * @return Returns a retry policy that must be freed. + * + * @see cass_retry_policy_free() + */ +CASS_EXPORT CassRetryPolicy* testing_retry_policy_ignoring_new(); + #endif diff --git a/src/testing_unimplemented.cpp b/src/testing_unimplemented.cpp index a15142d7..ced67512 100644 --- a/src/testing_unimplemented.cpp +++ b/src/testing_unimplemented.cpp @@ -160,9 +160,6 @@ cass_materialized_view_meta_field_by_name(const CassMaterializedViewMeta* view_m const char* name) { throw std::runtime_error("UNIMPLEMENTED cass_materialized_view_meta_field_by_name\n"); } -CASS_EXPORT CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* child_retry_policy) { - throw std::runtime_error("UNIMPLEMENTED cass_retry_policy_logging_new\n"); -} CASS_EXPORT CassVersion cass_schema_meta_version(const CassSchemaMeta* schema_meta) { throw std::runtime_error("UNIMPLEMENTED cass_schema_meta_version\n"); } diff --git a/tests/src/integration/objects/retry_policy.hpp b/tests/src/integration/objects/retry_policy.hpp index bcc9d1da..4b308856 100644 --- a/tests/src/integration/objects/retry_policy.hpp +++ b/tests/src/integration/objects/retry_policy.hpp @@ -17,6 +17,7 @@ #ifndef __TEST_RETRY_POLICY_HPP__ #define __TEST_RETRY_POLICY_HPP__ #include "cassandra.h" +#include "testing.hpp" #include "objects/object_base.hpp" @@ -82,6 +83,19 @@ class FallthroughRetryPolicy : public RetryPolicy { : RetryPolicy(cass_retry_policy_fallthrough_new()) {} }; +/** + * Wrapped ignoring retry policy + */ +class IgnoreRetryPolicy : public RetryPolicy { +public: + /** + * Create the ignoring retry policy object from the native driver + * ignoring retry policy object + */ + IgnoreRetryPolicy() + : RetryPolicy(datastax::internal::testing::retry_policy_ignoring_new()) {} +}; + /** * Wrapped logging retry policy */ diff --git a/tests/src/integration/tests/test_exec_profile.cpp b/tests/src/integration/tests/test_exec_profile.cpp index b7a3d2af..a0c2d8fa 100644 --- a/tests/src/integration/tests/test_exec_profile.cpp +++ b/tests/src/integration/tests/test_exec_profile.cpp @@ -25,10 +25,8 @@ class ExecutionProfileTest : public Integration { public: ExecutionProfileTest() : insert_(NULL) - //, child_retry_policy_(IgnoreRetryPolicy::policy()) // Used for counting retry - , child_retry_policy_(DefaultRetryPolicy()) - // We do not implement logging retry policy in cpp-rust-driver - // , logging_retry_policy_(child_retry_policy_) + , child_retry_policy_(IgnoreRetryPolicy()) // Used for counting retry + , logging_retry_policy_(child_retry_policy_) , skip_base_execution_profile_(false) { // LWTs do not work with tablets. disable_tablets_ = true; @@ -62,8 +60,7 @@ class ExecutionProfileTest : public Integration { .with_whitelist_filtering(Options::host_prefix() + "1") .with_load_balance_round_robin(); profiles_["retry_policy"] = ExecutionProfile::build() - // We do not implement logging retry policy in cpp-rust-driver - .with_retry_policy(child_retry_policy_) + .with_retry_policy(logging_retry_policy_) .with_consistency(CASS_CONSISTENCY_THREE); profiles_["speculative_execution"] = ExecutionProfile::build().with_constant_speculative_execution_policy(100, 20); @@ -104,8 +101,8 @@ class ExecutionProfileTest : public Integration { /** * Logging retry policy for 'retry_policy' execution profile */ - // We do not implement logging retry policy in cpp-rust-driver - // LoggingRetryPolicy logging_retry_policy_; + LoggingRetryPolicy logging_retry_policy_; + /** * Flag to determine if base execution profiles should be built or not */ @@ -634,7 +631,7 @@ CASSANDRA_INTEGRATION_TEST_F(ExecutionProfileTest, RetryPolicy) { CHECK_FAILURE; // Create a logger criteria for retry policy validation - logger_.add_critera("Ignoring unavailable error"); + logger_.add_critera("Ignoring write error"); // Execute a simple query without assigned profile Statement statement(default_select_all());