Skip to content

Commit 970fa75

Browse files
committed
[WIP] Allow buffers to be attached to requests
1 parent 55a6b91 commit 970fa75

File tree

3 files changed

+258
-51
lines changed

3 files changed

+258
-51
lines changed

src/datatype.rs

Lines changed: 198 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
use std::mem;
4545
use std::borrow::Borrow;
4646
use std::os::raw::c_void;
47+
use std::rc::Rc;
48+
use std::sync::Arc;
4749

4850
use conv::ConvUtil;
4951

@@ -57,7 +59,8 @@ use raw::traits::*;
5759
/// Datatype traits
5860
pub mod traits {
5961
pub use super::{Equivalence, Datatype, AsDatatype, Collection, Pointer, PointerMut, Buffer,
60-
BufferMut, Partitioned, PartitionedBuffer, PartitionedBufferMut};
62+
BufferMut, Partitioned, PartitionedBuffer, PartitionedBufferMut,
63+
ReadBuffer, WriteBuffer};
6164
}
6265

6366
/// A system datatype, e.g. `MPI_FLOAT`
@@ -729,3 +732,197 @@ pub fn address_of<T>(x: &T) -> Address {
729732
}
730733
address
731734
}
735+
736+
/// Implements a buffer with a known length.
737+
/// TODO: rename this into something else
738+
pub trait Buffre {
739+
/// The type each element.
740+
type Item: Equivalence;
741+
742+
/// The type of of its anchor.
743+
type Anchor;
744+
745+
/// Convert the buffer into its anchor. Holding the anchor should be
746+
/// sufficient to keep the buffer alive.
747+
fn into_anchor(self) -> Self::Anchor;
748+
749+
/// Length of the buffer.
750+
fn len(&self) -> usize;
751+
752+
/// Length of the buffer in `Count`.
753+
/// TODO: turn this into a private helper function.
754+
fn count(&self) -> Count {
755+
self.len().value_as().expect("Length of buffer cannot be expressed as an MPI Count.")
756+
}
757+
758+
/// Get the MPI data type.
759+
/// TODO: turn this into a private helper function.
760+
fn as_datatype(&self) -> <Self::Item as Equivalence>::Out {
761+
Self::Item::equivalent_datatype()
762+
}
763+
}
764+
765+
impl<'a, T: Equivalence> Buffre for &'a T {
766+
type Item = T;
767+
type Anchor = ();
768+
fn into_anchor(self) -> Self::Anchor {}
769+
fn len(&self) -> usize { 1 }
770+
}
771+
772+
impl<'a, T: Equivalence> Buffre for &'a mut T {
773+
type Item = T;
774+
type Anchor = ();
775+
fn into_anchor(self) -> Self::Anchor {}
776+
fn len(&self) -> usize { 1 }
777+
}
778+
779+
impl<'a, T: Equivalence> Buffre for &'a [T] {
780+
type Item = T;
781+
type Anchor = ();
782+
fn into_anchor(self) -> Self::Anchor {}
783+
fn len(&self) -> usize { (*self).len() }
784+
}
785+
786+
impl<'a, T: Equivalence> Buffre for &'a mut [T] {
787+
type Item = T;
788+
type Anchor = ();
789+
fn into_anchor(self) -> Self::Anchor {}
790+
fn len(&self) -> usize { (**self).len() }
791+
}
792+
793+
impl<'a, T: Equivalence> Buffre for Box<T> {
794+
type Item = T;
795+
type Anchor = Self;
796+
fn into_anchor(self) -> Self::Anchor { self }
797+
fn len(&self) -> usize { 1 }
798+
}
799+
800+
impl<'a, T: Equivalence> Buffre for Box<[T]> {
801+
type Item = T;
802+
type Anchor = Self;
803+
fn into_anchor(self) -> Self::Anchor { self }
804+
fn len(&self) -> usize { (**self).len() }
805+
}
806+
807+
impl<'a, T: Equivalence> Buffre for Vec<T> {
808+
type Item = T;
809+
type Anchor = Self;
810+
fn into_anchor(self) -> Self::Anchor { self }
811+
fn len(&self) -> usize { self.len() }
812+
}
813+
814+
impl<'a, T: Equivalence> Buffre for Rc<T> {
815+
type Item = T;
816+
type Anchor = Self;
817+
fn into_anchor(self) -> Self::Anchor { self }
818+
fn len(&self) -> usize { 1 }
819+
}
820+
821+
impl<'a, T: Equivalence> Buffre for Rc<Vec<T>> {
822+
type Item = T;
823+
type Anchor = Self;
824+
fn into_anchor(self) -> Self::Anchor { self }
825+
fn len(&self) -> usize { (**self).len() }
826+
}
827+
828+
impl<'a, T: Equivalence> Buffre for Arc<T> {
829+
type Item = T;
830+
type Anchor = Self;
831+
fn into_anchor(self) -> Self::Anchor { self }
832+
fn len(&self) -> usize { 1 }
833+
}
834+
835+
impl<'a, T: Equivalence> Buffre for Arc<Vec<T>> {
836+
type Item = T;
837+
type Anchor = Self;
838+
fn into_anchor(self) -> Self::Anchor { self }
839+
fn len(&self) -> usize { (**self).len() }
840+
}
841+
842+
/// Implements buffers that can be read from.
843+
pub unsafe trait ReadBuffer: Buffre {
844+
/// Return a pointer to the beginning of the buffer.
845+
fn as_ptr(&self) -> *const Self::Item;
846+
847+
/// Return the pointer as a `*mut c_void`;
848+
/// TODO: turn this into a private helper function.
849+
fn pointer(&self) -> *const c_void {
850+
self.as_ptr() as *const _
851+
}
852+
}
853+
854+
unsafe impl<'a, T: Equivalence> ReadBuffer for &'a T {
855+
fn as_ptr(&self) -> *const Self::Item { *self }
856+
}
857+
858+
unsafe impl<'a, T: Equivalence> ReadBuffer for &'a mut T {
859+
fn as_ptr(&self) -> *const Self::Item { &**self }
860+
}
861+
862+
unsafe impl<'a, T: Equivalence> ReadBuffer for &'a [T] {
863+
fn as_ptr(&self) -> *const Self::Item { (*self).as_ptr() }
864+
}
865+
866+
unsafe impl<'a, T: Equivalence> ReadBuffer for &'a mut [T] {
867+
fn as_ptr(&self) -> *const Self::Item { (&**self).as_ptr() }
868+
}
869+
870+
unsafe impl<'a, T: Equivalence> ReadBuffer for Box<T> {
871+
fn as_ptr(&self) -> *const Self::Item { &**self }
872+
}
873+
874+
unsafe impl<'a, T: Equivalence> ReadBuffer for Box<[T]> {
875+
fn as_ptr(&self) -> *const Self::Item { (**self).as_ptr() }
876+
}
877+
878+
unsafe impl<'a, T: Equivalence> ReadBuffer for Vec<T> {
879+
fn as_ptr(&self) -> *const Self::Item { (**self).as_ptr() }
880+
}
881+
882+
unsafe impl<'a, T: Equivalence> ReadBuffer for Rc<T> {
883+
fn as_ptr(&self) -> *const Self::Item { &**self }
884+
}
885+
886+
unsafe impl<'a, T: Equivalence> ReadBuffer for Rc<Vec<T>> {
887+
fn as_ptr(&self) -> *const Self::Item { (**self).as_ptr() }
888+
}
889+
890+
unsafe impl<'a, T: Equivalence> ReadBuffer for Arc<T> {
891+
fn as_ptr(&self) -> *const Self::Item { &**self }
892+
}
893+
894+
unsafe impl<'a, T: Equivalence> ReadBuffer for Arc<Vec<T>> {
895+
fn as_ptr(&self) -> *const Self::Item { (**self).as_ptr() }
896+
}
897+
898+
/// Implements buffers that can be written to.
899+
pub unsafe trait WriteBuffer: Buffre {
900+
/// Return a pointer to the beginning of the buffer.
901+
fn as_mut_ptr(&mut self) -> *mut Self::Item;
902+
903+
/// Return the pointer as a `*mut c_void`;
904+
/// TODO: turn this into a private helper function.
905+
fn pointer_mut(&mut self) -> *mut c_void {
906+
self.as_mut_ptr() as *mut _
907+
}
908+
}
909+
910+
unsafe impl<'a, T: Equivalence> WriteBuffer for &'a mut T {
911+
fn as_mut_ptr(&mut self) -> *mut Self::Item { *self }
912+
}
913+
914+
unsafe impl<'a, T: Equivalence> WriteBuffer for &'a mut [T] {
915+
fn as_mut_ptr(&mut self) -> *mut Self::Item { (*self).as_mut_ptr() }
916+
}
917+
918+
unsafe impl<'a, T: Equivalence> WriteBuffer for Box<T> {
919+
fn as_mut_ptr(&mut self) -> *mut Self::Item { &mut **self }
920+
}
921+
922+
unsafe impl<'a, T: Equivalence> WriteBuffer for Box<[T]> {
923+
fn as_mut_ptr(&mut self) -> *mut Self::Item { (**self).as_mut_ptr() }
924+
}
925+
926+
unsafe impl<'a, T: Equivalence> WriteBuffer for Vec<T> {
927+
fn as_mut_ptr(&mut self) -> *mut Self::Item { (**self).as_mut_ptr() }
928+
}

