Skip to content

Test the server with the delta sharing client #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/compile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ jobs:
./gradlew build testNative --no-daemon
./gradlew server:app:printVersion --no-daemon -q
shell: bash
- name: Run integration test
shell: bash
env:
WHITEFOX_TEST_AWS_REGION: ${{ vars.WHITEFOX_AWS_REGION }}
WHITEFOX_TEST_AWS_ACCESS_KEY_ID: ${{ secrets.WHITEFOX_AWS_ACCESS_KEY_ID }}
WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.WHITEFOX_AWS_SECRET_ACCESS_KEY }}
run: |
java -jar server/app/build/quarkus-app/quarkus-run.jar &
./gradlew :client-spark:clientSparkTest --no-daemon
kill -9 %1
- name: Build container image
if: runner.os == 'Linux'
run: |
Expand Down
86 changes: 86 additions & 0 deletions client-spark/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import org.openapitools.generator.gradle.plugin.tasks.GenerateTask

plugins {
java
id("com.diffplug.spotless")
id("whitefox.java-conventions")
}

repositories {
mavenCentral()
}

dependencies {
// OPENAPI
implementation("org.eclipse.microprofile.openapi:microprofile-openapi-api:3.1.1")
implementation("org.openapitools:jackson-databind-nullable:0.2.6")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
testImplementation("jakarta.annotation:jakarta.annotation-api:2.1.1")

// DELTA
testImplementation("org.apache.hadoop:hadoop-common:3.3.6")
testImplementation("io.delta:delta-sharing-spark_2.12:1.0.2")

//SPARK
testImplementation("org.apache.spark:spark-core_2.12:3.3.2")
testImplementation("org.apache.spark:spark-sql_2.12:3.3.2")
testImplementation("com.github.mrpowers:spark-fast-tests_2.12:1.3.0")

//JUNIT
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
}


tasks.getByName<Test>("test") {
useJUnitPlatform {
excludeTags.add("clientSparkTest")
}
}

tasks.withType<Test> {
environment = env.allVariables
systemProperty ("java.util.logging.manager", "java.util.logging.LogManager") //TODO modularize the whitefox-conventions plugin
}

tasks.register<Test>("clientSparkTest") {
useJUnitPlatform {
includeTags.add("clientSparkTest")
}
}

val openApiCodeGenDir = "generated/openapi"
val generatedCodeDirectory = generatedCodeDirectory(layout, openApiCodeGenDir)

val whiteFoxGenerate = tasks.register<GenerateTask>("openapiGenerateClientApi") {
dependsOn(tasks.spotlessApply)
generatorName.set("java")
inputSpec.set("$rootDir/protocol/whitefox-protocol-api.yml")
library.set("native")
outputDir.set(generatedCodeDirectory)
additionalProperties.set(mapOf(
"apiPackage" to "io.whitefox.api.client",
"invokerPackage" to "io.whitefox.api.utils",
"modelPackage" to "io.whitefox.api.client.model",
"dateLibrary" to "java8",
"sourceFolder" to "src/gen/java",
"openApiNullable" to "true",
"annotationLibrary" to "none",
"serializationLibrary" to "jackson",
"useJakartaEe" to "true",
"useRuntimeException" to "true"
))
}

sourceSets {
getByName("test") {
java {
srcDir("${generatedCodeDirectory(layout, openApiCodeGenDir)}/src/gen/java")
}
}
}

