Skip to content

added encoders 3.2 #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c57eaaf
added streaming and exploring how it works with kotlin spark api
Jolanrensen Feb 18, 2022
bb39fc7
Adds helpful rdd to dataset conversion, as well as a new withSpark fu…
Jolanrensen Feb 21, 2022
4bd3fe1
makes javaRDD toDS function more generic.
Jolanrensen Feb 21, 2022
c0ead09
added encoders: Duration, Period, ByteArray (Binary, now actually wor…
Jolanrensen Feb 21, 2022
09e9bb5
Arity is now Serializable, removed sc.stop(), sc is now lazy, updates…
Jolanrensen Feb 22, 2022
597b6f1
copying over some other missing parts from ScalaReflection.scala. Did…
Jolanrensen Feb 22, 2022
0f585bd
serializing binary works!
Jolanrensen Feb 23, 2022
92ed60e
serializing binary works!
Jolanrensen Feb 23, 2022
1fb680b
fixed serializing CalendarInterval, added tests and fixes for Decimal…
Jolanrensen Feb 23, 2022
4f8ae68
updating all tests to shouldBe instead of just show
Jolanrensen Feb 23, 2022
38486bb
removed .show() from rdd test
Jolanrensen Feb 23, 2022
0acd3e2
Update README.md
Jolanrensen Feb 23, 2022
bcf99b8
split up rdd tests, added list test.
Jolanrensen Feb 24, 2022
165c1b0
Merge pull request #132 from JetBrains/rdd-related-helpers
Jolanrensen Feb 24, 2022
2fdba6a
Update docs generation
Jolanrensen Feb 24, 2022
5d785e0
added jira issue
Jolanrensen Feb 24, 2022
518d1a1
Update ApiTest.kt
Jolanrensen Feb 25, 2022
e620896
added encoders: Duration, Period, ByteArray (Binary, now actually wor…
Jolanrensen Feb 21, 2022
e466df6
copying over some other missing parts from ScalaReflection.scala. Did…
Jolanrensen Feb 22, 2022
1557fa4
serializing binary works!
Jolanrensen Feb 23, 2022
680e5b1
serializing binary works!
Jolanrensen Feb 23, 2022
ba0c452
fixed serializing CalendarInterval, added tests and fixes for Decimal…
Jolanrensen Feb 23, 2022
054d626
updating all tests to shouldBe instead of just show
Jolanrensen Feb 23, 2022
66dc40e
added jira issue
Jolanrensen Feb 24, 2022
31f56d8
rebasing on spark 3.2 branch
Jolanrensen Feb 25, 2022
9b1c2c9
Merge remote-tracking branch 'origin/encoders-and-data-types' into en…
Jolanrensen Feb 28, 2022
d56b5a4
spark 3.2.1
Jolanrensen Feb 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/generate_docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_branch: docs
publish_dir: ./kotlin-spark-api/3.2/target/dokka
force_orphan: true


4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
- [Code of Conduct](#code-of-conduct)
- [License](#license)

## Supported versions of Apache Spark #TODO
## Supported versions of Apache Spark

| Apache Spark | Scala | Kotlin for Apache Spark |
|:------------:|:-----:|:-------------------------------:|
| 3.0.0+ | 2.12 | kotlin-spark-api-3.0:1.0.2 |
| 2.4.1+ | 2.12 | kotlin-spark-api-2.4_2.12:1.0.2 |
| 2.4.1+ | 2.11 | kotlin-spark-api-2.4_2.11:1.0.2 |
| 3.2.0+ | 2.12 | kotlin-spark-api-2.4_2.12:1.0.3 |
| 3.2.0+ | 2.12 | kotlin-spark-api-3.2:1.0.3 |

## Releases

Expand Down
2,245 changes: 1,268 additions & 977 deletions core/3.2/src/main/scala/org/apache/spark/sql/KotlinReflection.scala

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions examples/pom-3.2_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
6 changes: 6 additions & 0 deletions kotlin-spark-api/3.2/pom_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
<version>${spark3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,33 @@

package org.jetbrains.kotlinx.spark.api

import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.*
import org.apache.spark.api.java.function.*
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.*
import org.apache.spark.sql.Encoders.*
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.streaming.GroupState
import org.apache.spark.sql.streaming.GroupStateTimeout
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.*
import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions
import scala.Product
import scala.Tuple2
import scala.concurrent.duration.`Duration$`
import scala.reflect.ClassTag
import scala.reflect.api.TypeTags.TypeTag
import scala.reflect.api.StandardDefinitions
import java.beans.PropertyDescriptor
import java.math.BigDecimal
import java.sql.Date
import java.sql.Timestamp
import java.time.Duration
import java.time.Instant
import java.time.LocalDate
import java.time.Period
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.Any
Expand Down Expand Up @@ -95,10 +99,12 @@ val ENCODERS: Map<KClass<*>, Encoder<*>> = mapOf(
String::class to STRING(),
BigDecimal::class to DECIMAL(),
Date::class to DATE(),
LocalDate::class to LOCALDATE(), // 3.0 only
LocalDate::class to LOCALDATE(), // 3.0+
Timestamp::class to TIMESTAMP(),
Instant::class to INSTANT(), // 3.0 only
ByteArray::class to BINARY()
Instant::class to INSTANT(), // 3.0+
ByteArray::class to BINARY(),
Duration::class to DURATION(), // 3.2+
Period::class to PERIOD(), // 3.2+
)


Expand Down Expand Up @@ -154,6 +160,18 @@ inline fun <reified T> SparkSession.dsOf(vararg t: T): Dataset<T> =
inline fun <reified T> List<T>.toDS(spark: SparkSession): Dataset<T> =
spark.createDataset(this, encoder<T>())

/**
* Utility method to create dataset from RDD
*/
inline fun <reified T> RDD<T>.toDS(spark: SparkSession): Dataset<T> =
spark.createDataset(this, encoder<T>())

/**
* Utility method to create dataset from JavaRDD
*/
inline fun <reified T> JavaRDDLike<T, *>.toDS(spark: SparkSession): Dataset<T> =
spark.createDataset(this.rdd(), encoder<T>())

/**
* Main method of API, which gives you seamless integration with Spark:
* It creates encoder for any given supported type T
Expand All @@ -177,12 +195,16 @@ fun <T> generateEncoder(type: KType, cls: KClass<*>): Encoder<T> {
} as Encoder<T>
}

private fun isSupportedClass(cls: KClass<*>): Boolean =
cls.isData
|| cls.isSubclassOf(Map::class)
|| cls.isSubclassOf(Iterable::class)
|| cls.isSubclassOf(Product::class)
|| cls.java.isArray
private fun isSupportedClass(cls: KClass<*>): Boolean = when {
cls == ByteArray::class -> false // uses binary encoder
cls.isData -> true
cls.isSubclassOf(Map::class) -> true
cls.isSubclassOf(Iterable::class) -> true
cls.isSubclassOf(Product::class) -> true
cls.java.isArray -> true
else -> false
}


private fun <T> kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder<T> {
return ExpressionEncoder(
Expand Down Expand Up @@ -1192,7 +1214,7 @@ fun schema(type: KType, map: Map<String, KType> = mapOf()): DataType {
DoubleArray::class -> typeOf<Double>()
BooleanArray::class -> typeOf<Boolean>()
ShortArray::class -> typeOf<Short>()
ByteArray::class -> typeOf<Byte>()
// ByteArray::class -> typeOf<Byte>() handled by BinaryType
else -> types.getValue(klass.typeParameters[0].name)
}
} else types.getValue(klass.typeParameters[0].name)
Expand Down Expand Up @@ -1290,10 +1312,14 @@ private val knownDataTypes: Map<KClass<out Any>, DataType> = mapOf(
Float::class to DataTypes.FloatType,
Double::class to DataTypes.DoubleType,
String::class to DataTypes.StringType,
LocalDate::class to `DateType$`.`MODULE$`,
Date::class to `DateType$`.`MODULE$`,
Timestamp::class to `TimestampType$`.`MODULE$`,
Instant::class to `TimestampType$`.`MODULE$`,
LocalDate::class to DataTypes.DateType,
Date::class to DataTypes.DateType,
Timestamp::class to DataTypes.TimestampType,
Instant::class to DataTypes.TimestampType,
ByteArray::class to DataTypes.BinaryType,
Decimal::class to DecimalType.SYSTEM_DEFAULT(),
BigDecimal::class to DecimalType.SYSTEM_DEFAULT(),
CalendarInterval::class to DataTypes.CalendarIntervalType,
)

private fun transitiveMerge(a: Map<String, KType>, b: Map<String, KType>): Map<String, KType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
package org.jetbrains.kotlinx.spark.api

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaRDDLike
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.UDFRegistration
import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR
Expand Down Expand Up @@ -78,18 +83,38 @@ inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KS
KSparkSession(this).apply {
sparkContext.setLogLevel(logLevel)
func()
spark.stop()
}
}
.also { it.stop() }
}

/**
* Wrapper for spark creation which copies params from [sparkConf].
*
* @param sparkConf Sets a list of config options based on this.
* @param logLevel Control our logLevel. This overrides any user-defined log settings.
* @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession])
*/
@JvmOverloads
inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) {
withSpark(
builder = SparkSession.builder().config(sparkConf),
logLevel = logLevel,
func = func,
)
}

/**
* This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset]
*/
@Suppress("EXPERIMENTAL_FEATURE_WARNING", "unused")
inline class KSparkSession(val spark: SparkSession) {
class KSparkSession(val spark: SparkSession) {

val sc: JavaSparkContext by lazy { JavaSparkContext(spark.sparkContext) }

inline fun <reified T> List<T>.toDS() = toDS(spark)
inline fun <reified T> Array<T>.toDS() = spark.dsOf(*this)
inline fun <reified T> dsOf(vararg arg: T) = spark.dsOf(*arg)
inline fun <reified T> RDD<T>.toDS() = toDS(spark)
inline fun <reified T> JavaRDDLike<T, *>.toDS() = toDS(spark)
val udf: UDFRegistration get() = spark.udf()
}
Loading