Skip to content

Commit 5f7536c

Browse files
authored
Merge pull request #136 from JetBrains/reorganizing
Reorganizing API and adding conversions
2 parents 7cce12f + 04ae82a commit 5f7536c

File tree

20 files changed

+3068
-2361
lines changed

20 files changed

+3068
-2361
lines changed

core/3.2/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala

Lines changed: 104 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -26,189 +26,201 @@ import org.apache.spark.sql.types.{DataType, Metadata, StructField, StructType}
2626

2727

2828
trait DataTypeWithClass {
29-
val dt: DataType
30-
val cls: Class[_]
31-
val nullable: Boolean
29+
val dt: DataType
30+
val cls: Class[ _ ]
31+
val nullable: Boolean
3232
}
3333

3434
trait ComplexWrapper extends DataTypeWithClass
3535

36-
class KDataTypeWrapper(val dt: StructType
37-
, val cls: Class[_]
38-
, val nullable: Boolean = true) extends StructType with ComplexWrapper {
39-
override def fieldNames: Array[String] = dt.fieldNames
36+
class KDataTypeWrapper(
37+
val dt: StructType,
38+
val cls: Class[ _ ],
39+
val nullable: Boolean = true,
40+
) extends StructType with ComplexWrapper {
4041

41-
override def names: Array[String] = dt.names
42+
override def fieldNames: Array[ String ] = dt.fieldNames
4243

43-
override def equals(that: Any): Boolean = dt.equals(that)
44+
override def names: Array[ String ] = dt.names
4445

45-
override def hashCode(): Int = dt.hashCode()
46+
override def equals(that: Any): Boolean = dt.equals(that)
4647

47-
override def add(field: StructField): StructType = dt.add(field)
48+
override def hashCode(): Int = dt.hashCode()
4849

49-
override def add(name: String, dataType: DataType): StructType = dt.add(name, dataType)
50+
override def add(field: StructField): StructType = dt.add(field)
5051

51-
override def add(name: String, dataType: DataType, nullable: Boolean): StructType = dt.add(name, dataType, nullable)
52+
override def add(name: String, dataType: DataType): StructType = dt.add(name, dataType)
5253

53-
override def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType = dt.add(name, dataType, nullable, metadata)
54+
override def add(name: String, dataType: DataType, nullable: Boolean): StructType = dt.add(name, dataType, nullable)
5455

55-
override def add(name: String, dataType: DataType, nullable: Boolean, comment: String): StructType = dt.add(name, dataType, nullable, comment)
56+
override def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType = dt
57+
.add(name, dataType, nullable, metadata)
5658

57-
override def add(name: String, dataType: String): StructType = dt.add(name, dataType)
59+
override def add(name: String, dataType: DataType, nullable: Boolean, comment: String): StructType = dt
60+
.add(name, dataType, nullable, comment)
5861

59-
override def add(name: String, dataType: String, nullable: Boolean): StructType = dt.add(name, dataType, nullable)
62+
override def add(name: String, dataType: String): StructType = dt.add(name, dataType)
6063

61-
override def add(name: String, dataType: String, nullable: Boolean, metadata: Metadata): StructType = dt.add(name, dataType, nullable, metadata)
64+
override def add(name: String, dataType: String, nullable: Boolean): StructType = dt.add(name, dataType, nullable)
6265

63-
override def add(name: String, dataType: String, nullable: Boolean, comment: String): StructType = dt.add(name, dataType, nullable, comment)
66+
override def add(name: String, dataType: String, nullable: Boolean, metadata: Metadata): StructType = dt
67+
.add(name, dataType, nullable, metadata)
6468

65-
override def apply(name: String): StructField = dt.apply(name)
69+
override def add(name: String, dataType: String, nullable: Boolean, comment: String): StructType = dt
70+
.add(name, dataType, nullable, comment)
6671

67-
override def apply(names: Set[String]): StructType = dt.apply(names)
72+
override def apply(name: String): StructField = dt.apply(name)
6873

69-
override def fieldIndex(name: String): Int = dt.fieldIndex(name)
74+
override def apply(names: Set[ String ]): StructType = dt.apply(names)
7075

71-
override private[sql] def getFieldIndex(name: String) = dt.getFieldIndex(name)
76+
override def fieldIndex(name: String): Int = dt.fieldIndex(name)
7277

73-
private[sql] def findNestedField(fieldNames: Seq[String], includeCollections: Boolean, resolver: Resolver) = dt.findNestedField(fieldNames, includeCollections, resolver)
78+
override private[ sql ] def getFieldIndex(name: String) = dt.getFieldIndex(name)
7479

75-
override private[sql] def buildFormattedString(prefix: String, stringConcat: StringUtils.StringConcat, maxDepth: Int): Unit = dt.buildFormattedString(prefix, stringConcat, maxDepth)
80+
private[ sql ] def findNestedField(fieldNames: Seq[ String ], includeCollections: Boolean, resolver: Resolver) =
81+
dt.findNestedField(fieldNames, includeCollections, resolver)
7682

77-
override protected[sql] def toAttributes: Seq[AttributeReference] = dt.toAttributes
83+
override private[ sql ] def buildFormattedString(prefix: String, stringConcat: StringUtils.StringConcat, maxDepth: Int): Unit =
84+
dt.buildFormattedString(prefix, stringConcat, maxDepth)
7885

79-
override def treeString: String = dt.treeString
86+
override protected[ sql ] def toAttributes: Seq[ AttributeReference ] = dt.toAttributes
8087

81-
override def treeString(maxDepth: Int): String = dt.treeString(maxDepth)
88+
override def treeString: String = dt.treeString
8289

83-
override def printTreeString(): Unit = dt.printTreeString()
90+
override def treeString(maxDepth: Int): String = dt.treeString(maxDepth)
8491

85-
private[sql] override def jsonValue = dt.jsonValue
92+
override def printTreeString(): Unit = dt.printTreeString()
8693

87-
override def apply(fieldIndex: Int): StructField = dt.apply(fieldIndex)
94+
private[ sql ] override def jsonValue = dt.jsonValue
8895

89-
override def length: Int = dt.length
96+
override def apply(fieldIndex: Int): StructField = dt.apply(fieldIndex)
9097

91-
override def iterator: Iterator[StructField] = dt.iterator
98+
override def length: Int = dt.length
9299

93-
override def defaultSize: Int = dt.defaultSize
100+
override def iterator: Iterator[ StructField ] = dt.iterator
94101

95-
override def simpleString: String = dt.simpleString
102+
override def defaultSize: Int = dt.defaultSize
96103

97-
override def catalogString: String = dt.catalogString
104+
override def simpleString: String = dt.simpleString
98105

99-
override def sql: String = dt.sql
106+
override def catalogString: String = dt.catalogString
100107

101-
override def toDDL: String = dt.toDDL
108+
override def sql: String = dt.sql
102109

103-
private[sql] override def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
110+
override def toDDL: String = dt.toDDL
104111

105-
override private[sql] def merge(that: StructType) = dt.merge(that)
112+
private[ sql ] override def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
106113

107-
private[spark] override def asNullable = dt.asNullable
114+
override private[ sql ] def merge(that: StructType) = dt.merge(that)
108115

109-
private[spark] override def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
116+
private[ spark ] override def asNullable = dt.asNullable
110117

111-
override private[sql] lazy val interpretedOrdering = dt.interpretedOrdering
118+
private[ spark ] override def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
112119

113-
override def toString = s"KDataTypeWrapper(dt=$dt, cls=$cls, nullable=$nullable)"
120+
override private[ sql ] lazy val interpretedOrdering = dt.interpretedOrdering
121+
122+
override def toString = s"KDataTypeWrapper(dt=$dt, cls=$cls, nullable=$nullable)"
114123
}
115124

116-
case class KComplexTypeWrapper(dt: DataType, cls: Class[_], nullable: Boolean) extends DataType with ComplexWrapper {
117-
override private[sql] def unapply(e: Expression) = dt.unapply(e)
125+
case class KComplexTypeWrapper(dt: DataType, cls: Class[ _ ], nullable: Boolean) extends DataType with ComplexWrapper {
126+
127+
override private[ sql ] def unapply(e: Expression) = dt.unapply(e)
118128

119-
override def typeName: String = dt.typeName
129+
override def typeName: String = dt.typeName
120130

121-
override private[sql] def jsonValue = dt.jsonValue
131+
override private[ sql ] def jsonValue = dt.jsonValue
122132

123-
override def json: String = dt.json
133+
override def json: String = dt.json
124134

125-
override def prettyJson: String = dt.prettyJson
135+
override def prettyJson: String = dt.prettyJson
126136

127-
override def simpleString: String = dt.simpleString
137+
override def simpleString: String = dt.simpleString
128138

129-
override def catalogString: String = dt.catalogString
139+
override def catalogString: String = dt.catalogString
130140

131-
override private[sql] def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
141+
override private[ sql ] def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
132142

133-
override def sql: String = dt.sql
143+
override def sql: String = dt.sql
134144

135-
override private[spark] def sameType(other: DataType) = dt.sameType(other)
145+
override private[ spark ] def sameType(other: DataType) = dt.sameType(other)
136146

137-
override private[spark] def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
147+
override private[ spark ] def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
138148

139-
private[sql] override def defaultConcreteType = dt.defaultConcreteType
149+
private[ sql ] override def defaultConcreteType = dt.defaultConcreteType
140150

141-
private[sql] override def acceptsType(other: DataType) = dt.acceptsType(other)
151+
private[ sql ] override def acceptsType(other: DataType) = dt.acceptsType(other)
142152

143-
override def defaultSize: Int = dt.defaultSize
153+
override def defaultSize: Int = dt.defaultSize
144154

145-
override private[spark] def asNullable = dt.asNullable
155+
override private[ spark ] def asNullable = dt.asNullable
146156

147157
}
148158

149-
case class KSimpleTypeWrapper(dt: DataType, cls: Class[_], nullable: Boolean) extends DataType with DataTypeWithClass {
150-
override private[sql] def unapply(e: Expression) = dt.unapply(e)
159+
case class KSimpleTypeWrapper(dt: DataType, cls: Class[ _ ], nullable: Boolean) extends DataType with DataTypeWithClass {
160+
override private[ sql ] def unapply(e: Expression) = dt.unapply(e)
151161

152-
override def typeName: String = dt.typeName
162+
override def typeName: String = dt.typeName
153163

154-
override private[sql] def jsonValue = dt.jsonValue
164+
override private[ sql ] def jsonValue = dt.jsonValue
155165

156-
override def json: String = dt.json
166+
override def json: String = dt.json
157167

158-
override def prettyJson: String = dt.prettyJson
168+
override def prettyJson: String = dt.prettyJson
159169

160-
override def simpleString: String = dt.simpleString
170+
override def simpleString: String = dt.simpleString
161171

162-
override def catalogString: String = dt.catalogString
172+
override def catalogString: String = dt.catalogString
163173

164-
override private[sql] def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
174+
override private[ sql ] def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
165175

166-
override def sql: String = dt.sql
176+
override def sql: String = dt.sql
167177

168-
override private[spark] def sameType(other: DataType) = dt.sameType(other)
178+
override private[ spark ] def sameType(other: DataType) = dt.sameType(other)
169179

170-
override private[spark] def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
180+
override private[ spark ] def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
171181

172-
private[sql] override def defaultConcreteType = dt.defaultConcreteType
182+
private[ sql ] override def defaultConcreteType = dt.defaultConcreteType
173183

174-
private[sql] override def acceptsType(other: DataType) = dt.acceptsType(other)
184+
private[ sql ] override def acceptsType(other: DataType) = dt.acceptsType(other)
175185

176-
override def defaultSize: Int = dt.defaultSize
186+
override def defaultSize: Int = dt.defaultSize
177187

178-
override private[spark] def asNullable = dt.asNullable
188+
override private[ spark ] def asNullable = dt.asNullable
179189
}
180190

181191
class KStructField(val getterName: String, val delegate: StructField) extends StructField {
182-
override private[sql] def buildFormattedString(prefix: String, stringConcat: StringUtils.StringConcat, maxDepth: Int): Unit = delegate.buildFormattedString(prefix, stringConcat, maxDepth)
183192

184-
override def toString(): String = delegate.toString()
193+
override private[ sql ] def buildFormattedString(prefix: String, stringConcat: StringUtils.StringConcat, maxDepth: Int): Unit =
194+
delegate.buildFormattedString(prefix, stringConcat, maxDepth)
195+
196+
override def toString(): String = delegate.toString()
185197

186-
override private[sql] def jsonValue = delegate.jsonValue
198+
override private[ sql ] def jsonValue = delegate.jsonValue
187199

188-
override def withComment(comment: String): StructField = delegate.withComment(comment)
200+
override def withComment(comment: String): StructField = delegate.withComment(comment)
189201

190-
override def getComment(): Option[String] = delegate.getComment()
202+
override def getComment(): Option[ String ] = delegate.getComment()
191203

192-
override def toDDL: String = delegate.toDDL
204+
override def toDDL: String = delegate.toDDL
193205

194-
override def productElement(n: Int): Any = delegate.productElement(n)
206+
override def productElement(n: Int): Any = delegate.productElement(n)
195207

196-
override def productArity: Int = delegate.productArity
208+
override def productArity: Int = delegate.productArity
197209

198-
override def productIterator: Iterator[Any] = delegate.productIterator
210+
override def productIterator: Iterator[ Any ] = delegate.productIterator
199211

200-
override def productPrefix: String = delegate.productPrefix
212+
override def productPrefix: String = delegate.productPrefix
201213

202-
override val dataType: DataType = delegate.dataType
214+
override val dataType: DataType = delegate.dataType
203215

204-
override def canEqual(that: Any): Boolean = delegate.canEqual(that)
216+
override def canEqual(that: Any): Boolean = delegate.canEqual(that)
205217

206-
override val metadata: Metadata = delegate.metadata
207-
override val name: String = delegate.name
208-
override val nullable: Boolean = delegate.nullable
218+
override val metadata: Metadata = delegate.metadata
219+
override val name: String = delegate.name
220+
override val nullable: Boolean = delegate.nullable
209221
}
210222

211223
object helpme {
212224

213-
def listToSeq(i: java.util.List[_]): Seq[_] = Seq(i.toArray: _*)
225+
def listToSeq(i: java.util.List[ _ ]): Seq[ _ ] = Seq(i.toArray: _*)
214226
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Broadcasting.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ data class SomeClass(val a: IntArray, val b: Int) : Serializable
3131
fun main() = withSpark {
3232
val broadcastVariable = spark.broadcast(SomeClass(a = intArrayOf(5, 6), b = 3))
3333
val result = listOf(1, 2, 3, 4, 5)
34-
.toDS()
35-
.map {
36-
val receivedBroadcast = broadcastVariable.value
37-
it + receivedBroadcast.a.first()
38-
}
39-
.collectAsList()
34+
.toDS()
35+
.map {
36+
val receivedBroadcast = broadcastVariable.value
37+
it + receivedBroadcast.a.first()
38+
}
39+
.collectAsList()
4040

4141
println(result)
4242
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/CachedOperations.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import org.jetbrains.kotlinx.spark.api.*
2424
fun main() {
2525
withSpark {
2626
dsOf(1, 2, 3, 4, 5)
27-
.map { it to (it + 2) }
28-
.withCached {
29-
showDS()
27+
.map { it to (it + 2) }
28+
.withCached {
29+
showDS()
3030

31-
filter { it.first % 2 == 0 }.showDS()
32-
}
33-
.map { c(it.first, it.second, (it.first + it.second) * 2) }
34-
.show()
31+
filter { it.first % 2 == 0 }.showDS()
32+
}
33+
.map { c(it.first, it.second, (it.first + it.second) * 2) }
34+
.show()
3535
}
3636
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MapAndListOperations.kt

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ import org.jetbrains.kotlinx.spark.api.*
2323

2424
fun main() {
2525
withSpark(props = mapOf("spark.sql.codegen.wholeStage" to true)) {
26-
dsOf(mapOf(1 to c(1, 2, 3), 2 to c(1, 2, 3)), mapOf(3 to c(1, 2, 3), 4 to c(1, 2, 3)))
27-
.flatMap { it.toList().map { p -> listOf(p.first, p.second._1, p.second._2, p.second._3) }.iterator() }
28-
.flatten()
29-
.map { c(it) }
30-
.also { it.printSchema() }
31-
.distinct()
32-
.sort("_1")
33-
.debugCodegen()
34-
.show()
26+
dsOf(
27+
mapOf(1 to c(1, 2, 3), 2 to c(1, 2, 3)),
28+
mapOf(3 to c(1, 2, 3), 4 to c(1, 2, 3)),
29+
)
30+
.flatMap { it.toList().map { p -> listOf(p.first, p.second._1, p.second._2, p.second._3) }.iterator() }
31+
.flatten()
32+
.map { c(it) }
33+
.also { it.printSchema() }
34+
.distinct()
35+
.sort("_1")
36+
.debugCodegen()
37+
.show()
3538
}
3639
}
3740

0 commit comments

Comments
 (0)