Skip to content

Commit 991c219

Browse files
authored
chore: add error handle for pdf extract and pdf chunk (#1490)
* add error handling for pdf chunk * change return type for pdf chunk * add error handling for pdf chunk * introuduce a control variable called verbose for pdf chunk, when it is true, display both error message and content, otherwises, display content only * add pdf extract testcase * add testcase for pdf chunk * code change for pdf extract and pdf chunk, need to fix test * all testcases are clean * update testcase for better readibility
1 parent 014bd33 commit 991c219

File tree

4 files changed

+232
-72
lines changed

4 files changed

+232
-72
lines changed

bigframes/blob/_functions.py

Lines changed: 75 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -398,81 +398,94 @@ def image_normalize_to_bytes_func(
398398

399399
# Extracts all text from a PDF url
400400
def pdf_extract_func(src_obj_ref_rt: str) -> str:
401-
import io
402-
import json
401+
try:
402+
import io
403+
import json
403404

404-
from pypdf import PdfReader # type: ignore
405-
import requests
406-
from requests import adapters
405+
from pypdf import PdfReader # type: ignore
406+
import requests
407+
from requests import adapters
407408

408-
session = requests.Session()
409-
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
409+
session = requests.Session()
410+
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
410411

411-
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
412-
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
412+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
413+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
413414

414-
response = session.get(src_url, timeout=30, stream=True)
415-
response.raise_for_status()
416-
pdf_bytes = response.content
415+
response = session.get(src_url, timeout=30, stream=True)
416+
response.raise_for_status()
417+
pdf_bytes = response.content
417418

418-
pdf_file = io.BytesIO(pdf_bytes)
419-
reader = PdfReader(pdf_file, strict=False)
419+
pdf_file = io.BytesIO(pdf_bytes)
420+
reader = PdfReader(pdf_file, strict=False)
420421

421-
all_text = ""
422-
for page in reader.pages:
423-
page_extract_text = page.extract_text()
424-
if page_extract_text:
425-
all_text += page_extract_text
426-
return all_text
422+
all_text = ""
423+
for page in reader.pages:
424+
page_extract_text = page.extract_text()
425+
if page_extract_text:
426+
all_text += page_extract_text
427427

428+
result_dict = {"status": "", "content": all_text}
428429

429-
pdf_extract_def = FunctionDef(pdf_extract_func, ["pypdf", "requests", "pypdf[crypto]"])
430+
except Exception as e:
431+
result_dict = {"status": str(e), "content": ""}
430432

433+
result_json = json.dumps(result_dict)
434+
return result_json
431435

432-
# Extracts text from a PDF url and chunks it simultaneously
433-
def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str:
434-
import io
435-
import json
436436

437-
from pypdf import PdfReader # type: ignore
438-
import requests
439-
from requests import adapters
440-
441-
session = requests.Session()
442-
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
437+
pdf_extract_def = FunctionDef(pdf_extract_func, ["pypdf", "requests", "pypdf[crypto]"])
443438

444-
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
445-
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
446439

447-
response = session.get(src_url, timeout=30, stream=True)
448-
response.raise_for_status()
449-
pdf_bytes = response.content
450-
451-
pdf_file = io.BytesIO(pdf_bytes)
452-
reader = PdfReader(pdf_file, strict=False)
453-
454-
# extract and chunk text simultaneously
455-
all_text_chunks = []
456-
curr_chunk = ""
457-
for page in reader.pages:
458-
page_text = page.extract_text()
459-
if page_text:
460-
curr_chunk += page_text
461-
# split the accumulated text into chunks of a specific size with overlaop
462-
# this loop implements a sliding window approach to create chunks
463-
while len(curr_chunk) >= chunk_size:
464-
split_idx = curr_chunk.rfind(" ", 0, chunk_size)
465-
if split_idx == -1:
466-
split_idx = chunk_size
467-
actual_chunk = curr_chunk[:split_idx]
468-
all_text_chunks.append(actual_chunk)
469-
overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size]
470-
curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :]
471-
if curr_chunk:
472-
all_text_chunks.append(curr_chunk)
473-
474-
all_text_json_string = json.dumps(all_text_chunks)
475-
return all_text_json_string
440+
# Extracts text from a PDF url and chunks it simultaneously
441+
def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str:
442+
try:
443+
import io
444+
import json
445+
446+
from pypdf import PdfReader # type: ignore
447+
import requests
448+
from requests import adapters
449+
450+
session = requests.Session()
451+
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
452+
453+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
454+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
455+
456+
response = session.get(src_url, timeout=30, stream=True)
457+
response.raise_for_status()
458+
pdf_bytes = response.content
459+
460+
pdf_file = io.BytesIO(pdf_bytes)
461+
reader = PdfReader(pdf_file, strict=False)
462+
# extract and chunk text simultaneously
463+
all_text_chunks = []
464+
curr_chunk = ""
465+
for page in reader.pages:
466+
page_text = page.extract_text()
467+
if page_text:
468+
curr_chunk += page_text
469+
# split the accumulated text into chunks of a specific size with overlaop
470+
# this loop implements a sliding window approach to create chunks
471+
while len(curr_chunk) >= chunk_size:
472+
split_idx = curr_chunk.rfind(" ", 0, chunk_size)
473+
if split_idx == -1:
474+
split_idx = chunk_size
475+
actual_chunk = curr_chunk[:split_idx]
476+
all_text_chunks.append(actual_chunk)
477+
overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size]
478+
curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :]
479+
if curr_chunk:
480+
all_text_chunks.append(curr_chunk)
481+
482+
result_dict = {"status": "", "content": all_text_chunks}
483+
484+
except Exception as e:
485+
result_dict = {"status": str(e), "content": []}
486+
487+
result_json = json.dumps(result_dict)
488+
return result_json
476489

