-
Notifications
You must be signed in to change notification settings - Fork 19
Add workflow for reading CSV from s3, cleaning, saving to Parquet #738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3a76080
5aedaca
e185e5d
95585d4
10a529b
4504020
96e66f5
0779b48
7fb6792
437eb0b
e4851df
2657885
6eb04d6
6f4c04e
222c695
fc40687
670e3cc
16b0277
b557bc5
ccedaf8
37120c3
b3cfbaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import os | ||
import uuid | ||
|
||
import coiled | ||
import dask.dataframe as dd | ||
import pytest | ||
from distributed import Client, LocalCluster, wait | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def from_csv_to_parquet_cluster( | ||
dask_env_variables, | ||
cluster_kwargs, | ||
github_cluster_tags, | ||
): | ||
with coiled.Cluster( | ||
f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", | ||
environ=dask_env_variables, | ||
tags=github_cluster_tags, | ||
**cluster_kwargs["from_csv_to_parquet_cluster"], | ||
) as cluster: | ||
yield cluster | ||
|
||
|
||
@pytest.fixture | ||
def from_csv_to_parquet_client( | ||
from_csv_to_parquet_cluster, | ||
cluster_kwargs, | ||
upload_cluster_dump, | ||
benchmark_all, | ||
): | ||
n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] | ||
with Client(from_csv_to_parquet_cluster) as client: | ||
from_csv_to_parquet_cluster.scale(n_workers) | ||
client.wait_for_workers(n_workers) | ||
client.restart() | ||
with upload_cluster_dump(client), benchmark_all(client): | ||
yield client | ||
|
||
|
||
COLUMNSV1 = { | ||
"GlobalEventID": "Int64", | ||
"Day": "Int64", | ||
"MonthYear": "Int64", | ||
"Year": "Int64", | ||
"FractionDate": "Float64", | ||
"Actor1Code": "string[pyarrow]", | ||
"Actor1Name": "string[pyarrow]", | ||
"Actor1CountryCode": "string[pyarrow]", | ||
"Actor1KnownGroupCode": "string[pyarrow]", | ||
"Actor1EthnicCode": "string[pyarrow]", | ||
"Actor1Religion1Code": "string[pyarrow]", | ||
"Actor1Religion2Code": "string[pyarrow]", | ||
"Actor1Type1Code": "string[pyarrow]", | ||
"Actor1Type2Code": "string[pyarrow]", | ||
"Actor1Type3Code": "string[pyarrow]", | ||
"Actor2Code": "string[pyarrow]", | ||
"Actor2Name": "string[pyarrow]", | ||
"Actor2CountryCode": "string[pyarrow]", | ||
"Actor2KnownGroupCode": "string[pyarrow]", | ||
"Actor2EthnicCode": "string[pyarrow]", | ||
"Actor2Religion1Code": "string[pyarrow]", | ||
"Actor2Religion2Code": "string[pyarrow]", | ||
"Actor2Type1Code": "string[pyarrow]", | ||
"Actor2Type2Code": "string[pyarrow]", | ||
"Actor2Type3Code": "string[pyarrow]", | ||
"IsRootEvent": "Int64", | ||
"EventCode": "string[pyarrow]", | ||
"EventBaseCode": "string[pyarrow]", | ||
"EventRootCode": "string[pyarrow]", | ||
"QuadClass": "Int64", | ||
"GoldsteinScale": "Float64", | ||
"NumMentions": "Int64", | ||
"NumSources": "Int64", | ||
"NumArticles": "Int64", | ||
"AvgTone": "Float64", | ||
"Actor1Geo_Type": "Int64", | ||
"Actor1Geo_Fullname": "string[pyarrow]", | ||
"Actor1Geo_CountryCode": "string[pyarrow]", | ||
"Actor1Geo_ADM1Code": "string[pyarrow]", | ||
"Actor1Geo_Lat": "Float64", | ||
"Actor1Geo_Long": "Float64", | ||
"Actor1Geo_FeatureID": "string[pyarrow]", | ||
"Actor2Geo_Type": "Int64", | ||
"Actor2Geo_Fullname": "string[pyarrow]", | ||
"Actor2Geo_CountryCode": "string[pyarrow]", | ||
"Actor2Geo_ADM1Code": "string[pyarrow]", | ||
"Actor2Geo_Lat": "Float64", | ||
"Actor2Geo_Long": "Float64", | ||
"Actor2Geo_FeatureID": "string[pyarrow]", | ||
"ActionGeo_Type": "Int64", | ||
"ActionGeo_Fullname": "string[pyarrow]", | ||
"ActionGeo_CountryCode": "string[pyarrow]", | ||
"ActionGeo_ADM1Code": "string[pyarrow]", | ||
"ActionGeo_Lat": "Float64", | ||
"ActionGeo_Long": "Float64", | ||
"ActionGeo_FeatureID": "string[pyarrow]", | ||
"DATEADDED": "Int64", | ||
"SOURCEURL": "string[pyarrow]", | ||
} | ||
|
||
|
||
def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): | ||
s3 = s3_factory(anon=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking -- is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, I'll give it a test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like it is necessary |
||
|
||
bad_files = [ | ||
"gdelt-open-data/events/20161004.export.csv", | ||
"gdelt-open-data/events/20170106.export.csv", | ||
"gdelt-open-data/events/20170422.export.csv", | ||
"gdelt-open-data/events/20170802.export.csv", | ||
"gdelt-open-data/events/20170920.export.csv", | ||
"gdelt-open-data/events/20171021.export.csv", | ||
"gdelt-open-data/events/20180415.export.csv", | ||
"gdelt-open-data/events/20180416.export.csv", | ||
"gdelt-open-data/events/20180613.export.csv", | ||
"gdelt-open-data/events/20180806.export.csv", | ||
"gdelt-open-data/events/20190217.export.csv", | ||
"gdelt-open-data/events/20190613.export.csv", | ||
] | ||
files = s3.ls("s3://gdelt-open-data/events/")[120:] | ||
files = [f"s3://{f}" for f in files if f not in bad_files] | ||
|
||
df = dd.read_csv( | ||
files, | ||
names=COLUMNSV1.keys(), | ||
sep="\t", | ||
dtype=COLUMNSV1, | ||
storage_options=s3.storage_options, | ||
on_bad_lines="skip", | ||
) | ||
|
||
df = df.map_partitions( | ||
lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") | ||
) | ||
df["national_paper"] = df.SOURCEURL.str.contains( | ||
"washingtonpost|nytimes", regex=True | ||
) | ||
df = df[df["national_paper"]] | ||
|
||
output = s3_url + "/from-csv-to-parquet/" | ||
|
||
df.to_parquet(output) |
Uh oh!
There was an error while loading. Please reload this page.