diff --git a/clients/spark/src/main/scala/io/treeverse/gc/DataLister.scala b/clients/spark/src/main/scala/io/treeverse/gc/DataLister.scala index 53ce6305daf..61298d7f321 100644 --- a/clients/spark/src/main/scala/io/treeverse/gc/DataLister.scala +++ b/clients/spark/src/main/scala/io/treeverse/gc/DataLister.scala @@ -31,32 +31,62 @@ class NaiveDataLister extends DataLister { class FileDescriptor(val path: String, val lastModified: Long) extends Serializable class ParallelDataLister extends DataLister with Serializable { - private def listPath(configMapper: ConfigMapper, p: Path): Iterator[FileDescriptor] = { + // listPath lists entries under p and returns FileDescriptors with paths relative to p. + // Non-recursive mode uses listStatusIterator (includes directories, needed for data/ scanning). + // Recursive mode uses listFiles (leaf files only, needed for sharded shard directories). + private def listPath( + configMapper: ConfigMapper, + p: Path, + recursive: Boolean + ): Iterator[FileDescriptor] = { val fs = p.getFileSystem(configMapper.configuration) - if (!fs.exists(p)) { - return Iterator.empty - } - val it = fs.listStatusIterator(p) - new Iterator[FileDescriptor] with Serializable { - override def hasNext: Boolean = it.hasNext - - override def next(): FileDescriptor = { - val item = it.next() - new FileDescriptor(item.getPath.getName, item.getModificationTime) + if (!fs.exists(p)) return Iterator.empty + if (recursive) { + // Use makeQualified so the base URI scheme matches what listFiles returns + // (e.g. s3:// vs s3a://) and stripPrefix reliably removes the prefix. + val base = fs.makeQualified(p).toString.stripSuffix("/") + "/" + val it = fs.listFiles(p, true) + new Iterator[FileDescriptor] with Serializable { + override def hasNext: Boolean = it.hasNext + override def next(): FileDescriptor = { + val s = it.next() + val rel = s.getPath.toString.stripPrefix(base) + require(rel != s.getPath.toString, s"path ${s.getPath} is not under $base") + new FileDescriptor(rel, s.getModificationTime) + } + } + } else { + // for base with partition directories + val it = fs.listStatusIterator(p) + new Iterator[FileDescriptor] with Serializable { + override def hasNext: Boolean = it.hasNext + override def next(): FileDescriptor = { + val s = it.next() + new FileDescriptor(s.getPath.getName, s.getModificationTime) + } } } } + // ShardDirPrefix must match shardDirPrefix in pkg/upload/path_provider.go. + private val ShardDirPrefix = "!" + + // isShardSlice returns true for shard directory names (<2-hex-chars>). + // Legacy partition names are 20-char xid strings, so detection is unambiguous. + private def isShardSlice(sliceId: String): Boolean = + sliceId.matches(s"${java.util.regex.Pattern.quote(ShardDirPrefix)}[0-9a-f]{2}") + override def listData(configMapper: ConfigMapper, path: Path, parallelism: Int): DataFrame = { import spark.implicits._ - val slices = listPath(configMapper, path) + val slices = listPath(configMapper, path, recursive = false) val objectsPath = if (path.toString.endsWith("/")) path.toString else path.toString + "/" // udf require serializable string and not Path val objectsUDF = udf((sliceId: String) => { // WA for https://issues.apache.org/jira/browse/HDFS-14762 val slicePath = new Path(objectsPath + sliceId) - listPath(configMapper, slicePath).toSeq - .map(s => (s.path, s.lastModified)) + listPath(configMapper, slicePath, recursive = isShardSlice(sliceId)).toSeq.map(s => + (s.path, s.lastModified) + ) }) val objectsDF = slices diff --git a/clients/spark/src/test/scala/io/treeverse/gc/DataListerSpec.scala b/clients/spark/src/test/scala/io/treeverse/gc/DataListerSpec.scala index c72139b83be..a0a83e66a8e 100644 --- a/clients/spark/src/test/scala/io/treeverse/gc/DataListerSpec.scala +++ b/clients/spark/src/test/scala/io/treeverse/gc/DataListerSpec.scala @@ -64,6 +64,69 @@ class ParallelDataListerSpec df.sort(desc("base_address")).head.getString(0) should be("slice10/object10") }) } + + it("should list sharded-format paths recursively") { + val dataDir = new File(dir.toFile, "data") + dataDir.mkdir() + withSparkSession(spark => { + // Create data//// structure + val shardDir = new File(dataDir, "!ab") + shardDir.mkdir() + val subShardDir = new File(shardDir, "c") + subShardDir.mkdir() + val partitionDir = new File(subShardDir, "bpel26ce4m36oefv1600") + partitionDir.mkdir() + for (j <- 1 to 5) { + new File(partitionDir, f"xid$j%020d").createNewFile() + } + + val path = new Path(dataDir.toURI) + val configMapper = new ConfigMapper( + spark.sparkContext.broadcast( + HadoopUtils.getHadoopConfigurationValues(spark.sparkContext.hadoopConfiguration) + ) + ) + val df = new ParallelDataLister().listData(configMapper, path, 3).sort("base_address") + df.count should be(5) + df.head.getString(0) should startWith("!ab/") + df.head.getString(0).split("/").length should be(4) + df.head.getString(0) should be("!ab/c/bpel26ce4m36oefv1600/xid00000000000000000001") + }) + } + + it("should handle mixed sharded and legacy paths") { + val dataDir = new File(dir.toFile, "data") + dataDir.mkdir() + withSparkSession(spark => { + // Sharded: data/!ab/c/partition/xid + val shardDir = new File(dataDir, "!ab") + shardDir.mkdir() + val subShardDir = new File(shardDir, "c") + subShardDir.mkdir() + val shardPartition = new File(subShardDir, "bpel26ce4m36oefv1600") + shardPartition.mkdir() + new File(shardPartition, "xid_sharded_file").createNewFile() + + // Legacy: data// + val legacyPartition = new File(dataDir, "legacy_partition_xid_str00") + legacyPartition.mkdir() + new File(legacyPartition, "xid_legacy_file").createNewFile() + + val path = new Path(dataDir.toURI) + val configMapper = new ConfigMapper( + spark.sparkContext.broadcast( + HadoopUtils.getHadoopConfigurationValues(spark.sparkContext.hadoopConfiguration) + ) + ) + val df = new ParallelDataLister().listData(configMapper, path, 3).sort("base_address") + df.count should be(2) + val addresses = df.collect().map(_.getString(0)) + // sharded entries should precede legacy partitions + addresses(0) should be("!ab/c/bpel26ce4m36oefv1600/xid_sharded_file") + addresses(1) should be("legacy_partition_xid_str00/xid_legacy_file") + }) + } + it("should be able to list path with ':' in it") { val dataDir = new File(dir.toFile, "data") dataDir.mkdir()