Skip to content

Commit 08a77e0

Browse files
committed
Rewrite task-comm-NN to use pipes
1 parent 60d682b commit 08a77e0

15 files changed

+151
-210
lines changed

src/libcore/pipes.rs

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -30,59 +30,6 @@ macro_rules! move {
3030
// places. Once there is unary move, it can be removed.
3131
fn move<T>(-x: T) -> T { x }
3232

33-
/**
34-
35-
Some thoughts about fixed buffers.
36-
37-
The idea is if a protocol is bounded, we will synthesize a record that
38-
has a field for each state. Each of these states contains a packet for
39-
the messages that are legal to be sent in that state. Then, instead of
40-
allocating, the send code just finds a pointer to the right field and
41-
uses that instead.
42-
43-
Unforunately, this makes things kind of tricky. We need to be able to
44-
find the buffer, which means we need to pass it around. This could
45-
either be associated with the (send|recv)_packet classes, or with the
46-
packet itself. We will also need some form of reference counting so we
47-
can track who has the responsibility of freeing the buffer.
48-
49-
We want to preserve the ability to do things like optimistic buffer
50-
re-use, and skipping over to a new buffer when necessary. What I mean
51-
is, suppose we had the typical stream protocol. It'd make sense to
52-
amortize allocation costs by allocating a buffer with say 16
53-
messages. When the sender gets to the end of the buffer, it could
54-
check if the receiver is done with the packet in slot 0. If so, it can
55-
just reuse that one, checking if the receiver is done with the next
56-
one in each case. If it is ever not done, it just allocates a new
57-
buffer and skips over to that.
58-
59-
Also, since protocols are in libcore, we have to do this in a way that
60-
maintains backwards compatibility.
61-
62-
buffer header and buffer. Cast as c_void when necessary.
63-
64-
===
65-
66-
Okay, here are some new ideas.
67-
68-
It'd be nice to keep the bounded/unbounded case as uniform as
69-
possible. It leads to less code duplication, and less things that can
70-
go sublty wrong. For the bounded case, we could either have a struct
71-
with a bunch of unique pointers to pre-allocated packets, or we could
72-
lay them out inline. Inline layout is better, if for no other reason
73-
than that we don't have to allocate each packet
74-
individually. Currently we pass unique packets around as unsafe
75-
pointers, but they are actually unique pointers. We should instead use
76-
real unsafe pointers. This makes freeing data and running destructors
77-
trickier though. Thus, we should allocate all packets in parter of a
78-
higher level buffer structure. Packets can maintain a pointer to their
79-
buffer, and this is the part that gets freed.
80-
81-
It might be helpful to have some idea of a semi-unique pointer (like
82-
being partially pregnant, also like an ARC).
83-
84-
*/
85-
8633
enum state {
8734
empty,
8835
full,
@@ -805,6 +752,12 @@ class port_set<T: send> : recv<T> {
805752
vec::push(self.ports, port)
806753
}
807754

755+
fn chan() -> chan<T> {
756+
let (ch, po) = stream();
757+
self.add(po);
758+
ch
759+
}
760+
808761
fn try_recv() -> option<T> {
809762
let mut result = none;
810763
while result == none && self.ports.len() > 0 {

src/test/run-pass/task-comm-0.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
11
use std;
22

3-
import comm;
4-
import comm::chan;
5-
import comm::send;
3+
import pipes;
4+
import pipes::chan;
5+
import pipes::port;
66
import task;
77

88
fn main() { test05(); }
99

1010
fn test05_start(ch : chan<int>) {
11-
log(error, ch);
12-
send(ch, 10);
11+
ch.send(10);
1312
#error("sent 10");
14-
send(ch, 20);
13+
ch.send(20);
1514
#error("sent 20");
16-
send(ch, 30);
15+
ch.send(30);
1716
#error("sent 30");
1817
}
1918

2019
fn test05() {
21-
let po = comm::port();
22-
let ch = comm::chan(po);
20+
let (ch, po) = pipes::stream();
2321
task::spawn(|| test05_start(ch) );
24-
let mut value = comm::recv(po);
22+
let mut value = po.recv();
2523
log(error, value);
26-
value = comm::recv(po);
24+
value = po.recv();
2725
log(error, value);
28-
value = comm::recv(po);
26+
value = po.recv();
2927
log(error, value);
3028
assert (value == 30);
3129
}

src/test/run-pass/task-comm-10.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
11
use std;
22
import task;
3-
import comm;
3+
import pipes;
44

5-
fn start(c: comm::chan<comm::chan<~str>>) {
6-
let p = comm::port();
7-
comm::send(c, comm::chan(p));
5+
fn start(c: pipes::chan<pipes::chan<~str>>) {
6+
let (ch, p) = pipes::stream();
7+
c.send(ch);
88

99
let mut a;
1010
let mut b;
11-
a = comm::recv(p);
11+
a = p.recv();
1212
assert a == ~"A";
1313
log(error, a);
14-
b = comm::recv(p);
14+
b = p.recv();
1515
assert b == ~"B";
1616
log(error, b);
1717
}
1818

1919
fn main() {
20-
let p = comm::port();
21-
let ch = comm::chan(p);
20+
let (ch, p) = pipes::stream();
2221
let child = task::spawn(|| start(ch) );
2322

24-
let c = comm::recv(p);
25-
comm::send(c, ~"A");
26-
comm::send(c, ~"B");
23+
let c = p.recv();
24+
c.send(~"A");
25+
c.send(~"B");
2726
task::yield();
2827
}

src/test/run-pass/task-comm-11.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
use std;
2-
import comm;
2+
import pipes;
33
import task;
44

5-
fn start(c: comm::chan<comm::chan<int>>) {
6-
let p: comm::port<int> = comm::port();
7-
comm::send(c, comm::chan(p));
5+
fn start(c: pipes::chan<pipes::chan<int>>) {
6+
let (ch, p) = pipes::stream();
7+
c.send(ch);
88
}
99

1010
fn main() {
11-
let p = comm::port();
12-
let ch = comm::chan(p);
11+
let (ch, p) = pipes::stream();
1312
let child = task::spawn(|| start(ch) );
14-
let c = comm::recv(p);
13+
let c = p.recv();
1514
}

src/test/run-pass/task-comm-13.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
use std;
22
import task;
3-
import comm;
4-
import comm::send;
3+
import pipes;
4+
import pipes::send;
55

6-
fn start(c: comm::chan<int>, start: int, number_of_messages: int) {
6+
fn start(c: pipes::chan<int>, start: int, number_of_messages: int) {
77
let mut i: int = 0;
8-
while i < number_of_messages { send(c, start + i); i += 1; }
8+
while i < number_of_messages { c.send(start + i); i += 1; }
99
}
1010

1111
fn main() {
1212
#debug("Check that we don't deadlock.");
13-
let p = comm::port::<int>();
14-
let ch = comm::chan(p);
13+
let (ch, p) = pipes::stream();
1514
task::try(|| start(ch, 0, 10) );
1615
#debug("Joined task");
1716
}

src/test/run-pass/task-comm-14.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
use std;
2-
import comm;
31
import task;
42

53
fn main() {
6-
let po = comm::port::<int>();
7-
let ch = comm::chan(po);
4+
let po = pipes::port_set();
85

96
// Spawn 10 tasks each sending us back one int.
107
let mut i = 10;
118
while (i > 0) {
129
log(debug, i);
10+
let (ch, p) = pipes::stream();
11+
po.add(p);
1312
task::spawn(|copy i| child(i, ch) );
1413
i = i - 1;
1514
}
@@ -18,17 +17,16 @@ fn main() {
1817
// anything back, so we deadlock here.
1918

2019
i = 10;
21-
let mut value = 0;
2220
while (i > 0) {
2321
log(debug, i);
24-
value = comm::recv(po);
22+
po.recv();
2523
i = i - 1;
2624
}
2725

2826
#debug("main thread exiting");
2927
}
3028

31-
fn child(x: int, ch: comm::chan<int>) {
29+
fn child(x: int, ch: pipes::chan<int>) {
3230
log(debug, x);
33-
comm::send(ch, copy x);
31+
ch.send(x);
3432
}

src/test/run-pass/task-comm-15.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
// xfail-win32
22
use std;
3-
import comm;
43
import task;
54

6-
fn start(c: comm::chan<int>, i0: int) {
5+
fn start(c: pipes::chan<int>, i0: int) {
76
let mut i = i0;
87
while i > 0 {
9-
comm::send(c, 0);
8+
c.send(0);
109
i = i - 1;
1110
}
1211
}
1312

1413
fn main() {
15-
let p = comm::port();
1614
// Spawn a task that sends us back messages. The parent task
1715
// is likely to terminate before the child completes, so from
1816
// the child's point of view the receiver may die. We should
1917
// drop messages on the floor in this case, and not crash!
20-
let ch = comm::chan(p);
21-
let child = task::spawn(|| start(ch, 10) );
22-
let c = comm::recv(p);
18+
let (ch, p) = pipes::stream();
19+
task::spawn(|| start(ch, 10));
20+
p.recv();
2321
}

src/test/run-pass/task-comm-16.rs

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,41 @@
11
// -*- rust -*-
22

33
use std;
4-
import comm;
5-
import comm::send;
6-
import comm::port;
7-
import comm::recv;
8-
import comm::chan;
4+
import pipes;
5+
import pipes::send;
6+
import pipes::port;
7+
import pipes::recv;
8+
import pipes::chan;
99

1010
// Tests of ports and channels on various types
1111
fn test_rec() {
1212
type r = {val0: int, val1: u8, val2: char};
1313

14-
let po = comm::port();
15-
let ch = chan(po);
14+
let (ch, po) = pipes::stream();
1615
let r0: r = {val0: 0, val1: 1u8, val2: '2'};
17-
send(ch, r0);
16+
ch.send(r0);
1817
let mut r1: r;
19-
r1 = recv(po);
18+
r1 = po.recv();
2019
assert (r1.val0 == 0);
2120
assert (r1.val1 == 1u8);
2221
assert (r1.val2 == '2');
2322
}
2423

2524
fn test_vec() {
26-
let po = port();
27-
let ch = chan(po);
25+
let (ch, po) = pipes::stream();
2826
let v0: ~[int] = ~[0, 1, 2];
29-
send(ch, v0);
30-
let v1 = recv(po);
27+
ch.send(v0);
28+
let v1 = po.recv();
3129
assert (v1[0] == 0);
3230
assert (v1[1] == 1);
3331
assert (v1[2] == 2);
3432
}
3533

3634
fn test_str() {
37-
let po = port();
38-
let ch = chan(po);
39-
let s0 = ~"test";
40-
send(ch, s0);
41-
let s1 = recv(po);
35+
let (ch, po) = pipes::stream();
36+
let s0 = "test";
37+
ch.send(s0);
38+
let s1 = po.recv();
4239
assert (s1[0] == 't' as u8);
4340
assert (s1[1] == 'e' as u8);
4441
assert (s1[2] == 's' as u8);
@@ -47,33 +44,36 @@ fn test_str() {
4744

4845
fn test_tag() {
4946
enum t { tag1, tag2(int), tag3(int, u8, char), }
50-
let po = port();
51-
let ch = chan(po);
52-
send(ch, tag1);
53-
send(ch, tag2(10));
54-
send(ch, tag3(10, 11u8, 'A'));
47+
let (ch, po) = pipes::stream();
48+
ch.send(tag1);
49+
ch.send(tag2(10));
50+
ch.send(tag3(10, 11u8, 'A'));
5551
let mut t1: t;
56-
t1 = recv(po);
52+
t1 = po.recv();
5753
assert (t1 == tag1);
58-
t1 = recv(po);
54+
t1 = po.recv();
5955
assert (t1 == tag2(10));
60-
t1 = recv(po);
56+
t1 = po.recv();
6157
assert (t1 == tag3(10, 11u8, 'A'));
6258
}
6359

6460
fn test_chan() {
65-
let po = port();
66-
let ch = chan(po);
67-
let po0 = port();
68-
let ch0 = chan(po0);
69-
send(ch, ch0);
70-
let ch1 = recv(po);
61+
let (ch, po) = pipes::stream();
62+
let (ch0, po0) = pipes::stream();
63+
ch.send(ch0);
64+
let ch1 = po.recv();
7165
// Does the transmitted channel still work?
7266

73-
send(ch1, 10);
67+
ch1.send(10);
7468
let mut i: int;
75-
i = recv(po0);
69+
i = po0.recv();
7670
assert (i == 10);
7771
}
7872

79-
fn main() { test_rec(); test_vec(); test_str(); test_tag(); test_chan(); }
73+
fn main() {
74+
test_rec();
75+
test_vec();
76+
test_str();
77+
test_tag();
78+
test_chan();
79+
}

0 commit comments

Comments
 (0)