Skip to content

add an iceberg table loader #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/compile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
WHITEFOX_TEST_AWS_REGION: ${{ vars.WHITEFOX_AWS_REGION }}
WHITEFOX_TEST_AWS_ACCESS_KEY_ID: ${{ secrets.WHITEFOX_AWS_ACCESS_KEY_ID }}
WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.WHITEFOX_AWS_SECRET_ACCESS_KEY }}
WHITEFOX_TEST_GLUE_CATALOG_ID: ${{ secrets.WHITEFOX_GLUE_CATALOG_ID }}
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
export HADOOP_HOME="$(pwd)/.github/workflows/hadoop3-win-binaries"
Expand All @@ -41,6 +42,7 @@ jobs:
WHITEFOX_TEST_AWS_REGION: ${{ vars.WHITEFOX_AWS_REGION }}
WHITEFOX_TEST_AWS_ACCESS_KEY_ID: ${{ secrets.WHITEFOX_AWS_ACCESS_KEY_ID }}
WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.WHITEFOX_AWS_SECRET_ACCESS_KEY }}
WHITEFOX_TEST_GLUE_CATALOG_ID: ${{ secrets.WHITEFOX_GLUE_CATALOG_ID }}
run: |
WHITEFOX_SERVER_AUTHENTICATION_ENABLED=TRUE \
WHITEFOX_SERVER_AUTHENTICATION_BEARERTOKEN=token \
Expand Down
8 changes: 8 additions & 0 deletions server/core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ dependencies {
implementation("io.delta:delta-standalone_2.13:3.0.0")
implementation(String.format("org.apache.hadoop:hadoop-common:%s", hadoopVersion))

//ICEBERG
implementation("org.apache.iceberg:iceberg-api:1.4.3")
implementation("org.apache.iceberg:iceberg-core:1.4.3")
implementation("org.apache.iceberg:iceberg-aws:1.4.3")
implementation("software.amazon.awssdk:glue:2.22.10")
implementation("software.amazon.awssdk:sts:2.22.10")
implementation("software.amazon.awssdk:s3:2.22.10")

//AWS
compileOnly(String.format("com.amazonaws:aws-java-sdk-bom:%s", awsSdkVersion))
compileOnly(String.format("com.amazonaws:aws-java-sdk-s3:%s", awsSdkVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,41 @@ public String toString() {
+ credentials + ']';
}
}

final class HadoopMetastoreProperties implements MetastoreProperties {
private final String location;

public HadoopMetastoreProperties(String location, MetastoreType type) {
if (type != MetastoreType.HADOOP) {
throw new IllegalArgumentException(String.format(
"Hadoop metatstore properties are not compatible with metastore of type %o", type));
}
this.location = location;
}

public String location() {
return location;
}

@Override
@SkipCoverageGenerated
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (HadoopMetastoreProperties) obj;
return Objects.equals(this.location, that.location);
}

@Override
@SkipCoverageGenerated
public int hashCode() {
return Objects.hash(location);
}

@Override
@SkipCoverageGenerated
public String toString() {
return "HadoopMetastoreProperties[" + "location=" + location + ']';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import java.util.Optional;

public enum MetastoreType {
GLUE("glue");
GLUE("glue"),
HADOOP("hadoop");

public final String value;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.whitefox.core.aws.utils;

import java.util.Map;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

public class StaticCredentialsProvider implements AwsCredentialsProvider {

private final AwsCredentials credentials;

public static AwsCredentialsProvider create(Map<String, String> properties) {
return software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
retrieveCredentials(properties));
}

private static AwsCredentials retrieveCredentials(Map<String, String> properties) {
if (!properties.containsKey("accessKeyId")) {
throw new IllegalArgumentException("accessKeyId not found");
} else if (!properties.containsKey("secretAccessKey")) {
throw new IllegalArgumentException("secretAccessKey not found");
}
return AwsBasicCredentials.create(
properties.get("accessKeyId"), properties.get("secretAccessKey"));
}

private StaticCredentialsProvider(Map<String, String> properties) {
this.credentials = retrieveCredentials(properties);
}

@Override
public AwsCredentials resolveCredentials() {
return credentials;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.whitefox.core.services;

import io.whitefox.core.AwsCredentials;
import io.whitefox.core.MetastoreProperties;
import io.whitefox.core.aws.utils.StaticCredentialsProvider;
import java.util.HashMap;
import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.AwsProperties;

public class AwsGlueConfigBuilder {

public Map<String, String> buildConfig(
MetastoreProperties.GlueMetastoreProperties glueMetastoreProperties) {
if (glueMetastoreProperties.credentials() instanceof AwsCredentials.SimpleAwsCredentials) {
AwsCredentials.SimpleAwsCredentials credentials =
(AwsCredentials.SimpleAwsCredentials) glueMetastoreProperties.credentials();
Map<String, String> config = new HashMap<>();
config.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.aws.glue.GlueCatalog");
config.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
config.put(AwsProperties.GLUE_CATALOG_ID, glueMetastoreProperties.catalogId());
config.put(AwsClientProperties.CLIENT_REGION, credentials.region());
config.put(
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER,
StaticCredentialsProvider.class.getName());
config.put(
String.format("%s.%s", AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER, "accessKeyId"),
credentials.awsAccessKeyId());
config.put(
String.format(
"%s.%s", AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER, "secretAccessKey"),
credentials.awsSecretAccessKey());
return config;
} else {
throw new IllegalArgumentException(String.format(
"Credentials type not supported with glue metastore %s", glueMetastoreProperties));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.whitefox.core.services;

import io.whitefox.core.SharedTable;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class DeltaShareTableLoader implements TableLoader<DeltaSharedTable> {
public class DeltaShareTableLoader implements TableLoader {

@Override
public DeltaSharedTable loadTable(SharedTable sharedTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

public class DeltaSharedTable {
public class DeltaSharedTable implements InternalSharedTable {

private final DeltaLog deltaLog;
private final TableSchemaConverter tableSchemaConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@ public class DeltaSharesServiceImpl implements DeltaSharesService {

private final StorageManager storageManager;
private final Integer defaultMaxResults;
private final DeltaShareTableLoader tableLoader;

private final TableLoaderFactory tableLoaderFactory;
private final FileSignerFactory fileSignerFactory;

@Inject
public DeltaSharesServiceImpl(
StorageManager storageManager,
@ConfigProperty(name = "io.delta.sharing.api.server.defaultMaxResults")
Integer defaultMaxResults,
DeltaShareTableLoader tableLoader,
TableLoaderFactory tableLoaderFactory,
FileSignerFactory signerFactory) {
this.storageManager = storageManager;
this.defaultMaxResults = defaultMaxResults;
this.tableLoader = tableLoader;
this.tableLoaderFactory = tableLoaderFactory;
this.fileSignerFactory = signerFactory;
}

Expand All @@ -37,7 +36,10 @@ public Optional<Long> getTableVersion(
String share, String schema, String table, String startingTimestamp) {
return storageManager
.getSharedTable(share, schema, table)
.map(t -> tableLoader.loadTable(t).getTableVersion(Optional.ofNullable(startingTimestamp)))
.map(t -> tableLoaderFactory
.newTableLoader(t.internalTable())
.loadTable(t)
.getTableVersion(Optional.ofNullable(startingTimestamp)))
.orElse(Optional.empty());
}

Expand All @@ -59,9 +61,10 @@ public ContentAndToken<List<Share>> listShares(
@Override
public Optional<Metadata> getTableMetadata(
String share, String schema, String table, String startingTimestamp) {
return storageManager
.getSharedTable(share, schema, table)
.flatMap(t -> tableLoader.loadTable(t).getMetadata(Optional.ofNullable(startingTimestamp)));
return storageManager.getSharedTable(share, schema, table).flatMap(t -> tableLoaderFactory
.newTableLoader(t.internalTable())
.loadTable(t)
.getMetadata(Optional.ofNullable(startingTimestamp)));
}

@Override
Expand Down Expand Up @@ -128,7 +131,10 @@ public ReadTableResult queryTable(

var fileSigner =
fileSignerFactory.newFileSigner(sharedTable.internalTable().provider().storage());
var readTableResultToBeSigned = tableLoader.loadTable(sharedTable).queryTable(queryRequest);
var readTableResultToBeSigned = tableLoaderFactory
.newTableLoader(sharedTable.internalTable())
.loadTable(sharedTable)
.queryTable(queryRequest);
return new ReadTableResult(
readTableResultToBeSigned.protocol(),
readTableResultToBeSigned.metadata(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.whitefox.core.services;

import io.whitefox.core.Metastore;
import io.whitefox.core.MetastoreProperties;
import io.whitefox.core.Storage;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopCatalog;

public class IcebergCatalogHandler {

private final AwsGlueConfigBuilder awsGlueConfigBuilder;

private final HadoopConfigBuilder hadoopConfigBuilder;

public IcebergCatalogHandler(
AwsGlueConfigBuilder awsGlueConfigBuilder, HadoopConfigBuilder hadoopConfigBuilder) {
this.awsGlueConfigBuilder = awsGlueConfigBuilder;
this.hadoopConfigBuilder = hadoopConfigBuilder;
}

public Table loadTableWithGlueCatalog(
Metastore metastore, Storage storage, TableIdentifier tableIdentifier) {
try (var catalog = new GlueCatalog()) {
catalog.setConf(hadoopConfigBuilder.buildConfig(storage));
catalog.initialize(
metastore.name(),
awsGlueConfigBuilder.buildConfig(
(MetastoreProperties.GlueMetastoreProperties) metastore.properties()));
return loadTable(catalog, tableIdentifier);
} catch (IOException e) {
throw new RuntimeException("Unexpected error when closing the Glue catalog", e);
}
}

public Table loadTableWithHadoopCatalog(
Metastore metastore, Storage storage, TableIdentifier tableIdentifier) {
try (var catalog = new HadoopCatalog()) {
catalog.setConf(hadoopConfigBuilder.buildConfig(storage));
catalog.initialize(
metastore.name(),
Map.of(
CatalogProperties.WAREHOUSE_LOCATION,
((MetastoreProperties.HadoopMetastoreProperties) metastore.properties()).location()));
return loadTable(catalog, tableIdentifier);
} catch (IOException e) {
throw new RuntimeException("Unexpected error when closing the Hadoop catalog", e);
}
}

private Table loadTable(BaseMetastoreCatalog catalog, TableIdentifier tableIdentifier) {
try {
return catalog.loadTable(tableIdentifier);
} catch (NoSuchTableException e) {
throw new IllegalArgumentException(String.format(
"Cannot find iceberg table [%s] under namespace [%s]",
tableIdentifier.name(), tableIdentifier.namespace()));
} catch (Throwable e) {
throw new RuntimeException(String.format(
"Unexpected exception when loading the iceberg table [%s] under namespace [%s]",
tableIdentifier.name(), tableIdentifier.namespace()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.whitefox.core.services;

import io.whitefox.core.Metadata;
import io.whitefox.core.ReadTableRequest;
import io.whitefox.core.ReadTableResultToBeSigned;
import java.util.Optional;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.iceberg.Table;

public class IcebergSharedTable implements InternalSharedTable {

private final Table icebergTable;
private final TableSchemaConverter tableSchemaConverter;

private IcebergSharedTable(Table icebergTable, TableSchemaConverter tableSchemaConverter) {
this.icebergTable = icebergTable;
this.tableSchemaConverter = tableSchemaConverter;
}

public static IcebergSharedTable of(
Table icebergTable, TableSchemaConverter tableSchemaConverter) {
return new IcebergSharedTable(icebergTable, tableSchemaConverter);
}

public static IcebergSharedTable of(Table icebergTable) {
return new IcebergSharedTable(icebergTable, new TableSchemaConverter());
}

public Optional<Metadata> getMetadata(Optional<String> startingTimestamp) {
throw new NotImplementedException();
}

@Override
public Optional<Long> getTableVersion(Optional<String> startingTimestamp) {
throw new NotImplementedException();
}

@Override
public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) {
throw new NotImplementedException();
}
}
Loading