Skip to content

chore: any refactor #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions lib/athena-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ export class AthenaQuery {
async *query(
sql: string,
options?: { executionParameters?: string[]; maxResults: number }
): AsyncGenerator<
Record<string, string | number | BigInt | null>,
void,
undefined
> {
): AsyncGenerator<helpers.AtheneRecordData, void, undefined> {
const QueryExecutionId = await helpers.startQueryExecution({
athena: this.athena,
sql,
Expand Down
47 changes: 23 additions & 24 deletions lib/helper.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
import type {
Athena,
Datum,
GetQueryResultsCommandOutput,
} from "@aws-sdk/client-athena";

export type AtheneRecordData = Record<string, string | number | BigInt | null>;
type AtheneRecord = AtheneRecordData[];

async function startQueryExecution(params: {
athena: Athena;
sql: string;
Expand Down Expand Up @@ -55,14 +57,14 @@ async function getQueryResults(params: {
MaxResults?: number;
NextToken?: string;
QueryExecutionId: string;
}) {
}): Promise<{ items: AtheneRecord; nextToken?: string }> {
const queryResults = await params.athena.getQueryResults({
QueryExecutionId: params.QueryExecutionId,
MaxResults: params.MaxResults,
NextToken: params.NextToken,
});
return {
items: await cleanUpPaginatedDML(
items: cleanUpPaginatedDML(
queryResults,
// If NextToken is not given, ignore first data.
// Because the first data is header info.
Expand All @@ -72,42 +74,39 @@ async function getQueryResults(params: {
};
}

async function cleanUpPaginatedDML(
function cleanUpPaginatedDML(
queryResults: GetQueryResultsCommandOutput,
ignoreFirstData: boolean
) {
const dataTypes = await getDataTypes(queryResults);
): AtheneRecord {
const dataTypes = getDataTypes(queryResults);
if (!dataTypes) return [];

const columnNames = Object.keys(dataTypes);
let unformattedS3RowArray: Datum[] | null = null;
let formattedArray: Record<string, string | number | BigInt | null>[] = [];

for (
let i = ignoreFirstData ? 1 : 0;
i < (queryResults.ResultSet?.Rows?.length ?? 0);
i++
) {
unformattedS3RowArray = queryResults.ResultSet?.Rows?.[i].Data ?? null;

if (!unformattedS3RowArray) continue;
const items = queryResults.ResultSet?.Rows?.reduce((acc, { Data }, index) => {
if (ignoreFirstData && index === 0) return acc;
if (!Data) return acc;

const rowObject = unformattedS3RowArray?.reduce((acc, row, index) => {
const rowObject = Data?.reduce((acc, row, index) => {
if (row.VarCharValue) {
// use mutable operation for performance
acc[columnNames[index]] = row.VarCharValue;
}
return acc;
}, {} as Record<string, string>);

formattedArray.push(addDataType(rowObject, dataTypes));
}
return formattedArray;
// use mutable operation for performance
acc.push(addDataType(rowObject, dataTypes));
return acc;
}, [] as AtheneRecord);

return items ?? [];
}

function addDataType(
input: Record<string, string>,
dataTypes: Record<string, string>
): Record<string, null | string | number | BigInt> {
): AtheneRecordData {
const updatedObjectWithDataType: Record<
string,
null | string | number | BigInt
Expand Down Expand Up @@ -143,9 +142,9 @@ function addDataType(
return updatedObjectWithDataType;
}

async function getDataTypes(
function getDataTypes(
queryResults: GetQueryResultsCommandOutput
): Promise<Record<string, string> | undefined> {
): Record<string, string> | undefined {
const columnInfoArray = queryResults.ResultSet?.ResultSetMetadata?.ColumnInfo;

const columnInfoObject = columnInfoArray?.reduce((acc, columnInfo) => {
Expand Down