Skip to content
This repository was archived by the owner on Feb 9, 2022. It is now read-only.

Commit 9ba6d76

Browse files
Extend KubeClient to retrieve and verify the status of a resource (#5)
* All modules in k8s_openapi::api::core::v1 re-exported in test::prelude * verify_status and get_status added * Race conditions in KubeClient fixed * Missing semicolon added * Changelog updated
1 parent 32e81d5 commit 9ba6d76

File tree

5 files changed

+95
-32
lines changed

5 files changed

+95
-32
lines changed

CHANGELOG.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,20 @@
33
== 0.2.0 - unreleased
44

55
:4: https://github.com/stackabletech/integration-test-commons/pull/4[#4]
6+
:5: https://github.com/stackabletech/integration-test-commons/pull/5[#5]
67

78
=== Added
8-
* TestCluster merged from spark-operator-integration-tests and zookeeper-operator-integration-tests ({4})
9+
* TestCluster merged from spark-operator-integration-tests and zookeeper-operator-integration-tests ({4}).
10+
* `test::kube::KubeClient::verify_status` and `test::kube::KubeClient::get_status` added ({5}).
11+
* All modules in `k8s_openapi::api::core::v1` re-exported in `test::prelude` ({5}).
912

1013
=== Fixed
14+
* Race conditions in `test::kube::KubeClient` fixed ({5}).
1115

1216
=== Changed
1317
* Dependency `kube` set to version `0.56` ({4}).
18+
* `test::kube::Timeouts::verify_pod_condition` renamed to `verify_status` ({5}).
19+
1420

1521
== 0.1.0 - 2021-06-10
1622

src/operator/setup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ where
147147

148148
for pod in &created_pods {
149149
// TODO: switch to pod condition type enum from operator-rs?
150-
self.client.verify_pod_condition(pod, "Ready")
150+
self.client.verify_pod_condition(pod, "Ready");
151151
}
152152

153153
println!("Installation finished");

src/test/kube.rs

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,9 @@ impl TestKubeClient {
153153
})
154154
}
155155

