Skip to content

Commit 5d3fa0c

Browse files
committed
future: expose recorded attempted hosts
This commit exposes the `attempted_hosts` method on `CassFuture`, allowing users to retrieve the hosts that were attempted during the execution of a future. This can be used in proxy tests in the wrapper itself, but the main use case is in the C++ integration tests. The necessary bridging code is implemented in the next commit.
1 parent 6bd0a05 commit 5d3fa0c

File tree

3 files changed

+136
-64
lines changed

3 files changed

+136
-64
lines changed

scylla-rust-wrapper/src/future.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub struct CassFuture {
6060
state: Mutex<CassFutureState>,
6161
result: OnceLock<CassFutureResult>,
6262
wait_for_value: Condvar,
63+
#[cfg(cpp_integration_testing)]
64+
recording_listener: Option<Arc<crate::integration_testing::RecordingHistoryListener>>,
6365
}
6466

6567
impl FFI for CassFuture {
@@ -79,17 +81,30 @@ struct JoinHandleTimeout(JoinHandle<()>);
7981
impl CassFuture {
8082
pub(crate) fn make_raw(
8183
fut: impl Future<Output = CassFutureResult> + Send + 'static,
84+
#[cfg(cpp_integration_testing)] recording_listener: Option<
85+
Arc<crate::integration_testing::RecordingHistoryListener>,
86+
>,
8287
) -> CassOwnedSharedPtr<CassFuture, CMut> {
83-
Self::new_from_future(fut).into_raw()
88+
Self::new_from_future(
89+
fut,
90+
#[cfg(cpp_integration_testing)]
91+
recording_listener,
92+
)
93+
.into_raw()
8494
}
8595

8696
pub(crate) fn new_from_future(
8797
fut: impl Future<Output = CassFutureResult> + Send + 'static,
98+
#[cfg(cpp_integration_testing)] recording_listener: Option<
99+
Arc<crate::integration_testing::RecordingHistoryListener>,
100+
>,
88101
) -> Arc<CassFuture> {
89102
let cass_fut = Arc::new(CassFuture {
90103
state: Mutex::new(Default::default()),
91104
result: OnceLock::new(),
92105
wait_for_value: Condvar::new(),
106+
#[cfg(cpp_integration_testing)]
107+
recording_listener,
93108
});
94109
let cass_fut_clone = Arc::clone(&cass_fut);
95110
let join_handle = RUNTIME.spawn(async move {
@@ -125,6 +140,8 @@ impl CassFuture {
125140
state: Mutex::new(CassFutureState::default()),
126141
result: OnceLock::from(r),
127142
wait_for_value: Condvar::new(),
143+
#[cfg(cpp_integration_testing)]
144+
recording_listener: None,
128145
})
129146
}
130147

@@ -300,6 +317,15 @@ impl CassFuture {
300317
fn into_raw(self: Arc<Self>) -> CassOwnedSharedPtr<Self, CMut> {
301318
ArcFFI::into_ptr(self)
302319
}
320+
321+
#[cfg(cpp_integration_testing)]
322+
pub(crate) fn attempted_hosts(&self) -> Vec<std::net::SocketAddr> {
323+
if let Some(listener) = &self.recording_listener {
324+
listener.get_attempted_hosts()
325+
} else {
326+
vec![]
327+
}
328+
}
303329
}
304330

305331
// Do not remove; this asserts that `CassFuture` implements Send + Sync,
@@ -527,7 +553,11 @@ mod tests {
527553
tokio::time::sleep(Duration::from_millis(10)).await;
528554
Err((CassError::CASS_OK, ERROR_MSG.into()))
529555
};
530-
let cass_fut = CassFuture::make_raw(fut);
556+
let cass_fut = CassFuture::make_raw(
557+
fut,
558+
#[cfg(cpp_integration_testing)]
559+
None,
560+
);
531561

532562
struct PtrWrapper(CassBorrowedSharedPtr<'static, CassFuture, CMut>);
533563
unsafe impl Send for PtrWrapper {}
@@ -562,7 +592,11 @@ mod tests {
562592
tokio::time::sleep(Duration::from_micros(HUNDRED_MILLIS_IN_MICROS)).await;
563593
Err((CassError::CASS_OK, ERROR_MSG.into()))
564594
};
565-
let cass_fut = CassFuture::make_raw(fut);
595+
let cass_fut = CassFuture::make_raw(
596+
fut,
597+
#[cfg(cpp_integration_testing)]
598+
None,
599+
);
566600

567601
unsafe {
568602
// This should timeout on tokio::time::timeout.
@@ -609,7 +643,11 @@ mod tests {
609643
tokio::time::sleep(Duration::from_micros(HUNDRED_MILLIS_IN_MICROS)).await;
610644
Err((CassError::CASS_OK, ERROR_MSG.into()))
611645
};
612-
let cass_fut = CassFuture::make_raw(fut);
646+
let cass_fut = CassFuture::make_raw(
647+
fut,
648+
#[cfg(cpp_integration_testing)]
649+
None,
650+
);
613651
let flag = Box::new(false);
614652
let flag_ptr = Box::into_raw(flag);
615653

scylla-rust-wrapper/src/integration_testing.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ impl RecordingHistoryListener {
193193
}
194194
}
195195

196-
#[expect(unused)] // <- next commit removes this
197196
pub(crate) fn get_attempted_hosts(&self) -> Vec<SocketAddr> {
198197
self.attempted_hosts.lock().unwrap().clone()
199198
}

scylla-rust-wrapper/src/session.rs

Lines changed: 94 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,21 @@ impl CassSessionInner {
8181
let exec_profile_map = cluster.execution_profile_map().clone();
8282
let host_filter = cluster.build_host_filter();
8383

84-
CassFuture::make_raw(Self::connect_fut(
85-
session_opt,
86-
session_builder,
87-
exec_profile_map,
88-
host_filter,
89-
cluster
90-
.get_client_id()
91-
// If user did not set a client id, generate a random uuid v4.
92-
.unwrap_or_else(uuid::Uuid::new_v4),
93-
keyspace,
94-
))
84+
CassFuture::make_raw(
85+
Self::connect_fut(
86+
session_opt,
87+
session_builder,
88+
exec_profile_map,
89+
host_filter,
90+
cluster
91+
.get_client_id()
92+
// If user did not set a client id, generate a random uuid v4.
93+
.unwrap_or_else(uuid::Uuid::new_v4),
94+
keyspace,
95+
),
96+
#[cfg(cpp_integration_testing)]
97+
None,
98+
)
9599
}
96100

97101
async fn connect_fut(
@@ -255,10 +259,16 @@ pub unsafe extern "C" fn cass_session_execute_batch(
255259
};
256260

257261
match request_timeout_ms {
258-
Some(timeout_ms) => {
259-
CassFuture::make_raw(async move { request_with_timeout(timeout_ms, future).await })
260-
}
261-
None => CassFuture::make_raw(future),
262+
Some(timeout_ms) => CassFuture::make_raw(
263+
async move { request_with_timeout(timeout_ms, future).await },
264+
#[cfg(cpp_integration_testing)]
265+
None,
266+
),
267+
None => CassFuture::make_raw(
268+
future,
269+
#[cfg(cpp_integration_testing)]
270+
None,
271+
),
262272
}
263273
}
264274

@@ -297,19 +307,24 @@ pub unsafe extern "C" fn cass_session_execute(
297307
let mut statement = statement_opt.statement.clone();
298308

299309
#[cfg(cpp_integration_testing)]
300-
statement_opt.record_hosts.then(|| {
310+
let recording_listener = statement_opt.record_hosts.then(|| {
301311
let recording_listener =
302312
Arc::new(crate::integration_testing::RecordingHistoryListener::new());
303313
match statement {
304314
BoundStatement::Simple(ref mut unprepared) => {
305-
unprepared.query.set_history_listener(recording_listener);
315+
unprepared
316+
.query
317+
.set_history_listener(Arc::clone(&recording_listener) as _);
306318
}
307319
BoundStatement::Prepared(ref mut prepared) => {
320+
// It's extremely interesting to me that this `as _` cast is required
321+
// for the type checker to accept this code.
308322
Arc::make_mut(&mut prepared.statement)
309323
.statement
310-
.set_history_listener(recording_listener);
324+
.set_history_listener(Arc::clone(&recording_listener) as _);
311325
}
312326
};
327+
recording_listener
313328
});
314329

315330
let statement_exec_profile = statement_opt.exec_profile.clone();
@@ -426,10 +441,16 @@ pub unsafe extern "C" fn cass_session_execute(
426441
};
427442

428443
match request_timeout_ms {
429-
Some(timeout_ms) => {
430-
CassFuture::make_raw(async move { request_with_timeout(timeout_ms, future).await })
431-
}
432-
None => CassFuture::make_raw(future),
444+
Some(timeout_ms) => CassFuture::make_raw(
445+
async move { request_with_timeout(timeout_ms, future).await },
446+
#[cfg(cpp_integration_testing)]
447+
recording_listener,
448+
),
449+
None => CassFuture::make_raw(
450+
future,
451+
#[cfg(cpp_integration_testing)]
452+
recording_listener,
453+
),
433454
}
434455
}
435456

@@ -449,31 +470,35 @@ pub unsafe extern "C" fn cass_session_prepare_from_existing(
449470

450471
let statement = cass_statement.statement.clone();
451472

452-
CassFuture::make_raw(async move {
453-
let query = match &statement {
454-
BoundStatement::Simple(q) => q,
455-
BoundStatement::Prepared(ps) => {
456-
return Ok(CassResultValue::Prepared(ps.statement.clone()));
457-
}
458-
};
459-
460-
let session_guard = session.read().await;
461-
if session_guard.is_none() {
462-
return Err((
463-
CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE,
464-
"Session is not connected".msg(),
465-
));
466-
}
467-
let session = &session_guard.as_ref().unwrap().session;
468-
let prepared = session
469-
.prepare(query.query.clone())
470-
.await
471-
.map_err(|err| (err.to_cass_error(), err.msg()))?;
473+
CassFuture::make_raw(
474+
async move {
475+
let query = match &statement {
476+
BoundStatement::Simple(q) => q,
477+
BoundStatement::Prepared(ps) => {
478+
return Ok(CassResultValue::Prepared(Arc::clone(&ps.statement)));
479+
}
480+
};
472481

473-
Ok(CassResultValue::Prepared(Arc::new(
474-
CassPrepared::new_from_prepared_statement(prepared),
475-
)))
476-
})
482+
let session_guard = session.read().await;
483+
if session_guard.is_none() {
484+
return Err((
485+
CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE,
486+
"Session is not connected".msg(),
487+
));
488+
}
489+
let session = &session_guard.as_ref().unwrap().session;
490+
let prepared = session
491+
.prepare(query.query.clone())
492+
.await
493+
.map_err(|err| (err.to_cass_error(), err.msg()))?;
494+
495+
Ok(CassResultValue::Prepared(Arc::new(
496+
CassPrepared::new_from_prepared_statement(prepared),
497+
)))
498+
},
499+
#[cfg(cpp_integration_testing)]
500+
None,
501+
)
477502
}
478503

479504
#[unsafe(no_mangle)]
@@ -503,7 +528,7 @@ pub unsafe extern "C" fn cass_session_prepare_n(
503528
.unwrap_or_default();
504529
let query = Statement::new(query_str.to_string());
505530

506-
CassFuture::make_raw(async move {
531+
let fut = async move {
507532
let session_guard = cass_session.read().await;
508533
if session_guard.is_none() {
509534
return Err((
@@ -524,7 +549,13 @@ pub unsafe extern "C" fn cass_session_prepare_n(
524549
Ok(CassResultValue::Prepared(Arc::new(
525550
CassPrepared::new_from_prepared_statement(prepared),
526551
)))
527-
})
552+
};
553+
554+
CassFuture::make_raw(
555+
fut,
556+
#[cfg(cpp_integration_testing)]
557+
None,
558+
)
528559
}
529560

530561
#[unsafe(no_mangle)]
@@ -541,19 +572,23 @@ pub unsafe extern "C" fn cass_session_close(
541572
return ArcFFI::null();
542573
};
543574

544-
CassFuture::make_raw(async move {
545-
let mut session_guard = session_opt.write().await;
546-
if session_guard.is_none() {
547-
return Err((
548-
CassError::CASS_ERROR_LIB_UNABLE_TO_CLOSE,
549-
"Already closing or closed".msg(),
550-
));
551-
}
575+
CassFuture::make_raw(
576+
async move {
577+
let mut session_guard = session_opt.write().await;
578+
if session_guard.is_none() {
579+
return Err((
580+
CassError::CASS_ERROR_LIB_UNABLE_TO_CLOSE,
581+
"Already closing or closed".msg(),
582+
));
583+
}
552584

553-
*session_guard = None;
585+
*session_guard = None;
554586

555-
Ok(CassResultValue::Empty)
556-
})
587+
Ok(CassResultValue::Empty)
588+
},
589+
#[cfg(cpp_integration_testing)]
590+
None,
591+
)
557592
}
558593

559594
#[unsafe(no_mangle)]

0 commit comments

Comments
 (0)