Skip to content

Commit 0bc1ca4

Browse files
committed
auto merge of #8631 : anasazi/rust/homing-io, r=brson
libuv handles are tied to the event loop that created them. In order to perform IO, the handle must be on the thread with its home event loop. Thus, when as task wants to do IO it must first go to the IO handle's home event loop and pin itself to the corresponding scheduler while the IO action is in flight. Once the IO action completes, the task is unpinned and either returns to its home scheduler if it is a pinned task, or otherwise stays on the current scheduler. Making new blocking IO implementations (i.e. files) thread safe is rather simple. Add a home field to the IO handle's struct in uvio and implement the HomingIO trait. Wrap every IO call in the HomingIO.home_for_io method, which will take care of the scheduling. I'm not sure if this remains thread safe in the presence of asynchronous IO at the libuv level. If we decide to do that, then this set up should be revisited.
2 parents 3cd978f + 35e844f commit 0bc1ca4

File tree

8 files changed

+688
-348
lines changed

8 files changed

+688
-348
lines changed

src/libstd/rt/io/net/ip.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use option::{Option, None, Some};
1717

1818
type Port = u16;
1919

20-
#[deriving(Eq, TotalEq)]
20+
#[deriving(Eq, TotalEq, Clone)]
2121
pub enum IpAddr {
2222
Ipv4Addr(u8, u8, u8, u8),
2323
Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16)
@@ -62,7 +62,7 @@ impl ToStr for IpAddr {
6262
}
6363
}
6464

65-
#[deriving(Eq, TotalEq)]
65+
#[deriving(Eq, TotalEq, Clone)]
6666
pub struct SocketAddr {
6767
ip: IpAddr,
6868
port: Port,

src/libstd/rt/io/net/tcp.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ impl Writer for TcpStream {
8888
fn write(&mut self, buf: &[u8]) {
8989
match (**self).write(buf) {
9090
Ok(_) => (),
91-
Err(ioerr) => {
92-
io_error::cond.raise(ioerr);
93-
}
91+
Err(ioerr) => io_error::cond.raise(ioerr),
9492
}
9593
}
9694

@@ -129,9 +127,7 @@ impl TcpListener {
129127
impl Listener<TcpStream> for TcpListener {
130128
fn accept(&mut self) -> Option<TcpStream> {
131129
match (**self).accept() {
132-
Ok(s) => {
133-
Some(TcpStream::new(s))
134-
}
130+
Ok(s) => Some(TcpStream::new(s)),
135131
Err(ioerr) => {
136132
io_error::cond.raise(ioerr);
137133
return None;

src/libstd/rt/io/timer.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl Timer {
4141
}
4242

4343
impl RtioTimer for Timer {
44-
fn sleep(&self, msecs: u64) {
44+
fn sleep(&mut self, msecs: u64) {
4545
(**self).sleep(msecs);
4646
}
4747
}
@@ -50,15 +50,11 @@ impl RtioTimer for Timer {
5050
mod test {
5151
use super::*;
5252
use rt::test::*;
53-
use option::{Some, None};
5453
#[test]
5554
fn test_io_timer_sleep_simple() {
5655
do run_in_newsched_task {
5756
let timer = Timer::new();
58-
match timer {
59-
Some(t) => t.sleep(1),
60-
None => assert!(false)
61-
}
57+
do timer.map_move |mut t| { t.sleep(1) };
6258
}
6359
}
64-
}
60+
}

src/libstd/rt/rtio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,5 @@ pub trait RtioUdpSocket : RtioSocket {
9191
}
9292

9393
pub trait RtioTimer {
94-
fn sleep(&self, msecs: u64);
94+
fn sleep(&mut self, msecs: u64);
9595
}

src/libstd/rt/uv/net.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,10 @@ impl StreamWatcher {
190190

191191
extern fn close_cb(handle: *uvll::uv_stream_t) {
192192
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
193-
stream_watcher.get_watcher_data().close_cb.take_unwrap()();
193+
let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
194194
stream_watcher.drop_watcher_data();
195195
unsafe { free_handle(handle as *c_void) }
196+
cb();
196197
}
197198
}
198199
}
@@ -411,9 +412,10 @@ impl UdpWatcher {
411412

412413
extern fn close_cb(handle: *uvll::uv_udp_t) {
413414
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
414-
udp_watcher.get_watcher_data().close_cb.take_unwrap()();
415+
let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
415416
udp_watcher.drop_watcher_data();
416417
unsafe { free_handle(handle as *c_void) }
418+
cb();
417419
}
418420
}
419421
}

0 commit comments

Comments
 (0)