diff --git a/lib/athena-query.ts b/lib/athena-query.ts index 4fa4290..c516bb4 100644 --- a/lib/athena-query.ts +++ b/lib/athena-query.ts @@ -16,11 +16,7 @@ export class AthenaQuery { async *query( sql: string, options?: { executionParameters?: string[]; maxResults: number } - ): AsyncGenerator< - Record, - void, - undefined - > { + ): AsyncGenerator { const QueryExecutionId = await helpers.startQueryExecution({ athena: this.athena, sql, diff --git a/lib/helper.ts b/lib/helper.ts index 7242c71..c5b5573 100644 --- a/lib/helper.ts +++ b/lib/helper.ts @@ -1,9 +1,11 @@ -import { +import type { Athena, - Datum, GetQueryResultsCommandOutput, } from "@aws-sdk/client-athena"; +export type AtheneRecordData = Record; +type AtheneRecord = AtheneRecordData[]; + async function startQueryExecution(params: { athena: Athena; sql: string; @@ -55,14 +57,14 @@ async function getQueryResults(params: { MaxResults?: number; NextToken?: string; QueryExecutionId: string; -}) { +}): Promise<{ items: AtheneRecord; nextToken?: string }> { const queryResults = await params.athena.getQueryResults({ QueryExecutionId: params.QueryExecutionId, MaxResults: params.MaxResults, NextToken: params.NextToken, }); return { - items: await cleanUpPaginatedDML( + items: cleanUpPaginatedDML( queryResults, // If NextToken is not given, ignore first data. // Because the first data is header info. @@ -72,42 +74,39 @@ async function getQueryResults(params: { }; } -async function cleanUpPaginatedDML( +function cleanUpPaginatedDML( queryResults: GetQueryResultsCommandOutput, ignoreFirstData: boolean -) { - const dataTypes = await getDataTypes(queryResults); +): AtheneRecord { + const dataTypes = getDataTypes(queryResults); if (!dataTypes) return []; const columnNames = Object.keys(dataTypes); - let unformattedS3RowArray: Datum[] | null = null; - let formattedArray: Record[] = []; - - for ( - let i = ignoreFirstData ? 1 : 0; - i < (queryResults.ResultSet?.Rows?.length ?? 0); - i++ - ) { - unformattedS3RowArray = queryResults.ResultSet?.Rows?.[i].Data ?? null; - if (!unformattedS3RowArray) continue; + const items = queryResults.ResultSet?.Rows?.reduce((acc, { Data }, index) => { + if (ignoreFirstData && index === 0) return acc; + if (!Data) return acc; - const rowObject = unformattedS3RowArray?.reduce((acc, row, index) => { + const rowObject = Data?.reduce((acc, row, index) => { if (row.VarCharValue) { + // use mutable operation for performance acc[columnNames[index]] = row.VarCharValue; } return acc; }, {} as Record); - formattedArray.push(addDataType(rowObject, dataTypes)); - } - return formattedArray; + // use mutable operation for performance + acc.push(addDataType(rowObject, dataTypes)); + return acc; + }, [] as AtheneRecord); + + return items ?? []; } function addDataType( input: Record, dataTypes: Record -): Record { +): AtheneRecordData { const updatedObjectWithDataType: Record< string, null | string | number | BigInt @@ -143,9 +142,9 @@ function addDataType( return updatedObjectWithDataType; } -async function getDataTypes( +function getDataTypes( queryResults: GetQueryResultsCommandOutput -): Promise | undefined> { +): Record | undefined { const columnInfoArray = queryResults.ResultSet?.ResultSetMetadata?.ColumnInfo; const columnInfoObject = columnInfoArray?.reduce((acc, columnInfo) => {