Skip to content

Commit 5c7a951

Browse files
Merge remote-tracking branch 'github/main' into local_series_index
2 parents 8b614ac + 7d6fd7a commit 5c7a951

39 files changed

+1606
-1708
lines changed

bigframes/_config/experiment_options.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ def __init__(self):
2626
self._semantic_operators: bool = False
2727
self._ai_operators: bool = False
2828
self._blob: bool = False
29-
self._udf: bool = False
3029

3130
@property
3231
def semantic_operators(self) -> bool:
@@ -68,17 +67,3 @@ def blob(self, value: bool):
6867
)
6968
warnings.warn(msg, category=bfe.PreviewWarning)
7069
self._blob = value
71-
72-
@property
73-
def udf(self) -> bool:
74-
return self._udf
75-
76-
@udf.setter
77-
def udf(self, value: bool):
78-
if value is True:
79-
msg = bfe.format_message(
80-
"BigFrames managed function (udf) is still under experiments. "
81-
"It may not work and subject to change in the future."
82-
)
83-
warnings.warn(msg, category=bfe.PreviewWarning)
84-
self._udf = value

bigframes/bigquery/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
unix_millis,
2828
unix_seconds,
2929
)
30-
from bigframes.bigquery._operations.geo import st_area, st_difference
30+
from bigframes.bigquery._operations.geo import st_area, st_difference, st_intersection
3131
from bigframes.bigquery._operations.json import (
3232
json_extract,
3333
json_extract_array,
@@ -49,6 +49,7 @@
4949
# geo ops
5050
"st_area",
5151
"st_difference",
52+
"st_intersection",
5253
# json ops
5354
"json_set",
5455
"json_extract",

bigframes/bigquery/_operations/geo.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,95 @@ def st_difference(
211211
in other.
212212
"""
213213
return series._apply_binary_op(other, ops.geo_st_difference_op)
214+
215+
216+
def st_intersection(
217+
series: bigframes.series.Series, other: bigframes.series.Series
218+
) -> bigframes.series.Series:
219+
"""
220+
Returns a `GEOGRAPHY` that represents the point set intersection of the two
221+
input `GEOGRAPHYs`. Thus, every point in the intersection appears in both
222+
`geography_1` and `geography_2`.
223+
224+
.. note::
225+
BigQuery's Geography functions, like `st_intersection`, interpret the geometry
226+
data type as a point set on the Earth's surface. A point set is a set
227+
of points, lines, and polygons on the WGS84 reference spheroid, with
228+
geodesic edges. See: https://cloud.google.com/bigquery/docs/geospatial-data
229+
230+
**Examples:**
231+
232+
>>> import bigframes as bpd
233+
>>> import bigframes.bigquery as bbq
234+
>>> import bigframes.geopandas
235+
>>> from shapely.geometry import Polygon, LineString, Point
236+
>>> bpd.options.display.progress_bar = None
237+
238+
We can check two GeoSeries against each other, row by row.
239+
240+
>>> s1 = bigframes.geopandas.GeoSeries(
241+
... [
242+
... Polygon([(0, 0), (2, 2), (0, 2)]),
243+
... Polygon([(0, 0), (2, 2), (0, 2)]),
244+
... LineString([(0, 0), (2, 2)]),
245+
... LineString([(2, 0), (0, 2)]),
246+
... Point(0, 1),
247+
... ],
248+
... )
249+
>>> s2 = bigframes.geopandas.GeoSeries(
250+
... [
251+
... Polygon([(0, 0), (1, 1), (0, 1)]),
252+
... LineString([(1, 0), (1, 3)]),
253+
... LineString([(2, 0), (0, 2)]),
254+
... Point(1, 1),
255+
... Point(0, 1),
256+
... ],
257+
... index=range(1, 6),
258+
... )
259+
260+
>>> s1
261+
0 POLYGON ((0 0, 2 2, 0 2, 0 0))
262+
1 POLYGON ((0 0, 2 2, 0 2, 0 0))
263+
2 LINESTRING (0 0, 2 2)
264+
3 LINESTRING (2 0, 0 2)
265+
4 POINT (0 1)
266+
dtype: geometry
267+
268+
>>> s2
269+
1 POLYGON ((0 0, 1 1, 0 1, 0 0))
270+
2 LINESTRING (1 0, 1 3)
271+
3 LINESTRING (2 0, 0 2)
272+
4 POINT (1 1)
273+
5 POINT (0 1)
274+
dtype: geometry
275+
276+
>>> bbq.st_intersection(s1, s2)
277+
0 None
278+
1 POLYGON ((0 0, 0.99954 1, 0 1, 0 0))
279+
2 POINT (1 1.00046)
280+
3 LINESTRING (2 0, 0 2)
281+
4 GEOMETRYCOLLECTION EMPTY
282+
5 None
283+
dtype: geometry
284+
285+
We can also do intersection of each geometry and a single shapely geometry:
286+
287+
>>> bbq.st_intersection(s1, bigframes.geopandas.GeoSeries([Polygon([(0, 0), (1, 1), (0, 1)])]))
288+
0 POLYGON ((0 0, 0.99954 1, 0 1, 0 0))
289+
1 None
290+
2 None
291+
3 None
292+
4 None
293+
dtype: geometry
294+
295+
Args:
296+
other (GeoSeries or geometric object):
297+
The Geoseries (elementwise) or geometric object to find the
298+
intersection with.
299+
300+
Returns:
301+
bigframes.geopandas.GeoSeries:
302+
The Geoseries (elementwise) of the intersection of points in
303+
each aligned geometry with other.
304+
"""
305+
return series._apply_binary_op(other, ops.geo_st_intersection_op)

bigframes/core/compile/compiled.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -665,9 +665,11 @@ def _join_condition(
665665

666666

667667
def _as_groupable(value: ibis_types.Value):
668-
# Some types need to be converted to string to enable groupby
669-
if value.type().is_float64() or value.type().is_geospatial():
668+
# Some types need to be converted to another type to enable groupby
669+
if value.type().is_float64():
670670
return value.cast(ibis_dtypes.str)
671+
elif value.type().is_geospatial():
672+
return typing.cast(ibis_types.GeoSpatialColumn, value).as_binary()
671673
elif value.type().is_json():
672674
return scalar_op_compiler.to_json_string(value)
673675
else:

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,13 @@ def geo_st_geogpoint_op_impl(x: ibis_types.Value, y: ibis_types.Value):
10371037
)
10381038

10391039

1040+
@scalar_op_compiler.register_binary_op(ops.geo_st_intersection_op, pass_op=False)
1041+
def geo_st_intersection_op_impl(x: ibis_types.Value, y: ibis_types.Value):
1042+
return typing.cast(ibis_types.GeoSpatialValue, x).intersection(
1043+
typing.cast(ibis_types.GeoSpatialValue, y)
1044+
)
1045+
1046+
10401047
@scalar_op_compiler.register_unary_op(ops.geo_x_op)
10411048
def geo_x_op_impl(x: ibis_types.Value):
10421049
return typing.cast(ibis_types.GeoSpatialValue, x).x()

bigframes/dataframe.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4170,8 +4170,10 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
41704170
# to the applied function should be a Series, not a scalar.
41714171

41724172
if utils.get_axis_number(axis) == 1:
4173-
msg = bfe.format_message("axis=1 scenario is in preview.")
4174-
warnings.warn(msg, category=bfe.PreviewWarning)
4173+
msg = bfe.format_message(
4174+
"DataFrame.apply with parameter axis=1 scenario is in preview."
4175+
)
4176+
warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning)
41754177

41764178
if not hasattr(func, "bigframes_bigquery_function"):
41774179
raise ValueError(

bigframes/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class ObsoleteVersionWarning(Warning):
9595
"""The BigFrames version is too old."""
9696

9797

98+
class FunctionAxisOnePreviewWarning(PreviewWarning):
99+
"""Remote Function and Managed UDF with axis=1 preview."""
100+
101+
98102
def format_message(message: str, fill: bool = True):
99103
"""Formats a warning message with ANSI color codes for the warning color.
100104

bigframes/functions/_function_client.py

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
}
5454
)
5555

