Skip to content

Commit d09412a

Browse files
committed
Homed UDP sockets
1 parent d7b6fcb commit d09412a

File tree

2 files changed

+213
-24
lines changed

2 files changed

+213
-24
lines changed

src/libstd/rt/rtio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback;
2222
pub type IoFactoryObject = uvio::UvIoFactory;
2323
pub type RtioTcpStreamObject = uvio::UvTcpStream;
2424
pub type RtioTcpListenerObject = uvio::UvTcpListener;
25-
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
25+
pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket;
2626
pub type RtioTimerObject = uvio::UvTimer;
2727

2828
pub trait EventLoop {

src/libstd/rt/uv/uvio.rs

Lines changed: 212 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -239,27 +239,6 @@ impl UvIoFactory {
239239
pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
240240
match self { &UvIoFactory(ref mut ptr) => ptr }
241241
}
242-
243-
pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> {
244-
let mut watcher = UdpWatcher::new(self.uv_loop());
245-
match watcher.bind(addr) {
246-
Ok(_) => {
247-
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
248-
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
249-
}
250-
Err(uverr) => {
251-
let scheduler = Local::take::<Scheduler>();
252-
do scheduler.deschedule_running_task_and_then |_, task| {
253-
let task_cell = Cell::new(task);
254-
do watcher.close {
255-
let scheduler = Local::take::<Scheduler>();
256-
scheduler.resume_blocked_task_immediately(task_cell.take());
257-
}
258-
}
259-
Err(uv_error_to_io_error(uverr))
260-
}
261-
}
262-
}
263242
}
264243

265244
impl IoFactory for UvIoFactory {
@@ -331,6 +310,7 @@ impl IoFactory for UvIoFactory {
331310
}
332311
}
333312

313+
/*
334314
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
335315
let mut watcher = UdpWatcher::new(self.uv_loop());
336316
match watcher.bind(addr) {
@@ -348,6 +328,28 @@ impl IoFactory for UvIoFactory {
348328
}
349329
}
350330
}
331+
*/
332+
333+
pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> {
334+
let mut watcher = UdpWatcher::new(self.uv_loop());
335+
match watcher.bind(addr) {
336+
Ok(_) => {
337+
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
338+
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
339+
}
340+
Err(uverr) => {
341+
let scheduler = Local::take::<Scheduler>();
342+
do scheduler.deschedule_running_task_and_then |_, task| {
343+
let task_cell = Cell::new(task);
344+
do watcher.close {
345+
let scheduler = Local::take::<Scheduler>();
346+
scheduler.resume_blocked_task_immediately(task_cell.take());
347+
}
348+
}
349+
Err(uv_error_to_io_error(uverr))
350+
}
351+
}
352+
}
351353

352354
fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
353355
Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
@@ -640,18 +642,205 @@ impl Drop for HomedUvUdpSocket {
640642

641643
impl RtioSocket for HomedUvUdpSocket {
642644
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
645+
// first go home
643646
self.go_home();
644647
socket_name(Udp, self.watcher)
645648
}
646649
}
647650

