@@ -142,13 +142,19 @@ Result<std::unique_ptr<KernelState>> MinMaxInit(KernelContext* ctx,
142
142
// Any implementation
143
143
144
144
struct BooleanAnyImpl : public ScalarAggregator {
145
+ explicit BooleanAnyImpl (ScalarAggregateOptions options) : options(std::move(options)) {}
146
+
145
147
Status Consume (KernelContext*, const ExecBatch& batch) override {
146
148
// short-circuit if seen a True already
147
- if (this ->any == true ) {
149
+ if (options.skip_nulls && this ->any == true ) {
150
+ return Status::OK ();
151
+ }
152
+ // short-circuit if seen a null already
153
+ if (!options.skip_nulls && this ->has_nulls ) {
148
154
return Status::OK ();
149
155
}
150
-
151
156
const auto & data = *batch[0 ].array ();
157
+ this ->has_nulls = data.GetNullCount () > 0 ;
152
158
arrow::internal::OptionalBinaryBitBlockCounter counter (
153
159
data.buffers [0 ], data.offset , data.buffers [1 ], data.offset , data.length );
154
160
int64_t position = 0 ;
@@ -166,32 +172,48 @@ struct BooleanAnyImpl : public ScalarAggregator {
166
172
Status MergeFrom (KernelContext*, KernelState&& src) override {
167
173
const auto & other = checked_cast<const BooleanAnyImpl&>(src);
168
174
this ->any |= other.any ;
175
+ this ->has_nulls |= other.has_nulls ;
169
176
return Status::OK ();
170
177
}
171
178
172
- Status Finalize (KernelContext*, Datum* out) override {
173
- out->value = std::make_shared<BooleanScalar>(this ->any );
179
+ Status Finalize (KernelContext* ctx, Datum* out) override {
180
+ if (!options.skip_nulls && this ->has_nulls ) {
181
+ out->value = std::make_shared<BooleanScalar>();
182
+ } else {
183
+ out->value = std::make_shared<BooleanScalar>(this ->any );
184
+ }
174
185
return Status::OK ();
175
186
}
176
187
177
188
bool any = false ;
189
+ bool has_nulls = false ;
190
+ ScalarAggregateOptions options;
178
191
};
179
192
180
193
Result<std::unique_ptr<KernelState>> AnyInit (KernelContext*, const KernelInitArgs& args) {
181
- return ::arrow::internal::make_unique<BooleanAnyImpl>();
194
+ const ScalarAggregateOptions options =
195
+ static_cast <const ScalarAggregateOptions&>(*args.options );
196
+ return ::arrow::internal::make_unique<BooleanAnyImpl>(
197
+ static_cast <const ScalarAggregateOptions&>(*args.options ));
182
198
}
183
199
184
200
// ----------------------------------------------------------------------
185
201
// All implementation
186
202
187
203
struct BooleanAllImpl : public ScalarAggregator {
204
+ explicit BooleanAllImpl (ScalarAggregateOptions options) : options(std::move(options)) {}
205
+
188
206
Status Consume (KernelContext*, const ExecBatch& batch) override {
189
207
// short-circuit if seen a false already
190
- if (this ->all == false ) {
208
+ if (options.skip_nulls && this ->all == false ) {
209
+ return Status::OK ();
210
+ }
211
+ // short-circuit if seen a null already
212
+ if (!options.skip_nulls && this ->has_nulls ) {
191
213
return Status::OK ();
192
214
}
193
-
194
215
const auto & data = *batch[0 ].array ();
216
+ this ->has_nulls = data.GetNullCount () > 0 ;
195
217
arrow::internal::OptionalBinaryBitBlockCounter counter (
196
218
data.buffers [1 ], data.offset , data.buffers [0 ], data.offset , data.length );
197
219
int64_t position = 0 ;
@@ -210,19 +232,27 @@ struct BooleanAllImpl : public ScalarAggregator {
210
232
Status MergeFrom (KernelContext*, KernelState&& src) override {
211
233
const auto & other = checked_cast<const BooleanAllImpl&>(src);
212
234
this ->all &= other.all ;
235
+ this ->has_nulls |= other.has_nulls ;
213
236
return Status::OK ();
214
237
}
215
238
216
239
Status Finalize (KernelContext*, Datum* out) override {
217
- out->value = std::make_shared<BooleanScalar>(this ->all );
240
+ if (!options.skip_nulls && this ->has_nulls ) {
241
+ out->value = std::make_shared<BooleanScalar>();
242
+ } else {
243
+ out->value = std::make_shared<BooleanScalar>(this ->all );
244
+ }
218
245
return Status::OK ();
219
246
}
220
247
221
248
bool all = true ;
249
+ bool has_nulls = false ;
250
+ ScalarAggregateOptions options;
222
251
};
223
252
224
253
Result<std::unique_ptr<KernelState>> AllInit (KernelContext*, const KernelInitArgs& args) {
225
- return ::arrow::internal::make_unique<BooleanAllImpl>();
254
+ return ::arrow::internal::make_unique<BooleanAllImpl>(
255
+ static_cast <const ScalarAggregateOptions&>(*args.options ));
226
256
}
227
257
228
258
// ----------------------------------------------------------------------
@@ -408,12 +438,16 @@ const FunctionDoc min_max_doc{"Compute the minimum and maximum values of a numer
408
438
" ScalarAggregateOptions" };
409
439
410
440
const FunctionDoc any_doc{" Test whether any element in a boolean array evaluates to true" ,
411
- (" Null values are ignored." ),
412
- {" array" }};
441
+ (" Null values are ignored by default.\n "
442
+ " This can be changed through ScalarAggregateOptions." ),
443
+ {" array" },
444
+ " ScalarAggregateOptions" };
413
445
414
446
const FunctionDoc all_doc{" Test whether all elements in a boolean array evaluate to true" ,
415
- (" Null values are ignored." ),
416
- {" array" }};
447
+ (" Null values are ignored by default.\n "
448
+ " This can be changed through ScalarAggregateOptions." ),
449
+ {" array" },
450
+ " ScalarAggregateOptions" };
417
451
418
452
const FunctionDoc index_doc{" Find the index of the first occurrence of a given value" ,
419
453
(" The result is always computed as an int64_t, regardless\n "
@@ -497,12 +531,14 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
497
531
DCHECK_OK (registry->AddFunction (std::move (func)));
498
532
499
533
// any
500
- func = std::make_shared<ScalarAggregateFunction>(" any" , Arity::Unary (), &any_doc);
534
+ func = std::make_shared<ScalarAggregateFunction>(" any" , Arity::Unary (), &any_doc,
535
+ &default_scalar_aggregate_options);
501
536
aggregate::AddBasicAggKernels (aggregate::AnyInit, {boolean ()}, boolean (), func.get ());
502
537
DCHECK_OK (registry->AddFunction (std::move (func)));
503
538
504
539
// all
505
- func = std::make_shared<ScalarAggregateFunction>(" all" , Arity::Unary (), &all_doc);
540
+ func = std::make_shared<ScalarAggregateFunction>(" all" , Arity::Unary (), &all_doc,
541
+ &default_scalar_aggregate_options);
506
542
aggregate::AddBasicAggKernels (aggregate::AllInit, {boolean ()}, boolean (), func.get ());
507
543
DCHECK_OK (registry->AddFunction (std::move (func)));
508
544
0 commit comments