Skip to content

Commit df05910

Browse files
forgot file
1 parent 11d657f commit df05910

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

src/adapter/src/coord/consistency.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Internal consistency checks that validate invariants of [`Coordinator`].
11+
12+
use super::Coordinator;
13+
use crate::catalog::consistency::CatalogInconsistencies;
14+
use mz_repr::GlobalId;
15+
use serde::Serialize;
16+
17+
#[derive(Debug, Default, Serialize, PartialEq)]
18+
pub struct CoordinatorInconsistencies {
19+
/// Inconsistencies found in the catalog.
20+
catalog_inconsistencies: CatalogInconsistencies,
21+
/// Inconsistencies found in read capabilities.
22+
read_capabilities: Vec<ReadCapabilitiesInconsistency>,
23+
}
24+
25+
impl CoordinatorInconsistencies {
26+
pub fn is_empty(&self) -> bool {
27+
self.catalog_inconsistencies.is_empty() && self.read_capabilities.is_empty()
28+
}
29+
}
30+
31+
impl Coordinator {
32+
/// Checks the [`Coordinator`] to make sure we're internally consistent.
33+
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
34+
let mut inconsistencies = CoordinatorInconsistencies::default();
35+
36+
if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
37+
inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
38+
}
39+
40+
if let Err(read_capabilities) = self.check_read_capabilities() {
41+
inconsistencies.read_capabilities = read_capabilities;
42+
}
43+
44+
if inconsistencies.is_empty() {
45+
Ok(())
46+
} else {
47+
Err(inconsistencies)
48+
}
49+
}
50+
51+
/// # Invariants:
52+
///
53+
/// * Read capabilities should reference known objects.
54+
///
55+
fn check_read_capabilities(&self) -> Result<(), Vec<ReadCapabilitiesInconsistency>> {
56+
let mut read_capabilities_inconsistencies = Vec::new();
57+
for (gid, _) in &self.storage_read_capabilities {
58+
if self.catalog().try_get_entry(gid).is_none() {
59+
read_capabilities_inconsistencies
60+
.push(ReadCapabilitiesInconsistency::Storage(gid.clone()));
61+
}
62+
}
63+
for (gid, _) in &self.compute_read_capabilities {
64+
if self.catalog().try_get_entry(gid).is_none() {
65+
read_capabilities_inconsistencies
66+
.push(ReadCapabilitiesInconsistency::Compute(gid.clone()));
67+
}
68+
}
69+
70+
if read_capabilities_inconsistencies.is_empty() {
71+
Ok(())
72+
} else {
73+
Err(read_capabilities_inconsistencies)
74+
}
75+
}
76+
}
77+
78+
#[derive(Debug, Serialize, PartialEq, Eq)]
79+
enum ReadCapabilitiesInconsistency {
80+
Storage(GlobalId),
81+
Compute(GlobalId),
82+
}

0 commit comments

Comments
 (0)