Skip to content

Commit e29ba3f

Browse files
authored
Merge branch 'main' into release-please--branches--main
2 parents a4b7629 + 8ebfa57 commit e29ba3f

File tree

14 files changed

+848
-74
lines changed

14 files changed

+848
-74
lines changed

bigframes/core/blocks.py

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ def __init__(
154154
self._stats_cache[" ".join(self.index_columns)] = {}
155155
self._transpose_cache: Optional[Block] = transpose_cache
156156
self._view_ref: Optional[bigquery.TableReference] = None
157+
self._view_ref_dry_run: Optional[bigquery.TableReference] = None
157158

158159
@classmethod
159160
def from_local(
@@ -2459,19 +2460,19 @@ def is_monotonic_decreasing(
24592460
) -> bool:
24602461
return self._is_monotonic(column_id, increasing=False)
24612462

2462-
def to_sql_query(
2463-
self, include_index: bool, enable_cache: bool = True
2464-
) -> typing.Tuple[str, list[str], list[Label]]:
2463+
def _array_value_for_output(
2464+
self, *, include_index: bool
2465+
) -> Tuple[bigframes.core.ArrayValue, list[str], list[Label]]:
24652466
"""
2466-
Compiles this DataFrame's expression tree to SQL, optionally
2467-
including index columns.
2467+
Creates the expression tree with user-visible column names, such as for
2468+
SQL output.
24682469
24692470
Args:
24702471
include_index (bool):
24712472
whether to include index columns.
24722473
24732474
Returns:
2474-
a tuple of (sql_string, index_column_id_list, index_column_label_list).
2475+
a tuple of (ArrayValue, index_column_id_list, index_column_label_list).
24752476
If include_index is set to False, index_column_id_list and index_column_label_list
24762477
return empty lists.
24772478
"""
@@ -2494,25 +2495,72 @@ def to_sql_query(
24942495
# the BigQuery unicode column name feature?
24952496
substitutions[old_id] = new_id
24962497

2498+
return (
2499+
array_value.rename_columns(substitutions),
2500+
new_ids[: len(idx_labels)],
2501+
idx_labels,
2502+
)
2503+
2504+
def to_sql_query(
2505+
self, include_index: bool, enable_cache: bool = True
2506+
) -> Tuple[str, list[str], list[Label]]:
2507+
"""
2508+
Compiles this DataFrame's expression tree to SQL, optionally
2509+
including index columns.
2510+
2511+
Args:
2512+
include_index (bool):
2513+
whether to include index columns.
2514+
2515+
Returns:
2516+
a tuple of (sql_string, index_column_id_list, index_column_label_list).
2517+
If include_index is set to False, index_column_id_list and index_column_label_list
2518+
return empty lists.
2519+
"""
2520+
array_value, idx_ids, idx_labels = self._array_value_for_output(
2521+
include_index=include_index
2522+
)
2523+
24972524
# Note: this uses the sql from the executor, so is coupled tightly to execution
24982525
# implementaton. It will reference cached tables instead of original data sources.
24992526
# Maybe should just compile raw BFET? Depends on user intent.
2500-
sql = self.session._executor.to_sql(
2501-
array_value.rename_columns(substitutions), enable_cache=enable_cache
2502-
)
2527+
sql = self.session._executor.to_sql(array_value, enable_cache=enable_cache)
25032528
return (
25042529
sql,
2505-
new_ids[: len(idx_labels)],
2530+
idx_ids,
25062531
idx_labels,
25072532
)
25082533

2509-
def to_view(self, include_index: bool) -> bigquery.TableReference:
2534+
def to_placeholder_table(
2535+
self, include_index: bool, *, dry_run: bool = False
2536+
) -> bigquery.TableReference:
25102537
"""
2511-
Creates a temporary BigQuery VIEW with the SQL corresponding to this block.
2538+
Creates a temporary BigQuery VIEW (or empty table if dry_run) with the
2539+
SQL corresponding to this block.
25122540
"""
25132541
if self._view_ref is not None:
25142542
return self._view_ref
25152543

2544+
# Prefer the real view if it exists, but since dry_run might be called
2545+
# many times before the real query, we cache that empty table reference
2546+
# with the correct schema too.
2547+
if dry_run:
2548+
if self._view_ref_dry_run is not None:
2549+
return self._view_ref_dry_run
2550+
2551+
# Create empty temp table with the right schema.
2552+
array_value, _, _ = self._array_value_for_output(
2553+
include_index=include_index
2554+
)
2555+
temp_table_schema = array_value.schema.to_bigquery()
2556+
self._view_ref_dry_run = self.session._create_temp_table(
2557+
schema=temp_table_schema
2558+
)
2559+
return self._view_ref_dry_run
2560+
2561+
# We shouldn't run `to_sql_query` if we have a `dry_run`, because it
2562+
# could cause us to make unnecessary API calls to upload local node
2563+
# data.
25162564
sql, _, _ = self.to_sql_query(include_index=include_index)
25172565
self._view_ref = self.session._create_temp_view(sql)
25182566
return self._view_ref

bigframes/core/local_data.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,9 @@ def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtyp
336336
if target_type != array.type:
337337
# TODO: Maybe warn if lossy conversion?
338338
array = array.cast(target_type)
339-
bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type)
339+
bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
340+
target_type, allow_lossless_cast=True
341+
)
340342

341343
storage_type = _get_managed_storage_type(bf_type)
342344
if storage_type != array.type:

bigframes/core/pyformat.py

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@
2121

2222
import string
2323
import typing
24-
from typing import Any, Union
24+
from typing import Any, Optional, Union
2525

2626
import google.cloud.bigquery
27-
import google.cloud.bigquery.table
27+
import pandas
28+
29+
from bigframes.core import utils
30+
import bigframes.core.local_data
31+
from bigframes.core.tools import bigquery_schema
32+
import bigframes.session
2833

2934
_BQ_TABLE_TYPES = Union[
3035
google.cloud.bigquery.Table,
@@ -37,9 +42,51 @@ def _table_to_sql(table: _BQ_TABLE_TYPES) -> str:
3742
return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`"
3843

3944

45+
def _pandas_df_to_sql_dry_run(pd_df: pandas.DataFrame) -> str:
46+
# Ensure there are no duplicate column labels.
47+
#
48+
# Please make sure this stays in sync with the logic used to_gbq(). See
49+
# bigframes.dataframe.DataFrame._prepare_export().
50+
new_col_labels, new_idx_labels = utils.get_standardized_ids(
51+
pd_df.columns, pd_df.index.names
52+
)
53+
pd_copy = pd_df.copy()
54+
pd_copy.columns = pandas.Index(new_col_labels)
55+
pd_copy.index.names = new_idx_labels
56+
57+
managed_table = bigframes.core.local_data.ManagedArrowTable.from_pandas(pd_copy)
58+
bqschema = managed_table.schema.to_bigquery()
59+
return bigquery_schema.to_sql_dry_run(bqschema)
60+
61+
62+
def _pandas_df_to_sql(
63+
df_pd: pandas.DataFrame,
64+
*,
65+
name: str,
66+
session: Optional[bigframes.session.Session] = None,
67+
dry_run: bool = False,
68+
) -> str:
69+
if session is None:
70+
if not dry_run:
71+
message = (
72+
f"Can't embed pandas DataFrame {name} in a SQL "
73+
"string without a bigframes session except if for a dry run."
74+
)
75+
raise ValueError(message)
76+
77+
return _pandas_df_to_sql_dry_run(df_pd)
78+
79+
# Use the _deferred engine to avoid loading data too often during dry run.
80+
df = session.read_pandas(df_pd, write_engine="_deferred")
81+
return _table_to_sql(df._to_placeholder_table(dry_run=dry_run))
82+
83+
4084
def _field_to_template_value(
4185
name: str,
4286
value: Any,
87+
*,
88+
session: Optional[bigframes.session.Session] = None,
89+
dry_run: bool = False,
4390
) -> str:
4491
"""Convert value to something embeddable in a SQL string."""
4592
import bigframes.core.sql # Avoid circular imports
@@ -51,9 +98,11 @@ def _field_to_template_value(
5198
if isinstance(value, table_types):
5299
return _table_to_sql(value)
53100

54-
# TODO(tswast): convert pandas DataFrame objects to gbq tables or a literals subquery.
101+
if isinstance(value, pandas.DataFrame):
102+
return _pandas_df_to_sql(value, session=session, dry_run=dry_run, name=name)
103+
55104
if isinstance(value, bigframes.dataframe.DataFrame):
56-
return _table_to_sql(value._to_view())
105+
return _table_to_sql(value._to_placeholder_table(dry_run=dry_run))
57106

58107
return bigframes.core.sql.simple_literal(value)
59108

@@ -70,6 +119,7 @@ def _validate_type(name: str, value: Any):
70119
typing.get_args(_BQ_TABLE_TYPES)
71120
+ typing.get_args(bigframes.core.sql.SIMPLE_LITERAL_TYPES)
72121
+ (bigframes.dataframe.DataFrame,)
122+
+ (pandas.DataFrame,)
73123
)
74124

75125
if not isinstance(value, supported_types):
@@ -91,6 +141,8 @@ def pyformat(
91141
sql_template: str,
92142
*,
93143
pyformat_args: dict,
144+
session: Optional[bigframes.session.Session] = None,
145+
dry_run: bool = False,
94146
) -> str:
95147
"""Unsafe Python-style string formatting of SQL string.
96148
@@ -115,6 +167,8 @@ def pyformat(
115167
format_kwargs = {}
116168
for name in fields:
117169
value = pyformat_args[name]
118-
format_kwargs[name] = _field_to_template_value(name, value)
170+
format_kwargs[name] = _field_to_template_value(
171+
name, value, session=session, dry_run=dry_run
172+
)
119173

120174
return sql_template.format(**format_kwargs)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Helpers for working with BigQuery SchemaFields."""
16+
17+
from typing import Tuple
18+
19+
import google.cloud.bigquery
20+
21+
22+
def _type_to_sql(field: google.cloud.bigquery.SchemaField):
23+
"""Turn the type information of the field into SQL.
24+
25+
Ignores the mode, since this has already been handled by _field_to_sql.
26+
"""
27+
if field.field_type.casefold() in ("record", "struct"):
28+
return _to_struct(field.fields)
29+
return field.field_type
30+
31+
32+
def _field_to_sql(field: google.cloud.bigquery.SchemaField):
33+
if field.mode == "REPEATED":
34+
# Unlike other types, ARRAY are represented as mode="REPEATED". To get
35+
# the array type, we use SchemaField object but ignore the mode.
36+
return f"`{field.name}` ARRAY<{_type_to_sql(field)}>"
37+
38+
return f"`{field.name}` {_type_to_sql(field)}"
39+
40+
41+
def _to_struct(bqschema: Tuple[google.cloud.bigquery.SchemaField, ...]):
42+
fields = [_field_to_sql(field) for field in bqschema]
43+
return f"STRUCT<{', '.join(fields)}>"
44+
45+
46+
def to_sql_dry_run(bqschema: Tuple[google.cloud.bigquery.SchemaField, ...]):
47+
"""Create an empty table expression with the correct schema."""
48+
return f"UNNEST(ARRAY<{_to_struct(bqschema)}>[])"

bigframes/dataframe.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,11 +404,13 @@ def _should_sql_have_index(self) -> bool:
404404
self.index.name is not None or len(self.index.names) > 1
405405
)
406406

407-
def _to_view(self) -> bigquery.TableReference:
407+
def _to_placeholder_table(self, dry_run: bool = False) -> bigquery.TableReference:
408408
"""Compiles this DataFrame's expression tree to SQL and saves it to a
409-
(temporary) view.
409+
(temporary) view or table (in the case of a dry run).
410410
"""
411-
return self._block.to_view(include_index=self._should_sql_have_index())
411+
return self._block.to_placeholder_table(
412+
include_index=self._should_sql_have_index(), dry_run=dry_run
413+
)
412414

413415
def _to_sql_query(
414416
self, include_index: bool, enable_cache: bool = True

bigframes/dtypes.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,35 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype:
444444
if mapping.arrow_dtype is not None
445445
}
446446

447+
# Include types that aren't 1:1 to BigQuery but allowed to be loaded in to BigQuery:
448+
_ARROW_TO_BIGFRAMES_LOSSLESS = {
449+
pa.int8(): INT_DTYPE,
450+
pa.int16(): INT_DTYPE,
451+
pa.int32(): INT_DTYPE,
452+
pa.uint8(): INT_DTYPE,
453+
pa.uint16(): INT_DTYPE,
454+
pa.uint32(): INT_DTYPE,
455+
# uint64 is omitted because uint64 -> BigQuery INT64 is a lossy conversion.
456+
pa.float16(): FLOAT_DTYPE,
457+
pa.float32(): FLOAT_DTYPE,
458+
# TODO(tswast): Can we support datetime/timestamp/time with units larger
459+
# than microseconds?
460+
}
461+
462+
463+
def arrow_dtype_to_bigframes_dtype(
464+
arrow_dtype: pa.DataType, allow_lossless_cast: bool = False
465+
) -> Dtype:
466+
"""
467+
Convert an arrow type into the pandas-y type used to represent it in BigFrames.
468+
469+
Args:
470+
arrow_dtype: Arrow data type.
471+
allow_lossless_cast: Allow lossless conversions, such as int32 to int64.
472+
"""
473+
if allow_lossless_cast and arrow_dtype in _ARROW_TO_BIGFRAMES_LOSSLESS:
474+
return _ARROW_TO_BIGFRAMES_LOSSLESS[arrow_dtype]
447475

448-
def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
449476
if arrow_dtype in _ARROW_TO_BIGFRAMES:
450477
return _ARROW_TO_BIGFRAMES[arrow_dtype]
451478

0 commit comments

Comments
 (0)