@@ -4,8 +4,10 @@ import (
4
4
"encoding/json"
5
5
"errors"
6
6
"fmt"
7
+ "io/ioutil"
7
8
"math"
8
9
"os"
10
+ "path/filepath"
9
11
"strconv"
10
12
"strings"
11
13
"sync"
@@ -36,6 +38,8 @@ const (
36
38
KeyUnMarshalError = "Data unmarshal failed"
37
39
// NumUnMarshalError
38
40
NumUnMarshalError = 10
41
+ // lag file
42
+ LagFilename = "meta.lag"
39
43
)
40
44
41
45
var _ SkipDeepCopySender = & FtSender {}
@@ -206,6 +210,9 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende
206
210
isBlock : opt .isBlock ,
207
211
backoff : utils .NewBackoff (2 , 1 , 1 * time .Second , 5 * time .Minute ),
208
212
}
213
+ ftSender .statsMutex .Lock ()
214
+ ftSender .stats .FtSendLag = ftSender .readLag ()
215
+ ftSender .statsMutex .Unlock ()
209
216
210
217
if opt .innerSenderType == TypePandora {
211
218
ftSender .pandoraKeyCache = make (map [string ]KeyInfo )
@@ -273,9 +280,17 @@ func (ft *FtSender) RawSend(datas []string) error {
273
280
} else {
274
281
// se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag
275
282
se .AddSuccessNum (len (datas ))
283
+ ft .statsMutex .Lock ()
284
+ ft .stats .FtSendLag = ft .stats .FtSendLag + int64 (len (datas ))
285
+ ft .statsMutex .Unlock ()
276
286
ft .backoff .Reset ()
277
287
}
278
288
se .FtQueueLag = ft .BackupQueue .Depth () + ft .logQueue .Depth ()
289
+ if se .FtQueueLag == 0 {
290
+ ft .statsMutex .Lock ()
291
+ ft .stats .FtSendLag = 0
292
+ ft .statsMutex .Unlock ()
293
+ }
279
294
}
280
295
return se
281
296
}
@@ -314,7 +329,7 @@ func (ft *FtSender) Send(datas []Data) error {
314
329
}
315
330
316
331
if ft .isBlock {
317
- log .Error ("Runner[%v] Sender[%v] try Send Datas err: %v" , ft .runnerName , ft .innerSender .Name (), err )
332
+ log .Errorf ("Runner[%v] Sender[%v] try Send Datas err: %v" , ft .runnerName , ft .innerSender .Name (), err )
318
333
return se
319
334
}
320
335
@@ -354,9 +369,17 @@ func (ft *FtSender) Send(datas []Data) error {
354
369
} else {
355
370
// se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag
356
371
se .AddSuccessNum (len (datas ))
372
+ ft .statsMutex .Lock ()
373
+ ft .stats .FtSendLag = ft .stats .FtSendLag + int64 (len (datas ))
374
+ ft .statsMutex .Unlock ()
357
375
ft .backoff .Reset ()
358
376
}
359
377
se .FtQueueLag = ft .BackupQueue .Depth () + ft .logQueue .Depth ()
378
+ if se .FtQueueLag == 0 {
379
+ ft .statsMutex .Lock ()
380
+ ft .stats .FtSendLag = 0
381
+ ft .statsMutex .Unlock ()
382
+ }
360
383
return se
361
384
}
362
385
@@ -395,6 +418,9 @@ func (ft *FtSender) Close() error {
395
418
// persist queue's meta data
396
419
ft .logQueue .Close ()
397
420
ft .BackupQueue .Close ()
421
+ ft .statsMutex .Lock ()
422
+ ft .writeLag (ft .stats .FtSendLag )
423
+ ft .statsMutex .Unlock ()
398
424
399
425
return ft .innerSender .Close ()
400
426
}
@@ -481,6 +507,9 @@ func (ft *FtSender) saveToFile(datas []Data) error {
481
507
}
482
508
483
509
func (ft * FtSender ) asyncSendLogFromQueue () {
510
+ // if not sleep, queue lag may be cleared
511
+ time .Sleep (time .Second * 10 )
512
+
484
513
for i := 0 ; i < ft .procs ; i ++ {
485
514
if ft .opt .sendRaw {
486
515
readLinesChan := make (<- chan []string )
@@ -506,18 +535,32 @@ func (ft *FtSender) asyncSendLogFromQueue() {
506
535
}
507
536
508
537
// trySend 从bytes反序列化数据后尝试发送数据
509
- func (ft * FtSender ) trySendBytes (dat []byte , failSleep int , isRetry bool ) (backDataContext []* datasContext , err error ) {
538
+ func (ft * FtSender ) trySendBytes (dat []byte , failSleep int , isRetry bool , isFromQueue bool ) (backDataContext []* datasContext , err error ) {
510
539
if ft .opt .sendRaw {
511
540
datas , err := ft .unmarshalRaws (dat )
512
541
if err != nil {
513
542
return nil , errors .New (KeyUnMarshalError + ":" + err .Error ())
514
543
}
544
+ ft .statsMutex .Lock ()
545
+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
546
+ if ft .stats .FtSendLag < 0 {
547
+ ft .stats .FtSendLag = 0
548
+ }
549
+ ft .statsMutex .Unlock ()
550
+
515
551
return ft .backOffSendRawFromQueue (datas , failSleep , isRetry )
516
552
}
517
553
datas , err := ft .unmarshalData (dat )
518
554
if err != nil {
519
555
return nil , errors .New (KeyUnMarshalError + ":" + err .Error ())
556
+
557
+ }
558
+ ft .statsMutex .Lock ()
559
+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
560
+ if ft .stats .FtSendLag < 0 {
561
+ ft .stats .FtSendLag = 0
520
562
}
563
+ ft .statsMutex .Unlock ()
521
564
522
565
return ft .backOffSendFromQueue (datas , failSleep , isRetry )
523
566
}
@@ -566,6 +609,9 @@ func (ft *FtSender) trySendRaws(datas []string, failSleep int, isRetry bool) (ba
566
609
log .Errorf ("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d" , ft .runnerName , ft .innerSender .Name (), ft .BackupQueue .Name (), err , len (datas ))
567
610
return nil , nil
568
611
}
612
+ ft .statsMutex .Lock ()
613
+ ft .stats .FtSendLag += int64 (len (v .Lines ))
614
+ ft .statsMutex .Unlock ()
569
615
}
570
616
571
617
time .Sleep (time .Second * time .Duration (math .Pow (2 , float64 (failSleep ))))
@@ -620,6 +666,9 @@ func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (bac
620
666
log .Errorf ("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d" , ft .runnerName , ft .innerSender .Name (), ft .BackupQueue .Name (), err , len (datas ))
621
667
return nil , nil
622
668
}
669
+ ft .statsMutex .Lock ()
670
+ ft .stats .FtSendLag += int64 (len (v .Datas ))
671
+ ft .statsMutex .Unlock ()
623
672
}
624
673
625
674
time .Sleep (time .Second * time .Duration (math .Pow (2 , float64 (failSleep ))))
@@ -896,8 +945,14 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r
896
945
} else {
897
946
select {
898
947
case bytes := <- readChan :
899
- backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry )
948
+ backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry , true )
900
949
case datas := <- readDatasChan :
950
+ ft .statsMutex .Lock ()
951
+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
952
+ if ft .stats .FtSendLag < 0 {
953
+ ft .stats .FtSendLag = 0
954
+ }
955
+ ft .statsMutex .Unlock ()
901
956
backDataContext , err = ft .backOffSendRawFromQueue (datas , numWaits , isRetry )
902
957
case <- timer .C :
903
958
continue
@@ -917,7 +972,7 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r
917
972
unmarshalDataError ++
918
973
if unmarshalDataError > NumUnMarshalError {
919
974
time .Sleep (time .Second )
920
- log .Errorf ("Runner[%s] Sender[%s] sleep 1s due to unmarshal err" , ft .runnerName , ft .innerSender .Name (), queueName , err )
975
+ log .Errorf ("Runner[%s] Sender[%s] queue[%s] sleep 1s due to unmarshal err %v " , ft .runnerName , ft .innerSender .Name (), queueName , err )
921
976
}
922
977
} else {
923
978
unmarshalDataError = 0
@@ -939,7 +994,6 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
939
994
defer timer .Stop ()
940
995
numWaits := 1
941
996
unmarshalDataError := 0
942
-
943
997
var curDataContext , otherDataContext []* datasContext
944
998
var curIdx int
945
999
var backDataContext []* datasContext
@@ -955,8 +1009,14 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
955
1009
} else {
956
1010
select {
957
1011
case bytes := <- readChan :
958
- backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry )
1012
+ backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry , true )
959
1013
case datas := <- readDatasChan :
1014
+ ft .statsMutex .Lock ()
1015
+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
1016
+ if ft .stats .FtSendLag < 0 {
1017
+ ft .stats .FtSendLag = 0
1018
+ }
1019
+ ft .statsMutex .Unlock ()
960
1020
backDataContext , err = ft .backOffSendFromQueue (datas , numWaits , isRetry )
961
1021
case <- timer .C :
962
1022
continue
@@ -976,7 +1036,7 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
976
1036
unmarshalDataError ++
977
1037
if unmarshalDataError > NumUnMarshalError {
978
1038
time .Sleep (time .Second )
979
- log .Errorf ("Runner[%s] Sender[%s] sleep 1s due to unmarshal err" , ft .runnerName , ft .innerSender .Name (), queueName , err )
1039
+ log .Errorf ("Runner[%s] Sender[%s] queue[%s] sleep 1s due to unmarshal err %v " , ft .runnerName , ft .innerSender .Name (), queueName , err )
980
1040
}
981
1041
} else {
982
1042
unmarshalDataError = 0
@@ -1225,3 +1285,34 @@ func (ft *FtSender) backOffReTrySendRaw(lines []string, isRetry bool) (res []*da
1225
1285
time .Sleep (backoff .Duration ())
1226
1286
}
1227
1287
}
1288
+
1289
+ // readLag read lag from file
1290
+ func (ft * FtSender ) readLag () int64 {
1291
+ path := filepath .Join (ft .opt .saveLogPath , LagFilename )
1292
+ f , err := ioutil .ReadFile (path )
1293
+ if err != nil {
1294
+ log .Errorf ("Runner[%v] Sender[%v] read file error : %v" , ft .runnerName , ft .innerSender .Name (), err )
1295
+ return 0
1296
+ }
1297
+ lag , err := strconv .ParseInt (string (f ), 10 , 64 )
1298
+ if err != nil {
1299
+ log .Errorf ("Runner[%v] Sender[%v] parse lag error : %v" , ft .runnerName , ft .innerSender .Name (), err )
1300
+ }
1301
+ return lag
1302
+ }
1303
+
1304
+ // writeLag write lag into file
1305
+ func (ft * FtSender ) writeLag (lag int64 ) error {
1306
+ path := filepath .Join (ft .opt .saveLogPath , LagFilename )
1307
+ file , err := os .OpenFile (path , os .O_WRONLY | os .O_TRUNC | os .O_CREATE , 0666 )
1308
+ defer func () {
1309
+ file .Sync ()
1310
+ file .Close ()
1311
+ }()
1312
+ if err != nil {
1313
+ return err
1314
+ }
1315
+ lagStr := strconv .FormatInt (lag , 10 )
1316
+ _ , err = file .WriteString (lagStr )
1317
+ return err
1318
+ }
0 commit comments