@@ -142,13 +142,15 @@ Result<std::unique_ptr<KernelState>> MinMaxInit(KernelContext* ctx,
142
142
// Any implementation
143
143
144
144
struct BooleanAnyImpl : public ScalarAggregator {
145
+ explicit BooleanAnyImpl (ScalarAggregateOptions options) : options(std::move(options)) {}
146
+
145
147
Status Consume (KernelContext*, const ExecBatch& batch) override {
146
148
// short-circuit if seen a True already
147
149
if (this ->any == true ) {
148
150
return Status::OK ();
149
151
}
150
-
151
152
const auto & data = *batch[0 ].array ();
153
+ this ->has_nulls = data.GetNullCount () > 0 ;
152
154
arrow::internal::OptionalBinaryBitBlockCounter counter (
153
155
data.buffers [0 ], data.offset , data.buffers [1 ], data.offset , data.length );
154
156
int64_t position = 0 ;
@@ -166,32 +168,48 @@ struct BooleanAnyImpl : public ScalarAggregator {
166
168
Status MergeFrom (KernelContext*, KernelState&& src) override {
167
169
const auto & other = checked_cast<const BooleanAnyImpl&>(src);
168
170
this ->any |= other.any ;
171
+ this ->has_nulls |= other.has_nulls ;
169
172
return Status::OK ();
170
173
}
171
174
172
- Status Finalize (KernelContext*, Datum* out) override {
173
- out->value = std::make_shared<BooleanScalar>(this ->any );
175
+ Status Finalize (KernelContext* ctx, Datum* out) override {
176
+ if (!options.skip_nulls && !this ->any && this ->has_nulls ) {
177
+ out->value = std::make_shared<BooleanScalar>();
178
+ } else {
179
+ out->value = std::make_shared<BooleanScalar>(this ->any );
180
+ }
174
181
return Status::OK ();
175
182
}
176
183
177
184
bool any = false ;
185
+ bool has_nulls = false ;
186
+ ScalarAggregateOptions options;
178
187
};
179
188
180
189
Result<std::unique_ptr<KernelState>> AnyInit (KernelContext*, const KernelInitArgs& args) {
181
- return ::arrow::internal::make_unique<BooleanAnyImpl>();
190
+ const ScalarAggregateOptions options =
191
+ static_cast <const ScalarAggregateOptions&>(*args.options );
192
+ return ::arrow::internal::make_unique<BooleanAnyImpl>(
193
+ static_cast <const ScalarAggregateOptions&>(*args.options ));
182
194
}
183
195
184
196
// ----------------------------------------------------------------------
185
197
// All implementation
186
198
187
199
struct BooleanAllImpl : public ScalarAggregator {
200
+ explicit BooleanAllImpl (ScalarAggregateOptions options) : options(std::move(options)) {}
201
+
188
202
Status Consume (KernelContext*, const ExecBatch& batch) override {
189
203
// short-circuit if seen a false already
190
204
if (this ->all == false ) {
191
205
return Status::OK ();
192
206
}
193
-
207
+ // short-circuit if seen a null already
208
+ if (!options.skip_nulls && this ->has_nulls ) {
209
+ return Status::OK ();
210
+ }
194
211
const auto & data = *batch[0 ].array ();
212
+ this ->has_nulls = data.GetNullCount () > 0 ;
195
213
arrow::internal::OptionalBinaryBitBlockCounter counter (
196
214
data.buffers [1 ], data.offset , data.buffers [0 ], data.offset , data.length );
197
215
int64_t position = 0 ;
@@ -210,19 +228,27 @@ struct BooleanAllImpl : public ScalarAggregator {
210
228
Status MergeFrom (KernelContext*, KernelState&& src) override {
211
229
const auto & other = checked_cast<const BooleanAllImpl&>(src);
212
230
this ->all &= other.all ;
231
+ this ->has_nulls |= other.has_nulls ;
213
232
return Status::OK ();
214
233
}
215
234
216
235
Status Finalize (KernelContext*, Datum* out) override {
217
- out->value = std::make_shared<BooleanScalar>(this ->all );
236
+ if (!options.skip_nulls && this ->all && this ->has_nulls ) {
237
+ out->value = std::make_shared<BooleanScalar>();
238
+ } else {
239
+ out->value = std::make_shared<BooleanScalar>(this ->all );
240
+ }
218
241
return Status::OK ();
219
242
}
220
243
221
244
bool all = true ;
245
+ bool has_nulls = false ;
246
+ ScalarAggregateOptions options;
222
247
};
223
248
224
249
Result<std::unique_ptr<KernelState>> AllInit (KernelContext*, const KernelInitArgs& args) {
225
- return ::arrow::internal::make_unique<BooleanAllImpl>();
250
+ return ::arrow::internal::make_unique<BooleanAllImpl>(
251
+ static_cast <const ScalarAggregateOptions&>(*args.options ));
226
252
}
227
253
228
254
// ----------------------------------------------------------------------
@@ -407,12 +433,22 @@ const FunctionDoc min_max_doc{"Compute the minimum and maximum values of a numer
407
433
" ScalarAggregateOptions" };
408
434
409
435
const FunctionDoc any_doc{" Test whether any element in a boolean array evaluates to true" ,
410
- (" Null values are ignored." ),
411
- {" array" }};
436
+ (" Null values are ignored by default.\n "
437
+ " If null values are taken into account by setting "
438
+ " ScalarAggregateOptions parameter skip_nulls = false then "
439
+ " Kleene logic is used.\n "
440
+ " See KleeneOr for more details on Kleene logic." ),
441
+ {" array" },
442
+ " ScalarAggregateOptions" };
412
443
413
444
const FunctionDoc all_doc{" Test whether all elements in a boolean array evaluate to true" ,
414
- (" Null values are ignored." ),
415
- {" array" }};
445
+ (" Null values are ignored by default.\n "
446
+ " If null values are taken into account by setting "
447
+ " ScalarAggregateOptions parameter skip_nulls = false then "
448
+ " Kleene logic is used.\n "
449
+ " See KleeneAnd for more details on Kleene logic." ),
450
+ {" array" },
451
+ " ScalarAggregateOptions" };
416
452
417
453
const FunctionDoc index_doc{" Find the index of the first occurrence of a given value" ,
418
454
(" The result is always computed as an int64_t, regardless\n "
@@ -496,12 +532,14 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
496
532
DCHECK_OK (registry->AddFunction (std::move (func)));
497
533
498
534
// any
499
- func = std::make_shared<ScalarAggregateFunction>(" any" , Arity::Unary (), &any_doc);
535
+ func = std::make_shared<ScalarAggregateFunction>(" any" , Arity::Unary (), &any_doc,
536
+ &default_scalar_aggregate_options);
500
537
aggregate::AddBasicAggKernels (aggregate::AnyInit, {boolean ()}, boolean (), func.get ());
501
538
DCHECK_OK (registry->AddFunction (std::move (func)));
502
539
503
540
// all
504
- func = std::make_shared<ScalarAggregateFunction>(" all" , Arity::Unary (), &all_doc);
541
+ func = std::make_shared<ScalarAggregateFunction>(" all" , Arity::Unary (), &all_doc,
542
+ &default_scalar_aggregate_options);
505
543
aggregate::AddBasicAggKernels (aggregate::AllInit, {boolean ()}, boolean (), func.get ());
506
544
DCHECK_OK (registry->AddFunction (std::move (func)));
507
545
0 commit comments