Skip to content

Support symlinks #4881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bigquery_etl/backfill/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import re
import sys
from glob import glob
from pathlib import Path
from typing import Dict, List, Tuple

Expand Down Expand Up @@ -60,7 +61,9 @@ def get_qualified_table_name_to_entries_map_by_project(
backfills_dict: dict = {}

search_path = Path(sql_dir) / project_id
backfill_files = Path(search_path).rglob(BACKFILL_FILE)
backfill_files = map(
Path, glob(f"{search_path}/**/{BACKFILL_FILE}", recursive=True)
)
Comment on lines +64 to +66
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Path.glob() and Path.rglob() will be getting a new follow_symlinks argument in Python 3.13, so once we eventually upgrade to that we should be able to revert workarounds like this and specify follow_symlinks=True instead.


for backfill_file in backfill_files:
project, dataset, table = extract_from_query_path(backfill_file)
Expand Down
5 changes: 4 additions & 1 deletion bigquery_etl/cli/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import tempfile
from datetime import date, timedelta
from functools import partial
from glob import glob
from multiprocessing.pool import Pool, ThreadPool
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -1388,7 +1389,9 @@ def _initialize(query_file):
table = None

sql_content = query_file.read_text()
init_files = list(Path(query_file.parent).rglob("init.sql"))
init_files = list(
map(Path, glob(f"{query_file.parent}/**/init.sql", recursive=True))
)

# check if the provided file can be initialized and whether existing ones should be skipped
if "is_init()" in sql_content or len(init_files) > 0:
Expand Down
15 changes: 10 additions & 5 deletions bigquery_etl/cli/routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import string
import sys
from fnmatch import fnmatchcase
from glob import glob
from pathlib import Path

import pytest
Expand Down Expand Up @@ -41,7 +42,7 @@ def _routines_matching_name_pattern(pattern, sql_path, project_id):
if project_id is not None:
sql_path = sql_path / project_id

all_sql_files = Path(sql_path).rglob("*.sql")
all_sql_files = map(Path, glob(f"{sql_path}/**/*.sql", recursive=True))
routine_files = []

for sql_file in all_sql_files:
Expand Down Expand Up @@ -294,7 +295,9 @@ def info(ctx, name, sql_dir, project_id, usages):
# find routine usages in SQL files
click.echo("usages: ")
sql_files = [
p for project in project_dirs() for p in Path(project).rglob("*.sql")
p
for project in project_dirs()
for p in map(Path, glob(f"{project}/**/*.sql", recursive=True))
]
for sql_file in sql_files:
if f"{routine_dataset}.{routine_name}" in sql_file.read_text():
Expand Down Expand Up @@ -518,9 +521,11 @@ def rename(ctx, name, new_name, sql_dir, project_id):
shutil.move(source, destination)

# replace usages
all_sql_files = list(
[p for project in project_dirs() for p in Path(project).rglob("*.sql")]
)
all_sql_files = [
p
for project in project_dirs()
for p in map(Path, glob(f"{project}/**/*.sql", recursive=True))
]

for sql_file in all_sql_files:
sql = sql_file.read_text()
Expand Down
7 changes: 5 additions & 2 deletions bigquery_etl/cli/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import tempfile
from datetime import datetime
from glob import glob
from multiprocessing.pool import ThreadPool
from pathlib import Path

Expand Down Expand Up @@ -160,7 +161,9 @@ def deploy(
shutil.rmtree(test_path)

# rename test files
for test_file_path in test_destination.glob("**/*"):
for test_file_path in map(
Path, glob(f"{test_destination}/**/*", recursive=True)
):
for test_dep_file in artifact_files:
test_project = test_dep_file.parent.parent.parent.name
test_dataset = test_dep_file.parent.parent.name
Expand Down Expand Up @@ -330,7 +333,7 @@ def _update_references(artifact_files, project_id, dataset_suffix, sql_dir):
),
]

for path in Path(sql_dir).rglob("*.sql"):
for path in map(Path, glob(f"{sql_dir}/**/*.sql", recursive=True)):
# apply substitutions
if path.is_file():
sql = render(path.name, template_folder=path.parent, format=False)
Expand Down
11 changes: 8 additions & 3 deletions bigquery_etl/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import re
from fnmatch import fnmatchcase
from glob import glob
from pathlib import Path
from typing import Iterator, List, Optional, Tuple

