Skip to content

Implement cass_retry_policy_logging_new and enable some exec profile integration tests #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\
:ExecutionProfileTest.Integration_Cassandra_RoundRobin\
:ExecutionProfileTest.Integration_Cassandra_TokenAwareRouting\
:ExecutionProfileTest.Integration_Cassandra_RetryPolicy\
:ExecutionProfileTest.Integration_Cassandra_SpeculativeExecutionPolicy\
:DCExecutionProfileTest.Integration_Cassandra_DCAware\
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
Expand Down Expand Up @@ -107,7 +106,6 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\
:ExecutionProfileTest.Integration_Cassandra_RoundRobin\
:ExecutionProfileTest.Integration_Cassandra_TokenAwareRouting\
:ExecutionProfileTest.Integration_Cassandra_RetryPolicy\
:ExecutionProfileTest.Integration_Cassandra_SpeculativeExecutionPolicy\
:DCExecutionProfileTest.Integration_Cassandra_DCAware\
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
Expand Down
9 changes: 6 additions & 3 deletions scylla-rust-wrapper/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ pub unsafe extern "C" fn cass_batch_set_retry_policy(

let maybe_arced_retry_policy: Option<Arc<dyn scylla::policies::retry::RetryPolicy>> =
ArcFFI::as_ref(retry_policy).map(|policy| match policy {
CassRetryPolicy::DefaultRetryPolicy(default) => {
CassRetryPolicy::Default(default) => {
default.clone() as Arc<dyn scylla::policies::retry::RetryPolicy>
}
CassRetryPolicy::FallthroughRetryPolicy(fallthrough) => fallthrough.clone(),
CassRetryPolicy::DowngradingConsistencyRetryPolicy(downgrading) => downgrading.clone(),
CassRetryPolicy::Fallthrough(fallthrough) => fallthrough.clone(),
CassRetryPolicy::DowngradingConsistency(downgrading) => downgrading.clone(),
CassRetryPolicy::Logging(logging) => Arc::clone(logging) as _,
#[cfg(cpp_integration_testing)]
CassRetryPolicy::Ignoring(ignoring) => Arc::clone(ignoring) as _,
});

Arc::make_mut(&mut batch.state)
Expand Down
10 changes: 6 additions & 4 deletions scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::exec_profile::{CassExecProfile, ExecProfileName, exec_profile_builder
use crate::future::CassFuture;
use crate::load_balancing::{CassHostFilter, LoadBalancingConfig, LoadBalancingKind};
use crate::retry_policy::CassRetryPolicy;
use crate::retry_policy::RetryPolicy::*;
use crate::ssl::CassSsl;
use crate::timestamp_generator::CassTimestampGen;
use crate::types::*;
Expand Down Expand Up @@ -1144,9 +1143,12 @@ pub unsafe extern "C" fn cass_cluster_set_retry_policy(
};

let retry_policy: Arc<dyn RetryPolicy> = match ArcFFI::as_ref(retry_policy) {
Some(DefaultRetryPolicy(default)) => Arc::clone(default) as _,
Some(FallthroughRetryPolicy(fallthrough)) => Arc::clone(fallthrough) as _,
Some(DowngradingConsistencyRetryPolicy(downgrading)) => Arc::clone(downgrading) as _,
Some(CassRetryPolicy::Default(default)) => Arc::clone(default) as _,
Some(CassRetryPolicy::Fallthrough(fallthrough)) => Arc::clone(fallthrough) as _,
Some(CassRetryPolicy::DowngradingConsistency(downgrading)) => Arc::clone(downgrading) as _,
Some(CassRetryPolicy::Logging(logging)) => Arc::clone(logging) as _,
#[cfg(cpp_integration_testing)]
Some(CassRetryPolicy::Ignoring(ignoring)) => Arc::clone(ignoring) as _,
None => {
tracing::error!("Provided null retry policy pointer to cass_cluster_set_retry_policy!");
return;
Expand Down
12 changes: 6 additions & 6 deletions scylla-rust-wrapper/src/exec_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ use crate::cluster::{
};
use crate::load_balancing::{LoadBalancingConfig, LoadBalancingKind};
use crate::retry_policy::CassRetryPolicy;
use crate::retry_policy::RetryPolicy::{
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
};
use crate::session::CassSessionInner;
use crate::statement::CassStatement;
use crate::types::{
Expand Down Expand Up @@ -670,9 +667,12 @@ pub unsafe extern "C" fn cass_execution_profile_set_retry_policy(
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
};
let retry_policy: Arc<dyn RetryPolicy> = match ArcFFI::as_ref(retry_policy) {
Some(DefaultRetryPolicy(default)) => Arc::clone(default) as _,
Some(FallthroughRetryPolicy(fallthrough)) => Arc::clone(fallthrough) as _,
Some(DowngradingConsistencyRetryPolicy(downgrading)) => Arc::clone(downgrading) as _,
Some(CassRetryPolicy::Default(default)) => Arc::clone(default) as _,
Some(CassRetryPolicy::Fallthrough(fallthrough)) => Arc::clone(fallthrough) as _,
Some(CassRetryPolicy::DowngradingConsistency(downgrading)) => Arc::clone(downgrading) as _,
Some(CassRetryPolicy::Logging(logging)) => Arc::clone(logging) as _,
#[cfg(cpp_integration_testing)]
Some(CassRetryPolicy::Ignoring(ignoring)) => Arc::clone(ignoring) as _,
None => {
tracing::error!(
"Provided null retry policy pointer to cass_execution_profile_set_retry_policy!"
Expand Down
36 changes: 36 additions & 0 deletions scylla-rust-wrapper/src/integration_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use scylla::policies::retry::RetryDecision;

use crate::argconv::{
ArcFFI, BoxFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr,
CassOwnedSharedPtr,
};
use crate::batch::CassBatch;
use crate::cluster::CassCluster;
use crate::future::{CassFuture, CassResultValue};
use crate::retry_policy::CassRetryPolicy;
use crate::statement::{BoundStatement, CassStatement};
use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t};

Expand Down Expand Up @@ -174,3 +176,37 @@ pub unsafe extern "C" fn testing_batch_set_sleeping_history_listener(
.batch
.set_history_listener(history_listener)
}

/// A retry policy that always ignores all errors.
///
/// Useful for testing purposes.
#[derive(Debug)]
pub struct IgnoringRetryPolicy;

#[derive(Debug)]
struct IgnoringRetrySession;

impl scylla::policies::retry::RetryPolicy for IgnoringRetryPolicy {
fn new_session(&self) -> Box<dyn scylla::policies::retry::RetrySession> {
Box::new(IgnoringRetrySession)
}
}

impl scylla::policies::retry::RetrySession for IgnoringRetrySession {
fn decide_should_retry(
&mut self,
_request_info: scylla::policies::retry::RequestInfo,
) -> RetryDecision {
RetryDecision::IgnoreWriteError
}

fn reset(&mut self) {}
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn testing_retry_policy_ignoring_new()
-> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Ignoring(Arc::new(
IgnoringRetryPolicy,
))))
}
105 changes: 93 additions & 12 deletions scylla-rust-wrapper/src/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,125 @@
use scylla::policies::retry::{
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy, RequestInfo,
RetryDecision, RetryPolicy, RetrySession,
};
use std::sync::Arc;

use crate::argconv::{ArcFFI, CMut, CassOwnedSharedPtr, FFI, FromArc};
use crate::argconv::{ArcFFI, CMut, CassBorrowedSharedPtr, CassOwnedSharedPtr, FFI, FromArc};

pub enum RetryPolicy {
DefaultRetryPolicy(Arc<DefaultRetryPolicy>),
FallthroughRetryPolicy(Arc<FallthroughRetryPolicy>),
DowngradingConsistencyRetryPolicy(Arc<DowngradingConsistencyRetryPolicy>),
#[derive(Debug)]
pub struct CassLoggingRetryPolicy {
child_policy: Arc<CassRetryPolicy>,
}

pub type CassRetryPolicy = RetryPolicy;
struct CassLoggingRetrySession {
child_session: Box<dyn RetrySession>,
}

impl RetryPolicy for CassLoggingRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(CassLoggingRetrySession {
child_session: self.child_policy.new_session(),
})
}
}

impl RetrySession for CassLoggingRetrySession {
fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
let error = request_info.error;
let initial_consistency = request_info.consistency;
let decision = self.child_session.decide_should_retry(request_info);

match &decision {
RetryDecision::RetrySameTarget(consistency) => tracing::info!(
"Retrying on the same target; Error: {}; Initial Consistency: {:?}; New Consistency: {:?}",
error,
initial_consistency,
consistency
),
RetryDecision::RetryNextTarget(consistency) => tracing::info!(
"Retrying on the next target; Error: {}; Initial Consistency: {:?}; New Consistency: {:?}",
error,
initial_consistency,
consistency
),
RetryDecision::IgnoreWriteError => {
tracing::info!("Ignoring write error; Error: {}", error)
}
// cpp-driver does not log in case of DontRetry decision.
_ => {}
}

decision
}

fn reset(&mut self) {
self.child_session.reset();
}
}

#[derive(Debug)]
pub enum CassRetryPolicy {
Default(Arc<DefaultRetryPolicy>),
Fallthrough(Arc<FallthroughRetryPolicy>),
DowngradingConsistency(Arc<DowngradingConsistencyRetryPolicy>),
Logging(Arc<CassLoggingRetryPolicy>),
#[cfg(cpp_integration_testing)]
Ignoring(Arc<crate::integration_testing::IgnoringRetryPolicy>),
}

impl RetryPolicy for CassRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
match self {
Self::Default(policy) => policy.new_session(),
Self::Fallthrough(policy) => policy.new_session(),
Self::DowngradingConsistency(policy) => policy.new_session(),
Self::Logging(policy) => policy.new_session(),
#[cfg(cpp_integration_testing)]
Self::Ignoring(policy) => policy.new_session(),
}
}
}

impl FFI for CassRetryPolicy {
type Origin = FromArc;
}

#[unsafe(no_mangle)]
pub extern "C" fn cass_retry_policy_default_new() -> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
ArcFFI::into_ptr(Arc::new(RetryPolicy::DefaultRetryPolicy(Arc::new(
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Default(Arc::new(
DefaultRetryPolicy,
))))
}

#[unsafe(no_mangle)]
pub extern "C" fn cass_retry_policy_downgrading_consistency_new()
-> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
ArcFFI::into_ptr(Arc::new(RetryPolicy::DowngradingConsistencyRetryPolicy(
Arc::new(DowngradingConsistencyRetryPolicy),
)))
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::DowngradingConsistency(Arc::new(
DowngradingConsistencyRetryPolicy,
))))
}

