@@ -461,78 +461,78 @@ private Iterator<Page> toPages() {
461
461
// TODO: optimize case where all the queues are empty
462
462
try {
463
463
for (var entry : inputQueues .entrySet ()) {
464
- Queue inputQueue = entry .getValue ();
464
+ Queue inputQueue = entry .getValue ();
465
465
466
- list = new ArrayList <>(inputQueue .size ());
467
- builders = null ;
468
- while (inputQueue .size () > 0 ) {
469
- list .add (inputQueue .pop ());
470
- }
471
- Collections .reverse (list );
472
-
473
- int p = 0 ;
474
- int size = 0 ;
475
- for (int i = 0 ; i < list .size (); i ++) {
476
- if (builders == null ) {
477
- size = Math .min (maxPageSize , list .size () - i );
478
- builders = new ResultBuilder [elementTypes .size ()];
479
- for (int b = 0 ; b < builders .length ; b ++) {
480
- builders [b ] = ResultBuilder .resultBuilderFor (
481
- blockFactory ,
482
- elementTypes .get (b ),
483
- encoders .get (b ).toUnsortable (),
484
- channelInKey (sortOrders , b ),
485
- size
486
- );
487
- }
488
- p = 0 ;
466
+ list = new ArrayList <>(inputQueue .size ());
467
+ builders = null ;
468
+ while (inputQueue .size () > 0 ) {
469
+ list .add (inputQueue .pop ());
489
470
}
471
+ Collections .reverse (list );
472
+
473
+ int p = 0 ;
474
+ int size = 0 ;
475
+ for (int i = 0 ; i < list .size (); i ++) {
476
+ if (builders == null ) {
477
+ size = Math .min (maxPageSize , list .size () - i );
478
+ builders = new ResultBuilder [elementTypes .size ()];
479
+ for (int b = 0 ; b < builders .length ; b ++) {
480
+ builders [b ] = ResultBuilder .resultBuilderFor (
481
+ blockFactory ,
482
+ elementTypes .get (b ),
483
+ encoders .get (b ).toUnsortable (),
484
+ channelInKey (sortOrders , b ),
485
+ size
486
+ );
487
+ }
488
+ p = 0 ;
489
+ }
490
490
491
- Row row = list .get (i );
492
- BytesRef keys = row .keys .bytesRefView ();
493
- for (SortOrder so : sortOrders ) {
494
- if (keys .bytes [keys .offset ] == so .nul ()) {
491
+ Row row = list .get (i );
492
+ BytesRef keys = row .keys .bytesRefView ();
493
+ for (SortOrder so : sortOrders ) {
494
+ if (keys .bytes [keys .offset ] == so .nul ()) {
495
+ keys .offset ++;
496
+ keys .length --;
497
+ continue ;
498
+ }
495
499
keys .offset ++;
496
500
keys .length --;
497
- continue ;
501
+ builders [so .channel ].decodeKey (keys );
502
+ }
503
+ if (keys .length != 0 ) {
504
+ throw new IllegalArgumentException ("didn't read all keys" );
498
505
}
499
- keys .offset ++;
500
- keys .length --;
501
- builders [so .channel ].decodeKey (keys );
502
- }
503
- if (keys .length != 0 ) {
504
- throw new IllegalArgumentException ("didn't read all keys" );
505
- }
506
-
507
- BytesRef values = row .values .bytesRefView ();
508
- for (ResultBuilder builder : builders ) {
509
- builder .decodeValue (values );
510
- }
511
- if (values .length != 0 ) {
512
- throw new IllegalArgumentException ("didn't read all values" );
513
- }
514
506
515
- list .set (i , null );
516
- row .close ();
507
+ BytesRef values = row .values .bytesRefView ();
508
+ for (ResultBuilder builder : builders ) {
509
+ builder .decodeValue (values );
510
+ }
511
+ if (values .length != 0 ) {
512
+ throw new IllegalArgumentException ("didn't read all values" );
513
+ }
517
514
518
- p ++;
519
- if (p == size ) {
520
- Block [] blocks = new Block [builders .length ];
521
- try {
522
- for (int b = 0 ; b < blocks .length ; b ++) {
523
- blocks [b ] = builders [b ].build ();
524
- }
525
- } finally {
526
- if (blocks [blocks .length - 1 ] == null ) {
527
- Releasables .closeExpectNoException (blocks );
515
+ list .set (i , null );
516
+ row .close ();
517
+
518
+ p ++;
519
+ if (p == size ) {
520
+ Block [] blocks = new Block [builders .length ];
521
+ try {
522
+ for (int b = 0 ; b < blocks .length ; b ++) {
523
+ blocks [b ] = builders [b ].build ();
524
+ }
525
+ } finally {
526
+ if (blocks [blocks .length - 1 ] == null ) {
527
+ Releasables .closeExpectNoException (blocks );
528
+ }
528
529
}
530
+ result .add (new Page (blocks ));
531
+ Releasables .closeExpectNoException (builders );
532
+ builders = null ;
529
533
}
530
- result .add (new Page (blocks ));
531
- Releasables .closeExpectNoException (builders );
532
- builders = null ;
533
534
}
534
- }
535
- assert builders == null ;
535
+ assert builders == null ;
536
536
}
537
537
success = true ;
538
538
return result .iterator ();
@@ -586,9 +586,8 @@ public void close() {
586
586
Releasables .closeExpectNoException (spare , Releasables .wrap (releasables ));
587
587
}
588
588
589
- private static long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (TopNOperator .class )
590
- + RamUsageEstimator .shallowSizeOfInstance (List .class ) * 4
591
- + RamUsageEstimator .shallowSizeOfInstance (Map .class );
589
+ private static long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (TopNOperator .class ) + RamUsageEstimator
590
+ .shallowSizeOfInstance (List .class ) * 4 + RamUsageEstimator .shallowSizeOfInstance (Map .class );
592
591
593
592
@ Override
594
593
public long ramBytesUsed () {
@@ -603,7 +602,8 @@ public long ramBytesUsed() {
603
602
size += partitions .size () * Partition .SHALLOW_SIZE ;
604
603
size += RamUsageEstimator .alignObjectSize (arrHeader + ref * sortOrders .size ());
605
604
size += sortOrders .size () * SortOrder .SHALLOW_SIZE ;
606
- long ramBytesUsedSum = inputQueues .entrySet ().stream ()
605
+ long ramBytesUsedSum = inputQueues .entrySet ()
606
+ .stream ()
607
607
.mapToLong (e -> e .getKey ().getBytes (Charset .defaultCharset ()).length + e .getValue ().ramBytesUsed ())
608
608
.sum ();
609
609
size += ramBytesUsedSum ;
@@ -620,7 +620,9 @@ public Status status() {
620
620
public String toString () {
621
621
int queueSizeSum = inputQueues .values ().stream ().mapToInt (Queue ::size ).sum ();
622
622
return "TopNOperator[count="
623
- + queueSizeSum + "/" + topCount
623
+ + queueSizeSum
624
+ + "/"
625
+ + topCount
624
626
+ ", elementTypes="
625
627
+ elementTypes
626
628
+ ", encoders="
0 commit comments