Skip to content

Migrate Firecracker to use rust-vmm/event-manager #2604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/devices/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["The Chromium OS Authors"]
edition = "2018"

[dependencies]
event-manager = ">=0.2.1"
libc = ">=0.2.39"
timerfd = ">=1.0"
versionize = ">=0.1.6"
Expand All @@ -15,7 +16,6 @@ dumbo = { path = "../dumbo" }
logger = { path = "../logger" }
mmds = { path = "../mmds" }
net_gen = { path = "../net_gen" }
polly = { path = "../polly" }
rate_limiter = { path = "../rate_limiter" }
serde = { version = ">=1.0.27", features = ["derive"] }
snapshot = { path = "../snapshot" }
Expand Down
207 changes: 48 additions & 159 deletions src/devices/src/legacy/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use std::collections::VecDeque;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};

use event_manager::{EventOps, Events, MutEventSubscriber};

use logger::{error, warn, IncMetric, METRICS};
use polly::event_manager::{EventManager, Pollable, Subscriber};
use utils::epoll::{EpollEvent, EventSet};
use utils::epoll::EventSet;
use utils::eventfd::EventFd;

use crate::bus::BusDevice;
Expand Down Expand Up @@ -327,42 +328,31 @@ impl Serial {
.map_or(Ok(()), |buf_ready| buf_ready.write(1))
}

fn handle_ewouldblock(&self, ev_mgr: &mut EventManager) {
fn handle_ewouldblock(&self, ops: &mut EventOps) {
let buffer_ready_fd = self.buffer_ready_evt_fd();
let input_fd = self.serial_input_fd();
if input_fd < 0 || buffer_ready_fd < 0 {
error!("Serial does not have a configured input source.");
return;
}

if ev_mgr.subscriber(input_fd).is_err() {
match ev_mgr.subscriber(buffer_ready_fd) {
Ok(serial) => {
match ev_mgr.register(
input_fd,
EpollEvent::new(EventSet::IN, input_fd as u64),
serial.clone(),
) {
// Bytes might had come on the unregistered stdin. Try to consume any.
Ok(_) => self.signal_buffer_ready().unwrap_or_else(|err| {
error!(
"Could not signal that serial device buffer is ready: {:?}",
err
)
}),
Err(e) => {
error!(
"Could not register the serial input to the event manager: {:?}",
e
);
}
}
}
Err(e) => {
error!("Could not get the serial device subscriber: {:?}", e);
}
match ops.add(Events::new(&input_fd, EventSet::IN)) {
Err(event_manager::Error::FdAlreadyRegistered) => (),
Err(e) => {
error!(
"Could not register the serial input to the event manager: {:?}",
e
);
}
}
Ok(()) => {
// Bytes might had come on the unregistered stdin. Try to consume any.
self.signal_buffer_ready().unwrap_or_else(|err| {
error!(
"Could not signal that serial device buffer is ready: {:?}",
err
)
})
}
};
}
}

Expand Down Expand Up @@ -400,14 +390,14 @@ impl BusDevice for Serial {
}
}