#[unsafe(no_mangle)]
pub extern "C" fn cass_retry_policy_fallthrough_new() -> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
ArcFFI::into_ptr(Arc::new(RetryPolicy::FallthroughRetryPolicy(Arc::new(
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Fallthrough(Arc::new(
FallthroughRetryPolicy,
))))
}

#[unsafe(no_mangle)]
pub extern "C" fn cass_retry_policy_logging_new(
child_policy_raw: CassBorrowedSharedPtr<CassRetryPolicy, CMut>,
) -> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
let Some(child_policy) = ArcFFI::cloned_from_ptr(child_policy_raw) else {
tracing::error!("Provided null pointer to child policy in cass_retry_policy_logging_new!");
return ArcFFI::null();
};

ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Logging(Arc::new(
CassLoggingRetryPolicy { child_policy },
))))
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_retry_policy_free(
retry_policy: CassOwnedSharedPtr<CassRetryPolicy, CMut>,
Expand Down
9 changes: 6 additions & 3 deletions scylla-rust-wrapper/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,14 @@ pub unsafe extern "C" fn cass_statement_set_retry_policy(

let maybe_arced_retry_policy: Option<Arc<dyn scylla::policies::retry::RetryPolicy>> =
ArcFFI::as_ref(retry_policy).map(|policy| match policy {
CassRetryPolicy::DefaultRetryPolicy(default) => {
CassRetryPolicy::Default(default) => {
default.clone() as Arc<dyn scylla::policies::retry::RetryPolicy>
}
CassRetryPolicy::FallthroughRetryPolicy(fallthrough) => fallthrough.clone(),
CassRetryPolicy::DowngradingConsistencyRetryPolicy(downgrading) => downgrading.clone(),
CassRetryPolicy::Fallthrough(fallthrough) => fallthrough.clone(),
CassRetryPolicy::DowngradingConsistency(downgrading) => downgrading.clone(),
CassRetryPolicy::Logging(logging) => Arc::clone(logging) as _,
#[cfg(cpp_integration_testing)]
CassRetryPolicy::Ignoring(ignoring) => Arc::clone(ignoring) as _,
});

match &mut statement.statement {
Expand Down
4 changes: 4 additions & 0 deletions src/testing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,8 @@ void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_tim
testing_batch_set_sleeping_history_listener(batch, sleep_time_ms);
}

CassRetryPolicy* retry_policy_ignoring_new() {
return testing_retry_policy_ignoring_new();
}

}}} // namespace datastax::internal::testing
2 changes: 2 additions & 0 deletions src/testing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* state

