Skip to content
This repository was archived by the owner on Feb 9, 2022. It is now read-only.

Extend KubeClient to retrieve and verify the status of a resource #5

Merged
merged 6 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
== 0.2.0 - unreleased

:4: https://github.com/stackabletech/integration-test-commons/pull/4[#4]
:5: https://github.com/stackabletech/integration-test-commons/pull/5[#5]

=== Added
* TestCluster merged from spark-operator-integration-tests and zookeeper-operator-integration-tests ({4})
* TestCluster merged from spark-operator-integration-tests and zookeeper-operator-integration-tests ({4}).
* `test::kube::KubeClient::verify_status` and `test::kube::KubeClient::get_status` added ({5}).
* All modules in `k8s_openapi::api::core::v1` re-exported in `test::prelude` ({5}).

=== Fixed
* Race conditions in `test::kube::KubeClient` fixed ({5}).

=== Changed
* Dependency `kube` set to version `0.56` ({4}).
* `test::kube::Timeouts::verify_pod_condition` renamed to `verify_status` ({5}).


== 0.1.0 - 2021-06-10

Expand Down
2 changes: 1 addition & 1 deletion src/operator/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ where

for pod in &created_pods {
// TODO: switch to pod condition type enum from operator-rs?
self.client.verify_pod_condition(pod, "Ready")
self.client.verify_pod_condition(pod, "Ready");
}

println!("Installation finished");
Expand Down
110 changes: 81 additions & 29 deletions src/test/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ impl TestKubeClient {
})
}

