diff --git a/Cargo.lock b/Cargo.lock index 45cefc2..1509d34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,7 @@ dependencies = [ "serde_yaml", "spectral", "tokio", + "uuid", ] [[package]] @@ -1536,6 +1537,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 8d377e6..7b346bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,4 @@ serde_json = "1.0" serde_yaml = "0.8" spectral = "0.6" tokio = { version = "1.5", features = ["rt-multi-thread"] } +uuid = { version = "0.8", features = ["v4"] } diff --git a/tests/logs.rs b/tests/logs.rs new file mode 100644 index 0000000..209a74a --- /dev/null +++ b/tests/logs.rs @@ -0,0 +1,111 @@ +mod test; +use test::prelude::*; + +struct EchoService<'a> { + client: &'a TestKubeClient, + pod: TemporaryResource<'a, Pod>, +} + +impl<'a> EchoService<'a> { + pub fn new(client: &'a TestKubeClient, log_output: &[&str]) -> Self { + setup_repository(&client); + + /// Newline character for LOG_OUTPUT + /// + /// Source code: \\\\\\\\n + /// Pod spec: \\\\n + /// Systemd unit file: \\n + /// echo-service: \n + /// Journal: separate entries + const NEWLINE: &str = "\\\\\\\\n"; + + let pod = TemporaryResource::new( + &client, + &formatdoc! {r#" + apiVersion: v1 + kind: Pod + metadata: + name: agent-logs-integration-test-{id} + spec: + containers: + - name: echo-service + image: echo-service:1.0.0 + command: + - echo-service-1.0.0/start.sh + env: + - name: LOG_OUTPUT + value: "{log_output}" + tolerations: + - key: kubernetes.io/arch + operator: Equal + value: stackable-linux + "#, + id = Uuid::new_v4(), + log_output = log_output.join(NEWLINE) + }, + ); + + client.verify_pod_condition(&pod, "Ready"); + + EchoService { client, pod } + } + + pub fn get_logs(&self, params: &LogParams) -> Vec { + self.client.get_logs(&self.pod, params) + } +} + +#[test] +fn all_logs_should_be_retrievable() { + let client = TestKubeClient::new(); + + let log_output = vec!["line 1", "line 2", "line 3"]; + let echo_service = EchoService::new(&client, &log_output); + + let logs = echo_service.get_logs(&LogParams::default()); + assert_equals(&["line 1", "line 2", "line 3"], &logs); +} + +#[test] +fn the_tail_of_logs_should_be_retrievable() { + let client = TestKubeClient::new(); + + let log_output = vec!["line 1", "line 2", "line 3"]; + let echo_service = EchoService::new(&client, &log_output); + + let with_tail_lines = |tail_lines| LogParams { + tail_lines: Some(tail_lines), + ..Default::default() + }; + + let logs = echo_service.get_logs(&with_tail_lines(0)); + assert_that(&logs).is_empty(); + + let logs = echo_service.get_logs(&with_tail_lines(1)); + assert_equals(&["line 3"], &logs); + + let logs = echo_service.get_logs(&with_tail_lines(2)); + assert_equals(&["line 2", "line 3"], &logs); + + let logs = echo_service.get_logs(&with_tail_lines(3)); + assert_equals(&["line 1", "line 2", "line 3"], &logs); + + let logs = echo_service.get_logs(&with_tail_lines(4)); + assert_equals(&["line 1", "line 2", "line 3"], &logs); +} + +#[test] +fn non_ascii_characters_should_be_handled_correctly() { + let client = TestKubeClient::new(); + + let log_output = vec!["Spade: ♠", "Heart: ♥", "Diamond: ♦", "Club: ♣"]; + let echo_service = EchoService::new(&client, &log_output); + + let logs = echo_service.get_logs(&LogParams::default()); + assert_equals(&["Spade: ♠", "Heart: ♥", "Diamond: ♦", "Club: ♣"], &logs); +} + +fn assert_equals(expected: &[&str], actual: &[String]) { + assert_that(&actual.iter().map(String::as_ref).collect::>()) + .equals_iterator(&expected.iter()); +} diff --git a/tests/service.rs b/tests/service.rs index 677dcc4..1d030fb 100644 --- a/tests/service.rs +++ b/tests/service.rs @@ -5,47 +5,27 @@ use test::prelude::*; fn service_should_be_started_successfully() { let client = TestKubeClient::new(); - create_repository(&client); - - // Remove pod if it still exists from a previous test run. - if let Some(pod) = client.find::("agent-integration-test") { - client.delete(pod); - }; - - let pod = client.create(indoc! {r#" - apiVersion: v1 - kind: Pod - metadata: - name: agent-integration-test - spec: - containers: - - name: test-service - image: test-service:0.1.0 - command: - - test-service-0.1.0/start.sh - tolerations: - - key: kubernetes.io/arch - operator: Equal - value: stackable-linux - "#}); + setup_repository(&client); + + let pod = TemporaryResource::new( + &client, + indoc! {" + apiVersion: v1 + kind: Pod + metadata: + name: agent-service-integration-test + spec: + containers: + - name: noop-service + image: noop-service:1.0.0 + command: + - noop-service-1.0.0/start.sh + tolerations: + - key: kubernetes.io/arch + operator: Equal + value: stackable-linux + "}, + ); client.verify_pod_condition(&pod, "Ready"); - - client.delete(pod); -} - -fn create_repository(client: &TestKubeClient) { - client.apply_crd(&Repository::crd()); - - client.apply::(indoc!(" - apiVersion: stable.stackable.de/v1 - kind: Repository - metadata: - name: integration-test-repository - namespace: default - spec: - repo_type: StackableRepo - properties: - url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/6d784f1fb433123cb3b1d5cd7364a4553246d749/ - ")); } diff --git a/tests/test/kube.rs b/tests/test/kube.rs index c46a9b8..c153d7c 100644 --- a/tests/test/kube.rs +++ b/tests/test/kube.rs @@ -2,7 +2,7 @@ //! //! These clients simplify testing. -use anyhow::Result; +use anyhow::{anyhow, Result}; use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::{Node, NodeCondition, Pod, PodCondition, Taint}; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{ @@ -17,6 +17,8 @@ use serde::Serialize; use serde_json::Value; use tokio::runtime::Runtime; +pub use kube::api::LogParams; + /// A client for interacting with the Kubernetes API /// /// [`TestKubeClient`] is a synchronous version of [`KubeClient`] which @@ -48,7 +50,7 @@ impl TestKubeClient { /// separated: `key1=value1,key2=value2`. pub fn list_labeled(&self, label_selector: &str) -> ObjectList where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta, + K: Clone + DeserializeOwned + Meta, { self.runtime.block_on(async { self.kube_client @@ -71,7 +73,7 @@ impl TestKubeClient { /// Searches for a named resource. pub fn find(&self, name: &str) -> Option where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta, + K: Clone + DeserializeOwned + Meta, { self.runtime .block_on(async { self.kube_client.find::(name).await }) @@ -80,7 +82,7 @@ impl TestKubeClient { /// Applies a resource with the given YAML specification. pub fn apply(&self, spec: &str) -> K where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta + Serialize, + K: Clone + DeserializeOwned + Meta + Serialize, { self.runtime.block_on(async { self.kube_client @@ -93,7 +95,7 @@ impl TestKubeClient { /// Creates a resource with the given YAML specification. pub fn create(&self, spec: &str) -> K where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta + Serialize, + K: Clone + DeserializeOwned + Meta + Serialize, { self.runtime.block_on(async { self.kube_client @@ -106,7 +108,7 @@ impl TestKubeClient { /// Deletes the given resource. pub fn delete(&self, resource: K) where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta, + K: Clone + DeserializeOwned + Meta, { self.runtime.block_on(async { self.kube_client @@ -125,6 +127,16 @@ impl TestKubeClient { .expect("Pod condition could not be verified") }) } + + /// Returns the logs for the given pod. + pub fn get_logs(&self, pod: &Pod, params: &LogParams) -> Vec { + self.runtime.block_on(async { + self.kube_client + .get_logs(pod, params) + .await + .expect("Logs could not be retrieved") + }) + } } /// A client for interacting with the Kubernetes API @@ -153,7 +165,7 @@ impl KubeClient { /// `key1=value1,key2=value2`. pub async fn list_labeled(&self, label_selector: &str) -> Result> where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta, + K: Clone + DeserializeOwned + Meta, { let api: Api = Api::all(self.client.clone()); let lp = ListParams::default().labels(label_selector); @@ -161,7 +173,7 @@ impl KubeClient { } /// Applies the given custom resource definition and awaits the accepted status. - pub async fn apply_crd(&self, crd: &CustomResourceDefinition) -> anyhow::Result<()> { + pub async fn apply_crd(&self, crd: &CustomResourceDefinition) -> Result<()> { let is_ready = |crd: &CustomResourceDefinition| { get_crd_conditions(crd) .iter() @@ -193,7 +205,7 @@ impl KubeClient { } } - Err(anyhow::anyhow!( + Err(anyhow!( "Custom resource definition [{}] could not be applied within {} seconds.", crd.name(), timeout_secs @@ -203,7 +215,7 @@ impl KubeClient { /// Searches for a named resource. pub async fn find(&self, name: &str) -> Option where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta, + K: Clone + DeserializeOwned + Meta, { let api: Api = Api::namespaced(self.client.clone(), &self.namespace); api.get(name).await.ok() @@ -212,7 +224,7 @@ impl KubeClient { /// Applies a resource with the given YAML specification. pub async fn apply(&self, spec: &str) -> Result where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta + Serialize, + K: Clone + DeserializeOwned + Meta + Serialize, { let resource: K = from_yaml(spec); let apply_params = PatchParams::apply("agent_integration_test").force(); @@ -226,7 +238,7 @@ impl KubeClient { /// confirmation of the creation. pub async fn create(&self, spec: &str) -> Result where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta + Serialize, + K: Clone + DeserializeOwned + Meta + Serialize, { let timeout_secs = 10; let api: Api = Api::namespaced(self.client.clone(), &self.namespace); @@ -234,10 +246,10 @@ impl KubeClient { let resource = from_yaml(spec); api.create(&PostParams::default(), &resource).await?; - let lp = ListParams::default() + let list_params = ListParams::default() .fields(&format!("metadata.name={}", resource.name())) .timeout(timeout_secs); - let mut stream = api.watch(&lp, "0").await?.boxed(); + let mut stream = api.watch(&list_params, "0").await?.boxed(); while let Some(status) = stream.try_next().await? { if let WatchEvent::Added(resource) = status { @@ -245,7 +257,7 @@ impl KubeClient { } } - Err(anyhow::anyhow!( + Err(anyhow!( "Resource [{}] could not be created within {} seconds.", resource.name(), timeout_secs @@ -255,7 +267,7 @@ impl KubeClient { /// Deletes the given resource and awaits the confirmation of the deletion. pub async fn delete(&self, resource: K) -> Result<()> where - K: k8s_openapi::Resource + Clone + DeserializeOwned + Meta, + K: Clone + DeserializeOwned + Meta, { let timeout_secs = 10; let api: Api = Api::namespaced(self.client.clone(), &self.namespace); @@ -268,10 +280,10 @@ impl KubeClient { return Ok(()); } - let lp = ListParams::default() + let list_params = ListParams::default() .fields(&format!("metadata.name={}", resource.name())) .timeout(timeout_secs); - let mut stream = api.watch(&lp, "0").await?.boxed(); + let mut stream = api.watch(&list_params, "0").await?.boxed(); while let Some(status) = stream.try_next().await? { if let WatchEvent::Deleted(_) = status { @@ -279,7 +291,7 @@ impl KubeClient { } } - Err(anyhow::anyhow!( + Err(anyhow!( "Resource [{}] could not be deleted within {} seconds.", resource.name(), timeout_secs @@ -287,11 +299,7 @@ impl KubeClient { } /// Verifies that the given pod condition becomes true within 30 seconds. - pub async fn verify_pod_condition( - &self, - pod: &Pod, - condition_type: &str, - ) -> anyhow::Result<()> { + pub async fn verify_pod_condition(&self, pod: &Pod, condition_type: &str) -> Result<()> { let is_condition_true = |pod: &Pod| { get_pod_conditions(pod) .iter() @@ -318,12 +326,31 @@ impl KubeClient { } } - Err(anyhow::anyhow!( + Err(anyhow!( "Pod condition [{}] was not satisfied within {} seconds", condition_type, timeout_secs )) } + + /// Returns the logs for the given pod. + pub async fn get_logs(&self, pod: &Pod, params: &LogParams) -> Result> { + let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); + + let bytes = pods + .log_stream(&pod.name(), params) + .await? + .try_collect::>() + .await? + .concat(); + + let lines = String::from_utf8_lossy(&bytes) + .lines() + .map(|line| line.to_owned()) + .collect(); + + Ok(lines) + } } /// Deserializes the given JSON value into the desired type. diff --git a/tests/test/mod.rs b/tests/test/mod.rs index cd704cc..c979b5a 100644 --- a/tests/test/mod.rs +++ b/tests/test/mod.rs @@ -2,4 +2,5 @@ pub mod assertions; pub mod kube; pub mod prelude; -pub mod repository_spec; +pub mod repository; +pub mod temporary_resource; diff --git a/tests/test/prelude.rs b/tests/test/prelude.rs index 048673b..8f4040c 100644 --- a/tests/test/prelude.rs +++ b/tests/test/prelude.rs @@ -1,11 +1,10 @@ pub use super::assertions::*; -pub use super::kube::{ - from_value, get_crd_conditions, get_node_conditions, get_node_taints, get_pod_conditions, - TestKubeClient, -}; -pub use super::repository_spec::Repository; +pub use super::kube::*; +pub use super::repository::setup_repository; +pub use super::temporary_resource::TemporaryResource; -pub use indoc::indoc; +pub use indoc::{formatdoc, indoc}; pub use k8s_openapi::api::core::v1::{Node, Pod}; pub use serde_json::json; pub use spectral::prelude::*; +pub use uuid::Uuid; diff --git a/tests/test/repository_spec.rs b/tests/test/repository.rs similarity index 51% rename from tests/test/repository_spec.rs rename to tests/test/repository.rs index b0d036d..009b7ef 100644 --- a/tests/test/repository_spec.rs +++ b/tests/test/repository.rs @@ -1,3 +1,5 @@ +use super::prelude::TestKubeClient; +use indoc::indoc; use kube_derive::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -20,3 +22,19 @@ pub struct RepositorySpec { pub enum RepoType { StackableRepo, } + +pub fn setup_repository(client: &TestKubeClient) { + client.apply_crd(&Repository::crd()); + + client.apply::(indoc! {" + apiVersion: stable.stackable.de/v1 + kind: Repository + metadata: + name: integration-test-repository + namespace: default + spec: + repo_type: StackableRepo + properties: + url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/ + "}); +} diff --git a/tests/test/temporary_resource.rs b/tests/test/temporary_resource.rs new file mode 100644 index 0000000..df649b8 --- /dev/null +++ b/tests/test/temporary_resource.rs @@ -0,0 +1,40 @@ +use super::prelude::TestKubeClient; +use kube::api::Meta; +use serde::{de::DeserializeOwned, Serialize}; +use std::{mem, ops::Deref}; + +/// Trait combo which must be satisfied for a resource to be deletable +pub trait DeletableResource: Clone + Default + DeserializeOwned + Meta {} +impl DeletableResource for T {} + +/// A temporary resource which is deleted when it goes out of scope +pub struct TemporaryResource<'a, T: DeletableResource> { + client: &'a TestKubeClient, + resource: T, +} + +impl<'a, T: DeletableResource> TemporaryResource<'a, T> { + /// Creates a new temporary resource according to the given specification. + pub fn new(client: &'a TestKubeClient, spec: &str) -> Self + where + T: Serialize, + { + let resource = client.create(spec); + TemporaryResource { client, resource } + } +} + +impl<'a, T: DeletableResource> Drop for TemporaryResource<'a, T> { + fn drop(&mut self) { + let resource = mem::take(&mut self.resource); + self.client.delete(resource); + } +} + +impl<'a, T: DeletableResource> Deref for TemporaryResource<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.resource + } +}