CASS_EXPORT void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_time_ms);

CASS_EXPORT CassRetryPolicy* retry_policy_ignoring_new();

}}} // namespace datastax::internal::testing

#endif
14 changes: 14 additions & 0 deletions src/testing_rust_impls.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,18 @@ CASS_EXPORT void testing_batch_set_sleeping_history_listener(CassBatch *batch,
cass_uint64_t sleep_time_ms);
}

/**
* Creates a new ignoring retry policy.
*
* This policy never retries any requests, regardless of the error.
* It simply ignores the error.
*
* @public @memberof CassRetryPolicy
*
* @return Returns a retry policy that must be freed.
*
* @see cass_retry_policy_free()
*/
CASS_EXPORT CassRetryPolicy* testing_retry_policy_ignoring_new();

#endif
3 changes: 0 additions & 3 deletions src/testing_unimplemented.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ cass_materialized_view_meta_field_by_name(const CassMaterializedViewMeta* view_m
const char* name) {
throw std::runtime_error("UNIMPLEMENTED cass_materialized_view_meta_field_by_name\n");
}
CASS_EXPORT CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* child_retry_policy) {
throw std::runtime_error("UNIMPLEMENTED cass_retry_policy_logging_new\n");
}
CASS_EXPORT CassVersion cass_schema_meta_version(const CassSchemaMeta* schema_meta) {
throw std::runtime_error("UNIMPLEMENTED cass_schema_meta_version\n");
}
Expand Down
14 changes: 14 additions & 0 deletions tests/src/integration/objects/retry_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef __TEST_RETRY_POLICY_HPP__
#define __TEST_RETRY_POLICY_HPP__
#include "cassandra.h"
#include "testing.hpp"

#include "objects/object_base.hpp"

Expand Down Expand Up @@ -82,6 +83,19 @@ class FallthroughRetryPolicy : public RetryPolicy {
: RetryPolicy(cass_retry_policy_fallthrough_new()) {}
};

/**
* Wrapped ignoring retry policy
*/
class IgnoreRetryPolicy : public RetryPolicy {
public:
/**
* Create the ignoring retry policy object from the native driver
* ignoring retry policy object
*/
IgnoreRetryPolicy()
: RetryPolicy(datastax::internal::testing::retry_policy_ignoring_new()) {}
};

/**
* Wrapped logging retry policy
*/
Expand Down
Loading