impl Subscriber for Serial {
impl MutEventSubscriber for Serial {
/// Handle events on the serial input fd.
fn process(&mut self, event: &EpollEvent, ev_mgr: &mut EventManager) {
fn process(&mut self, event: Events, ops: &mut EventOps) {
#[inline]
fn unregister_source(ev_mgr: &mut EventManager, source: Pollable) {
match ev_mgr.unregister(source) {
fn unregister_source<T: AsRawFd>(ops: &mut EventOps, source: &T) {
match ops.remove(Events::new(source, EventSet::IN)) {
Ok(_) => (),
Err(_) => error!("Could not unregister the source: {}", source),
Err(_) => error!("Could not unregister source fd: {}", source.as_raw_fd()),
}
}

Expand All @@ -423,8 +413,8 @@ impl Subscriber for Serial {
Ok(_) => (),
Err(err) => {
error!("Detach serial device input source due to error in consuming the buffer ready event: {:?}", err);
unregister_source(ev_mgr, input_fd);
unregister_source(ev_mgr, buffer_ready_fd);
unregister_source(ops, &input_fd);
unregister_source(ops, &buffer_ready_fd);
return;
}
}
Expand All @@ -437,28 +427,28 @@ impl Subscriber for Serial {
Ok(count) => {
// Handle EOF if the event came from the input source.
if input_fd == event.fd() && count == 0 {
unregister_source(ev_mgr, input_fd);
unregister_source(ev_mgr, buffer_ready_fd);
unregister_source(ops, &input_fd);
unregister_source(ops, &buffer_ready_fd);
warn!("Detached the serial input due to peer close/error.");
}
}
Err(e) => {
match e.raw_os_error() {
Some(errno) if errno == libc::ENOBUFS => {
unregister_source(ev_mgr, input_fd);
unregister_source(ops, &input_fd);
}
Some(errno) if errno == libc::EWOULDBLOCK => {
self.handle_ewouldblock(ev_mgr);
self.handle_ewouldblock(ops);
}
Some(errno) if errno == libc::ENOTTY => {
error!("The serial device does not have the input source attached.");
unregister_source(ev_mgr, input_fd);
unregister_source(ev_mgr, buffer_ready_fd);
unregister_source(ops, &input_fd);
unregister_source(ops, &buffer_ready_fd);
}
Some(_) | None => {
// Unknown error, detach the serial input source.
unregister_source(ev_mgr, input_fd);
unregister_source(ev_mgr, buffer_ready_fd);
unregister_source(ops, &input_fd);
unregister_source(ops, &buffer_ready_fd);
warn!("Detached the serial input due to peer close/error.");
}
}
Expand All @@ -468,16 +458,16 @@ impl Subscriber for Serial {

/// Initial registration of pollable objects.
/// If serial input is present, register the serial input FD as readable.
fn interest_list(&self) -> Vec<EpollEvent> {
match &self.input {
Some(input) => match self.buffer_ready_evt.as_ref() {
Some(buf_ready_evt) => vec![
EpollEvent::new(EventSet::IN, input.as_raw_fd() as u64),
EpollEvent::new(EventSet::IN, buf_ready_evt.as_raw_fd() as u64),
],
None => vec![],
},
None => vec![],
fn init(&mut self, ops: &mut EventOps) {
if self.input.is_some() {
if let Some(buf_ready_evt) = self.buffer_ready_evt.as_ref() {
if let Err(e) = ops.add(Events::new(&self.serial_input_fd(), EventSet::IN)) {
error!("Failed to register serial input fd: {}", e);
}
if let Err(e) = ops.add(Events::new(buf_ready_evt, EventSet::IN)) {
error!("Failed to register serial buffer ready event: {}", e);
}
}
}
}
}
Expand All @@ -490,7 +480,7 @@ mod tests {
use std::os::unix::io::RawFd;
use std::sync::{Arc, Mutex};

use polly::event_manager::EventManager;
use event_manager::{EventManager, SubscriberOps};

struct SharedBufferInternal {
read_buf: Vec<u8>,
Expand Down Expand Up @@ -578,107 +568,6 @@ mod tests {

static RAW_INPUT_BUF: [u8; 3] = [b'a', b'b', b'c'];

#[test]
fn test_event_handling_no_in() {
let mut event_manager = EventManager::new().unwrap();

let intr_evt = EventFd::new(libc::EFD_NONBLOCK).unwrap();
let serial_out = SharedBuffer::new();

let mut serial = Serial::new_out(intr_evt, Box::new(serial_out));
// A serial without in does not have any events in the list.

assert!(serial.interest_list().is_empty());
// Even though there is no in or hangup, process should not panic. Call it to validate this.
let epoll_event = EpollEvent::new(EventSet::IN, 0);
serial.process(&epoll_event, &mut event_manager);
}

#[test]
fn test_event_handling_with_in() {
let mut event_manager = EventManager::new().unwrap();

let intr_evt = EventFd::new(libc::EFD_NONBLOCK).unwrap();
let serial_in_out = SharedBuffer::new();

let mut serial = Serial::new_in_out(
intr_evt.try_clone().unwrap(),
Box::new(serial_in_out.clone()),
Box::new(serial_in_out),
Some(EventFd::new(libc::EFD_NONBLOCK).unwrap()),
);
// Check that the interest list contains one event set.
assert_eq!(serial.interest_list().len(), 2);

// Process an invalid event type does not panic.
let invalid_event = EpollEvent::new(EventSet::OUT, intr_evt.as_raw_fd() as u64);
serial.process(&invalid_event, &mut event_manager);

// Process an event with a `RawFd` that does not correspond to `intr_evt` does not panic.
let invalid_event = EpollEvent::new(EventSet::IN, 0);
serial.process(&invalid_event, &mut event_manager);
}

#[test]
fn test_event_handling_ewould_block() {
let mut event_manager = EventManager::new().unwrap();

let intr_evt = EventFd::new(libc::EFD_NONBLOCK).unwrap();
let serial_in_out = SharedBuffer::new();

let mut serial = Serial::new_in_out(
intr_evt.try_clone().unwrap(),
Box::new(serial_in_out.clone()),
Box::new(serial_in_out.clone()),
Some(EventFd::new(libc::EFD_NONBLOCK).unwrap()),
);

// Process a spurious event, which will result in EWOULDBLOCK and unregister the serial input.
let spurious_ev = EpollEvent::new(EventSet::IN, serial_in_out.as_raw_fd() as u64);
serial.process(&spurious_ev, &mut event_manager);

// Try to modify the input event. Will result in Error since the serial input was unregistered.
event_manager
.modify(serial_in_out.as_raw_fd(), spurious_ev)
.unwrap_err();
}

#[test]
fn test_event_handling_err_and_hup() {
let mut event_manager = EventManager::new().unwrap();
let serial_in_out = SharedBuffer::new();
let mut serial = Serial::new_in_out(
EventFd::new(libc::EFD_NONBLOCK).unwrap(),
Box::new(serial_in_out.clone()),
Box::new(serial_in_out.clone()),
Some(EventFd::new(libc::EFD_NONBLOCK).unwrap()),
);

// Check that the interest list contains one event set.
let expected_medium_bytes = [b'a'; FIFO_SIZE];
assert_eq!(serial.interest_list().len(), 2);
{
let mut guard = serial_in_out.internal.lock().unwrap();
guard.read_buf.write_all(&expected_medium_bytes).unwrap();
}

assert!(serial.in_buffer.is_empty());
let err_hup_ev = EpollEvent::new(
EventSet::ERROR | EventSet::HANG_UP,
serial_in_out.as_raw_fd() as u64,
);

serial.process(&err_hup_ev, &mut event_manager);
assert_eq!(serial.in_buffer.len(), expected_medium_bytes.len());
serial.in_buffer.clear();

// Process one more round of `EventSet::HANG_UP`.
// Check that the processing does not bring anything new to the serial
// `in_buffer`.
serial.process(&err_hup_ev, &mut event_manager);
assert!(serial.in_buffer.is_empty());
}

#[test]
fn test_serial_output() {
let intr_evt = EventFd::new(libc::EFD_NONBLOCK).unwrap();
Expand Down Expand Up @@ -760,7 +649,7 @@ mod tests {

let mut evmgr = EventManager::new().unwrap();
let serial_wrap = Arc::new(Mutex::new(serial));
evmgr.add_subscriber(serial_wrap.clone()).unwrap();
let _id = evmgr.add_subscriber(serial_wrap.clone());

// Run the event handler which should drive serial input.
// There should be one event reported (which should have also handled serial input).
Expand Down
Loading