56+
# BQ managed functions (@udf) currently only support Python 3.11.
57+
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
58+
5659

5760
class FunctionClient:
5861
# Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -193,11 +196,22 @@ def provision_bq_managed_function(
193196
name,
194197
packages,
195198
is_row_processor,
199+
*,
200+
capture_references=False,
196201
):
197202
"""Create a BigQuery managed function."""
198-
import cloudpickle
199203

200-
pickled = cloudpickle.dumps(func)
204+
# TODO(b/406283812): Expose the capability to pass down
205+
# capture_references=True in the public udf API.
206+
if (
207+
capture_references
208+
and (python_version := _utils.get_python_version())
209+
!= _MANAGED_FUNC_PYTHON_VERSION
210+
):
211+
raise bf_formatting.create_exception_with_feedback_link(
212+
NotImplementedError,
213+
f"Capturing references for udf is currently supported only in Python version {_MANAGED_FUNC_PYTHON_VERSION}, you are running {python_version}.",
214+
)
201215

202216
# Create BQ managed function.
203217
bq_function_args = []
@@ -209,13 +223,15 @@ def provision_bq_managed_function(
209223
bq_function_args.append(f"{name_} {type_}")
210224

211225
managed_function_options = {
212-
"runtime_version": _utils.get_python_version(),
226+
"runtime_version": _MANAGED_FUNC_PYTHON_VERSION,
213227
"entry_point": "bigframes_handler",
214228
}
215229

216230
# Augment user package requirements with any internal package
217231
# requirements.
218-
packages = _utils._get_updated_package_requirements(packages, is_row_processor)
232+
packages = _utils._get_updated_package_requirements(
233+
packages, is_row_processor, capture_references
234+
)
219235
if packages:
220236
managed_function_options["packages"] = packages
221237
managed_function_options_str = self._format_function_options(
@@ -235,20 +251,45 @@ def provision_bq_managed_function(
235251
persistent_func_id = (
236252
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
237253
)
238-
create_function_ddl = textwrap.dedent(
239-
f"""
240-
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
241-
RETURNS {bq_function_return_type}
242-
LANGUAGE python
243-
OPTIONS ({managed_function_options_str})
244-
AS r'''
254+
255+
udf_name = func.__name__
256+
if capture_references:
257+
# This code path ensures that if the udf body contains any
258+
# references to variables and/or imports outside the body, they are
259+
# captured as well.
245260
import cloudpickle
246-
udf = cloudpickle.loads({pickled})
247-
def bigframes_handler(*args):
248-
return udf(*args)
249-
'''
250-
"""
251-
).strip()
261+
262+
pickled = cloudpickle.dumps(func)
263+
udf_code = textwrap.dedent(
264+
f"""
265+
import cloudpickle
266+
{udf_name} = cloudpickle.loads({pickled})
267+
"""
268+
)
269+
else:
270+
# This code path ensures that if the udf body is self contained,
271+
# i.e. there are no references to variables or imports outside the
272+
# body.
273+
udf_code = textwrap.dedent(inspect.getsource(func))
274+
udf_code = udf_code[udf_code.index("def") :]
275+
276+
create_function_ddl = (
277+
textwrap.dedent(
278+
f"""
279+
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
280+
RETURNS {bq_function_return_type}
281+
LANGUAGE python
282+
OPTIONS ({managed_function_options_str})
283+
AS r'''
284+
__UDF_PLACE_HOLDER__
285+
def bigframes_handler(*args):
286+
return {udf_name}(*args)
287+
'''
288+
"""
289+
)
290+
.strip()
291+
.replace("__UDF_PLACE_HOLDER__", udf_code)
292+
)
252293

253294
self._ensure_dataset_exists()
254295
self._create_bq_function(create_function_ddl)

bigframes/functions/_function_session.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@
5858

5959
from . import _function_client, _utils
6060

61-
# BQ managed functions (@udf) currently only support Python 3.11.
62-
_MANAGED_FUNC_PYTHON_VERSIONS = ("python-3.11",)
63-
6461

6562
class FunctionSession:
6663
"""Session to manage bigframes functions."""
@@ -758,7 +755,13 @@ def udf(
758755
name: Optional[str] = None,
759756
packages: Optional[Sequence[str]] = None,
760757
):
761-
"""Decorator to turn a Python udf into a BigQuery managed function.
758+
"""Decorator to turn a Python user defined function (udf) into a
759+
BigQuery managed function.
760+
761+
.. note::
762+
The udf must be self-contained, i.e. it must not contain any
763+
references to an import or variable defined outside the function
764+
body.
762765
763766
.. note::
764767
Please have following IAM roles enabled for you:
@@ -809,17 +812,8 @@ def udf(
809812
of the form supported in
810813
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
811814
"""
812-
if not bigframes.options.experiments.udf:
813-
raise bf_formatting.create_exception_with_feedback_link(NotImplementedError)
814815

815-
# Check the Python version.
816-
python_version = _utils.get_python_version()
817-
if python_version not in _MANAGED_FUNC_PYTHON_VERSIONS:
818-
raise bf_formatting.create_exception_with_feedback_link(
819-
RuntimeError,
820-
f"Python version {python_version} is not supported yet for "
821-
"BigFrames managed function.",
822-
)
816+
warnings.warn("udf is in preview.", category=bfe.PreviewWarning)
823817

824818
# Some defaults may be used from the session if not provided otherwise.
825819
session = self._resolve_session(session)
@@ -862,7 +856,7 @@ def wrapper(func):
862856
ValueError,
863857
"'input_types' was not set and parameter "
864858
f"'{parameter.name}' is missing a type annotation. "
865-
"Types are required to use managed function.",
859+
"Types are required to use udf.",
866860
)
867861
input_types.append(param_type)
868862
elif not isinstance(input_types, collections.abc.Sequence):
@@ -875,8 +869,7 @@ def wrapper(func):
875869
raise bf_formatting.create_exception_with_feedback_link(
876870
ValueError,
877871
"'output_type' was not set and function is missing a "
878-
"return type annotation. Types are required to use "
879-
"managed function.",
872+
"return type annotation. Types are required to use udf",
880873
)
881874

882875
# The function will actually be receiving a pandas Series, but allow

bigframes/functions/_utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,12 @@ def get_remote_function_locations(bq_location):
6464

6565

6666
def _get_updated_package_requirements(
67-
package_requirements=None, is_row_processor=False
67+
package_requirements=None, is_row_processor=False, capture_references=True
6868
):
69-
requirements = [f"cloudpickle=={cloudpickle.__version__}"]
69+
requirements = []
70+
if capture_references:
71+
requirements.append(f"cloudpickle=={cloudpickle.__version__}")
72+
7073
if is_row_processor:
7174
# bigframes function will send an entire row of data as json, which
7275
# would be converted to a pandas series and processed Ensure numpy

bigframes/geopandas/geoseries.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,6 @@ def to_wkt(self: GeoSeries) -> bigframes.series.Series:
9696

9797
def difference(self: GeoSeries, other: GeoSeries) -> bigframes.series.Series: # type: ignore
9898
return self._apply_binary_op(other, ops.geo_st_difference_op)
99+
100+
def intersection(self: GeoSeries, other: GeoSeries) -> bigframes.series.Series: # type: ignore
101+
return self._apply_binary_op(other, ops.geo_st_intersection_op)

0 commit comments

Comments
 (0)