Skip to content
This repository was archived by the owner on Dec 29, 2022. It is now read-only.

Commit 577ba2c

Browse files
committed
Sequentialise all changes to the VFS
Fixes #321
1 parent 9ca162f commit 577ba2c

File tree

3 files changed

+254
-6
lines changed

3 files changed

+254
-6
lines changed

src/actions/change_queue.rs

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
use vfs::{self, Vfs, Change};
12+
13+
use std::collections::HashMap;
14+
use std::mem;
15+
use std::path::{Path, PathBuf};
16+
use std::sync::{Arc, Mutex};
17+
use std::thread::{self, Thread};
18+
use std::time::Duration;
19+
20+
/// A queue for ensuring that changes happen in version order.
21+
///
22+
/// Assumptions:
23+
/// * Each change comes on its own thread
24+
/// * Version numbers are sequential
25+
/// * Version numbers are per-file and independent
26+
///
27+
/// If a version number is missed, then we will wait for a few seconds and then
28+
/// panic. The theory is that it is better to burn down the whole RLS than continue
29+
/// with inconsistent state.
30+
///
31+
/// This is necessary because the RLS spawns a new thread for every message it
32+
/// is sent. It is possible that a client sends multiple changes in order, but
33+
/// basically at the same time (this is especially common when 'undo'ing). The
34+
/// threads would then race to commit the changes to the VFS. This queue serialises
35+
/// those changes.
36+
37+
const CHANGE_QUEUE_TIMEOUT: u64 = 5;
38+
39+
// We need a newtype because of public in private warnings :-(
40+
pub struct ChangeQueue(ChangeQueue_);
41+
42+
impl ChangeQueue {
43+
pub fn new(vfs: Arc<Vfs>) -> ChangeQueue {
44+
ChangeQueue(ChangeQueue_::new(VfsSink(vfs)))
45+
}
46+
47+
pub fn on_changes(&self, file_name: &Path, version: u64, changes: &[Change]) -> Result<(), vfs::Error> {
48+
self.0.on_changes(file_name, version, changes)
49+
}
50+
}
51+
52+
struct ChangeQueue_<S = VfsSink> {
53+
sink: S,
54+
queues: Mutex<HashMap<PathBuf, Queue>>,
55+
}
56+
57+
impl<S: ChangeSink> ChangeQueue_<S> {
58+
fn new(sink: S) -> ChangeQueue_<S> {
59+
ChangeQueue_ {
60+
sink,
61+
queues: Mutex::new(HashMap::new()),
62+
}
63+
}
64+
65+
pub fn on_changes(&self, file_name: &Path, version: u64, changes: &[Change]) -> Result<(), vfs::Error> {
66+
trace!("on_changes: {} {:?}", version, changes);
67+
68+
// It is important to hold the lock on self.queues for the whole time
69+
// from checking the current version until we are done making the change.
70+
// However, we must drop the lock if our thread suspends so that other
71+
// threads can make the changes we're blocked waiting for.
72+
let mut queues = self.queues.lock().unwrap();
73+
let cur_version = {
74+
let queue = queues.entry(file_name.to_owned()).or_insert(Queue::new());
75+
queue.cur_version
76+
};
77+
if cur_version.is_some() && Some(version) != cur_version {
78+
trace!("Blocking change {}, current: {:?}", version, cur_version);
79+
{
80+
let mut queue = queues.get_mut(file_name).unwrap();
81+
queue.queued.insert(version, thread::current());
82+
}
83+
mem::drop(queues);
84+
thread::park_timeout(Duration::from_secs(CHANGE_QUEUE_TIMEOUT));
85+
86+
// We've been woken up - either because our change is next, or the timeout expired.
87+
queues = self.queues.lock().unwrap();
88+
}
89+
90+
let mut queue = queues.get_mut(file_name).unwrap();
91+
// Fail if we timed-out rather than our thread was unparked.
92+
if cur_version.is_some() && Some(version) != queue.cur_version {
93+
eprintln!("Missing change, aborting. Found {}, expected {:?}", version, queue.cur_version);
94+
S::on_error();
95+
}
96+
97+
queue.commit_change(version, changes, &self.sink)
98+
}
99+
}
100+
101+
struct Queue {
102+
cur_version: Option<u64>,
103+
queued: HashMap<u64, Thread>,
104+
}
105+
106+
impl Queue {
107+
fn new() -> Queue {
108+
Queue {
109+
cur_version: None,
110+
queued: HashMap::new(),
111+
}
112+
}
113+
114+
fn commit_change<S: ChangeSink>(&mut self, version: u64, changes: &[Change], sink: &S) -> Result<(), vfs::Error> {
115+
trace!("commit_change {}, current: {:?}", version, self.cur_version);
116+
117+
let result = sink.change(changes)?;
118+
let cur_version = version + 1;
119+
self.cur_version = Some(cur_version);
120+
121+
if let Some(t) = self.queued.remove(&cur_version) {
122+
trace!("waking up change {}", cur_version);
123+
t.unpark();
124+
}
125+
126+
Ok(result)
127+
}
128+
}
129+
130+
// A wrapper around the VFS so we can test easily.
131+
trait ChangeSink {
132+
// Make a change to the VFS (or mock the change).
133+
fn change(&self, changes: &[Change]) -> Result<(), vfs::Error>;
134+
// How to handle a sequencing error.
135+
fn on_error() -> !;
136+
}
137+
138+
struct VfsSink(Arc<Vfs>);
139+
140+
impl ChangeSink for VfsSink {
141+
fn change(&self, changes: &[Change]) -> Result<(), vfs::Error> {
142+
self.0.on_changes(changes)
143+
}
144+
145+
// Burn down the whole RLS.
146+
fn on_error() -> ! {
147+
::std::process::abort();
148+
}
149+
}
150+
151+
#[cfg(test)]
152+
mod test {
153+
use super::*;
154+
155+
use std::sync::{Mutex, Arc};
156+
use std::path::PathBuf;
157+
158+
struct TestSink {
159+
expected: Mutex<HashMap<PathBuf, u64>>,
160+
}
161+
162+
impl TestSink {
163+
fn new() -> TestSink {
164+
TestSink {
165+
expected: Mutex::new(HashMap::new()),
166+
}
167+
}
168+
}
169+
170+
impl ChangeSink for TestSink {
171+
fn change(&self, changes: &[Change]) -> Result<(), vfs::Error> {
172+
if let Change::AddFile { ref text, ref file } = changes[0] {
173+
let index: u64 = text.parse().unwrap();
174+
let mut expected = self.expected.lock().unwrap();
175+
let expected = expected.entry(file.to_owned()).or_insert(0);
176+
assert_eq!(*expected, index);
177+
*expected = index + 1;
178+
Ok(())
179+
} else {
180+
panic!();
181+
}
182+
}
183+
184+
fn on_error() -> ! {
185+
panic!();
186+
}
187+
}
188+
189+
#[test]
190+
fn test_queue_seq() {
191+
// Sanity test that checks we get the expected behaviour with no threading.
192+
193+
let queue = ChangeQueue_::new(TestSink::new());
194+
queue.on_changes(Path::new("foo"), 0, &[Change::AddFile { file: PathBuf::new(), text: "0".to_owned() }]).unwrap();
195+
queue.on_changes(Path::new("foo"), 1, &[Change::AddFile { file: PathBuf::new(), text: "1".to_owned() }]).unwrap();
196+
queue.on_changes(Path::new("foo"), 2, &[Change::AddFile { file: PathBuf::new(), text: "2".to_owned() }]).unwrap();
197+
queue.on_changes(Path::new("foo"), 3, &[Change::AddFile { file: PathBuf::new(), text: "3".to_owned() }]).unwrap();
198+
}
199+
200+
#[test]
201+
fn test_queue_concurrent() {
202+
let queue = Arc::new(ChangeQueue_::new(TestSink::new()));
203+
let mut threads = vec![];
204+
let foo = Path::new("foo");
205+
let bar = Path::new("bar");
206+
for i in 3..100 {
207+
let queue_ = queue.clone();
208+
threads.push(thread::spawn(move || {
209+
queue_.on_changes(foo, i, &[Change::AddFile { file: foo.to_owned(), text: (i-3).to_string() }]).unwrap();
210+
}));
211+
212+
let queue_ = queue.clone();
213+
threads.push(thread::spawn(move || {
214+
queue_.on_changes(bar, i, &[Change::AddFile { file: bar.to_owned(), text: (i-3).to_string() }]).unwrap();
215+
}));
216+
}
217+
218+
for h in threads {
219+
h.join().unwrap();
220+
}
221+
}
222+
223+
#[test]
224+
#[should_panic]
225+
fn test_queue_skip() {
226+
// Skip a change - the queue should panic rather than loop forever.
227+
let queue = Arc::new(ChangeQueue_::new(TestSink::new()));
228+
let mut threads = vec![];
229+
for i in 0..100 {
230+
if i == 45 {
231+
continue;
232+
}
233+
let queue = queue.clone();
234+
threads.push(thread::spawn(move || {
235+
queue.on_changes(Path::new("foo"), i, &[Change::AddFile { file: PathBuf::new(), text: i.to_string() }]).unwrap();
236+
}));
237+
}
238+
239+
for h in threads {
240+
h.join().unwrap();
241+
}
242+
}
243+
}

