Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions clients/spark/src/main/scala/io/treeverse/gc/DataLister.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,62 @@ class NaiveDataLister extends DataLister {
class FileDescriptor(val path: String, val lastModified: Long) extends Serializable

class ParallelDataLister extends DataLister with Serializable {
private def listPath(configMapper: ConfigMapper, p: Path): Iterator[FileDescriptor] = {
// listPath lists entries under p and returns FileDescriptors with paths relative to p.
// Non-recursive mode uses listStatusIterator (includes directories, needed for data/ scanning).
// Recursive mode uses listFiles (leaf files only, needed for sharded shard directories).
private def listPath(
configMapper: ConfigMapper,
p: Path,
recursive: Boolean
): Iterator[FileDescriptor] = {
val fs = p.getFileSystem(configMapper.configuration)
if (!fs.exists(p)) {
return Iterator.empty
}
val it = fs.listStatusIterator(p)
new Iterator[FileDescriptor] with Serializable {
override def hasNext: Boolean = it.hasNext

override def next(): FileDescriptor = {
val item = it.next()
new FileDescriptor(item.getPath.getName, item.getModificationTime)
if (!fs.exists(p)) return Iterator.empty
if (recursive) {
// Use makeQualified so the base URI scheme matches what listFiles returns
// (e.g. s3:// vs s3a://) and stripPrefix reliably removes the prefix.
val base = fs.makeQualified(p).toString.stripSuffix("/") + "/"
val it = fs.listFiles(p, true)
new Iterator[FileDescriptor] with Serializable {
override def hasNext: Boolean = it.hasNext
override def next(): FileDescriptor = {
val s = it.next()
val rel = s.getPath.toString.stripPrefix(base)
require(rel != s.getPath.toString, s"path ${s.getPath} is not under $base")
new FileDescriptor(rel, s.getModificationTime)
}
}
} else {
// for base with partition directories
val it = fs.listStatusIterator(p)
new Iterator[FileDescriptor] with Serializable {
override def hasNext: Boolean = it.hasNext
override def next(): FileDescriptor = {
val s = it.next()
new FileDescriptor(s.getPath.getName, s.getModificationTime)
}
}
}
}

// ShardDirPrefix must match shardDirPrefix in pkg/upload/path_provider.go.
private val ShardDirPrefix = "!"

// isShardSlice returns true for shard directory names (<ShardDirPrefix><2-hex-chars>).
// Legacy partition names are 20-char xid strings, so detection is unambiguous.
Comment on lines +71 to +75
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to revise the comments here so they make sense in this context

private def isShardSlice(sliceId: String): Boolean =
sliceId.matches(s"${java.util.regex.Pattern.quote(ShardDirPrefix)}[0-9a-f]{2}")

override def listData(configMapper: ConfigMapper, path: Path, parallelism: Int): DataFrame = {
import spark.implicits._
val slices = listPath(configMapper, path)
val slices = listPath(configMapper, path, recursive = false)
val objectsPath = if (path.toString.endsWith("/")) path.toString else path.toString + "/"
// udf require serializable string and not Path
val objectsUDF = udf((sliceId: String) => {
// WA for https://issues.apache.org/jira/browse/HDFS-14762
val slicePath = new Path(objectsPath + sliceId)
listPath(configMapper, slicePath).toSeq
.map(s => (s.path, s.lastModified))
listPath(configMapper, slicePath, recursive = isShardSlice(sliceId)).toSeq.map(s =>
(s.path, s.lastModified)
)
})

val objectsDF = slices
Expand Down
63 changes: 63 additions & 0 deletions clients/spark/src/test/scala/io/treeverse/gc/DataListerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,69 @@ class ParallelDataListerSpec
df.sort(desc("base_address")).head.getString(0) should be("slice10/object10")
})
}

it("should list sharded-format paths recursively") {
val dataDir = new File(dir.toFile, "data")
dataDir.mkdir()
withSparkSession(spark => {
// Create data/<shard>/<sub-shard>/<partition>/<xid> structure
val shardDir = new File(dataDir, "!ab")
shardDir.mkdir()
val subShardDir = new File(shardDir, "c")
subShardDir.mkdir()
val partitionDir = new File(subShardDir, "bpel26ce4m36oefv1600")
partitionDir.mkdir()
for (j <- 1 to 5) {
new File(partitionDir, f"xid$j%020d").createNewFile()
}

val path = new Path(dataDir.toURI)
val configMapper = new ConfigMapper(
spark.sparkContext.broadcast(
HadoopUtils.getHadoopConfigurationValues(spark.sparkContext.hadoopConfiguration)
)
)
val df = new ParallelDataLister().listData(configMapper, path, 3).sort("base_address")
df.count should be(5)
df.head.getString(0) should startWith("!ab/")
df.head.getString(0).split("/").length should be(4)
df.head.getString(0) should be("!ab/c/bpel26ce4m36oefv1600/xid00000000000000000001")
})
}

it("should handle mixed sharded and legacy paths") {
val dataDir = new File(dir.toFile, "data")
dataDir.mkdir()
withSparkSession(spark => {
// Sharded: data/!ab/c/partition/xid
val shardDir = new File(dataDir, "!ab")
shardDir.mkdir()
val subShardDir = new File(shardDir, "c")
subShardDir.mkdir()
val shardPartition = new File(subShardDir, "bpel26ce4m36oefv1600")
shardPartition.mkdir()
new File(shardPartition, "xid_sharded_file").createNewFile()

// Legacy: data/<partition>/<xid>
val legacyPartition = new File(dataDir, "legacy_partition_xid_str00")
legacyPartition.mkdir()
new File(legacyPartition, "xid_legacy_file").createNewFile()

val path = new Path(dataDir.toURI)
val configMapper = new ConfigMapper(
spark.sparkContext.broadcast(
HadoopUtils.getHadoopConfigurationValues(spark.sparkContext.hadoopConfiguration)
)
)
val df = new ParallelDataLister().listData(configMapper, path, 3).sort("base_address")
df.count should be(2)
val addresses = df.collect().map(_.getString(0))
// sharded entries should precede legacy partitions
addresses(0) should be("!ab/c/bpel26ce4m36oefv1600/xid_sharded_file")
addresses(1) should be("legacy_partition_xid_str00/xid_legacy_file")
})
}

it("should be able to list path with ':' in it") {
val dataDir = new File(dir.toFile, "data")
dataDir.mkdir()
Expand Down
Loading