diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml new file mode 100644 index 00000000..9ca5d938 --- /dev/null +++ b/.github/workflows/generate_docs.yml @@ -0,0 +1,25 @@ +name: Generate and publish docs + +on: + push: + branches: + - "spark-3.2" + pull_request: + branches: + - "spark-3.2" + +jobs: + generate-and-publish-docs: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 11 + uses: actions/setup-java@v1 + with: + distributions: adopt + java-version: 11 + check-latest: true + - name: Generate docs + run: ./mvnw clean package site -Dmaven.test.skip=true + # TODO create branch and copy the docs over from kotlin-spark-api/3.2/target/dokka \ No newline at end of file diff --git a/README.md b/README.md index f090f0cf..79c669e4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.2) [![official JetBrains project](http://jb.gg/badges/incubator.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) +# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.2) [![official JetBrains project](http://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) Your next API to work with [Apache Spark](https://spark.apache.org/). diff --git a/kotlin-spark-api/3.2/pom_2.12.xml b/kotlin-spark-api/3.2/pom_2.12.xml index 7195f912..756d9c2b 100644 --- a/kotlin-spark-api/3.2/pom_2.12.xml +++ b/kotlin-spark-api/3.2/pom_2.12.xml @@ -27,10 +27,7 @@ org.jetbrains.kotlinx.spark core-3.2_${scala.compat.version} - - org.jetbrains.kotlinx.spark - kotlin-spark-api-common - + diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 32935f40..e679f561 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.api +import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.function.* @@ -36,6 +37,7 @@ import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions import scala.Product import scala.Tuple2 import scala.reflect.ClassTag +import scala.reflect.api.TypeTags.TypeTag import java.beans.PropertyDescriptor import java.math.BigDecimal import java.sql.Date @@ -82,7 +84,7 @@ import kotlin.reflect.full.primaryConstructor import kotlin.to @JvmField -val ENCODERS = mapOf, Encoder<*>>( +val ENCODERS: Map, Encoder<*>> = mapOf( Boolean::class to BOOLEAN(), Byte::class to BYTE(), Short::class to SHORT(), @@ -164,6 +166,9 @@ inline fun List.toDS(spark: SparkSession): Dataset = @OptIn(ExperimentalStdlibApi::class) inline fun encoder(): Encoder = generateEncoder(typeOf(), T::class) +/** + * @see encoder + */ fun generateEncoder(type: KType, cls: KClass<*>): Encoder { @Suppress("UNCHECKED_CAST") return when { @@ -172,7 +177,8 @@ fun generateEncoder(type: KType, cls: KClass<*>): Encoder { } as Encoder } -private fun isSupportedClass(cls: KClass<*>): Boolean = cls.isData +private fun isSupportedClass(cls: KClass<*>): Boolean = + cls.isData || cls.isSubclassOf(Map::class) || cls.isSubclassOf(Iterable::class) || cls.isSubclassOf(Product::class) @@ -192,30 +198,89 @@ private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder ) } +/** + * (Kotlin-specific) + * Returns a new Dataset that contains the result of applying [func] to each element. + */ inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset by first applying a function to all elements of this Dataset, + * and then flattening the results. + */ inline fun Dataset.flatMap(noinline func: (T) -> Iterator): Dataset = flatMap(func, encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset by flattening. This means that a Dataset of an iterable such as + * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6)`. + */ inline fun > Dataset.flatten(): Dataset = flatMap(FlatMapFunction { it.iterator() }, encoder()) +/** + * (Kotlin-specific) + * Returns a [KeyValueGroupedDataset] where the data is grouped by the given key [func]. + */ inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset = groupByKey(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset that contains the result of applying [func] to each partition. + */ inline fun Dataset.mapPartitions(noinline func: (Iterator) -> Iterator): Dataset = mapPartitions(func, encoder()) +/** + * (Kotlin-specific) + * Filters rows to eliminate [null] values. + */ @Suppress("UNCHECKED_CAST") fun Dataset.filterNotNull(): Dataset = filter { it != null } as Dataset +/** + * Returns a new [KeyValueGroupedDataset] where the given function [func] has been applied + * to the data. The grouping key is unchanged by this. + * + * ```kotlin + * // Create values grouped by key from a Dataset> + * ds.groupByKey { it._1 }.mapValues { it._2 } + * ``` + */ inline fun KeyValueGroupedDataset.mapValues(noinline func: (VALUE) -> R): KeyValueGroupedDataset = mapValues(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Applies the given function to each group of data. For each unique group, the function will + * be passed the group key and an iterator that contains all the elements in the group. The + * function can return an element of arbitrary type which will be returned as a new [Dataset]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [Dataset]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [org.apache.spark.sql.expressions.Aggregator]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling [toList]) unless they are sure that this is possible given the memory + * constraints of their cluster. + */ inline fun KeyValueGroupedDataset.mapGroups(noinline func: (KEY, Iterator) -> R): Dataset = mapGroups(MapGroupsFunction(func), encoder()) +/** + * (Kotlin-specific) + * Reduces the elements of each group of data using the specified binary function. + * The given function must be commutative and associative or the result may be non-deterministic. + * + * Note that you need to use [reduceGroupsK] always instead of the Java- or Scala-specific + * [KeyValueGroupedDataset.reduceGroups] to make the compiler work. + */ inline fun KeyValueGroupedDataset.reduceGroupsK(noinline func: (VALUE, VALUE) -> VALUE): Dataset> = reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } @@ -228,23 +293,63 @@ inline fun KeyValueGroupedDataset.reduc inline fun Dataset.reduceK(noinline func: (T, T) -> T): T = reduce(ReduceFunction(func)) +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Tuple2._1] values. + */ @JvmName("takeKeysTuple2") inline fun Dataset>.takeKeys(): Dataset = map { it._1() } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Pair.first] values. + */ inline fun Dataset>.takeKeys(): Dataset = map { it.first } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Arity2._1] values. + */ @JvmName("takeKeysArity2") inline fun Dataset>.takeKeys(): Dataset = map { it._1 } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Tuple2._2] values. + */ @JvmName("takeValuesTuple2") inline fun Dataset>.takeValues(): Dataset = map { it._2() } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Pair.second] values. + */ inline fun Dataset>.takeValues(): Dataset = map { it.second } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Arity2._2] values. + */ @JvmName("takeValuesArity2") inline fun Dataset>.takeValues(): Dataset = map { it._2 } - +/** + * (Kotlin-specific) + * Applies the given function to each group of data. For each unique group, the function will + * be passed the group key and an iterator that contains all the elements in the group. The + * function can return an iterator containing elements of an arbitrary type which will be returned + * as a new [Dataset]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [Dataset]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [org.apache.spark.sql.expressions.Aggregator]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling [toList]) unless they are sure that this is possible given the memory + * constraints of their cluster. + */ inline fun KeyValueGroupedDataset.flatMapGroups( noinline func: (key: K, values: Iterator) -> Iterator, ): Dataset = flatMapGroups( @@ -252,12 +357,57 @@ inline fun KeyValueGroupedDataset.flatMapGroups( encoder() ) +/** + * (Kotlin-specific) + * Returns the group state value if it exists, else [null]. + * This is comparable to [GroupState.getOption], but instead utilises Kotlin's nullability features + * to get the same result. + */ fun GroupState.getOrNull(): S? = if (exists()) get() else null +/** + * (Kotlin-specific) + * Allows the group state object to be used as a delegate. Will be [null] if it does not exist. + * + * For example: + * ```kotlin + * groupedDataset.mapGroupsWithState(GroupStateTimeout.NoTimeout()) { key, values, state: GroupState -> + * var s by state + * ... + * } + * ``` + */ operator fun GroupState.getValue(thisRef: Any?, property: KProperty<*>): S? = getOrNull() -operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value) +/** + * (Kotlin-specific) + * Allows the group state object to be used as a delegate. Will be [null] if it does not exist. + * + * For example: + * ```kotlin + * groupedDataset.mapGroupsWithState(GroupStateTimeout.NoTimeout()) { key, values, state: GroupState -> + * var s by state + * ... + * } + * ``` + */ +operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [org.apache.spark.sql.streaming.GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.mapGroupsWithState( noinline func: (key: K, values: Iterator, state: GroupState) -> U, ): Dataset = mapGroupsWithState( @@ -266,6 +416,22 @@ inline fun KeyValueGroupedDataset.mapGroupsWi encoder() ) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [org.apache.spark.sql.streaming.GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * @param timeoutConf Timeout configuration for groups that do not receive data for a while. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.mapGroupsWithState( timeoutConf: GroupStateTimeout, noinline func: (key: K, values: Iterator, state: GroupState) -> U, @@ -276,6 +442,23 @@ inline fun KeyValueGroupedDataset.mapGroupsWi timeoutConf ) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * @param outputMode The output mode of the function. + * @param timeoutConf Timeout configuration for groups that do not receive data for a while. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.flatMapGroupsWithState( outputMode: OutputMode, timeoutConf: GroupStateTimeout, @@ -288,6 +471,13 @@ inline fun KeyValueGroupedDataset.flatMapGrou timeoutConf ) +/** + * (Kotlin-specific) + * Applies the given function to each cogrouped data. For each unique group, the function will + * be passed the grouping key and 2 iterators containing all elements in the group from + * [Dataset] [this] and [other]. The function can return an iterator containing elements of an + * arbitrary type which will be returned as a new [Dataset]. + */ inline fun KeyValueGroupedDataset.cogroup( other: KeyValueGroupedDataset, noinline func: (key: K, left: Iterator, right: Iterator) -> Iterator, @@ -297,30 +487,93 @@ inline fun KeyValueGroupedDataset.cogroup( encoder() ) +/** DEPRECATED: Use [as] or [to] for this. */ +@Deprecated( + message = "Deprecated, since we already have `as`() and to().", + replaceWith = ReplaceWith("this.to()"), + level = DeprecationLevel.ERROR, +) inline fun Dataset.downcast(): Dataset = `as`(encoder()) + +/** + * (Kotlin-specific) + * Returns a new Dataset where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of [R]: + * - When [R] is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by [spark.sql.caseSensitive]). + * - When [R] is a tuple, the columns will be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When [R] is a primitive type (i.e. [String], [Int], etc.), then the first column of the + * `DataFrame` will be used. + * + * If the schema of the Dataset does not match the desired [R] type, you can use [Dataset.select]/[selectTyped] + * along with [Dataset.alias] or [as]/[to] to rearrange or rename as required. + * + * Note that [as]/[to] only changes the view of the data that is passed into typed operations, + * such as [map], and does not eagerly project away any columns that are not present in + * the specified class. + * + * @see to as alias for [as] + */ inline fun Dataset<*>.`as`(): Dataset = `as`(encoder()) + +/** + * (Kotlin-specific) + * Returns a new Dataset where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of [R]: + * - When [R] is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by [spark.sql.caseSensitive]). + * - When [R] is a tuple, the columns will be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When [R] is a primitive type (i.e. [String], [Int], etc.), then the first column of the + * `DataFrame` will be used. + * + * If the schema of the Dataset does not match the desired [R] type, you can use [Dataset.select]/[selectTyped] + * along with [Dataset.alias] or [as]/[to] to rearrange or rename as required. + * + * Note that [as]/[to] only changes the view of the data that is passed into typed operations, + * such as [map], and does not eagerly project away any columns that are not present in + * the specified class. + * + * @see as as alias for [to] + */ inline fun Dataset<*>.to(): Dataset = `as`(encoder()) -inline fun Dataset.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func)) +/** + * (Kotlin-specific) + * Applies a function [func] to all rows. + */ +inline fun Dataset.forEach(noinline func: (T) -> Unit): Unit = foreach(ForeachFunction(func)) -inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit) = +/** + * (Kotlin-specific) + * Runs [func] on each partition of this Dataset. + */ +inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit): Unit = foreachPartition(ForeachPartitionFunction(func)) /** * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that */ -fun Dataset.debugCodegen() = also { KSparkExtensions.debugCodegen(it) } +fun Dataset.debugCodegen(): Dataset = also { KSparkExtensions.debugCodegen(it) } -val SparkSession.sparkContext +/** + * Returns the Spark context associated with this Spark session. + */ +val SparkSession.sparkContext: SparkContext get() = KSparkExtensions.sparkContext(this) /** * It's hard to call `Dataset.debug` from kotlin, so here is utility for that */ -fun Dataset.debug() = also { KSparkExtensions.debug(it) } +fun Dataset.debug(): Dataset = also { KSparkExtensions.debug(it) } @Suppress("FunctionName") -@Deprecated("Changed to \"`===`\" to better reflect Scala API.", ReplaceWith("this `===` c")) +@Deprecated( + message = "Changed to \"`===`\" to better reflect Scala API.", + replaceWith = ReplaceWith("this `===` c"), + level = DeprecationLevel.ERROR, +) infix fun Column.`==`(c: Column) = `$eq$eq$eq`(c) /** @@ -648,7 +901,17 @@ operator fun Column.rem(other: Any): Column = `$percent`(other) */ operator fun Column.get(key: Any): Column = getItem(key) -fun lit(a: Any) = functions.lit(a) +/** + * Creates a [Column] of literal value. + * + * The passed in object is returned directly if it is already a [Column]. + * If the object is a Scala Symbol, it is converted into a [Column] also. + * Otherwise, a new [Column] is created to represent the literal value. + * + * This is just a shortcut to the function from [org.apache.spark.sql.functions]. + * For all the functions, simply add `import org.apache.spark.sql.functions.*` to your file. + */ +fun lit(a: Any): Column = functions.lit(a) /** * Provides a type hint about the expected return value of this column. This information can @@ -755,8 +1018,15 @@ inline fun Dataset.withCached( return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) } } -inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) -inline fun Dataset<*>.toArray(): Array = to().collect() as Array +/** + * Collects the dataset as list where each item has been mapped to type [T]. + */ +inline fun Dataset<*>.toList(): List = to().collectAsList() as List + +/** + * Collects the dataset as Array where each item has been mapped to type [T]. + */ +inline fun Dataset<*>.toArray(): Array = to().collect() as Array /** * Selects column based on the column name and returns it as a [Column]. @@ -773,7 +1043,6 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) * ``` * @see invoke */ - @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = col(column.name).`as`() as TypedColumn @@ -888,6 +1157,14 @@ inline fun = mapOf()): DataType { val primitiveSchema = knownDataTypes[type.classifier] @@ -987,15 +1264,24 @@ fun schema(type: KType, map: Map = mapOf()): DataType { } } +/** + * The entry point to programming Spark with the Dataset and DataFrame API. + * + * @see org.apache.spark.sql.SparkSession + */ typealias SparkSession = org.apache.spark.sql.SparkSession -fun SparkContext.setLogLevel(level: SparkLogLevel) = setLogLevel(level.name) +/** + * Control our logLevel. This overrides any user-defined log settings. + * @param level The desired log level as [SparkLogLevel]. + */ +fun SparkContext.setLogLevel(level: SparkLogLevel): Unit = setLogLevel(level.name) enum class SparkLogLevel { ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN } -private val knownDataTypes = mapOf( +private val knownDataTypes: Map, DataType> = mapOf( Byte::class to DataTypes.ByteType, Short::class to DataTypes.ShortType, Int::class to DataTypes.IntegerType, @@ -1007,7 +1293,7 @@ private val knownDataTypes = mapOf( LocalDate::class to `DateType$`.`MODULE$`, Date::class to `DateType$`.`MODULE$`, Timestamp::class to `TimestampType$`.`MODULE$`, - Instant::class to `TimestampType$`.`MODULE$` + Instant::class to `TimestampType$`.`MODULE$`, ) private fun transitiveMerge(a: Map, b: Map): Map { @@ -1017,11 +1303,12 @@ private fun transitiveMerge(a: Map, b: Map): Map(val f: (T) -> R) : (T) -> R { + private val values = ConcurrentHashMap() - override fun invoke(x: T) = - values.getOrPut(x, { f(x) }) + + override fun invoke(x: T): R = values.getOrPut(x) { f(x) } } private fun ((T) -> R).memoize(): (T) -> R = Memoize1(this) -private val memoizedSchema = { x: KType -> schema(x) }.memoize() +private val memoizedSchema: (KType) -> DataType = { x: KType -> schema(x) }.memoize() diff --git a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt similarity index 91% rename from kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt rename to kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt index b93ce377..9f7de351 100644 --- a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt @@ -57,9 +57,13 @@ class FilteringIterator( done() } } + +/** Maps the values of the iterator lazily using [func]. */ fun Iterator.map(func: (T) -> R): Iterator = MappingIterator(this, func) +/** Filters the values of the iterator lazily using [func]. */ fun Iterator.filter(func: (T) -> Boolean): Iterator = FilteringIterator(this, func) +/** Partitions the values of the iterator lazily in groups of [size]. */ fun Iterator.partition(size: Int): Iterator> = PartitioningIterator(this, size) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt index 3ef0b177..6188daae 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt @@ -19,22 +19,27 @@ */ package org.jetbrains.kotlinx.spark.api +import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.UDFRegistration import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR /** - * Wrapper for spark creation which allows to set different spark params + * Wrapper for spark creation which allows setting different spark params. * * @param props spark options, value types are runtime-checked for type-correctness - * @param master [SparkSession.Builder.master] - * @param appName [SparkSession.Builder.appName] + * @param master Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to + * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. By default, it + * tries to get the system value "spark.master", otherwise it uses "local[*]" + * @param appName Sets a name for the application, which will be shown in the Spark web UI. + * If no application name is set, a randomly generated name will be used. + * @param logLevel Control our logLevel. This overrides any user-defined log settings. * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) */ @JvmOverloads inline fun withSpark( props: Map = emptyMap(), - master: String = "local[*]", + master: String = SparkConf().get("spark.master", "local[*]"), appName: String = "Kotlin Spark Sample", logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit, @@ -58,10 +63,17 @@ inline fun withSpark( } +/** + * Wrapper for spark creation which allows setting different spark params. + * + * @param builder A [SparkSession.Builder] object, configured how you want. + * @param logLevel Control our logLevel. This overrides any user-defined log settings. + * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) + */ @JvmOverloads inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) { builder - .orCreate + .getOrCreate() .apply { KSparkSession(this).apply { sparkContext.setLogLevel(logLevel) diff --git a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt similarity index 100% rename from kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt rename to kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt diff --git a/kotlin-spark-api/common/pom.xml b/kotlin-spark-api/common/pom.xml deleted file mode 100644 index 19959fdb..00000000 --- a/kotlin-spark-api/common/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - 4.0.0 - - Kotlin Spark API: Common - kotlin-spark-api-common - Kotlin API for Apache Spark: common parts - - org.jetbrains.kotlinx.spark - kotlin-spark-api-parent - 1.0.4-SNAPSHOT - ../.. - - - - - org.jetbrains.kotlin - kotlin-stdlib-jdk8 - - - - - src/main/kotlin - src/test/kotlin - - - org.jetbrains.kotlin - kotlin-maven-plugin - - - org.jetbrains.dokka - dokka-maven-plugin - ${dokka.version} - - 8 - - - - dokka - - dokka - - pre-site - - - javadocjar - - javadocJar - - pre-integration-test - - - - - - diff --git a/pom.xml b/pom.xml index 4f0974c5..47043737 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ 1.5.30 - 1.4.32 + 1.6.10 0.16.0 4.6.0 1.0.1 @@ -35,7 +35,6 @@ - kotlin-spark-api/common dummy @@ -51,11 +50,6 @@ kotlin-reflect ${kotlin.version} - - org.jetbrains.kotlinx.spark - kotlin-spark-api-common - ${project.version} -