/// Verifies that the given pod condition becomes true within the specified timeout.
pub fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) {
/// Verifies that the given pod condition becomes true within the
/// specified timeout.
pub fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Pod {
self.runtime.block_on(async {
self.kube_client
.verify_pod_condition(pod, condition_type)
Expand All @@ -163,6 +164,36 @@ impl TestKubeClient {
})
}

/// Verifies that the status of a resource fulfills the given
/// predicate within the specified timeout.
pub fn verify_status<K, P>(&self, resource: &K, predicate: P) -> K
where
P: Fn(&K) -> bool,
K: Clone + Debug + DeserializeOwned + Resource,
<K as Resource>::DynamicType: Default,
{
self.runtime.block_on(async {
self.kube_client
.verify_status(resource, predicate)
.await
.expect("Resource did not reach the expected status")
})
}

/// Returns the given resource with an updated status.
pub fn get_status<K>(&self, resource: &K) -> K
where
K: DeserializeOwned + Resource,
<K as Resource>::DynamicType: Default,
{
self.runtime.block_on(async {
self.kube_client
.get_status(resource)
.await
.expect("Status could not be retrieved")
})
}

/// Returns the logs for the given pod.
pub fn get_logs(&self, pod: &Pod, params: &LogParams) -> Vec<String> {
self.runtime.block_on(async {
Expand Down Expand Up @@ -197,7 +228,7 @@ pub struct Timeouts {
pub create: Duration,
pub delete: Duration,
pub get_annotation: Duration,
pub verify_pod_condition: Duration,
pub verify_status: Duration,
}

impl Default for Timeouts {
Expand All @@ -207,7 +238,7 @@ impl Default for Timeouts {
create: Duration::from_secs(10),
delete: Duration::from_secs(10),
get_annotation: Duration::from_secs(10),
verify_pod_condition: Duration::from_secs(30),
verify_status: Duration::from_secs(30),
}
}
}
Expand Down Expand Up @@ -248,6 +279,11 @@ impl KubeClient {
let timeout_secs = self.timeouts.apply_crd.as_secs() as u32;
let crds: Api<CustomResourceDefinition> = Api::all(self.client.clone());

let lp = ListParams::default()
.fields(&format!("metadata.name={}", crd.name()))
.timeout(timeout_secs);
let mut stream = crds.watch(&lp, "0").await?.boxed();

let apply_params = PatchParams::apply("agent_integration_test").force();
crds.patch(&crd.name(), &apply_params, &Patch::Apply(crd))
.await?;
Expand All @@ -256,11 +292,6 @@ impl KubeClient {
return Ok(());
}

let lp = ListParams::default()
.fields(&format!("metadata.name={}", crd.name()))
.timeout(timeout_secs);
let mut stream = crds.watch(&lp, "0").await?.boxed();

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(crd) = status {
if is_ready(&crd) {
Expand Down Expand Up @@ -320,14 +351,15 @@ impl KubeClient {
let timeout_secs = self.timeouts.create.as_secs() as u32;
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);

let resource = from_yaml(spec);
api.create(&PostParams::default(), &resource).await?;
let resource: K = from_yaml(spec);

let list_params = ListParams::default()
.fields(&format!("metadata.name={}", resource.name()))
.timeout(timeout_secs);
let mut stream = api.watch(&list_params, "0").await?.boxed();

api.create(&PostParams::default(), &resource).await?;

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Added(resource) = status {
return Ok(resource);
Expand All @@ -350,6 +382,11 @@ impl KubeClient {
let timeout_secs = self.timeouts.delete.as_secs() as u32;
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);

let list_params = ListParams::default()
.fields(&format!("metadata.name={}", resource.name()))
.timeout(timeout_secs);
let mut stream = api.watch(&list_params, "0").await?.boxed();

let result = api
.delete(&resource.name(), &DeleteParams::default())
.await?;
Expand All @@ -358,11 +395,6 @@ impl KubeClient {
return Ok(());
}

let list_params = ListParams::default()
.fields(&format!("metadata.name={}", resource.name()))
.timeout(timeout_secs);
let mut stream = api.watch(&list_params, "0").await?.boxed();

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Deleted(_) = status {
return Ok(());
Expand Down Expand Up @@ -419,42 +451,62 @@ impl KubeClient {
}

/// Verifies that the given pod condition becomes true within the specified timeout.
pub async fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Result<()> {
pub async fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Result<Pod> {
let is_condition_true = |pod: &Pod| {
get_pod_conditions(pod)
.iter()
.any(|condition| condition.type_ == condition_type && condition.status == "True")
};
self.verify_status(pod, is_condition_true).await
}

let timeout_secs = self.timeouts.verify_pod_condition.as_secs() as u32;
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
/// Verifies that the status of a resource fulfills the given
/// predicate within the specified timeout.
pub async fn verify_status<K, P>(&self, resource: &K, predicate: P) -> Result<K>
where
P: Fn(&K) -> bool,
K: Clone + Debug + DeserializeOwned + Resource,
<K as Resource>::DynamicType: Default,
{
let timeout_secs = self.timeouts.verify_status.as_secs() as u32;
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);

let lp = ListParams::default()
.fields(&format!("metadata.name={}", pod.name()))
.fields(&format!("metadata.name={}", resource.name()))
.timeout(timeout_secs);
let mut stream = pods.watch(&lp, "0").await?.boxed();
let mut stream = api.watch(&lp, "0").await?.boxed();

let pod = pods.get_status(&pod.name()).await?;
let resource = api.get_status(&resource.name()).await?;

if is_condition_true(&pod) {
return Ok(());
if predicate(&resource) {
return Ok(resource);
}

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(pod) = status {
if is_condition_true(&pod) {
return Ok(());
if let WatchEvent::Modified(resource) = status {
if predicate(&resource) {
return Ok(resource);
}
}
}

Err(anyhow!(
"Pod condition [{}] was not satisfied within {} seconds",
condition_type,
"Resource [{}] did not reach the expected status within {} seconds.",
resource.name(),
timeout_secs
))
}

/// Returns the given resource with an updated status.
pub async fn get_status<K>(&self, resource: &K) -> Result<K>
where
K: DeserializeOwned + Resource,
<K as Resource>::DynamicType: Default,
{
let api: Api<K> = Api::namespaced(self.client.clone(), &self.namespace);
Ok(api.get_status(&resource.name()).await?)
}

/// Returns the logs for the given pod.
pub async fn get_logs(&self, pod: &Pod, params: &LogParams) -> Result<Vec<String>> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
Expand Down
2 changes: 1 addition & 1 deletion src/test/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub use super::repository::*;
pub use super::temporary_resource::TemporaryResource;

pub use indoc::{formatdoc, indoc};
pub use k8s_openapi::api::core::v1::{Node, Pod};
pub use k8s_openapi::api::core::v1::*;
pub use serde_json::json;
pub use spectral::prelude::*;
5 changes: 5 additions & 0 deletions src/test/temporary_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ impl<'a, T: DeletableResource> TemporaryResource<'a, T> {
let resource = client.create(spec);
TemporaryResource { client, resource }
}

/// Updates the resource so that it contains the current status.
pub fn update(&mut self) {
self.resource = self.client.get_status(&self.resource);
}
}

impl<'a, T: DeletableResource> Drop for TemporaryResource<'a, T> {
Expand Down