156-
/// Verifies that the given pod condition becomes true within the specified timeout.
157-
pub fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) {
156+
/// Verifies that the given pod condition becomes true within the
157+
/// specified timeout.
158+
pub fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Pod {
158159
self.runtime.block_on(async {
159160
self.kube_client
160161
.verify_pod_condition(pod, condition_type)
@@ -163,6 +164,36 @@ impl TestKubeClient {
163164
})
164165
}
165166

167+
/// Verifies that the status of a resource fulfills the given
168+
/// predicate within the specified timeout.
169+
pub fn verify_status<K, P>(&self, resource: &K, predicate: P) -> K
170+
where
171+
P: Fn(&K) -> bool,
172+
K: Clone + Debug + DeserializeOwned + Resource,
173+
<K as Resource>::DynamicType: Default,
174+
{
175+
self.runtime.block_on(async {
176+
self.kube_client
177+
.verify_status(resource, predicate)
178+
.await
179+
.expect("Resource did not reach the expected status")
180+
})
181+
}
182+
183+
/// Returns the given resource with an updated status.
184+
pub fn get_status<K>(&self, resource: &K) -> K
185+
where
186+
K: DeserializeOwned + Resource,
187+
<K as Resource>::DynamicType: Default,
188+
{
189+
self.runtime.block_on(async {
190+
self.kube_client
191+
.get_status(resource)
192+
.await
193+
.expect("Status could not be retrieved")
194+
})
195+
}
196+
166197
/// Returns the logs for the given pod.
167198
pub fn get_logs(&self, pod: &Pod, params: &LogParams) -> Vec<String> {
168199
self.runtime.block_on(async {
@@ -197,7 +228,7 @@ pub struct Timeouts {
197228
pub create: Duration,
198229
pub delete: Duration,
199230
pub get_annotation: Duration,
200-
pub verify_pod_condition: Duration,
231+
pub verify_status: Duration,
201232
}
202233

203234
impl Default for Timeouts {
@@ -207,7 +238,7 @@ impl Default for Timeouts {
207238
create: Duration::from_secs(10),
208239
delete: Duration::from_secs(10),
209240
get_annotation: Duration::from_secs(10),
210-
verify_pod_condition: Duration::from_secs(30),
241+
verify_status: Duration::from_secs(30),
211242
}
212243
}
213244
}
@@ -248,6 +279,11 @@ impl KubeClient {
248279
let timeout_secs = self.timeouts.apply_crd.as_secs() as u32;
249280
let crds: Api<CustomResourceDefinition> = Api::all(self.client.clone());
250281

282+
let lp = ListParams::default()
283+
.fields(&format!("metadata.name={}", crd.name()))
284+
.timeout(timeout_secs);
285+
let mut stream = crds.watch(&lp, "0").await?.boxed();
286+
251287
let apply_params = PatchParams::apply("agent_integration_test").force();
252288
crds.patch(&crd.name(), &apply_params, &Patch::Apply(crd))
253289
.await?;
@@ -256,11 +292,6 @@ impl KubeClient {
256292
return Ok(());
257293
}
258294

259-
let lp = ListParams::default()
260-
.fields(&format!("metadata.name={}", crd.name()))
261-
.timeout(timeout_secs);
262-
let mut stream = crds.watch(&lp, "0").await?.boxed();
263-
264295
while let Some(status) = stream.try_next().await? {
265296
if let WatchEvent::Modified(crd) = status {
266297
if is_ready(&crd) {
@@ -320,14 +351,15 @@ impl KubeClient {
320351
let timeout_secs = self.timeouts.create.as_secs() as u32;
321352
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);
322353

323-
let resource = from_yaml(spec);
324-
api.create(&PostParams::default(), &resource).await?;
354+
let resource: K = from_yaml(spec);
325355

326356
let list_params = ListParams::default()
327357
.fields(&format!("metadata.name={}", resource.name()))
328358
.timeout(timeout_secs);
329359
let mut stream = api.watch(&list_params, "0").await?.boxed();
330360

361+
api.create(&PostParams::default(), &resource).await?;
362+
331363
while let Some(status) = stream.try_next().await? {
332364
if let WatchEvent::Added(resource) = status {
333365
return Ok(resource);
@@ -350,6 +382,11 @@ impl KubeClient {
350382
let timeout_secs = self.timeouts.delete.as_secs() as u32;
351383
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);
352384

385+
let list_params = ListParams::default()
386+
.fields(&format!("metadata.name={}", resource.name()))
387+
.timeout(timeout_secs);
388+
let mut stream = api.watch(&list_params, "0").await?.boxed();
389+
353390
let result = api
354391
.delete(&resource.name(), &DeleteParams::default())
355392
.await?;
@@ -358,11 +395,6 @@ impl KubeClient {
358395
return Ok(());
359396
}
360397

361-
let list_params = ListParams::default()
362-
.fields(&format!("metadata.name={}", resource.name()))
363-
.timeout(timeout_secs);
364-
let mut stream = api.watch(&list_params, "0").await?.boxed();
365-
366398
while let Some(status) = stream.try_next().await? {
367399
if let WatchEvent::Deleted(_) = status {
368400
return Ok(());
@@ -419,42 +451,62 @@ impl KubeClient {
419451
}
420452

421453
/// Verifies that the given pod condition becomes true within the specified timeout.
422-
pub async fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Result<()> {
454+
pub async fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Result<Pod> {
423455
let is_condition_true = |pod: &Pod| {
424456
get_pod_conditions(pod)
425457
.iter()
426458
.any(|condition| condition.type_ == condition_type && condition.status == "True")
427459
};
460+
self.verify_status(pod, is_condition_true).await
461+
}
428462

429-
let timeout_secs = self.timeouts.verify_pod_condition.as_secs() as u32;
430-
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
463+
/// Verifies that the status of a resource fulfills the given
464+
/// predicate within the specified timeout.
465+
pub async fn verify_status<K, P>(&self, resource: &K, predicate: P) -> Result<K>
466+
where
467+
P: Fn(&K) -> bool,
468+
K: Clone + Debug + DeserializeOwned + Resource,
469+
<K as Resource>::DynamicType: Default,
470+
{
471+
let timeout_secs = self.timeouts.verify_status.as_secs() as u32;
472+
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);
431473

432474
let lp = ListParams::default()
433-
.fields(&format!("metadata.name={}", pod.name()))
475+
.fields(&format!("metadata.name={}", resource.name()))
434476
.timeout(timeout_secs);
435-
let mut stream = pods.watch(&lp, "0").await?.boxed();
477+
let mut stream = api.watch(&lp, "0").await?.boxed();
436478

437-
let pod = pods.get_status(&pod.name()).await?;
479+
let resource = api.get_status(&resource.name()).await?;
438480

439-
if is_condition_true(&pod) {
440-
return Ok(());
481+
if predicate(&resource) {
482+
return Ok(resource);
441483
}
442484

443485
while let Some(status) = stream.try_next().await? {
444-
if let WatchEvent::Modified(pod) = status {
445-
if is_condition_true(&pod) {
446-
return Ok(());
486+
if let WatchEvent::Modified(resource) = status {
487+
if predicate(&resource) {
488+
return Ok(resource);
447489
}
448490
}
449491
}
450492

451493
Err(anyhow!(
452-
"Pod condition [{}] was not satisfied within {} seconds",
453-
condition_type,
494+
"Resource [{}] did not reach the expected status within {} seconds.",
495+
resource.name(),
454496
timeout_secs
455497
))
456498
}
457499

500+
/// Returns the given resource with an updated status.
501+
pub async fn get_status<K>(&self, resource: &K) -> Result<K>
502+
where
503+
K: DeserializeOwned + Resource,
504+
<K as Resource>::DynamicType: Default,
505+
{
506+
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);
507+
Ok(api.get_status(&resource.name()).await?)
508+
}
509+
458510
/// Returns the logs for the given pod.
459511
pub async fn get_logs(&self, pod: &Pod, params: &LogParams) -> Result<Vec<String>> {
460512
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);

src/test/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ pub use super::repository::*;
66
pub use super::temporary_resource::TemporaryResource;
77

88
pub use indoc::{formatdoc, indoc};
9-
pub use k8s_openapi::api::core::v1::{Node, Pod};
9+
pub use k8s_openapi::api::core::v1::*;
1010
pub use serde_json::json;
1111
pub use spectral::prelude::*;

src/test/temporary_resource.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ impl<'a, T: DeletableResource> TemporaryResource<'a, T> {
3131
let resource = client.create(spec);
3232
TemporaryResource { client, resource }
3333
}
34+
35+
/// Updates the resource so that it contains the current status.
36+
pub fn update(&mut self) {
37+
self.resource = self.client.get_status(&self.resource);
38+
}
3439
}
3540

3641
impl<'a, T: DeletableResource> Drop for TemporaryResource<'a, T> {

0 commit comments

Comments
 (0)