tasks.withType<JavaCompile> {
options.encoding = "UTF-8"
options.compilerArgs.add("-parameters")
dependsOn(whiteFoxGenerate)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.whitefox.api.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.github.mrpowers.spark.fast.tests.DatasetComparer;
import io.whitefox.api.models.MrFoxDeltaTableSchema;
import io.whitefox.api.utils.StorageManagerInitializer;
import java.util.List;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.collection.GenMap;

@Tag("clientSparkTest")
public class ITDeltaSharingClient implements DatasetComparer {

private final String tablePath = String.format(
"%s#%s.%s.%s",
getClass().getClassLoader().getResource("MrFoxProfile.json"),
"s3share",
"s3schema",
"s3Table1");

private final SparkSession spark = SparkSession.builder()
.appName("delta sharing client test")
.config("spark.driver.host", "localhost")
.master("local[1, 4]")
.getOrCreate();

@BeforeAll
static void initStorageManager() {
new StorageManagerInitializer().initStorageManager();
}

@Test
void showS3Table1withQueryTableApi() {
var ds = spark.read().format("deltaSharing").load(tablePath);
var expectedSchema = new StructType(new StructField[] {
new StructField("id", DataType.fromDDL("long"), true, new Metadata(GenMap.empty()))
});
var expectedData = spark
.createDataFrame(
List.of(
new MrFoxDeltaTableSchema(0),
new MrFoxDeltaTableSchema(3),
new MrFoxDeltaTableSchema(2),
new MrFoxDeltaTableSchema(1),
new MrFoxDeltaTableSchema(4)),
MrFoxDeltaTableSchema.class)
.toDF();

assertEquals(expectedSchema.json(), ds.schema().json());
assertEquals(5, ds.count());
assertSmallDatasetEquality(ds, expectedData, true, false, false, 500);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.whitefox.api.models;

public class MrFoxDeltaTableSchema {
private final long id;

public MrFoxDeltaTableSchema(long id) {
this.id = id;
}

public long getId() {
return id;
}
}
32 changes: 32 additions & 0 deletions client-spark/src/test/java/io/whitefox/api/utils/S3TestConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.whitefox.api.utils;

public class S3TestConfig {
private final String region;
private final String accessKey;
private final String secretKey;

public String getRegion() {
return region;
}

public String getAccessKey() {
return accessKey;
}

public String getSecretKey() {
return secretKey;
}

public S3TestConfig(String region, String accessKey, String secretKey) {
this.region = region;
this.accessKey = accessKey;
this.secretKey = secretKey;
}

public static S3TestConfig loadFromEnv() {
return new S3TestConfig(
System.getenv().get("WHITEFOX_TEST_AWS_REGION"),
System.getenv().get("WHITEFOX_TEST_AWS_ACCESS_KEY_ID"),
System.getenv().get("WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.whitefox.api.utils;

import io.whitefox.api.client.*;
import io.whitefox.api.client.model.*;
import java.util.List;
import java.util.Map;

public class StorageManagerInitializer {
private final S3TestConfig s3TestConfig;
private final StorageV1Api storageV1Api;
private final ProviderV1Api providerV1Api;
private final TableV1Api tableV1Api;
private final ShareV1Api shareV1Api;
private final SchemaV1Api schemaV1Api;

public StorageManagerInitializer() {
var apiClient = new ApiClient();
this.s3TestConfig = S3TestConfig.loadFromEnv();
this.storageV1Api = new StorageV1Api(apiClient);
this.providerV1Api = new ProviderV1Api(apiClient);
this.tableV1Api = new TableV1Api(apiClient);
this.shareV1Api = new ShareV1Api(apiClient);
this.schemaV1Api = new SchemaV1Api(apiClient);
}

public void initStorageManager() {
storageV1Api.createStorage(createStorageRequest(s3TestConfig));
providerV1Api.addProvider(addProviderRequest());
tableV1Api.createTableInProvider(addProviderRequest().getName(), createTableRequest());
shareV1Api.createShare(createShareRequest());
schemaV1Api.createSchema(createShareRequest().getName(), createSchemaRequest());
schemaV1Api.addTableToSchema(
createShareRequest().getName(), createSchemaRequest(), addTableToSchemaRequest());
}

private String createSchemaRequest() {
return "s3schema";
}

private AddTableToSchemaRequest addTableToSchemaRequest() {
return new AddTableToSchemaRequest()
.name("s3Table1")
.reference(new TableReference().providerName("MrFoxProvider").name("s3Table1"));
}

private CreateShareInput createShareRequest() {
return new CreateShareInput().name("s3share").recipients(List.of("Mr.Fox")).schemas(List.of());
}

private CreateTableInput createTableRequest() {
return new CreateTableInput()
.name("s3Table1")
.skipValidation(true)
.properties(Map.of(
"type", "delta",
"location", "s3a://whitefox-s3-test-bucket/delta/samples/delta-table"));
}

private ProviderInput addProviderRequest() {
return new ProviderInput()
.name("MrFoxProvider")
.storageName("MrFoxStorage")
.metastoreName(null);
}

private CreateStorage createStorageRequest(S3TestConfig s3TestConfig) {
return new CreateStorage()
.name("MrFoxStorage")
.type(CreateStorage.TypeEnum.S3)
.properties(new StorageProperties(new S3Properties()
.credentials(new SimpleAwsCredentials()
.region(s3TestConfig.getRegion())
.awsAccessKeyId(s3TestConfig.getAccessKey())
.awsSecretAccessKey(s3TestConfig.getSecretKey()))))
.skipValidation(true);
}
}
6 changes: 6 additions & 0 deletions client-spark/src/test/resources/MrFoxProfile.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"shareCredentialsVersion": 1,
"endpoint": "http://localhost:8080/delta-api/v1/",
"bearerToken": "fakeToken",
"expirationTime": null
}
35 changes: 22 additions & 13 deletions protocol/delta-sharing-protocol-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ paths:
description: 'Starting Timestamp ISO8601 format, in the UTC timezone'
schema:
type: string
- in: header
name: delta-sharing-capabilities
required: false
description: 'Delta Sharing Capabilities'
schema:
type: string
requestBody:
required: true
content:
Expand Down Expand Up @@ -670,7 +676,7 @@ components:
items:
type: string
jsonPredicateHints:
type: object
type: string
description: |
query predicates on partition columns specified using a structured JSON format.
When it’s present, the server will try to use the predicates to filter table's
Expand All @@ -680,19 +686,20 @@ components:
If the server encounters any errors during predicate processing (for example, invalid
syntax or non existing columns), it will skip filtering and return all the files.
When it’s absent, the server will return all the files in the table.
properties:
op:
$ref: '#/components/schemas/Ops'
children:
type: string
name:
type: string
value:
type: string
valueType:
type: string
# properties:
# op:
# $ref: '#/components/schemas/Ops'
# children:
# type: string
# name:
# type: string
# value:
# type: string
# valueType:
# type: string
limitHint:
type: integer
format: int64
example: 1000
description: |
It’s a hint from the client to tell the server how many rows the
Expand All @@ -717,13 +724,15 @@ components:
timestamp. This is only supported on tables with history sharing enabled.
startingVersion:
type: integer
format: int64
example: 1000
description: |
an optional version number. If set, will return all data change files
since startingVersion, inclusive, including historical metadata if seen
in the delta log.
endingVersion:
type: integer
format: int64
example: 1000
description: |
an optional version number, only used if startingVersion is set. If set,
Expand Down Expand Up @@ -836,7 +845,7 @@ components:
MetadataObject:
type: object
properties:
metadata:
metaData:
type: object
properties:
id:
Expand Down
4 changes: 2 additions & 2 deletions server/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project

// region dependencies

val hadoopVersion = "3.3.6"
dependencies {
// INTERNAL
implementation(project(":server:core"))
implementation(project(":server:persistence:memory"))

// QUARKUS
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation("io.quarkus:quarkus-arc")
Expand Down
Loading