20
20
21
21
//! Muxing is the process of splitting a connection into multiple substreams.
22
22
//!
23
- //! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer`
24
- //! has ownership of a connection, lets you open and close substreams.
23
+ //! The main item of this module is the `StreamMuxer` trait. An implementation
24
+ //! of `StreamMuxer` has ownership of a connection, lets you open and close
25
+ //! substreams.
25
26
//!
26
- //! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this
27
- //! > is managed by the library's internals.
27
+ //! > **Note**: You normally don't need to use the methods of the `StreamMuxer`
28
+ //! > directly, as this
29
+ //! > is managed by the library's internals.
28
30
//!
29
- //! Each substream of a connection is an isolated stream of data. All the substreams are muxed
30
- //! together so that the data read from or written to each substream doesn't influence the other
31
- //! substreams.
31
+ //! Each substream of a connection is an isolated stream of data. All the
32
+ //! substreams are muxed together so that the data read from or written to each
33
+ //! substream doesn't influence the other substreams.
32
34
//!
33
- //! In the context of libp2p, each substream can use a different protocol. Contrary to opening a
34
- //! connection, opening a substream is almost free in terms of resources. This means that you
35
- //! shouldn't hesitate to rapidly open and close substreams, and to design protocols that don't
36
- //! require maintaining long-lived channels of communication.
35
+ //! In the context of libp2p, each substream can use a different protocol.
36
+ //! Contrary to opening a connection, opening a substream is almost free in
37
+ //! terms of resources. This means that you shouldn't hesitate to rapidly open
38
+ //! and close substreams, and to design protocols that don't require maintaining
39
+ //! long-lived channels of communication.
37
40
//!
38
- //! > **Example**: The Kademlia protocol opens a new substream for each request it wants to
39
- //! > perform. Multiple requests can be performed simultaneously by opening multiple
40
- //! > substreams, without having to worry about associating responses with the
41
- //! > right request.
41
+ //! > **Example**: The Kademlia protocol opens a new substream for each request
42
+ //! > it wants to
43
+ //! > perform. Multiple requests can be performed simultaneously by opening
44
+ //! > multiple
45
+ //! > substreams, without having to worry about associating responses with the
46
+ //! > right request.
42
47
//!
43
48
//! # Implementing a muxing protocol
44
49
//!
45
- //! In order to implement a muxing protocol, create an object that implements the `UpgradeInfo`,
46
- //! `InboundUpgrade` and `OutboundUpgrade` traits. See the `upgrade` module for more information.
47
- //! The `Output` associated type of the `InboundUpgrade` and `OutboundUpgrade` traits should be
48
- //! identical, and should be an object that implements the `StreamMuxer` trait.
50
+ //! In order to implement a muxing protocol, create an object that implements
51
+ //! the `UpgradeInfo`, `InboundUpgrade` and `OutboundUpgrade` traits. See the
52
+ //! `upgrade` module for more information. The `Output` associated type of the
53
+ //! `InboundUpgrade` and `OutboundUpgrade` traits should be identical, and
54
+ //! should be an object that implements the `StreamMuxer` trait.
49
55
//!
50
- //! The upgrade process will take ownership of the connection, which makes it possible for the
51
- //! implementation of `StreamMuxer` to control everything that happens on the wire.
56
+ //! The upgrade process will take ownership of the connection, which makes it
57
+ //! possible for the implementation of `StreamMuxer` to control everything that
58
+ //! happens on the wire.
59
+
60
+ use std:: { future:: Future , pin:: Pin } ;
52
61
53
- use futures:: { task:: Context , task:: Poll , AsyncRead , AsyncWrite } ;
62
+ use futures:: {
63
+ task:: { Context , Poll } ,
64
+ AsyncRead , AsyncWrite ,
65
+ } ;
54
66
use multiaddr:: Multiaddr ;
55
- use std:: future:: Future ;
56
- use std:: pin:: Pin ;
57
67
58
- pub use self :: boxed:: StreamMuxerBox ;
59
- pub use self :: boxed:: SubstreamBox ;
68
+ pub use self :: boxed:: { StreamMuxerBox , SubstreamBox } ;
60
69
61
70
mod boxed;
62
71
63
72
/// Provides multiplexing for a connection by allowing users to open substreams.
64
73
///
65
- /// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
66
- /// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
67
- /// functions that allow the implementation to make progress on various tasks.
74
+ /// A substream created by a [`StreamMuxer`] is a type that implements
75
+ /// [`AsyncRead`] and [`AsyncWrite`]. The [`StreamMuxer`] itself is modelled
76
+ /// closely after [`AsyncWrite`]. It features `poll`-style functions that allow
77
+ /// the implementation to make progress on various tasks.
68
78
pub trait StreamMuxer {
69
- /// Type of the object that represents the raw substream where data can be read and written.
79
+ /// Type of the object that represents the raw substream where data can be
80
+ /// read and written.
70
81
type Substream : AsyncRead + AsyncWrite ;
71
82
72
83
/// Error type of the muxer
73
84
type Error : std:: error:: Error ;
74
85
75
86
/// Poll for new inbound substreams.
76
87
///
77
- /// This function should be called whenever callers are ready to accept more inbound streams. In
78
- /// other words, callers may exercise back-pressure on incoming streams by not calling this
79
- /// function if a certain limit is hit.
88
+ /// This function should be called whenever callers are ready to accept more
89
+ /// inbound streams. In other words, callers may exercise back-pressure
90
+ /// on incoming streams by not calling this function
91
+ /// if a certain limit is hit.
80
92
fn poll_inbound (
81
93
self : Pin < & mut Self > ,
82
94
cx : & mut Context < ' _ > ,
@@ -90,20 +102,23 @@ pub trait StreamMuxer {
90
102
91
103
/// Poll to close this [`StreamMuxer`].
92
104
///
93
- /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely
94
- /// dropped.
105
+ /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become
106
+ /// useless and may be safely dropped.
95
107
///
96
- /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
97
- /// > that the remote is properly informed of the shutdown. However, apart from
98
- /// > properly informing the remote, there is no difference between this and
99
- /// > immediately dropping the muxer.
108
+ /// > **Note**: You are encouraged to call this method and wait for it to
109
+ /// > return `Ready`, so
110
+ /// > that the remote is properly informed of the shutdown. However, apart
111
+ /// > from
112
+ /// > properly informing the remote, there is no difference between this and
113
+ /// > immediately dropping the muxer.
100
114
fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > ;
101
115
102
116
/// Poll to allow the underlying connection to make progress.
103
117
///
104
- /// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
105
- /// unconditionally. Because it will be called regardless, this function can be used by
106
- /// implementations to return events about the underlying connection that the caller MUST deal
118
+ /// In contrast to all other `poll`-functions on [`StreamMuxer`], this
119
+ /// function MUST be called unconditionally. Because it will be called
120
+ /// regardless, this function can be used by implementations to return
121
+ /// events about the underlying connection that the caller MUST deal
107
122
/// with.
108
123
fn poll (
109
124
self : Pin < & mut Self > ,
@@ -120,7 +135,8 @@ pub enum StreamMuxerEvent {
120
135
121
136
/// Extension trait for [`StreamMuxer`].
122
137
pub trait StreamMuxerExt : StreamMuxer + Sized {
123
- /// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`.
138
+ /// Convenience function for calling [`StreamMuxer::poll_inbound`] for
139
+ /// [`StreamMuxer`]s that are `Unpin`.
124
140
fn poll_inbound_unpin (
125
141
& mut self ,
126
142
cx : & mut Context < ' _ > ,
@@ -131,7 +147,8 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
131
147
Pin :: new ( self ) . poll_inbound ( cx)
132
148
}
133
149
134
- /// Convenience function for calling [`StreamMuxer::poll_outbound`] for [`StreamMuxer`]s that are `Unpin`.
150
+ /// Convenience function for calling [`StreamMuxer::poll_outbound`] for
151
+ /// [`StreamMuxer`]s that are `Unpin`.
135
152
fn poll_outbound_unpin (
136
153
& mut self ,
137
154
cx : & mut Context < ' _ > ,
@@ -142,15 +159,17 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
142
159
Pin :: new ( self ) . poll_outbound ( cx)
143
160
}
144
161
145
- /// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`.
162
+ /// Convenience function for calling [`StreamMuxer::poll`] for
163
+ /// [`StreamMuxer`]s that are `Unpin`.
146
164
fn poll_unpin ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < StreamMuxerEvent , Self :: Error > >
147
165
where
148
166
Self : Unpin ,
149
167
{
150
168
Pin :: new ( self ) . poll ( cx)
151
169
}
152
170
153
- /// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
171
+ /// Convenience function for calling [`StreamMuxer::poll_close`] for
172
+ /// [`StreamMuxer`]s that are `Unpin`.
154
173
fn poll_close_unpin ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > >
155
174
where
156
175
Self : Unpin ,
0 commit comments