src/point_to_point.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ pub unsafe trait Source: AsCommunicator {
244244
/// # Standard section(s)
245245
///
246246
/// 3.7.2
247-
fn immediate_receive_into_with_tag<'a, Sc, Buf: ?Sized>(&self,
248-
scope: Sc,
249-
buf: &'a mut Buf,
250-
tag: Tag)
251-
-> Request<'a, Sc>
252-
where Buf: 'a + BufferMut,
247+
fn immediate_receive_into_with_tag<'a, Sc, Buf>(&self,
248+
scope: Sc,
249+
mut buf: Buf,
250+
tag: Tag)
251+
-> Request<'a, Sc, Buf::Anchor>
252+
where Buf: 'a + WriteBuffer,
253253
Sc: Scope<'a>
254254
{
255255
let mut request: MPI_Request = unsafe { mem::uninitialized() };
@@ -261,7 +261,7 @@ pub unsafe trait Source: AsCommunicator {
261261
tag,
262262
self.as_communicator().as_raw(),
263263
&mut request);
264-
Request::from_raw(request, scope)
264+
Request::from_raw_with(request, scope, buf.into_anchor())
265265
}
266266
}
267267

@@ -275,11 +275,11 @@ pub unsafe trait Source: AsCommunicator {
275275
/// # Standard section(s)
276276
///
277277
/// 3.7.2
278-
fn immediate_receive_into<'a, Sc, Buf: ?Sized>(&self,
279-
scope: Sc,
280-
buf: &'a mut Buf)
281-
-> Request<'a, Sc>
282-
where Buf: 'a + BufferMut,
278+
fn immediate_receive_into<'a, Sc, Buf>(&self,
279+
scope: Sc,
280+
buf: Buf)
281+
-> Request<'a, Sc, Buf::Anchor>
282+
where Buf: 'a + WriteBuffer,
283283
Sc: Scope<'a>
284284
{
285285
self.immediate_receive_into_with_tag(scope, buf, unsafe_extern_static!(ffi::RSMPI_ANY_TAG))
@@ -1120,7 +1120,7 @@ pub struct ReceiveFuture<T> {
11201120
impl<T> ReceiveFuture<T> where T: Equivalence {
11211121
/// Wait for the receive operation to finish and return the received data.
11221122
pub fn get(self) -> (T, Status) {
1123-
let status = self.req.wait();
1123+
let (status, _) = self.req.wait();
11241124
if status.count(T::equivalent_datatype()) == 0 {
11251125
panic!("Received an empty message into a ReceiveFuture.");
11261126
}
@@ -1133,7 +1133,7 @@ impl<T> ReceiveFuture<T> where T: Equivalence {
11331133
/// is returned.
11341134
pub fn try(mut self) -> Result<(T, Status), Self> {
11351135
match self.req.test() {
1136-
Ok(status) => {
1136+
Ok((status, _)) => {
11371137
if status.count(T::equivalent_datatype()) == 0 {
11381138
panic!("Received an empty message into a ReceiveFuture.");
11391139
}

0 commit comments

Comments
 (0)