477490

478491
pdf_chunk_def = FunctionDef(pdf_chunk_func, ["pypdf", "requests", "pypdf[crypto]"])

bigframes/operations/blob.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ def pdf_extract(
615615
max_batching_rows: int = 1,
616616
container_cpu: Union[float, int] = 2,
617617
container_memory: str = "1Gi",
618+
verbose: bool = False,
618619
) -> bigframes.series.Series:
619620
"""Extracts text from PDF URLs and saves the text as string.
620621
@@ -630,12 +631,20 @@ def pdf_extract(
630631
send to cloud run to execute the function.
631632
container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
632633
container_memory (str, default "1Gi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
634+
verbose (bool, default "False"): controls the verbosity of the output.
635+
When set to True, both error messages and the extracted content
636+
are displayed. Conversely, when set to False, only the extracted
637+
content is presented, suppressing error messages.
633638
634639
Returns:
635-
bigframes.series.Series: conatins all text from a pdf file
640+
bigframes.series.Series: str or struct[str, str],
641+
depend on the "verbose" parameter.
642+
Contains the extracted text from the PDF file.
643+
Includes error messages if verbosity is enabled.
636644
"""
637-
645+
import bigframes.bigquery as bbq
638646
import bigframes.blob._functions as blob_func
647+
import bigframes.pandas as bpd
639648

640649
connection = self._resolve_connection(connection)
641650

@@ -649,11 +658,19 @@ def pdf_extract(
649658
).udf()
650659

651660
src_rt = self._get_runtime_json_str(mode="R")
661+
652662
res = src_rt.apply(pdf_extract_udf)
653663

654-
self._add_to_cleanup_set(pdf_extract_udf)
664+
content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content"))
655665

656-
return res
666+
self._add_to_cleanup_set(pdf_extract_udf)
667+
if verbose:
668+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
669+
res_df = bpd.DataFrame({"status": status_series, "content": content_series})
670+
struct_series = bbq.struct(res_df)
671+
return struct_series
672+
else:
673+
return content_series
657674

