Skip to content

Commit e9c5f05

Browse files
anishshri-dbattilapiros
authored andcommitted
[SPARK-48770][SS] Change to read operator metadata once on driver to check if we can find info for numColsPrefixKey used for session window agg queries
### What changes were proposed in this pull request? Change to read operator metadata once on driver to check if we can find info for numColsPrefixKey used for session window agg queries ### Why are the changes needed? Avoid reading the operator metadata file multiple times on the executors ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.datasources.v2.state.RocksDBStateDataSourceReadSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), Idle Worker Monitor for python3 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=tru... [info] Run completed in 1 minute, 39 seconds. [info] Total number of tests run: 14 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47167 from anishshri-db/task/SPARK-48770. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 165996f commit e9c5f05

File tree

4 files changed

+37
-19
lines changed

4 files changed

+37
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
3030
import org.apache.spark.sql.connector.expressions.Transform
3131
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
3232
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues.JoinSideValues
33+
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
3334
import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog, OffsetSeqMetadata}
3435
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
3536
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
3637
import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
3738
import org.apache.spark.sql.sources.DataSourceRegister
3839
import org.apache.spark.sql.types.{IntegerType, StructType}
3940
import org.apache.spark.sql.util.CaseInsensitiveStringMap
41+
import org.apache.spark.util.SerializableConfiguration
4042

