Skip to content

Commit e5c4c1e

Browse files
authored
[Spark Runner] Prepare Spark 3 structured-streaming to shared base, adopt Flink-style version overrides (#38233)
1 parent 1cb2a71 commit e5c4c1e

76 files changed

Lines changed: 91 additions & 25 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

gradle.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,7 @@ docker_image_default_repo_prefix=beam_
4040

4141
# supported flink versions
4242
flink_versions=1.17,1.18,1.19,1.20,2.0
43+
# supported spark versions
44+
spark_versions=3
4345
# supported python versions
4446
python_versions=3.10,3.11,3.12,3.13,3.14

runners/spark/3/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
def basePath = '..'
2020
/* All properties required for loading the Spark build script */
2121
project.ext {
22+
spark_major = '3'
2223
// Spark 3 version as defined in BeamModulePlugin
2324
spark_version = spark3_version
2425
spark_scala_version = '2.12'
25-
copySourceBase = false // disabled to use Spark 3 as primary dev version
2626
archives_base_name = 'beam-runners-spark-3'
2727
}
2828

runners/spark/spark_runner.gradle

Lines changed: 88 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -89,38 +89,102 @@ def hadoopVersions = [
8989

9090
hadoopVersions.each { kv -> configurations.create("hadoopVersion$kv.key") }
9191

92-
def sourceBase = "${project.projectDir}/../src"
93-
def sourceBaseCopy = "${project.buildDir}/sourcebase/src"
94-
95-
def useCopiedSourceSet = { scope, type, trigger ->
96-
def taskName = "copy${scope.capitalize()}${type.capitalize()}"
97-
trigger.dependsOn tasks.register(taskName, Copy) {
98-
from "$sourceBase/$scope/$type"
99-
into "$sourceBaseCopy/$scope/$type"
100-
duplicatesStrategy DuplicatesStrategy.INCLUDE
92+
/*
93+
* Per-version source overrides (mirrors runners/flink/flink_runner.gradle).
94+
*
95+
* Layout:
96+
* runners/spark/src/ -- shared base (lowest supported version uses these directly)
97+
* runners/spark/<major>/src/ -- version-specific overrides (later overrides take precedence)
98+
*
99+
* The lowest supported `spark_major` builds straight from the shared base.
100+
* Higher versions copy <shared> + <previous majors> + <current> into a single
101+
* source-overrides directory using DuplicatesStrategy.INCLUDE so the current
102+
* version's files override earlier ones.
103+
*/
104+
def base_path = ".."
105+
106+
def overrides = { versions, type, group = 'java' ->
107+
// order matters: later entries override earlier ones during the Copy
108+
["${base_path}/src/${type}/${group}"] +
109+
versions.collect { "${base_path}/${it}/src/${type}/${group}" } +
110+
["./src/${type}/${group}"]
111+
}
112+
113+
def all_versions = spark_versions.split(",").collect { it.trim() }
114+
// Determine version order by list position rather than string comparison so two-digit
115+
// majors (e.g. "10") still sort after single-digit ones.
116+
def spark_major_index = all_versions.indexOf(spark_major)
117+
if (spark_major_index < 0) {
118+
throw new GradleException(
119+
"spark_major='${spark_major}' is not listed in spark_versions='${spark_versions}' " +
120+
"(see root gradle.properties).")
121+
}
122+
def previous_versions = spark_major_index > 0 ? all_versions.subList(0, spark_major_index) : []
123+
124+
def main_source_overrides = overrides(previous_versions, "main")
125+
def test_source_overrides = overrides(previous_versions, "test")
126+
def main_resources_overrides = overrides(previous_versions, "main", "resources")
127+
def test_resources_overrides = overrides(previous_versions, "test", "resources")
128+
129+
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()
130+
131+
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->
132+
copyTask.from main_source_overrides
133+
copyTask.into "${sourceOverridesBase}/main/java"
134+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
135+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) {
136+
project.ext.excluded_files.main.each { f -> copyTask.exclude "**/${f}" }
101137
}
102-
// append copied sources to srcDirs
103-
sourceSets."$scope"."$type".srcDirs "$sourceBaseCopy/$scope/$type"
104138
}
105139

106-
if (copySourceBase) {
107-
// Copy source base into build directory.
108-
// While this is not necessary, having multiple source sets referencing the same shared base will typically confuse an IDE and harm developer experience.
109-
// The copySourceBase flag can be swapped without any implications and allows to pick a main version that is actively worked on.
110-
useCopiedSourceSet("main", "java", compileJava)
111-
useCopiedSourceSet("main", "resources", processResources)
112-
useCopiedSourceSet("test", "java", compileTestJava)
113-
useCopiedSourceSet("test", "resources", processTestResources)
140+
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
141+
it.from main_resources_overrides
142+
it.into "${sourceOverridesBase}/main/resources"
143+
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
144+
}
145+
146+
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask ->
147+
copyTask.from test_source_overrides
148+
copyTask.into "${sourceOverridesBase}/test/java"
149+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
150+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) {
151+
project.ext.excluded_files.test.each { f -> copyTask.exclude "**/${f}" }
152+
}
153+
}
154+
155+
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
156+
it.from test_resources_overrides
157+
it.into "${sourceOverridesBase}/test/resources"
158+
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
159+
}
160+
161+
def use_override = (spark_major_index > 0)
162+
def sourceBase = "${project.projectDir}/../src"
163+
164+
if (use_override) {
165+
// Pin srcDirs to the Copy task providers so each higher version sees only its merged
166+
// overrides tree. Passing the TaskProviders here lets Gradle auto-wire task dependencies
167+
// for every consumer (compile, javadoc, sources jar, etc.) without manual dependsOn.
168+
sourceSets {
169+
main {
170+
java { srcDirs = [copySourceOverrides] }
171+
resources { srcDirs = [copyResourcesOverrides] }
172+
}
173+
test {
174+
java { srcDirs = [copyTestSourceOverrides] }
175+
resources { srcDirs = [copyTestResourcesOverrides] }
176+
}
177+
}
114178
} else {
115-
// append shared base sources to srcDirs
179+
// Lowest supported Spark version: build straight from the shared base, no copy step.
116180
sourceSets {
117181
main {
118-
java.srcDirs "${sourceBase}/main/java"
119-
resources.srcDirs "${sourceBase}/main/resources"
182+
java { srcDirs = ["${sourceBase}/main/java"] }
183+
resources { srcDirs = ["${sourceBase}/main/resources"] }
120184
}
121185
test {
122-
java.srcDirs "${sourceBase}/test/java"
123-
resources.srcDirs "${sourceBase}/test/resources"
186+
java { srcDirs = ["${sourceBase}/test/java"] }
187+
resources { srcDirs = ["${sourceBase}/test/resources"] }
124188
}
125189
}
126190
}

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/examples/WordCount.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/examples/WordCount.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java

File renamed without changes.

runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/package-info.java renamed to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/package-info.java

File renamed without changes.

0 commit comments

Comments
 (0)