Skip to content

Commit d48cc86

Browse files
matar993Marco Scalzo
and
Marco Scalzo
authored
Implement Iceberg table loader (#178)
Co-authored-by: Marco Scalzo <[email protected]>
1 parent 13c4ecd commit d48cc86

File tree

46 files changed

+936
-84
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+936
-84
lines changed

.github/workflows/compile.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
WHITEFOX_TEST_AWS_REGION: ${{ vars.WHITEFOX_AWS_REGION }}
2929
WHITEFOX_TEST_AWS_ACCESS_KEY_ID: ${{ secrets.WHITEFOX_AWS_ACCESS_KEY_ID }}
3030
WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.WHITEFOX_AWS_SECRET_ACCESS_KEY }}
31+
WHITEFOX_TEST_GLUE_CATALOG_ID: ${{ secrets.WHITEFOX_GLUE_CATALOG_ID }}
3132
run: |
3233
if [ "$RUNNER_OS" == "Windows" ]; then
3334
export HADOOP_HOME="$(pwd)/.github/workflows/hadoop3-win-binaries"
@@ -41,6 +42,7 @@ jobs:
4142
WHITEFOX_TEST_AWS_REGION: ${{ vars.WHITEFOX_AWS_REGION }}
4243
WHITEFOX_TEST_AWS_ACCESS_KEY_ID: ${{ secrets.WHITEFOX_AWS_ACCESS_KEY_ID }}
4344
WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.WHITEFOX_AWS_SECRET_ACCESS_KEY }}
45+
WHITEFOX_TEST_GLUE_CATALOG_ID: ${{ secrets.WHITEFOX_GLUE_CATALOG_ID }}
4446
run: |
4547
WHITEFOX_SERVER_AUTHENTICATION_ENABLED=TRUE \
4648
WHITEFOX_SERVER_AUTHENTICATION_BEARERTOKEN=token \

server/core/build.gradle.kts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ dependencies {
3030
implementation("io.delta:delta-standalone_2.13:3.0.0")
3131
implementation(String.format("org.apache.hadoop:hadoop-common:%s", hadoopVersion))
3232

33+
//ICEBERG
34+
implementation("org.apache.iceberg:iceberg-api:1.4.3")
35+
implementation("org.apache.iceberg:iceberg-core:1.4.3")
36+
implementation("org.apache.iceberg:iceberg-aws:1.4.3")
37+
implementation("software.amazon.awssdk:glue:2.22.10")
38+
implementation("software.amazon.awssdk:sts:2.22.10")
39+
implementation("software.amazon.awssdk:s3:2.22.10")
40+
3341
//AWS
3442
compileOnly(String.format("com.amazonaws:aws-java-sdk-bom:%s", awsSdkVersion))
3543
compileOnly(String.format("com.amazonaws:aws-java-sdk-s3:%s", awsSdkVersion))

server/core/src/main/java/io/whitefox/core/MetastoreProperties.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,41 @@ public String toString() {
5151
+ credentials + ']';
5252
}
5353
}
54+
55+
final class HadoopMetastoreProperties implements MetastoreProperties {
56+
private final String location;
57+
58+
public HadoopMetastoreProperties(String location, MetastoreType type) {
59+
if (type != MetastoreType.HADOOP) {
60+
throw new IllegalArgumentException(String.format(
61+
"Hadoop metatstore properties are not compatible with metastore of type %o", type));
62+
}
63+
this.location = location;
64+
}
65+
66+
public String location() {
67+
return location;
68+
}
69+
70+
@Override
71+
@SkipCoverageGenerated
72+
public boolean equals(Object obj) {
73+
if (obj == this) return true;
74+
if (obj == null || obj.getClass() != this.getClass()) return false;
75+
var that = (HadoopMetastoreProperties) obj;
76+
return Objects.equals(this.location, that.location);
77+
}
78+
79+
@Override
80+
@SkipCoverageGenerated
81+
public int hashCode() {
82+
return Objects.hash(location);
83+
}
84+
85+
@Override
86+
@SkipCoverageGenerated
87+
public String toString() {
88+
return "HadoopMetastoreProperties[" + "location=" + location + ']';
89+
}
90+
}
5491
}