658675
def pdf_chunk(
659676
self,
@@ -664,6 +681,7 @@ def pdf_chunk(
664681
max_batching_rows: int = 1,
665682
container_cpu: Union[float, int] = 2,
666683
container_memory: str = "1Gi",
684+
verbose: bool = False,
667685
) -> bigframes.series.Series:
668686
"""Extracts and chunks text from PDF URLs and saves the text as
669687
arrays of strings.
@@ -684,14 +702,21 @@ def pdf_chunk(
684702
send to cloud run to execute the function.
685703
container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
686704
container_memory (str, default "1Gi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
705+
verbose (bool, default "False"): controls the verbosity of the output.
706+
When set to True, both error messages and the extracted content
707+
are displayed. Conversely, when set to False, only the extracted
708+
content is presented, suppressing error messages.
687709
688710
Returns:
689-
bigframe.series.Series: Series of array[str], where each string is a
690-
chunk of text extracted from PDF.
711+
bigframe.series.Series: array[str] or struct[str, array[str]],
712+
depend on the "verbose" parameter.
713+
where each string is a chunk of text extracted from PDF.
714+
Includes error messages if verbosity is enabled.
691715
"""
692716

693717
import bigframes.bigquery as bbq
694718
import bigframes.blob._functions as blob_func
719+
import bigframes.pandas as bpd
695720

696721
connection = self._resolve_connection(connection)
697722

@@ -718,8 +743,12 @@ def pdf_chunk(
718743

719744
res = self._df_apply_udf(df, pdf_chunk_udf)
720745

721-
res_array = bbq.json_extract_string_array(res)
722-
746+
content_series = bbq.json_extract_string_array(res, "$.content")
723747
self._add_to_cleanup_set(pdf_chunk_udf)
724-
725-
return res_array
748+
if verbose:
749+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
750+
res_df = bpd.DataFrame({"status": status_series, "content": content_series})
751+
struct_series = bbq.struct(res_df)
752+
return struct_series
753+
else:
754+
return content_series

tests/system/conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1480,3 +1480,17 @@ def reset_default_session_and_location():
14801480
yield
14811481
bpd.close_session()
14821482
bpd.options.bigquery.location = None
1483+
1484+
1485+
@pytest.fixture(scope="session")
1486+
def pdf_gcs_path() -> str:
1487+
return "gs://bigframes_blob_test/pdfs/*"
1488+
1489+
1490+
@pytest.fixture(scope="session")
1491+
def pdf_mm_df(
1492+
pdf_gcs_path, session: bigframes.Session, bq_connection: str
1493+
) -> bpd.DataFrame:
1494+
bigframes.options.experiments.blob = True
1495+
1496+
return session.from_glob_path(pdf_gcs_path, name="pdf", connection=bq_connection)

tests/system/large/blob/test_function.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,3 +285,107 @@ def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection:
285285
assert isinstance(actual, bpd.Series)
286286
assert len(actual) == 2
287287
assert actual.dtype == dtypes.BYTES_DTYPE
288+
289+
290+
@pytest.mark.parametrize(
291+
"verbose, expected",
292+
[
293+
(
294+
True,
295+
pd.Series(
296+
[
297+
{"status": "File has not been decrypted", "content": ""},
298+
{
299+
"status": "",
300+
"content": "Sample PDF This is a testing file. Some dummy messages are used for testing purposes. ",
301+
},
302+
]
303+
),
304+
),
305+
(
306+
False,
307+
pd.Series(
308+
[
309+
"",
310+
"Sample PDF This is a testing file. Some dummy messages are used for testing purposes. ",
311+
],
312+
name="pdf",
313+
),
314+
),
315+
],
316+
)
317+
def test_blob_pdf_extract(
318+
pdf_mm_df: bpd.DataFrame,
319+
verbose: bool,
320+
bq_connection: str,
321+
expected: pd.Series,
322+
):
323+
bigframes.options.experiments.blob = True
324+
325+
actual = (
326+
pdf_mm_df["pdf"]
327+
.blob.pdf_extract(connection=bq_connection, verbose=verbose)
328+
.explode()
329+
.to_pandas()
330+
)
331+
332+
pd.testing.assert_series_equal(
333+
actual,
334+
expected,
335+
check_dtype=False,
336+
check_index=False,
337+
)
338+
339+
340+
@pytest.mark.parametrize(
341+
"verbose, expected",
342+
[
343+
(
344+
True,
345+
pd.Series(
346+
[
347+
{"status": "File has not been decrypted", "content": []},
348+
{
349+
"status": "",
350+
"content": [
351+
"Sample PDF This is a testing file. Some ",
352+
"dummy messages are used for testing ",
353+
"purposes. ",
354+
],
355+
},
356+
]
357+
),
358+
),
359+
(
360+
False,
361+
pd.Series(
362+
[
363+
pd.NA,
364+
"Sample PDF This is a testing file. Some ",
365+
"dummy messages are used for testing ",
366+
"purposes. ",
367+
],
368+
),
369+
),
370+
],
371+
)
372+
def test_blob_pdf_chunk(
373+
pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: str, expected: pd.Series
374+
):
375+
bigframes.options.experiments.blob = True
376+
377+
actual = (
378+
pdf_mm_df["pdf"]
379+
.blob.pdf_chunk(
380+
connection=bq_connection, chunk_size=50, overlap_size=10, verbose=verbose
381+
)
382+
.explode()
383+
.to_pandas()
384+
)
385+
386+
pd.testing.assert_series_equal(
387+
actual,
388+
expected,
389+
check_dtype=False,
390+
check_index=False,
391+
)

0 commit comments

Comments
 (0)