4143
/**
4244
* An implementation of [[TableProvider]] with [[DataSourceRegister]] for State Store data source.
@@ -46,6 +48,8 @@ class StateDataSource extends TableProvider with DataSourceRegister {
4648

4749
private lazy val hadoopConf: Configuration = session.sessionState.newHadoopConf()
4850

51+
private lazy val serializedHadoopConf = new SerializableConfiguration(hadoopConf)
52+
4953
override def shortName(): String = "statestore"
5054

5155
override def getTable(
@@ -54,7 +58,17 @@ class StateDataSource extends TableProvider with DataSourceRegister {
5458
properties: util.Map[String, String]): Table = {
5559
val sourceOptions = StateSourceOptions.apply(session, hadoopConf, properties)
5660
val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, sourceOptions.batchId)
57-
new StateTable(session, schema, sourceOptions, stateConf)
61+
// Read the operator metadata once to see if we can find the information for prefix scan
62+
// encoder used in session window aggregation queries.
63+
val allStateStoreMetadata = new StateMetadataPartitionReader(
64+
sourceOptions.stateCheckpointLocation.getParent.toString, serializedHadoopConf)
65+
.stateMetadata.toArray
66+
val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
67+
entry.operatorId == sourceOptions.operatorId &&
68+
entry.stateStoreName == sourceOptions.storeName
69+
}
70+
71+
new StateTable(session, schema, sourceOptions, stateConf, stateStoreMetadata)
5872
}
5973

6074
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
2222
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
23-
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
23+
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataTableEntry
2424
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
2525
import org.apache.spark.sql.execution.streaming.state._
2626
import org.apache.spark.sql.types.StructType
@@ -33,11 +33,12 @@ import org.apache.spark.util.SerializableConfiguration
3333
class StatePartitionReaderFactory(
3434
storeConf: StateStoreConf,
3535
hadoopConf: SerializableConfiguration,
36-
schema: StructType) extends PartitionReaderFactory {
36+
schema: StructType,
37+
stateStoreMetadata: Array[StateMetadataTableEntry]) extends PartitionReaderFactory {
3738

3839
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
3940
new StatePartitionReader(storeConf, hadoopConf,
40-
partition.asInstanceOf[StateStoreInputPartition], schema)
41+
partition.asInstanceOf[StateStoreInputPartition], schema, stateStoreMetadata)
4142
}
4243
}
4344

@@ -49,7 +50,9 @@ class StatePartitionReader(
4950
storeConf: StateStoreConf,
5051
hadoopConf: SerializableConfiguration,
5152
partition: StateStoreInputPartition,
52-
schema: StructType) extends PartitionReader[InternalRow] with Logging {
53+
schema: StructType,
54+
stateStoreMetadata: Array[StateMetadataTableEntry])
55+
extends PartitionReader[InternalRow] with Logging {
5356

5457
private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
5558
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType]
@@ -58,13 +61,6 @@ class StatePartitionReader(
5861
val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
5962
partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
6063
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
61-
val allStateStoreMetadata = new StateMetadataPartitionReader(
62-
partition.sourceOptions.stateCheckpointLocation.getParent.toString, hadoopConf)
63-
.stateMetadata.toArray
64-
val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
65-
entry.operatorId == partition.sourceOptions.operatorId &&
66-
entry.stateStoreName == partition.sourceOptions.storeName
67-
}
6864
val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
6965
logWarning("Metadata for state store not found, possible cause is this checkpoint " +
7066
"is created by older version of spark. If the query has session window aggregation, " +

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
2525
import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder}
2727
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
28+
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataTableEntry
2829
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
2930
import org.apache.spark.sql.execution.streaming.state.{StateStoreConf, StateStoreErrors}
3031
import org.apache.spark.sql.types.StructType
@@ -35,8 +36,10 @@ class StateScanBuilder(
3536
session: SparkSession,
3637
schema: StructType,
3738
sourceOptions: StateSourceOptions,
38-
stateStoreConf: StateStoreConf) extends ScanBuilder {
39-
override def build(): Scan = new StateScan(session, schema, sourceOptions, stateStoreConf)
39+
stateStoreConf: StateStoreConf,
40+
stateStoreMetadata: Array[StateMetadataTableEntry]) extends ScanBuilder {
41+
override def build(): Scan = new StateScan(session, schema, sourceOptions, stateStoreConf,
42+
stateStoreMetadata)
4043
}
4144

4245
/** An implementation of [[InputPartition]] for State Store data source. */
@@ -50,7 +53,8 @@ class StateScan(
5053
session: SparkSession,
5154
schema: StructType,
5255
sourceOptions: StateSourceOptions,
53-
stateStoreConf: StateStoreConf) extends Scan with Batch {
56+
stateStoreConf: StateStoreConf,
57+
stateStoreMetadata: Array[StateMetadataTableEntry]) extends Scan with Batch {
5458

5559
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
5660
private val hadoopConfBroadcast = session.sparkContext.broadcast(
@@ -62,7 +66,8 @@ class StateScan(
6266
val fs = stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
6367
val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new PathFilter() {
6468
override def accept(path: Path): Boolean = {
65-
fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && path.getName.toInt >= 0
69+
fs.getFileStatus(path).isDirectory &&
70+
Try(path.getName.toInt).isSuccess && path.getName.toInt >= 0
6671
}
6772
})
6873

@@ -116,7 +121,8 @@ class StateScan(
116121
hadoopConfBroadcast.value, userFacingSchema, stateSchema)
117122

118123
case JoinSideValues.none =>
119-
new StatePartitionReaderFactory(stateStoreConf, hadoopConfBroadcast.value, schema)
124+
new StatePartitionReaderFactory(stateStoreConf, hadoopConfBroadcast.value, schema,
125+
stateStoreMetadata)
120126
}
121127

122128
override def toBatch: Batch = this

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability}
2525
import org.apache.spark.sql.connector.read.ScanBuilder
2626
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
27+
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataTableEntry
2728
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
2829
import org.apache.spark.sql.execution.streaming.state.StateStoreConf
2930
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -35,7 +36,8 @@ class StateTable(
3536
session: SparkSession,
3637
override val schema: StructType,
3738
sourceOptions: StateSourceOptions,
38-
stateConf: StateStoreConf)
39+
stateConf: StateStoreConf,
40+
stateStoreMetadata: Array[StateMetadataTableEntry])
3941
extends Table with SupportsRead with SupportsMetadataColumns {
4042

4143
import StateTable._
@@ -69,7 +71,7 @@ class StateTable(
6971
override def capabilities(): util.Set[TableCapability] = CAPABILITY
7072

7173
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
72-
new StateScanBuilder(session, schema, sourceOptions, stateConf)
74+
new StateScanBuilder(session, schema, sourceOptions, stateConf, stateStoreMetadata)
7375

7476
override def properties(): util.Map[String, String] = Map.empty[String, String].asJava
7577

0 commit comments

Comments
 (0)