16
16
from scapy .compat import lambda_tuple_converter
17
17
from scapy .config import conf
18
18
from scapy .base_classes import BasePacket , BasePacketList , _CanvasDumpExtended
19
+ from scapy .fields import IPField , ShortEnumField , PacketField
19
20
from scapy .utils import do_graph , hexdump , make_table , make_lined_table , \
20
21
make_tex_table , issubtype
21
- from scapy .extlib import plt , MATPLOTLIB_INLINED , MATPLOTLIB_DEFAULT_PLOT_KARGS
22
+ from scapy .extlib import plt , Line2D , \
23
+ MATPLOTLIB_INLINED , MATPLOTLIB_DEFAULT_PLOT_KARGS
22
24
from functools import reduce
23
25
import scapy .modules .six as six
24
26
from scapy .modules .six .moves import range , zip
25
-
27
+ from scapy .compat import Optional , List , Union , Tuple , Dict , Any , Callable
28
+ from scapy .packet import Packet
26
29
#############
27
30
# Results #
28
31
#############
@@ -46,18 +49,23 @@ def __init__(self, res=None, name="PacketList", stats=None):
46
49
self .listname = name
47
50
48
51
def __len__ (self ):
52
+ # type: () -> int
49
53
return len (self .res )
50
54
51
55
def _elt2pkt (self , elt ):
56
+ # type: (Packet) -> Packet
52
57
return elt
53
58
54
59
def _elt2sum (self , elt ):
60
+ # type: (Packet) -> str
55
61
return elt .summary ()
56
62
57
63
def _elt2show (self , elt ):
64
+ # type: (Packet) -> str
58
65
return self ._elt2sum (elt )
59
66
60
67
def __repr__ (self ):
68
+ # type: () -> str
61
69
stats = {x : 0 for x in self .stats }
62
70
other = 0
63
71
for r in self .res :
@@ -85,6 +93,7 @@ def __repr__(self):
85
93
ct .punct (">" ))
86
94
87
95
def __getstate__ (self ):
96
+ # type: () -> Dict[str, Union[List[PacketField], List[Packet], str]]
88
97
"""
89
98
Creates a basic representation of the instance, used in
90
99
conjunction with __setstate__() e.g. by pickle
@@ -99,6 +108,7 @@ def __getstate__(self):
99
108
return state
100
109
101
110
def __setstate__ (self , state ):
111
+ # type: (Dict[str, Union[List[PacketField], List[Packet], str]]) -> None # noqa: E501
102
112
"""
103
113
Sets instance attributes to values given by state, used in
104
114
conjunction with __getstate__() e.g. by pickle
@@ -110,6 +120,7 @@ def __setstate__(self, state):
110
120
self .listname = state ['listname' ]
111
121
112
122
def __getattr__ (self , attr ):
123
+ # type: (str) -> Any
113
124
return getattr (self .res , attr )
114
125
115
126
def __getitem__ (self , item ):
@@ -122,10 +133,12 @@ def __getitem__(self, item):
122
133
return self .res .__getitem__ (item )
123
134
124
135
def __add__ (self , other ):
136
+ # type: (PacketList) -> PacketList
125
137
return self .__class__ (self .res + other .res ,
126
138
name = "%s+%s" % (self .listname , other .listname ))
127
139
128
140
def summary (self , prn = None , lfilter = None ):
141
+ # type: (Optional[Callable], Optional[Callable]) -> None
129
142
"""prints a summary of each packet
130
143
131
144
:param prn: function to apply to each packet instead of
@@ -143,6 +156,7 @@ def summary(self, prn=None, lfilter=None):
143
156
print (prn (r ))
144
157
145
158
def nsummary (self , prn = None , lfilter = None ):
159
+ # type: (Optional[Callable], Optional[Callable]) -> None
146
160
"""prints a summary of each packet with the packet's number
147
161
148
162
:param prn: function to apply to each packet instead of
@@ -165,29 +179,35 @@ def display(self): # Deprecated. Use show()
165
179
self .show ()
166
180
167
181
def show (self , * args , ** kargs ):
182
+ # type: (Any, Any) -> None
168
183
"""Best way to display the packet list. Defaults to nsummary() method""" # noqa: E501
169
184
return self .nsummary (* args , ** kargs )
170
185
171
186
def filter (self , func ):
187
+ # type: (Callable) -> PacketList
172
188
"""Returns a packet list filtered by a truth function. This truth
173
189
function has to take a packet as the only argument and return a boolean value.""" # noqa: E501
174
190
return self .__class__ ([x for x in self .res if func (x )],
175
191
name = "filtered %s" % self .listname )
176
192
177
193
def make_table (self , * args , ** kargs ):
194
+ # type: (Any, Any) -> None
178
195
"""Prints a table using a function that returns for each packet its head column value, head row value and displayed value # noqa: E501
179
196
ex: p.make_table(lambda x:(x[IP].dst, x[TCP].dport, x[TCP].sprintf("%flags%")) """ # noqa: E501
180
197
return make_table (self .res , * args , ** kargs )
181
198
182
199
def make_lined_table (self , * args , ** kargs ):
200
+ # type: (Any, Any) -> None
183
201
"""Same as make_table, but print a table with lines"""
184
202
return make_lined_table (self .res , * args , ** kargs )
185
203
186
204
def make_tex_table (self , * args , ** kargs ):
205
+ # type: (Any, Any) -> None
187
206
"""Same as make_table, but print a table with LaTeX syntax"""
188
207
return make_tex_table (self .res , * args , ** kargs )
189
208
190
209
def plot (self , f , lfilter = None , plot_xy = False , ** kargs ):
210
+ # type: (Callable, Optional[Callable], bool, Any) -> Line2D
191
211
"""Applies a function to each packet to get a value that will be plotted
192
212
with matplotlib. A list of matplotlib.lines.Line2D is returned.
193
213
@@ -219,6 +239,7 @@ def plot(self, f, lfilter=None, plot_xy=False, **kargs):
219
239
return lines
220
240
221
241
def diffplot (self , f , delay = 1 , lfilter = None , ** kargs ):
242
+ # type: (Callable, int, Optional[Callable], Any) -> Line2D
222
243
"""diffplot(f, delay=1, lfilter=None)
223
244
Applies a function to couples (l[i],l[i+delay])
224
245
@@ -246,6 +267,7 @@ def diffplot(self, f, delay=1, lfilter=None, **kargs):
246
267
return lines
247
268
248
269
def multiplot (self , f , lfilter = None , plot_xy = False , ** kargs ):
270
+ # type: (Callable, Optional[Callable], bool, Any) -> Line2D
249
271
"""Uses a function that returns a label and a value for this label, then
250
272
plots all the values label by label.
251
273
@@ -263,7 +285,7 @@ def multiplot(self, f, lfilter=None, plot_xy=False, **kargs):
263
285
lst_pkts = (f (* e ) for e in self .res if lfilter (* e ))
264
286
265
287
# Apply the function f to the packets
266
- d = {}
288
+ d = {} # type: Dict[str, List[float]]
267
289
for k , v in lst_pkts :
268
290
d .setdefault (k , []).append (v )
269
291
@@ -286,11 +308,13 @@ def multiplot(self, f, lfilter=None, plot_xy=False, **kargs):
286
308
return lines
287
309
288
310
def rawhexdump (self ):
311
+ # type: (Optional[Callable]) -> None
289
312
"""Prints an hexadecimal dump of each packet in the list"""
290
313
for p in self :
291
314
hexdump (self ._elt2pkt (p ))
292
315
293
316
def hexraw (self , lfilter = None ):
317
+ # type: (Optional[Callable]) -> None
294
318
"""Same as nsummary(), except that if a packet has a Raw layer, it will be hexdumped # noqa: E501
295
319
lfilter: a truth function that decides whether a packet must be displayed""" # noqa: E501
296
320
for i , res in enumerate (self .res ):
@@ -304,6 +328,7 @@ def hexraw(self, lfilter=None):
304
328
hexdump (p .getlayer (conf .raw_layer ).load )
305
329
306
330
def hexdump (self , lfilter = None ):
331
+ # type: (Optional[Callable]) -> None
307
332
"""Same as nsummary(), except that packets are also hexdumped
308
333
lfilter: a truth function that decides whether a packet must be displayed""" # noqa: E501
309
334
for i , res in enumerate (self .res ):
@@ -316,6 +341,7 @@ def hexdump(self, lfilter=None):
316
341
hexdump (p )
317
342
318
343
def padding (self , lfilter = None ):
344
+ # type: (Optional[Callable]) -> None
319
345
"""Same as hexraw(), for Padding layer"""
320
346
for i , res in enumerate (self .res ):
321
347
p = self ._elt2pkt (res )
@@ -327,6 +353,7 @@ def padding(self, lfilter=None):
327
353
hexdump (p .getlayer (conf .padding_layer ).load )
328
354
329
355
def nzpadding (self , lfilter = None ):
356
+ # type: (Optional[Callable]) -> None
330
357
"""Same as padding() but only non null padding"""
331
358
for i , res in enumerate (self .res ):
332
359
p = self ._elt2pkt (res )
@@ -389,6 +416,7 @@ def getsrcdst(pkt):
389
416
return do_graph (gr , ** kargs )
390
417
391
418
def afterglow (self , src = None , event = None , dst = None , ** kargs ):
419
+ # type: (Optional[Callable], Optional[Callable], Optional[Callable], Any) -> None # noqa: E501
392
420
"""Experimental clone attempt of http://sourceforge.net/projects/afterglow
393
421
each datum is reduced as src -> event -> dst and the data are graphed.
394
422
by default we have IP.src -> IP.dport -> IP.dst"""
@@ -398,9 +426,9 @@ def afterglow(self, src=None, event=None, dst=None, **kargs):
398
426
event = lambda x : x ['IP' ].dport
399
427
if dst is None :
400
428
dst = lambda x : x ['IP' ].dst
401
- sl = {}
402
- el = {}
403
- dl = {}
429
+ sl = {} # type: Dict[IPField, Tuple[int, List[ShortEnumField]]]
430
+ el = {} # type: Dict[ShortEnumField, Tuple[int, List[IPField]]]
431
+ dl = {} # type: Dict[IPField, ShortEnumField]
404
432
for i in self .res :
405
433
try :
406
434
s , e , d = src (i ), event (i ), dst (i )
@@ -468,6 +496,7 @@ def minmax(x):
468
496
return do_graph (gr , ** kargs )
469
497
470
498
def canvas_dump (self , ** kargs ):
499
+ # type: (Any) -> Any # Using Any since pyx is imported later
471
500
import pyx
472
501
d = pyx .document .document ()
473
502
len_res = len (self .res )
@@ -483,6 +512,7 @@ def canvas_dump(self, **kargs):
483
512
return d
484
513
485
514
def sr (self , multi = 0 ):
515
+ # type: (int) -> Tuple[SndRcvList, PacketList]
486
516
"""sr([multi=1]) -> (SndRcvList, PacketList)
487
517
Matches packets in the list and return ( (matched couples), (unmatched packets) )""" # noqa: E501
488
518
remain = self .res [:]
@@ -545,6 +575,7 @@ def session_extractor(p):
545
575
return dict (sessions )
546
576
547
577
def replace (self , * args , ** kargs ):
578
+ # type: (Any, Any) -> PacketList
548
579
"""
549
580
lst.replace(<field>,[<oldvalue>,]<newvalue>)
550
581
lst.replace( (fld,[ov],nv),(fld,[ov,]nv),...)
@@ -577,7 +608,13 @@ def replace(self, *args, **kargs):
577
608
x .append (p )
578
609
return x
579
610
580
- def getlayer (self , cls , nb = None , flt = None , name = None , stats = None ):
611
+ def getlayer (self , cls , # type: Packet
612
+ nb = None , # type: Optional[int]
613
+ flt = None , # type: Optional[Dict[str, Any]]
614
+ name = None , # type: Optional[str]
615
+ stats = None # type: Optional[List[Packet]]
616
+ ):
617
+ # type: (...) -> PacketList
581
618
"""Returns the packet list from a given layer.
582
619
583
620
See ``Packet.getlayer`` for more info.
@@ -604,7 +641,7 @@ def getlayer(self, cls, nb=None, flt=None, name=None, stats=None):
604
641
if stats is None :
605
642
stats = self .stats
606
643
607
- getlayer_arg = {}
644
+ getlayer_arg = {} # type: Dict[str, Any]
608
645
if flt is not None :
609
646
getlayer_arg .update (flt )
610
647
getlayer_arg ['cls' ] = cls
@@ -619,6 +656,7 @@ def getlayer(self, cls, nb=None, flt=None, name=None, stats=None):
619
656
)
620
657
621
658
def convert_to (self , other_cls , name = None , stats = None ):
659
+ # type: (Packet, Optional[str], Optional[List[Packet]]) -> PacketList
622
660
"""Converts all packets to another type.
623
661
624
662
See ``Packet.convert_to`` for more info.
@@ -648,13 +686,20 @@ def convert_to(self, other_cls, name=None, stats=None):
648
686
649
687
650
688
class SndRcvList (PacketList ):
651
- __slots__ = []
652
-
653
- def __init__ (self , res = None , name = "Results" , stats = None ):
689
+ __slots__ = [] # type: List[str]
690
+
691
+ def __init__ (self ,
692
+ res = None , # type: Optional[Union[List[Packet], PacketList]]
693
+ name = "Results" , # type: str
694
+ stats = None # type: Optional[List[Packet]]
695
+ ):
696
+ # type: (...) -> None
654
697
PacketList .__init__ (self , res , name , stats )
655
698
656
699
def _elt2pkt (self , elt ):
700
+ # type: (Tuple[Packet, Packet]) -> Packet
657
701
return elt [1 ]
658
702
659
703
def _elt2sum (self , elt ):
704
+ # type: (Tuple[Packet, Packet]) -> str
660
705
return "%s ==> %s" % (elt [0 ].summary (), elt [1 ].summary ())
0 commit comments