server/core/src/main/java/io/whitefox/core/MetastoreType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import java.util.Optional;
55

66
public enum MetastoreType {
7-
GLUE("glue");
7+
GLUE("glue"),
8+
HADOOP("hadoop");
89

910
public final String value;
1011

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.whitefox.core.aws.utils;
2+
3+
import java.util.Map;
4+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
5+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
6+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
7+
8+
public class StaticCredentialsProvider implements AwsCredentialsProvider {
9+
10+
private final AwsCredentials credentials;
11+
12+
public static AwsCredentialsProvider create(Map<String, String> properties) {
13+
return software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
14+
retrieveCredentials(properties));
15+
}
16+
17+
private static AwsCredentials retrieveCredentials(Map<String, String> properties) {
18+
if (!properties.containsKey("accessKeyId")) {
19+
throw new IllegalArgumentException("accessKeyId not found");
20+
} else if (!properties.containsKey("secretAccessKey")) {
21+
throw new IllegalArgumentException("secretAccessKey not found");
22+
}
23+
return AwsBasicCredentials.create(
24+
properties.get("accessKeyId"), properties.get("secretAccessKey"));
25+
}
26+
27+
private StaticCredentialsProvider(Map<String, String> properties) {
28+
this.credentials = retrieveCredentials(properties);
29+
}
30+
31+
@Override
32+
public AwsCredentials resolveCredentials() {
33+
return credentials;
34+
}
35+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.whitefox.core.services;
2+
3+
import io.whitefox.core.AwsCredentials;
4+
import io.whitefox.core.MetastoreProperties;
5+
import io.whitefox.core.aws.utils.StaticCredentialsProvider;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import org.apache.iceberg.CatalogProperties;
9+
import org.apache.iceberg.aws.AwsClientProperties;
10+
import org.apache.iceberg.aws.AwsProperties;
11+
12+
public class AwsGlueConfigBuilder {
13+
14+
public Map<String, String> buildConfig(
15+
MetastoreProperties.GlueMetastoreProperties glueMetastoreProperties) {
16+
if (glueMetastoreProperties.credentials() instanceof AwsCredentials.SimpleAwsCredentials) {
17+
AwsCredentials.SimpleAwsCredentials credentials =
18+
(AwsCredentials.SimpleAwsCredentials) glueMetastoreProperties.credentials();
19+
Map<String, String> config = new HashMap<>();
20+
config.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.aws.glue.GlueCatalog");
21+
config.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
22+
config.put(AwsProperties.GLUE_CATALOG_ID, glueMetastoreProperties.catalogId());
23+
config.put(AwsClientProperties.CLIENT_REGION, credentials.region());
24+
config.put(
25+
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER,
26+
StaticCredentialsProvider.class.getName());
27+
config.put(
28+
String.format("%s.%s", AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER, "accessKeyId"),
29+
credentials.awsAccessKeyId());
30+
config.put(
31+
String.format(
32+
"%s.%s", AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER, "secretAccessKey"),
33+
credentials.awsSecretAccessKey());
34+
return config;
35+
} else {
36+
throw new IllegalArgumentException(String.format(
37+
"Credentials type not supported with glue metastore %s", glueMetastoreProperties));
38+
}
39+
}
40+
}

server/core/src/main/java/io/whitefox/core/services/DeltaShareTableLoader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package io.whitefox.core.services;
22

33
import io.whitefox.core.SharedTable;
4-
import jakarta.enterprise.context.ApplicationScoped;
54

6-
@ApplicationScoped
7-
public class DeltaShareTableLoader implements TableLoader<DeltaSharedTable> {
5+
public class DeltaShareTableLoader implements TableLoader {
86

97
@Override
108
public DeltaSharedTable loadTable(SharedTable sharedTable) {

server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import java.util.Optional;
1010
import java.util.stream.Collectors;
1111

12-
public class DeltaSharedTable {
12+
public class DeltaSharedTable implements InternalSharedTable {
1313

1414
private final DeltaLog deltaLog;
1515
private final TableSchemaConverter tableSchemaConverter;

server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@ public class DeltaSharesServiceImpl implements DeltaSharesService {
1515

1616
private final StorageManager storageManager;
1717
private final Integer defaultMaxResults;
18-
private final DeltaShareTableLoader tableLoader;
19-
18+
private final TableLoaderFactory tableLoaderFactory;
2019
private final FileSignerFactory fileSignerFactory;
2120

2221
@Inject
2322
public DeltaSharesServiceImpl(
2423
StorageManager storageManager,
2524
@ConfigProperty(name = "io.delta.sharing.api.server.defaultMaxResults")
2625
Integer defaultMaxResults,
27-
DeltaShareTableLoader tableLoader,
26+
TableLoaderFactory tableLoaderFactory,
2827
FileSignerFactory signerFactory) {
2928
this.storageManager = storageManager;
3029
this.defaultMaxResults = defaultMaxResults;
31-
this.tableLoader = tableLoader;
30+
this.tableLoaderFactory = tableLoaderFactory;
3231
this.fileSignerFactory = signerFactory;
3332
}
3433

@@ -37,7 +36,10 @@ public Optional<Long> getTableVersion(
3736
String share, String schema, String table, String startingTimestamp) {
3837
return storageManager
3938
.getSharedTable(share, schema, table)
40-
.map(t -> tableLoader.loadTable(t).getTableVersion(Optional.ofNullable(startingTimestamp)))
39+
.map(t -> tableLoaderFactory
40+
.newTableLoader(t.internalTable())
41+
.loadTable(t)
42+
.getTableVersion(Optional.ofNullable(startingTimestamp)))
4143
.orElse(Optional.empty());
4244
}
4345

@@ -59,9 +61,10 @@ public ContentAndToken<List<Share>> listShares(
5961
@Override
6062
public Optional<Metadata> getTableMetadata(
6163
String share, String schema, String table, String startingTimestamp) {
62-
return storageManager
63-
.getSharedTable(share, schema, table)
64-
.flatMap(t -> tableLoader.loadTable(t).getMetadata(Optional.ofNullable(startingTimestamp)));
64+
return storageManager.getSharedTable(share, schema, table).flatMap(t -> tableLoaderFactory
65+
.newTableLoader(t.internalTable())
66+
.loadTable(t)
67+
.getMetadata(Optional.ofNullable(startingTimestamp)));
6568
}
6669

6770
@Override
@@ -128,7 +131,10 @@ public ReadTableResult queryTable(
128131

129132
var fileSigner =
130133
fileSignerFactory.newFileSigner(sharedTable.internalTable().provider().storage());
131-
var readTableResultToBeSigned = tableLoader.loadTable(sharedTable).queryTable(queryRequest);
134+
var readTableResultToBeSigned = tableLoaderFactory
135+
.newTableLoader(sharedTable.internalTable())
136+
.loadTable(sharedTable)
137+
.queryTable(queryRequest);
132138
return new ReadTableResult(
133139
readTableResultToBeSigned.protocol(),
134140
readTableResultToBeSigned.metadata(),
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.whitefox.core.services;
2+
3+
import io.whitefox.core.Metastore;
4+
import io.whitefox.core.MetastoreProperties;
5+
import io.whitefox.core.Storage;
6+
import java.io.IOException;
7+
import java.util.Map;
8+
import org.apache.iceberg.BaseMetastoreCatalog;
9+
import org.apache.iceberg.CatalogProperties;
10+
import org.apache.iceberg.Table;
11+
import org.apache.iceberg.aws.glue.GlueCatalog;
12+
import org.apache.iceberg.catalog.TableIdentifier;
13+
import org.apache.iceberg.exceptions.NoSuchTableException;
14+
import org.apache.iceberg.hadoop.HadoopCatalog;
15+
16+
public class IcebergCatalogHandler {
17+
18+
private final AwsGlueConfigBuilder awsGlueConfigBuilder;
19+
20+
private final HadoopConfigBuilder hadoopConfigBuilder;
21+
22+
public IcebergCatalogHandler(
23+
AwsGlueConfigBuilder awsGlueConfigBuilder, HadoopConfigBuilder hadoopConfigBuilder) {
24+
this.awsGlueConfigBuilder = awsGlueConfigBuilder;
25+
this.hadoopConfigBuilder = hadoopConfigBuilder;
26+
}
27+
28+
public Table loadTableWithGlueCatalog(
29+
Metastore metastore, Storage storage, TableIdentifier tableIdentifier) {
30+
try (var catalog = new GlueCatalog()) {
31+
catalog.setConf(hadoopConfigBuilder.buildConfig(storage));
32+
catalog.initialize(
33+
metastore.name(),
34+
awsGlueConfigBuilder.buildConfig(
35+
(MetastoreProperties.GlueMetastoreProperties) metastore.properties()));
36+
return loadTable(catalog, tableIdentifier);
37+
} catch (IOException e) {
38+
throw new RuntimeException("Unexpected error when closing the Glue catalog", e);
39+
}
40+
}
41+
42+
public Table loadTableWithHadoopCatalog(
43+
Metastore metastore, Storage storage, TableIdentifier tableIdentifier) {
44+
try (var catalog = new HadoopCatalog()) {
45+
catalog.setConf(hadoopConfigBuilder.buildConfig(storage));
46+
catalog.initialize(
47+
metastore.name(),
48+
Map.of(
49+
CatalogProperties.WAREHOUSE_LOCATION,
50+
((MetastoreProperties.HadoopMetastoreProperties) metastore.properties()).location()));
51+
return loadTable(catalog, tableIdentifier);
52+
} catch (IOException e) {
53+
throw new RuntimeException("Unexpected error when closing the Hadoop catalog", e);
54+
}
55+
}
56+
57+
private Table loadTable(BaseMetastoreCatalog catalog, TableIdentifier tableIdentifier) {
58+
try {
59+
return catalog.loadTable(tableIdentifier);
60+
} catch (NoSuchTableException e) {
61+
throw new IllegalArgumentException(String.format(
62+
"Cannot find iceberg table [%s] under namespace [%s]",
63+
tableIdentifier.name(), tableIdentifier.namespace()));
64+
} catch (Throwable e) {
65+
throw new RuntimeException(String.format(
66+
"Unexpected exception when loading the iceberg table [%s] under namespace [%s]",
67+
tableIdentifier.name(), tableIdentifier.namespace()));
68+
}
69+
}
70+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.whitefox.core.services;
2+
3+
import io.whitefox.core.Metadata;
4+
import io.whitefox.core.ReadTableRequest;
5+
import io.whitefox.core.ReadTableResultToBeSigned;
6+
import java.util.Optional;
7+
import org.apache.commons.lang3.NotImplementedException;
8+
import org.apache.iceberg.Table;
9+
10+
public class IcebergSharedTable implements InternalSharedTable {
11+
12+
private final Table icebergTable;
13+
private final TableSchemaConverter tableSchemaConverter;
14+
15+
private IcebergSharedTable(Table icebergTable, TableSchemaConverter tableSchemaConverter) {
16+
this.icebergTable = icebergTable;
17+
this.tableSchemaConverter = tableSchemaConverter;
18+
}
19+
20+
public static IcebergSharedTable of(
21+
Table icebergTable, TableSchemaConverter tableSchemaConverter) {
22+
return new IcebergSharedTable(icebergTable, tableSchemaConverter);
23+
}
24+
25+
public static IcebergSharedTable of(Table icebergTable) {
26+
return new IcebergSharedTable(icebergTable, new TableSchemaConverter());
27+
}
28+
29+
public Optional<Metadata> getMetadata(Optional<String> startingTimestamp) {
30+
throw new NotImplementedException();
31+
}
32+
33+
@Override
34+
public Optional<Long> getTableVersion(Optional<String> startingTimestamp) {
35+
throw new NotImplementedException();
36+
}
37+
38+
@Override
39+
public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) {
40+
throw new NotImplementedException();
41+
}
42+
}

0 commit comments

Comments
 (0)