diff --git a/Cargo.lock b/Cargo.lock index d9749fe0955..1cd1c233922 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -188,7 +188,7 @@ dependencies = [ "hyper 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "lettre 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "lettre_email 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "license-exprs 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -277,7 +277,7 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "entities 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "pest 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "pest_derive 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -420,7 +420,7 @@ dependencies = [ "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1036,7 +1036,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "lazy_static" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1360,7 +1360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)", "openssl-sys 0.9.31 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2434,7 +2434,7 @@ dependencies = [ "checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c9e5e58fa1a4c3b915a561a78a22ee0cac6ab97dca2504428bc1cb074375f8d5" -"checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d" +"checksum lazy_static 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fb497c35d362b6a331cfd94956a07fc2c78a4604cdbee844a81170386b996dd3" "checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" "checksum lettre 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d440625fef1b29dd39cbe1c582476ca4295117adc940bad78a7b37514a18eef2" "checksum lettre_email 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6035d98658fcac4e2aab55bd2229685e6071fedafe185085149a88b51ed39821" diff --git a/migrations/2018-05-03-150523_create_jobs/down.sql b/migrations/2018-05-03-150523_create_jobs/down.sql new file mode 100644 index 00000000000..d7ff875a678 --- /dev/null +++ b/migrations/2018-05-03-150523_create_jobs/down.sql @@ -0,0 +1 @@ +DROP TABLE background_jobs; diff --git a/migrations/2018-05-03-150523_create_jobs/up.sql b/migrations/2018-05-03-150523_create_jobs/up.sql new file mode 100644 index 00000000000..b6ac3047c6f --- /dev/null +++ b/migrations/2018-05-03-150523_create_jobs/up.sql @@ -0,0 +1,8 @@ +CREATE TABLE background_jobs ( + id BIGSERIAL PRIMARY KEY, + job_type TEXT NOT NULL, + data JSONB NOT NULL, + retries INTEGER NOT NULL DEFAULT 0, + last_retry TIMESTAMP NOT NULL DEFAULT '1970-01-01', + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); diff --git a/src/background/job.rs b/src/background/job.rs new file mode 100644 index 00000000000..5d3f23144b6 --- /dev/null +++ b/src/background/job.rs @@ -0,0 +1,26 @@ +use diesel::PgConnection; +use serde::{Serialize, de::DeserializeOwned}; + +use super::storage; +use util::CargoResult; + +/// A background job, meant to be run asynchronously. +pub trait Job: Serialize + DeserializeOwned { + /// The environment this job is run with. This is a struct you define, + /// which should encapsulate things like database connection pools, any + /// configuration, and any other static data or shared resources. + type Environment; + + /// The key to use for storing this job, and looking it up later. + /// + /// Typically this is the name of your struct in `snake_case` + const JOB_TYPE: &'static str; + + /// Enqueue this job to be run at some point in the future. + fn enqueue(self, conn: &PgConnection) -> CargoResult<()> { + storage::enqueue_job(conn, self) + } + + /// The logic involved in actually performing this job. + fn perform(self, env: &Self::Environment) -> CargoResult<()>; +} diff --git a/src/background/mod.rs b/src/background/mod.rs new file mode 100644 index 00000000000..70b8d6dce79 --- /dev/null +++ b/src/background/mod.rs @@ -0,0 +1,8 @@ +mod job; +mod registry; +mod runner; +mod storage; + +pub use self::job::*; +pub use self::registry::Registry; +pub use self::runner::*; diff --git a/src/background/registry.rs b/src/background/registry.rs new file mode 100644 index 00000000000..9105298d06a --- /dev/null +++ b/src/background/registry.rs @@ -0,0 +1,39 @@ +use serde_json; +use std::collections::HashMap; + +use super::Job; +use util::CargoResult; + +#[doc(hidden)] +pub type PerformFn = Box CargoResult<()>>; + +#[derive(Default)] +#[allow(missing_debug_implementations)] // Can't derive debug +/// A registry of background jobs, used to map job types to concrege perform +/// functions at runtime. +pub struct Registry { + job_types: HashMap<&'static str, PerformFn>, +} + +impl Registry { + /// Create a new, empty registry + pub fn new() -> Self { + Registry { + job_types: Default::default(), + } + } + + /// Get the perform function for a given job type + pub fn get(&self, job_type: &str) -> Option<&PerformFn> { + self.job_types.get(job_type) + } + + /// Register a new background job. This will override any existing + /// registries with the same `JOB_TYPE`, if one exists. + pub fn register>(&mut self) { + self.job_types.insert(T::JOB_TYPE, Box::new(|data, env| { + let data = serde_json::from_value(data)?; + T::perform(data, env) + })); + } +} diff --git a/src/background/runner.rs b/src/background/runner.rs new file mode 100644 index 00000000000..726cfc26a79 --- /dev/null +++ b/src/background/runner.rs @@ -0,0 +1,200 @@ +#![allow(dead_code)] +use diesel::prelude::*; +use std::panic::{catch_unwind, UnwindSafe}; + +use super::storage; +use util::errors::*; + +fn get_single_job(conn: &PgConnection, f: F) -> CargoResult<()> +where + F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + UnwindSafe, +{ + conn.transaction::<_, Box, _>(|| { + let job = storage::find_next_unlocked_job(conn)?; + let job_id = job.id; + + let result = catch_unwind(|| f(job)) + .map_err(|_| internal("job panicked")) + .and_then(|r| r); + + if result.is_ok() { + storage::delete_successful_job(conn, job_id)?; + } else { + storage::update_failed_job(conn, job_id); + } + Ok(()) + }) +} + +#[cfg(test)] +mod tests { + use diesel::prelude::*; + + use schema::background_jobs::dsl::*; + use std::sync::{Mutex, MutexGuard, Barrier, Arc}; + use std::panic::AssertUnwindSafe; + use std::thread; + use super::*; + + #[test] + fn jobs_are_locked_when_fetched() { + let _guard = TestGuard::lock(); + + let conn = connection(); + let first_job_id = create_dummy_job(&conn).id; + let second_job_id = create_dummy_job(&conn).id; + let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let fetch_barrier2 = fetch_barrier.clone(); + let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let return_barrier2 = return_barrier.clone(); + + let t1 = thread::spawn(move || { + let _ = get_single_job(&connection(), |job| { + fetch_barrier.0.wait(); // Tell thread 2 it can lock its job + assert_eq!(first_job_id, job.id); + return_barrier.0.wait(); // Wait for thread 2 to lock its job + Ok(()) + }); + }); + + let t2 = thread::spawn(move || { + fetch_barrier2.0.wait(); // Wait until thread 1 locks its job + get_single_job(&connection(), |job| { + assert_eq!(second_job_id, job.id); + return_barrier2.0.wait(); // Tell thread 1 it can unlock its job + Ok(()) + }) + .unwrap(); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn jobs_are_deleted_when_successfully_run() { + let _guard = TestGuard::lock(); + + let conn = connection(); + create_dummy_job(&conn); + + get_single_job(&conn, |_| { + Ok(()) + }).unwrap(); + + let remaining_jobs = background_jobs.count() + .get_result(&conn); + assert_eq!(Ok(0), remaining_jobs); + } + + #[test] + fn failed_jobs_do_not_release_lock_before_updating_retry_time() { + let _guard = TestGuard::lock(); + create_dummy_job(&connection()); + let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let barrier2 = barrier.clone(); + + let t1 = thread::spawn(move || { + let _ = get_single_job(&connection(), |_| { + barrier.0.wait(); + // error so the job goes back into the queue + Err(human("nope")) + }); + }); + + let t2 = thread::spawn(move || { + let conn = connection(); + // Wait for the first thread to acquire the lock + barrier2.0.wait(); + // We are intentionally not using `get_single_job` here. + // `SKIP LOCKED` is intentionally omitted here, so we block until + // the lock on the first job is released. + // If there is any point where the row is unlocked, but the retry + // count is not updated, we will get a row here. + let available_jobs = background_jobs + .select(id) + .filter(retries.eq(0)) + .for_update() + .load::(&conn) + .unwrap(); + assert_eq!(0, available_jobs.len()); + + // Sanity check to make sure the job actually is there + let total_jobs_including_failed = background_jobs + .select(id) + .for_update() + .load::(&conn) + .unwrap(); + assert_eq!(1, total_jobs_including_failed.len()); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn panicking_in_jobs_updates_retry_counter() { + let _guard = TestGuard::lock(); + let conn = connection(); + let job_id = create_dummy_job(&conn).id; + + let t1 = thread::spawn(move || { + let _ = get_single_job(&connection(), |_| { + panic!() + }); + }); + + let _ = t1.join(); + + let tries = background_jobs + .find(job_id) + .select(retries) + .for_update() + .first::(&conn) + .unwrap(); + assert_eq!(1, tries); + } + + + lazy_static! { + // Since these tests deal with behavior concerning multiple connections + // running concurrently, they have to run outside of a transaction. + // Therefore we can't run more than one at a time. + // + // Rather than forcing the whole suite to be run with `--test-threads 1`, + // we just lock these tests instead. + static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); + } + + struct TestGuard<'a>(MutexGuard<'a, ()>); + + impl<'a> TestGuard<'a> { + fn lock() -> Self { + TestGuard(TEST_MUTEX.lock().unwrap()) + } + } + + impl<'a> Drop for TestGuard<'a> { + fn drop(&mut self) { + ::diesel::sql_query("TRUNCATE TABLE background_jobs") + .execute(&connection()) + .unwrap(); + } + } + + fn connection() -> PgConnection { + use dotenv; + + let database_url = + dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests"); + PgConnection::establish(&database_url).unwrap() + } + + fn create_dummy_job(conn: &PgConnection) -> storage::BackgroundJob { + ::diesel::insert_into(background_jobs) + .values((job_type.eq("Foo"), data.eq(json!(null)))) + .returning((id, job_type, data)) + .get_result(conn) + .unwrap() + } +} diff --git a/src/background/storage.rs b/src/background/storage.rs new file mode 100644 index 00000000000..1dbd9e47bb2 --- /dev/null +++ b/src/background/storage.rs @@ -0,0 +1,71 @@ +use diesel::dsl::now; +use diesel::prelude::*; +use diesel::{delete, insert_into, update}; +use diesel::sql_types::Integer; +use serde_json; + +use schema::background_jobs; +use super::Job; +use util::CargoResult; + +#[derive(Queryable, Identifiable, Debug, Clone)] +pub struct BackgroundJob { + pub id: i64, + pub job_type: String, + pub data: serde_json::Value, +} + +/// Enqueues a job to be run as soon as possible. +pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { + use schema::background_jobs::dsl::*; + + let job_data = serde_json::to_value(job)?; + insert_into(background_jobs) + .values(( + job_type.eq(T::JOB_TYPE), + data.eq(job_data), + )) + .execute(conn)?; + Ok(()) +} + +/// Finds the next job that is unlocked, and ready to be retried. If a row is +/// found, it will be locked. +pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult { + use schema::background_jobs::dsl::*; + use diesel::dsl::*; + use diesel::sql_types::Interval; + + sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer); + + background_jobs + .select((id, job_type, data)) + .filter(last_retry.lt(now - 1.minute().into_sql::() * power(2, retries))) + .order(id) + .for_update() + .skip_locked() + .first::(conn) +} + +/// Deletes a job that has successfully completed running +pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> { + use schema::background_jobs::dsl::*; + + delete(background_jobs.find(job_id)).execute(conn)?; + Ok(()) +} + +/// Marks that we just tried and failed to run a job. +/// +/// Ignores any database errors that may have occurred. If the DB has gone away, +/// we assume that just trying again with a new connection will succeed. +pub fn update_failed_job(conn: &PgConnection, job_id: i64) { + use schema::background_jobs::dsl::*; + + let _ = update(background_jobs.find(job_id)) + .set(( + retries.eq(retries + 1), + last_retry.eq(now), + )) + .execute(conn); +} diff --git a/src/lib.rs b/src/lib.rs index df2532c3205..6aa52076aaf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,10 @@ extern crate conduit_router; extern crate conduit_static; extern crate cookie; +#[cfg(test)] +#[macro_use] +extern crate lazy_static; + pub use self::uploaders::{Bomb, Uploader}; pub use app::App; pub use config::Config; @@ -62,6 +66,7 @@ use std::sync::Arc; use conduit_middleware::MiddlewareBuilder; pub mod app; +pub mod background; pub mod boot; pub mod config; pub mod db; diff --git a/src/schema.rs b/src/schema.rs index d5436fcb094..336cefaad16 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -53,6 +53,53 @@ table! { use diesel_full_text_search::{TsVector as Tsvector}; use diesel_ltree::Ltree; + /// Representation of the `background_jobs` table. + /// + /// (Automatically generated by Diesel.) + background_jobs (id) { + /// The `id` column of the `background_jobs` table. + /// + /// Its SQL type is `Int8`. + /// + /// (Automatically generated by Diesel.) + id -> Int8, + /// The `job_type` column of the `background_jobs` table. + /// + /// Its SQL type is `Text`. + /// + /// (Automatically generated by Diesel.) + job_type -> Text, + /// The `data` column of the `background_jobs` table. + /// + /// Its SQL type is `Jsonb`. + /// + /// (Automatically generated by Diesel.) + data -> Jsonb, + /// The `retries` column of the `background_jobs` table. + /// + /// Its SQL type is `Int4`. + /// + /// (Automatically generated by Diesel.) + retries -> Int4, + /// The `last_retry` column of the `background_jobs` table. + /// + /// Its SQL type is `Timestamp`. + /// + /// (Automatically generated by Diesel.) + last_retry -> Timestamp, + /// The `created_at` column of the `background_jobs` table. + /// + /// Its SQL type is `Timestamp`. + /// + /// (Automatically generated by Diesel.) + created_at -> Timestamp, + } +} + +table! { + use diesel::sql_types::*; + use diesel_full_text_search::{TsVector as Tsvector}; + /// Representation of the `badges` table. /// /// (Automatically generated by Diesel.) @@ -924,6 +971,7 @@ joinable!(versions -> crates (crate_id)); allow_tables_to_appear_in_same_query!( api_tokens, + background_jobs, badges, categories, crate_downloads,