651+
impl RtioUdpSocket for HomedUvUdpSocket {
652+
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
653+
// first go home
654+
self.go_home();
655+
656+
let result_cell = Cell::new_empty();
657+
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
658+
659+
let scheduler = Local::take::<Scheduler>();
660+
let buf_ptr: *&mut [u8] = &buf;
661+
do scheduler.deschedule_running_task_and_then |_, task| {
662+
rtdebug!("recvfrom: entered scheduler context");
663+
let task_cell = Cell::new(task);
664+
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
665+
do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
666+
let _ = flags; // /XXX add handling for partials?
667+
668+
watcher.recv_stop();
669+
670+
let result = match status {
671+
None => {
672+
assert!(nread >= 0);
673+
Ok((nread as uint, addr))
674+
}
675+
Some(err) => Err(uv_error_to_io_error(err)),
676+
};
677+
678+
unsafe { (*result_cell_ptr).put_back(result); }
679+
680+
let scheduler = Local::take::<Scheduler>();
681+
scheduler.resume_blocked_task_immediately(task_cell.take());
682+
}
683+
}
684+
685+
assert!(!result_cell.is_empty());
686+
return result_cell.take();
687+
}
688+
689+
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
690+
// first go home
691+
self.go_home();
692+
693+
let result_cell = Cell::new_empty();
694+
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
695+
let scheduler = Local::take::<Scheduler>();
696+
let buf_ptr: *&[u8] = &buf;
697+
do scheduler.deschedule_running_task_and_then |_, task| {
698+
let task_cell = Cell::new(task);
699+
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
700+
do self.watcher.send(buf, dst) |_watcher, status| {
701+
702+
let result = match status {
703+
None => Ok(()),
704+
Some(err) => Err(uv_error_to_io_error(err)),
705+
};
706+
707+
unsafe { (*result_cell_ptr).put_back(result); }
708+
709+
let scheduler = Local::take::<Scheduler>();
710+
scheduler.resume_blocked_task_immediately(task_cell.take());
711+
}
712+
}
713+
714+
assert!(!result_cell.is_empty());
715+
return result_cell.take();
716+
}
717+
718+
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
719+
// first go home
720+
self.go_home();
721+
722+
let r = unsafe {
723+
do multi.to_str().as_c_str |m_addr| {
724+
uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
725+
ptr::null(), uvll::UV_JOIN_GROUP)
726+
}
727+
};
728+
729+
match status_to_maybe_uv_error(self.watcher, r) {
730+
Some(err) => Err(uv_error_to_io_error(err)),
731+
None => Ok(())
732+
}
733+
}
734+
735+
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
736+
// first go home
737+
self.go_home();
738+
739+
let r = unsafe {
740+
do multi.to_str().as_c_str |m_addr| {
741+
uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
742+
ptr::null(), uvll::UV_LEAVE_GROUP)
743+
}
744+
};
745+
746+
match status_to_maybe_uv_error(self.watcher, r) {
747+
Some(err) => Err(uv_error_to_io_error(err)),
748+
None => Ok(())
749+
}
750+
}
751+
752+
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
753+
// first go home
754+
self.go_home();
755+
756+
let r = unsafe {
757+
uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int)
758+
};
759+
760+
match status_to_maybe_uv_error(self.watcher, r) {
761+
Some(err) => Err(uv_error_to_io_error(err)),
762+
None => Ok(())
763+
}
764+
}
765+
766+
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
767+
// first go home
768+
self.go_home();
769+
770+
let r = unsafe {
771+
uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int)
772+
};
773+
774+
match status_to_maybe_uv_error(self.watcher, r) {
775+
Some(err) => Err(uv_error_to_io_error(err)),
776+
None => Ok(())
777+
}
778+
}
779+
780+
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
781+
// first go home
782+
self.go_home();
783+
784+
let r = unsafe {
785+
uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int)
786+
};
787+
788+
match status_to_maybe_uv_error(self.watcher, r) {
789+
Some(err) => Err(uv_error_to_io_error(err)),
790+
None => Ok(())
791+
}
792+
}
793+
794+
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
795+
// first go home
796+
self.go_home();
797+
798+
let r = unsafe {
799+
uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
800+
};
801+
802+
match status_to_maybe_uv_error(self.watcher, r) {
803+
Some(err) => Err(uv_error_to_io_error(err)),
804+
None => Ok(())
805+
}
806+
}
807+
808+
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
809+
// first go home
810+
self.go_home();
811+
812+
let r = unsafe {
813+
uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int)
814+
};
815+
816+
match status_to_maybe_uv_error(self.watcher, r) {
817+
Some(err) => Err(uv_error_to_io_error(err)),
818+
None => Ok(())
819+
}
820+
}
821+
822+
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
823+
// first go home
824+
self.go_home();
825+
826+
let r = unsafe {
827+
uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int)
828+
};
829+
830+
match status_to_maybe_uv_error(self.watcher, r) {
831+
Some(err) => Err(uv_error_to_io_error(err)),
832+
None => Ok(())
833+
}
834+
}
835+
}
836+
648837
#[test]
649838
fn test_simple_homed_udp_io_bind_only() {
650839
do run_in_newsched_task {
651840
unsafe {
652841
let io = Local::unsafe_borrow::<IoFactoryObject>();
653842
let addr = next_test_ip4();
654-
let maybe_socket = (*io).homed_udp_bind(addr);
843+
let maybe_socket = (*io)./*homed_*/udp_bind(addr);
655844
assert!(maybe_socket.is_ok());
656845
}
657846
}
@@ -688,7 +877,7 @@ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() {
688877
let test_function: ~fn() = || {
689878
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
690879
let addr = next_test_ip4();
691-
let maybe_socket = unsafe { (*io).homed_udp_bind(addr) };
880+
let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) };
692881
// this socket is bound to this event loop
693882
assert!(maybe_socket.is_ok());
694883

0 commit comments

Comments
 (0)