src/actions/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// except according to those terms.
1010

1111
mod compiler_message_parsing;
12+
mod change_queue;
1213
mod lsp_extensions;
1314

1415
use analysis::{AnalysisHost};
@@ -22,6 +23,7 @@ use span;
2223
use Span;
2324

2425
use build::*;
26+
use self::change_queue::ChangeQueue;
2527
use lsp_data::*;
2628
use server::{ResponseData, Output};
2729

@@ -40,6 +42,7 @@ type BuildResults = HashMap<PathBuf, Vec<RustDiagnostic>>;
4042
pub struct ActionHandler {
4143
analysis: Arc<AnalysisHost>,
4244
vfs: Arc<Vfs>,
45+
change_queue: ChangeQueue,
4346
build_queue: Arc<BuildQueue>,
4447
current_project: Mutex<Option<PathBuf>>,
4548
previous_build_results: Mutex<BuildResults>,
@@ -51,9 +54,10 @@ impl ActionHandler {
5154
vfs: Arc<Vfs>,
5255
build_queue: Arc<BuildQueue>) -> ActionHandler {
5356
ActionHandler {
54-
analysis: analysis,
55-
vfs: vfs,
56-
build_queue: build_queue,
57+
analysis,
58+
vfs: vfs.clone(),
59+
change_queue: ChangeQueue::new(vfs),
60+
build_queue,
5761
current_project: Mutex::new(None),
5862
previous_build_results: Mutex::new(HashMap::new()),
5963
fmt_config: Mutex::new(FmtConfig::default()),
@@ -183,7 +187,9 @@ impl ActionHandler {
183187
}
184188

185189
pub fn on_change(&self, change: DidChangeTextDocumentParams, out: &Output) {
190+
trace!("on_change: {:?}, thread: {}", change, unsafe { ::std::mem::transmute::<_, u64>(thread::current().id()) });
186191
let fname = parse_file_path(&change.text_document.uri).unwrap();
192+
let fname2 = fname.clone();
187193
let changes: Vec<Change> = change.content_changes.iter().map(move |i| {
188194
if let Some(range) = i.range {
189195
let range = ls_util::range_to_rls(range);
@@ -199,9 +205,7 @@ impl ActionHandler {
199205
}
200206
}
201207
}).collect();
202-
self.vfs.on_changes(&changes).unwrap();
203-
204-
trace!("on_change: {:?}", changes);
208+
self.change_queue.on_changes(&fname2, change.text_document.version, &changes).unwrap();
205209

206210
self.build_current_project(BuildPriority::Normal, out);
207211
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#![feature(rustc_private)]
1212
#![feature(concat_idents)]
1313
#![feature(command_envs)]
14+
#![feature(thread_id)]
1415

1516
extern crate cargo;
1617
#[macro_use]

0 commit comments

Comments
 (0)