Expand Down Expand Up @@ -109,9 +110,11 @@ def paths_matching_name_pattern(
pattern = "*.*"

if os.path.isdir(pattern):
for root, _, _ in os.walk(pattern):
for root, _, _ in os.walk(pattern, followlinks=True):
for file in files:
matching_files.extend(Path(root).rglob(file))
matching_files.extend(
map(Path, glob(f"{root}/**/{file}", recursive=True))
)
elif os.path.isfile(pattern):
matching_files.append(Path(pattern))
else:
Expand All @@ -122,7 +125,9 @@ def paths_matching_name_pattern(
all_matching_files: List[Path] = []

for file in files:
all_matching_files.extend(Path(sql_path).rglob(file))
all_matching_files.extend(
map(Path, glob(f"{sql_path}/**/{file}", recursive=True))
)

for query_file in all_matching_files:
match = file_regex.match(str(query_file))
Expand Down
7 changes: 6 additions & 1 deletion bigquery_etl/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import re
import sys
from glob import glob
from itertools import groupby
from pathlib import Path
from subprocess import CalledProcessError
Expand Down Expand Up @@ -145,7 +146,11 @@ def _get_references(
file_paths = {
path
for parent in map(Path, paths or ["sql"])
for path in (parent.glob("**/*.sql") if parent.is_dir() else [parent])
for path in (
map(Path, glob(f"{parent}/**/*.sql", recursive=True))
if parent.is_dir()
else [parent]
)
if not path.name.endswith(".template.sql") # skip templates
}
fail = False
Expand Down
2 changes: 1 addition & 1 deletion bigquery_etl/docs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def validate(project_dirs, log_level):
if os.path.isdir(project_dir):
parsed_routines = read_routine_dir(project_dir)

for root, dirs, files in os.walk(project_dir):
for root, dirs, files in os.walk(project_dir, followlinks=True):
if os.path.basename(root) == EXAMPLE_DIR:
sql_files = (f for f in files if os.path.splitext(f)[1] == ".sql")
for file in sql_files:
Expand Down
2 changes: 1 addition & 1 deletion bigquery_etl/format_sql/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def format(paths, check=False):
if os.path.isdir(path):
sql_files.extend(
filepath
for dirpath, _, filenames in os.walk(path)
for dirpath, _, filenames in os.walk(path, followlinks=True)
for filename in filenames
if filename.endswith(".sql")
# skip tests/**/input.sql
Expand Down
4 changes: 2 additions & 2 deletions bigquery_etl/metadata/validate_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def validate(target):
failed = False

if os.path.isdir(target):
for root, dirs, files in os.walk(target):
for root, dirs, files in os.walk(target, followlinks=True):
for file in files:
if Metadata.is_metadata_file(file):
path = os.path.join(root, file)
Expand Down Expand Up @@ -134,7 +134,7 @@ def validate_datasets(target):
failed = False

if os.path.isdir(target):
for root, dirs, files in os.walk(target):
for root, dirs, files in os.walk(target, followlinks=True):
for file in files:
if DatasetMetadata.is_dataset_metadata_file(file):
path = os.path.join(root, file)
Expand Down
2 changes: 1 addition & 1 deletion bigquery_etl/query_scheduling/generate_airflow_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_dags(project_id, dags_config, sql_dir=None):
for project_dir in project_dirs(project_id, sql_dir=sql_dir):
# parse metadata.yaml to retrieve scheduling information
if os.path.isdir(project_dir):
for root, dirs, files in os.walk(project_dir):
for root, dirs, files in os.walk(project_dir, followlinks=True):
try:
if QUERY_FILE in files:
query_file = os.path.join(root, QUERY_FILE)
Expand Down
4 changes: 2 additions & 2 deletions bigquery_etl/routine/parse_routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_routines_from_dir(project_dir):
"project": root.split("/")[-3],
"is_udf": filename == UDF_FILE,
}
for root, dirs, files in os.walk(project_dir)
for root, dirs, files in os.walk(project_dir, followlinks=True)
for filename in files
if filename in ROUTINE_FILES
]
Expand Down Expand Up @@ -234,7 +234,7 @@ def read_routine_dir(*project_dirs):
raw_routines[project_dirs] = {
raw_routine.name: raw_routine
for project_dir in project_dirs
for root, dirs, files in os.walk(project_dir)
for root, dirs, files in os.walk(project_dir, followlinks=True)
if os.path.basename(root) != ConfigLoader.get("routine", "example_dir")
for filename in files
if filename in ROUTINE_FILES
Expand Down
2 changes: 1 addition & 1 deletion bigquery_etl/routine/publish_routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def push_dependencies_to_gcs(bucket, path, dependency_dir, project_id):
client = storage.Client(project_id)
bucket = client.get_bucket(bucket)

for root, dirs, files in os.walk(dependency_dir):
for root, dirs, files in os.walk(dependency_dir, followlinks=True):
for filename in files:
blob = bucket.blob(path + filename)
blob.upload_from_filename(os.path.join(root, filename))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from argparse import ArgumentParser
from fnmatch import fnmatchcase
from glob import glob
from pathlib import Path

from google.cloud import bigquery
Expand Down Expand Up @@ -43,9 +44,13 @@ def main():
args = parser.parse_args()
client = bigquery.Client(args.project)

sql_queries = list(Path(args.sql_dir).rglob("query.sql"))
python_queries = list(Path(args.sql_dir).rglob("query.py"))
multipart_queries = list(Path(args.sql_dir).rglob("part1.sql"))
sql_queries = list(map(Path, glob(f"{args.sql_dir}/**/query.sql", recursive=True)))
python_queries = list(
map(Path, glob(f"{args.sql_dir}/**/query.py", recursive=True))
)
multipart_queries = list(
map(Path, glob(f"{args.sql_dir}/**/part1.sql", recursive=True))
)
query_paths = sql_queries + python_queries + multipart_queries

query = create_query(query_paths, args.date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""Determine cost of previously scheduled bigquery-etl queries."""

from argparse import ArgumentParser
from glob import glob
from pathlib import Path

from google.cloud import bigquery
Expand Down Expand Up @@ -57,9 +58,13 @@ def create_query(query_paths, date, project):
def main():
args = parser.parse_args()

sql_queries = list(Path(args.sql_dir).rglob("query.sql"))
python_queries = list(Path(args.sql_dir).rglob("query.py"))
multipart_queries = list(Path(args.sql_dir).rglob("part1.sql"))
sql_queries = list(map(Path, glob(f"{args.sql_dir}/**/query.sql", recursive=True)))
python_queries = list(
map(Path, glob(f"{args.sql_dir}/**/query.py", recursive=True))
)
multipart_queries = list(
map(Path, glob(f"{args.sql_dir}/**/part1.sql", recursive=True))
)
query_paths = sql_queries + python_queries + multipart_queries
partition = args.date.replace("-", "")
destination_table = f"{args.project}.{args.destination_dataset}.{args.destination_table}${partition}"
Expand Down
2 changes: 1 addition & 1 deletion sql_generators/events_daily/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def get_args(self) -> dict:

def get_query_dirs(path):
"""Walk a path to get all templated query dirs."""
for directory, sub_dirs, files in os.walk(path):
for directory, sub_dirs, files in os.walk(path, followlinks=True):
non_hidden = {f for f in files if not f.startswith(".")}
if non_hidden and non_hidden.issubset(ALLOWED_FILES):
dir_path = Path(directory)
Expand Down
7 changes: 6 additions & 1 deletion tests/sql/glam-fenix-dev/glam_etl/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import warnings
from collections import namedtuple
from glob import glob
from multiprocessing import Pool
from pathlib import Path

Expand Down Expand Up @@ -88,7 +89,11 @@ def deps(output):
"""Create a dependency file with all links between queries and tables."""
path = Path(output)
deps = calculate_dependencies(
[p for p in SQL_ROOT.glob("**/*.sql") if "__clients_daily" not in p.name]
[
p
for p in map(Path, glob(f"{SQL_ROOT}/**/*.sql", recursive=True))
if "__clients_daily" not in p.name
]
)
path.write_text(
json.dumps([dict(zip(["from", "to"], dep)) for dep in deps], indent=2)
Expand Down