From e3e7c46faedc80c0895548ba5634ed7446c15285 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 3 Mar 2023 11:09:13 +0100 Subject: [PATCH 01/11] introduce new archive index format based on SQLite --- Cargo.lock | 51 +++++++ Cargo.toml | 2 + src/storage/archive_index.rs | 285 +++++++++++++++++++++++++++++++---- src/storage/mod.rs | 16 +- 4 files changed, 315 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 085e11116..d4c29ea2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,6 +1495,7 @@ dependencies = [ "kuchiki", "log", "lol_html", + "lru", "memmap2", "mime", "mime_guess", @@ -1513,6 +1514,7 @@ dependencies = [ "rayon", "regex", "reqwest", + "rusqlite", "rustwide", "schemamama", "schemamama_postgres", @@ -1663,6 +1665,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "1.9.0" @@ -2552,6 +2560,15 @@ dependencies = [ "ahash 0.8.3", ] +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "headers" version = "0.3.8" @@ -2996,6 +3013,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libsqlite3-sys" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libssh2-sys" version = "0.2.23" @@ -3089,6 +3117,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "lru" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "mac" version = "0.1.1" @@ -4212,6 +4249,20 @@ dependencies = [ "xmlparser", ] +[[package]] +name = "rusqlite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.22" diff --git a/Cargo.toml b/Cargo.toml index ad2315b5b..6cb96dece 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,8 @@ bzip2 = "0.4.4" serde_cbor = "0.11.1" getrandom = "0.2.1" itertools = { version = "0.10.5", optional = true} +rusqlite = { version = "0.28.0", features = ["bundled"] } +lru = "0.9.0" # Async tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index f01488d46..cf106d132 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,14 +1,41 @@ use crate::error::Result; use crate::storage::{compression::CompressionAlgorithm, FileRange}; use anyhow::{bail, Context as _}; +use lru::LruCache; use memmap2::MmapOptions; +use rusqlite::{Connection, OpenFlags, OptionalExtension}; use serde::de::DeserializeSeed; use serde::de::{IgnoredAny, MapAccess, Visitor}; use serde::{Deserialize, Deserializer, Serialize}; -use std::collections::HashMap; -use std::fmt; -use std::path::Path; -use std::{fs, io}; +use std::num::NonZeroUsize; +use std::{ + cell::RefCell, + collections::HashMap, + fmt, fs, + fs::File, + io, + io::Read, + path::{Path, PathBuf}, +}; + +static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; + +thread_local! { + // local SQLite connection cache. + // `rusqlite::Connection` is not `Sync`, so we need to keep this by thread. + // Parallel connections to the same SQLite file are handled by SQLite itself. + // + // Alternative would be to have this cache global, but to prevent using + // the same connection from multiple threads at once. + // + // The better solution probably depends on the request pattern: are we + // typically having many requests to a small group of crates? + // Or are the requests more spread over many crates the there wouldn't be + // many conflicts on the connection? + static SQLITE_CONNECTIONS: RefCell> = RefCell::new( + LruCache::new(NonZeroUsize::new(32).unwrap()) + ); +} #[derive(Deserialize, Serialize)] pub(crate) struct FileInfo { @@ -30,33 +57,60 @@ struct Index { files: HashMap, } -pub(crate) fn create( +/// create an archive index based on a zipfile. +/// +/// Will delete the destination file if it already exists. +pub(crate) fn create>( zipfile: &mut R, - writer: &mut W, + destination: P, ) -> Result<()> { + if destination.as_ref().exists() { + fs::remove_file(&destination)?; + } + let mut archive = zip::ZipArchive::new(zipfile)?; - // get file locations - let mut files: HashMap = HashMap::with_capacity(archive.len()); + let conn = rusqlite::Connection::open(&destination)?; + conn.execute("BEGIN", ())?; + conn.execute( + " + CREATE TABLE files ( + id INTEGER PRIMARY KEY, + path TEXT UNIQUE, + start INTEGER, + end INTEGER, + compression INTEGER + ); + ", + (), + )?; + for i in 0..archive.len() { let zf = archive.by_index(i)?; - files.insert( - zf.name().to_string(), - FileInfo { - range: FileRange::new(zf.data_start(), zf.data_start() + zf.compressed_size() - 1), - compression: match zf.compression() { - zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2, + let compression_bzip: i32 = CompressionAlgorithm::Bzip2.into(); + + conn.execute( + "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", + ( + zf.name().to_string(), + zf.data_start(), + zf.data_start() + zf.compressed_size() - 1, + match zf.compression() { + zip::CompressionMethod::Bzip2 => compression_bzip, c => bail!("unsupported compression algorithm {} in zip-file", c), }, - }, - ); + ), + )?; } - serde_cbor::to_writer(writer, &Index { files }).context("serialization error") + conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; + conn.execute("END", ())?; + + Ok(()) } -pub(crate) fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { +fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { let mut deserializer = serde_cbor::Deserializer::from_slice(bytes); /// This visitor will just find the `files` element in the top-level map. @@ -155,18 +209,98 @@ pub(crate) fn find_in_slice(bytes: &[u8], search_for: &str) -> Result, F: Fn(&Connection) -> Result>( + path: P, + f: F, +) -> Result { + let path = path.as_ref().to_owned(); + SQLITE_CONNECTIONS.with(|connections| { + let mut connections = connections.borrow_mut(); + + if let Some(conn) = connections.get(&path) { + if conn.execute("SELECT 1", []).is_ok() { + return f(conn); + } + } + + let conn = Connection::open_with_flags( + &path, + OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, + )?; + + // we're using `get_or_insert` to save the second lookup receiving the + // reference into the cache, after having pushed the entry. + f(connections.get_or_insert(path, || conn)) + }) +} + +fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { + let mut stmt = conn.prepare( + " + SELECT start, end, compression + FROM files + WHERE path = ? + ", + )?; + + stmt.query_row((search_for,), |row| { + let compression: i32 = row.get(2)?; + Ok(FileInfo { + range: row.get(0)?..=row.get(1)?, + compression: compression.try_into().expect("invalid compression value"), + }) + }) + .optional() + .context("error fetching SQLite data") +} + +/// quick check if a file is a SQLite file. +/// +/// Helpful for the transition phase where an archive-index might be +/// old (CBOR) or new (SQLite) format. +/// +/// See +/// https://raw.githubusercontent.com/rusqlite/rusqlite/master/libsqlite3-sys/sqlite3/sqlite3.c +/// and +/// https://en.wikipedia.org/wiki/SQLite (-> _Magic number_) +/// ``` +/// > FORMAT DETAILS +/// > OFFSET SIZE DESCRIPTION +/// > 0 16 Header string: "SQLite format 3\000" +/// > [...] +fn is_sqlite_file>(archive_index_path: P) -> Result { + let mut f = File::open(archive_index_path)?; + + let mut buffer = [0; 16]; + match f.read_exact(&mut buffer) { + Ok(()) => Ok(buffer == SQLITE_FILE_HEADER), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(false), + Err(err) => Err(err.into()), + } +} + pub(crate) fn find_in_file>( archive_index_path: P, search_for: &str, ) -> Result> { - let file = fs::File::open(archive_index_path).context("could not open file")?; - let mmap = unsafe { - MmapOptions::new() - .map(&file) - .context("could not create memory map")? - }; - - find_in_slice(&mmap, search_for) + if is_sqlite_file(&archive_index_path)? { + with_sqlite_connection(archive_index_path, |connection| { + find_in_sqlite_index(connection, search_for) + }) + } else { + let file = fs::File::open(archive_index_path).context("could not open file")?; + let mmap = unsafe { + MmapOptions::new() + .map(&file) + .context("could not create memory map")? + }; + + find_in_slice(&mmap, search_for) + } } #[cfg(test)] @@ -175,8 +309,37 @@ mod tests { use std::io::Write; use zip::write::FileOptions; - #[test] - fn index_create_save_load() { + /// legacy archive index creation, only for testing that reading them still works + fn create_cbor_index( + zipfile: &mut R, + writer: &mut W, + ) -> Result<()> { + let mut archive = zip::ZipArchive::new(zipfile)?; + + // get file locations + let mut files: HashMap = HashMap::with_capacity(archive.len()); + for i in 0..archive.len() { + let zf = archive.by_index(i)?; + + files.insert( + zf.name().to_string(), + FileInfo { + range: FileRange::new( + zf.data_start(), + zf.data_start() + zf.compressed_size() - 1, + ), + compression: match zf.compression() { + zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2, + c => bail!("unsupported compression algorithm {} in zip-file", c), + }, + }, + ); + } + + serde_cbor::to_writer(writer, &Index { files }).context("serialization error") + } + + fn create_test_archive() -> fs::File { let mut tf = tempfile::tempfile().unwrap(); let objectcontent: Vec = (0..255).collect(); @@ -190,9 +353,14 @@ mod tests { .unwrap(); archive.write_all(&objectcontent).unwrap(); tf = archive.finish().unwrap(); + tf + } + #[test] + fn index_create_save_load_cbor_direct() { + let mut tf = create_test_archive(); let mut buf = Vec::new(); - create(&mut tf, &mut buf).unwrap(); + create_cbor_index(&mut tf, &mut buf).unwrap(); let fi = find_in_slice(&buf, "testfile1").unwrap().unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); @@ -200,4 +368,63 @@ mod tests { assert!(find_in_slice(&buf, "some_other_file").unwrap().is_none()); } + + #[test] + fn index_create_save_load_cbor_as_fallback() { + let mut tf = create_test_archive(); + let mut cbor_buf = Vec::new(); + create_cbor_index(&mut tf, &mut cbor_buf).unwrap(); + let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap(); + io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap(); + + assert!(!is_sqlite_file(&cbor_index_file).unwrap()); + + let fi = find_in_file(cbor_index_file.path(), "testfile1") + .unwrap() + .unwrap(); + assert_eq!(fi.range, FileRange::new(39, 459)); + assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); + + assert!(find_in_file(cbor_index_file.path(), "some_other_file") + .unwrap() + .is_none()); + } + + #[test] + fn index_create_save_load_sqlite() { + let mut tf = create_test_archive(); + + let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + create(&mut tf, &tempfile).unwrap(); + assert!(is_sqlite_file(&tempfile).unwrap()); + + let fi = find_in_file(&tempfile, "testfile1").unwrap().unwrap(); + + assert_eq!(fi.range, FileRange::new(39, 459)); + assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); + + assert!(find_in_file(&tempfile, "some_other_file") + .unwrap() + .is_none()); + } + + #[test] + fn is_sqlite_file_empty() { + let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + assert!(!is_sqlite_file(tempfile).unwrap()); + } + + #[test] + fn is_sqlite_file_other_content() { + let mut tempfile = tempfile::NamedTempFile::new().unwrap(); + tempfile.write_all(b"some_bytes").unwrap(); + assert!(!is_sqlite_file(tempfile.path()).unwrap()); + } + + #[test] + fn is_sqlite_file_specific_headers() { + let mut tempfile = tempfile::NamedTempFile::new().unwrap(); + tempfile.write_all(SQLITE_FILE_HEADER).unwrap(); + assert!(is_sqlite_file(tempfile.path()).unwrap()); + } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d838d6a80..d06a84590 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,6 +12,7 @@ use crate::{db::Pool, Config, Metrics}; use anyhow::{anyhow, ensure}; use chrono::{DateTime, Utc}; use path_slash::PathExt; +use std::io::BufReader; use std::{ collections::{HashMap, HashSet}, ffi::OsStr, @@ -361,24 +362,19 @@ impl Storage { } let mut zip_content = zip.finish()?.into_inner(); - let mut index_content = vec![]; - archive_index::create(&mut io::Cursor::new(&mut zip_content), &mut index_content)?; - let alg = CompressionAlgorithm::default(); - let compressed_index_content = compress(&index_content[..], alg)?; let remote_index_path = format!("{}.index", &archive_path); - // additionally store the index in the local cache, so it's directly available let local_index_path = self .config .local_archive_cache_path .join(&remote_index_path); - if local_index_path.exists() { - fs::remove_file(&local_index_path)?; - } fs::create_dir_all(local_index_path.parent().unwrap())?; - let mut local_index_file = fs::File::create(&local_index_path)?; - local_index_file.write_all(&index_content)?; + archive_index::create(&mut io::Cursor::new(&mut zip_content), &local_index_path)?; + + let alg = CompressionAlgorithm::default(); + let compressed_index_content = + compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?; self.store_inner( vec![ From 708c77f8765e2f2950b1677af4e06dcb12b7a6ac Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 3 Mar 2023 14:43:57 +0100 Subject: [PATCH 02/11] set SQLITE synchronous mode when writing --- src/storage/archive_index.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index cf106d132..039ef8dc6 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -71,6 +71,7 @@ pub(crate) fn create>( let mut archive = zip::ZipArchive::new(zipfile)?; let conn = rusqlite::Connection::open(&destination)?; + conn.execute("PRAGMA synchronous = FULL", ())?; conn.execute("BEGIN", ())?; conn.execute( " From 71af45fcf7f4c0edd49b1f2e68e5ec3a8fa8c64a Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Tue, 7 Mar 2023 17:32:33 +0100 Subject: [PATCH 03/11] run `VACUUM` after creating archive index SQLite db --- src/storage/archive_index.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index 039ef8dc6..a9b364af0 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -107,6 +107,7 @@ pub(crate) fn create>( conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; conn.execute("END", ())?; + conn.execute("VACUUM", ())?; Ok(()) } From cec41ce467dd54b5b2f729c30be92ab5fa43db23 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 11 Mar 2023 16:36:14 +0100 Subject: [PATCH 04/11] migrate to a real connection pool for the sqlite connections --- Cargo.lock | 134 ++++++++++++++++++++++++++++++++--- Cargo.toml | 3 +- src/config.rs | 4 ++ src/storage/archive_index.rs | 99 ++++++++------------------ src/storage/mod.rs | 20 +++++- src/storage/sqlite_pool.rs | 76 ++++++++++++++++++++ 6 files changed, 253 insertions(+), 83 deletions(-) create mode 100644 src/storage/sqlite_pool.rs diff --git a/Cargo.lock b/Cargo.lock index d4c29ea2e..4aa1dbff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -787,6 +787,12 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -836,6 +842,37 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "camino" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -1495,11 +1532,11 @@ dependencies = [ "kuchiki", "log", "lol_html", - "lru", "memmap2", "mime", "mime_guess", "mockito", + "moka", "num_cpus", "once_cell", "path-slash", @@ -1510,6 +1547,7 @@ dependencies = [ "prometheus", "r2d2", "r2d2_postgres", + "r2d2_sqlite", "rand 0.8.5", "rayon", "regex", @@ -1625,6 +1663,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "extend" version = "0.1.2" @@ -2465,6 +2512,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "globset" version = "0.4.10" @@ -3117,15 +3170,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "lru" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" -dependencies = [ - "hashbrown 0.13.2", -] - [[package]] name = "mac" version = "0.1.1" @@ -3287,6 +3331,28 @@ dependencies = [ "similar", ] +[[package]] +name = "moka" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b6446f16d504e3d575df79cabb11bfbe9f24b17e9562d964a815db7b28ae3ec" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "num_cpus", + "once_cell", + "parking_lot 0.12.1", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3981,6 +4047,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + [[package]] name = "quick-error" version = "2.0.1" @@ -4017,6 +4094,16 @@ dependencies = [ "r2d2", ] +[[package]] +name = "r2d2_sqlite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4f5d0337e99cd5cacd91ffc326c6cc9d8078def459df560c4f9bf9ba4a51034" +dependencies = [ + "r2d2", + "rusqlite", +] + [[package]] name = "rand" version = "0.7.3" @@ -4806,6 +4893,21 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -5012,6 +5114,12 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.38" @@ -5466,6 +5574,12 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 6cb96dece..c47e123d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ semver = { version = "1.0.4", features = ["serde"] } slug = "0.1.1" r2d2 = "0.8" r2d2_postgres = "0.18" +r2d2_sqlite = "0.21.0" url = { version = "2.1.1", features = ["serde"] } docsrs-metadata = { path = "crates/metadata" } anyhow = { version = "1.0.42", features = ["backtrace"]} @@ -70,7 +71,7 @@ serde_cbor = "0.11.1" getrandom = "0.2.1" itertools = { version = "0.10.5", optional = true} rusqlite = { version = "0.28.0", features = ["bundled"] } -lru = "0.9.0" +moka = { version ="0.10.0", default-features = false, features = ["sync"]} # Async tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } diff --git a/src/config.rs b/src/config.rs index c46b2ed41..0cf907626 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,6 +14,9 @@ pub struct Config { pub(crate) max_pool_size: u32, pub(crate) min_pool_idle: u32, + // local pool for sqlite connections + pub(crate) max_sqlite_pool_size: u64, + // Storage params pub(crate) storage_backend: StorageKind, @@ -136,6 +139,7 @@ impl Config { database_url: require_env("DOCSRS_DATABASE_URL")?, max_pool_size: env("DOCSRS_MAX_POOL_SIZE", 90)?, + max_sqlite_pool_size: env("DOCSRS_MAX_SQLITE_POOL_SIZE", 500)?, min_pool_idle: env("DOCSRS_MIN_POOL_IDLE", 10)?, storage_backend: env("DOCSRS_STORAGE_BACKEND", StorageKind::Database)?, diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index a9b364af0..fb1d5bf41 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,41 +1,16 @@ use crate::error::Result; use crate::storage::{compression::CompressionAlgorithm, FileRange}; use anyhow::{bail, Context as _}; -use lru::LruCache; use memmap2::MmapOptions; -use rusqlite::{Connection, OpenFlags, OptionalExtension}; +use rusqlite::{Connection, OptionalExtension}; use serde::de::DeserializeSeed; use serde::de::{IgnoredAny, MapAccess, Visitor}; use serde::{Deserialize, Deserializer, Serialize}; -use std::num::NonZeroUsize; -use std::{ - cell::RefCell, - collections::HashMap, - fmt, fs, - fs::File, - io, - io::Read, - path::{Path, PathBuf}, -}; +use std::{collections::HashMap, fmt, fs, fs::File, io, io::Read, path::Path}; -static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; +use super::sqlite_pool::SqliteConnectionPool; -thread_local! { - // local SQLite connection cache. - // `rusqlite::Connection` is not `Sync`, so we need to keep this by thread. - // Parallel connections to the same SQLite file are handled by SQLite itself. - // - // Alternative would be to have this cache global, but to prevent using - // the same connection from multiple threads at once. - // - // The better solution probably depends on the request pattern: are we - // typically having many requests to a small group of crates? - // Or are the requests more spread over many crates the there wouldn't be - // many conflicts on the connection? - static SQLITE_CONNECTIONS: RefCell> = RefCell::new( - LruCache::new(NonZeroUsize::new(32).unwrap()) - ); -} +static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; #[derive(Deserialize, Serialize)] pub(crate) struct FileInfo { @@ -211,35 +186,6 @@ fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { .deserialize(&mut deserializer)?) } -/// try to open an index file as SQLite -/// Uses a thread-local cache of open connections to the index files. -/// Will test the connection before returning it, and attempt to -/// reconnect if the test fails. -fn with_sqlite_connection, F: Fn(&Connection) -> Result>( - path: P, - f: F, -) -> Result { - let path = path.as_ref().to_owned(); - SQLITE_CONNECTIONS.with(|connections| { - let mut connections = connections.borrow_mut(); - - if let Some(conn) = connections.get(&path) { - if conn.execute("SELECT 1", []).is_ok() { - return f(conn); - } - } - - let conn = Connection::open_with_flags( - &path, - OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, - )?; - - // we're using `get_or_insert` to save the second lookup receiving the - // reference into the cache, after having pushed the entry. - f(connections.get_or_insert(path, || conn)) - }) -} - fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { let mut stmt = conn.prepare( " @@ -288,9 +234,10 @@ fn is_sqlite_file>(archive_index_path: P) -> Result { pub(crate) fn find_in_file>( archive_index_path: P, search_for: &str, + pool: &SqliteConnectionPool, ) -> Result> { if is_sqlite_file(&archive_index_path)? { - with_sqlite_connection(archive_index_path, |connection| { + pool.with_connection(archive_index_path, |connection| { find_in_sqlite_index(connection, search_for) }) } else { @@ -381,15 +328,23 @@ mod tests { assert!(!is_sqlite_file(&cbor_index_file).unwrap()); - let fi = find_in_file(cbor_index_file.path(), "testfile1") - .unwrap() - .unwrap(); + let fi = find_in_file( + cbor_index_file.path(), + "testfile1", + &SqliteConnectionPool::default(), + ) + .unwrap() + .unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - assert!(find_in_file(cbor_index_file.path(), "some_other_file") - .unwrap() - .is_none()); + assert!(find_in_file( + cbor_index_file.path(), + "some_other_file", + &SqliteConnectionPool::default(), + ) + .unwrap() + .is_none()); } #[test] @@ -400,14 +355,20 @@ mod tests { create(&mut tf, &tempfile).unwrap(); assert!(is_sqlite_file(&tempfile).unwrap()); - let fi = find_in_file(&tempfile, "testfile1").unwrap().unwrap(); + let fi = find_in_file(&tempfile, "testfile1", &SqliteConnectionPool::default()) + .unwrap() + .unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - assert!(find_in_file(&tempfile, "some_other_file") - .unwrap() - .is_none()); + assert!(find_in_file( + &tempfile, + "some_other_file", + &SqliteConnectionPool::default(), + ) + .unwrap() + .is_none()); } #[test] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d06a84590..a65c51d58 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2,10 +2,12 @@ mod archive_index; mod compression; mod database; mod s3; +mod sqlite_pool; pub use self::compression::{compress, decompress, CompressionAlgorithm, CompressionAlgorithms}; use self::database::DatabaseBackend; use self::s3::S3Backend; +use self::sqlite_pool::SqliteConnectionPool; use crate::error::Result; use crate::web::metrics::RenderingTimesRecorder; use crate::{db::Pool, Config, Metrics}; @@ -13,6 +15,7 @@ use anyhow::{anyhow, ensure}; use chrono::{DateTime, Utc}; use path_slash::PathExt; use std::io::BufReader; +use std::num::NonZeroU64; use std::{ collections::{HashMap, HashSet}, ffi::OsStr, @@ -113,6 +116,7 @@ enum StorageBackend { pub struct Storage { backend: StorageBackend, config: Arc, + sqlite_pool: SqliteConnectionPool, } impl Storage { @@ -123,6 +127,10 @@ impl Storage { runtime: Arc, ) -> Result { Ok(Storage { + sqlite_pool: SqliteConnectionPool::new( + NonZeroU64::new(config.max_sqlite_pool_size) + .ok_or_else(|| anyhow!("invalid sqlite pool size"))?, + ), config: config.clone(), backend: match config.storage_backend { StorageKind::Database => { @@ -229,7 +237,9 @@ impl Storage { pub(crate) fn exists_in_archive(&self, archive_path: &str, path: &str) -> Result { match self.get_index_filename(archive_path) { - Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)?.is_some()), + Ok(index_filename) => { + Ok(archive_index::find_in_file(index_filename, path, &self.sqlite_pool)?.is_some()) + } Err(err) => { if err.downcast_ref::().is_some() { Ok(false) @@ -306,8 +316,12 @@ impl Storage { if let Some(ref mut t) = fetch_time { t.step("find path in index"); } - let info = archive_index::find_in_file(self.get_index_filename(archive_path)?, path)? - .ok_or(PathNotFoundError)?; + let info = archive_index::find_in_file( + self.get_index_filename(archive_path)?, + path, + &self.sqlite_pool, + )? + .ok_or(PathNotFoundError)?; if let Some(t) = fetch_time { t.step("range request"); diff --git a/src/storage/sqlite_pool.rs b/src/storage/sqlite_pool.rs new file mode 100644 index 000000000..2bfe3ad3e --- /dev/null +++ b/src/storage/sqlite_pool.rs @@ -0,0 +1,76 @@ +use anyhow::Result; +use moka::sync::Cache; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{Connection, OpenFlags}; +use std::{ + num::NonZeroU64, + path::{Path, PathBuf}, + time::Duration, +}; + +static MAX_IDLE_TIME: Duration = Duration::from_secs(10 * 60); +static MAX_LIFE_TIME: Duration = Duration::from_secs(60 * 60); + +/// SQLite connection pool. +/// +/// Typical connection pools handle many connections to a single database, +/// while this one handles some connections to many databases. +/// +/// The more connections we keep alive, the more open files we have, +/// so you might need to tweak this limit based on the max open files +/// on your system. +/// +/// We open the databases in readonly mode. +/// We are using an additional connection pool per database to parallel requests +/// can be efficiently answered. Because of this the actual max connection count +/// might be higher than the given max_connections. +/// +/// We keep at minimum of one connection per database, for one hour. +/// Any additional connections will be dropped after 10 minutes of inactivity. +#[derive(Clone)] +pub(crate) struct SqliteConnectionPool { + pools: Cache>, +} + +impl Default for SqliteConnectionPool { + fn default() -> Self { + Self::new(NonZeroU64::new(10).unwrap()) + } +} + +impl SqliteConnectionPool { + pub(crate) fn new(max_connections: NonZeroU64) -> Self { + Self { + pools: Cache::builder() + .max_capacity(max_connections.get()) + .time_to_idle(MAX_LIFE_TIME) + .build(), + } + } + + pub(crate) fn with_connection, F: Fn(&Connection) -> Result>( + &self, + path: P, + f: F, + ) -> Result { + let path = path.as_ref().to_owned(); + + let pool = self + .pools + .entry(path.clone()) + .or_insert_with(|| { + let manager = SqliteConnectionManager::file(path) + .with_flags(OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX); + r2d2::Pool::builder() + .min_idle(Some(1)) + .max_lifetime(Some(MAX_LIFE_TIME)) + .idle_timeout(Some(MAX_IDLE_TIME)) + .max_size(10) + .build_unchecked(manager) + }) + .into_value(); + + let conn = pool.get()?; + f(&conn) + } +} From ff78fcbeb7adac6bb6304f3354edbc7532b9138e Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 11 Mar 2023 21:18:19 +0100 Subject: [PATCH 05/11] fix compile --- Cargo.lock | 2 +- src/storage/archive_index.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4aa1dbff8..808c658e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3342,7 +3342,7 @@ dependencies = [ "crossbeam-utils", "num_cpus", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "rustc_version", "scheduled-thread-pool", "skeptic", diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index fb1d5bf41..3f673c79a 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -64,7 +64,7 @@ pub(crate) fn create>( for i in 0..archive.len() { let zf = archive.by_index(i)?; - let compression_bzip: i32 = CompressionAlgorithm::Bzip2.into(); + let compression_bzip = CompressionAlgorithm::Bzip2 as i32; conn.execute( "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", From 49bf28aeebc3492a10d7d38b5f02a341f10264d3 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 11 Mar 2023 21:23:24 +0100 Subject: [PATCH 06/11] ignore code block as doctest --- src/storage/archive_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index 3f673c79a..c0d0c7bc1 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -215,7 +215,7 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result _Magic number_) -/// ``` +/// ```ignore /// > FORMAT DETAILS /// > OFFSET SIZE DESCRIPTION /// > 0 16 Header string: "SQLite format 3\000" From b694df6fe8aa8fca0a0c44bcaa4502b1cdcd23cb Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 16 Apr 2023 17:51:51 +0200 Subject: [PATCH 07/11] add more documentation for limits on sqlite connection pool --- Cargo.lock | 7 +++++-- src/storage/sqlite_pool.rs | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 808c658e5..e07a1006a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2603,6 +2603,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.6", +] [[package]] name = "hashbrown" @@ -4053,7 +4056,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" dependencies = [ - "bitflags", + "bitflags 1.3.2", "memchr", "unicase", ] @@ -4342,7 +4345,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a" dependencies = [ - "bitflags", + "bitflags 1.3.2", "fallible-iterator", "fallible-streaming-iterator", "hashlink", diff --git a/src/storage/sqlite_pool.rs b/src/storage/sqlite_pool.rs index 2bfe3ad3e..b230bdbcb 100644 --- a/src/storage/sqlite_pool.rs +++ b/src/storage/sqlite_pool.rs @@ -27,6 +27,9 @@ static MAX_LIFE_TIME: Duration = Duration::from_secs(60 * 60); /// /// We keep at minimum of one connection per database, for one hour. /// Any additional connections will be dropped after 10 minutes of inactivity. +/// +/// * `max_databases` is the maximum amout of databases in the pool. +/// * for each of the databases, we manage a pool of 1-10 connections #[derive(Clone)] pub(crate) struct SqliteConnectionPool { pools: Cache>, @@ -39,10 +42,10 @@ impl Default for SqliteConnectionPool { } impl SqliteConnectionPool { - pub(crate) fn new(max_connections: NonZeroU64) -> Self { + pub(crate) fn new(max_databases: NonZeroU64) -> Self { Self { pools: Cache::builder() - .max_capacity(max_connections.get()) + .max_capacity(max_databases.get()) .time_to_idle(MAX_LIFE_TIME) .build(), } From ce868cc16bf4d8b72167d2177aa02f7b836f9a29 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 16 Apr 2023 18:03:30 +0200 Subject: [PATCH 08/11] first test for pool --- src/storage/sqlite_pool.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/storage/sqlite_pool.rs b/src/storage/sqlite_pool.rs index b230bdbcb..e0e4c3104 100644 --- a/src/storage/sqlite_pool.rs +++ b/src/storage/sqlite_pool.rs @@ -77,3 +77,26 @@ impl SqliteConnectionPool { f(&conn) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_simple_connection() { + let filename = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + rusqlite::Connection::open(&filename).unwrap(); + + let pool = SqliteConnectionPool::new(NonZeroU64::new(1).unwrap()); + + pool.with_connection(&filename, |conn| { + conn.query_row("SELECT 1", [], |row| { + assert_eq!(row.get::<_, i32>(0).unwrap(), 1); + Ok(()) + }) + .unwrap(); + Ok(()) + }) + .unwrap(); + } +} From 3b6d40b571c22f40afb0ac148fd4c4e8229f9098 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 12 May 2023 19:40:15 +0200 Subject: [PATCH 09/11] don't clone string --- src/storage/archive_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index c0d0c7bc1..c593ec4da 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -69,7 +69,7 @@ pub(crate) fn create>( conn.execute( "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", ( - zf.name().to_string(), + zf.name(), zf.data_start(), zf.data_start() + zf.compressed_size() - 1, match zf.compression() { From d57889fe7ca1176e21d5347aed9450f01d344568 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 12 May 2023 20:00:51 +0200 Subject: [PATCH 10/11] replace panic with error when decoding compression from archive index --- src/storage/archive_index.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index c593ec4da..da927f13f 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -197,9 +197,16 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result Date: Fri, 12 May 2023 20:08:12 +0200 Subject: [PATCH 11/11] replace ```ignore with ```text for to prevent execution as doc-test --- src/storage/archive_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index da927f13f..1bb401743 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -222,7 +222,7 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result _Magic number_) -/// ```ignore +/// ```text /// > FORMAT DETAILS /// > OFFSET SIZE DESCRIPTION /// > 0 16 Header string: "SQLite format 3\000"