From 436cd37046672fe156f4c100be2ce8668d5c203a Mon Sep 17 00:00:00 2001 From: jorgee Date: Mon, 12 May 2025 17:23:39 +0200 Subject: [PATCH 1/2] change LinStore interface query functions from collections to streams Signed-off-by: jorgee --- .../groovy/nextflow/cli/CmdLineageTest.groovy | 2 +- .../nextflow/lineage/DefaultLinStore.groovy | 78 +++++-------------- .../nextflow/lineage/LinExtensionImpl.groovy | 13 +--- .../src/main/nextflow/lineage/LinStore.groovy | 11 +-- .../lineage/cli/LinCommandImpl.groovy | 4 +- .../lineage/fs/LinFileSystemProvider.groovy | 8 +- .../main/nextflow/lineage/fs/LinPath.groovy | 5 +- .../lineage/DefaultLinStoreTest.groovy | 8 +- .../lineage/cli/LinCommandImplTest.groovy | 4 +- 9 files changed, 46 insertions(+), 87 deletions(-) diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy index b044fd832b..7eaba85ab8 100644 --- a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy @@ -279,7 +279,7 @@ class CmdLineageTest extends Specification { def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"), "lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, ['foo', 'bar']) def jsonSer = encoder.encode(entry) - def expectedOutput = '[\n "lid://12345"\n]' + def expectedOutput = 'lid://12345' lidFile.text = jsonSer when: def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput", "label=foo"]) diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy index 95b6094832..e5e0ef7613 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy @@ -16,11 +16,9 @@ package nextflow.lineage -import java.nio.file.FileVisitResult -import java.nio.file.FileVisitor import java.nio.file.Files import java.nio.file.Path -import java.nio.file.attribute.BasicFileAttributes +import java.util.stream.Stream import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -93,70 +91,32 @@ class DefaultLinStore implements LinStore { void close() throws IOException {} @Override - Map search(Map> params) { - final results = new HashMap() - - Files.walkFileTree(location, new FileVisitor() { - - @Override - FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - FileVisitResult.CONTINUE - } - - @Override - FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if( file.name.startsWith('.data.json') ) { - final lidObject = encoder.decode(file.text) - if( LinUtils.checkParams(lidObject, params) ) { - results.put(location.relativize(file.getParent()).toString(), lidObject as LinSerializable) - } - } - FileVisitResult.CONTINUE + Stream search(Map> params) { + return Files.walk(location) + .filter { path -> + Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') } - - @Override - FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { - FileVisitResult.CONTINUE + .map { path -> + def lidObject = encoder.decode(path.text) + def key = location.relativize(path.parent).toString() + return new AbstractMap.SimpleEntry(key, lidObject) } - - @Override - FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - FileVisitResult.CONTINUE + .filter { entry -> + LinUtils.checkParams(entry.value, params) } - }) - - return results + .map { it.key } } @Override - List getSubKeys(String parentKey) { - final results = new LinkedList() - final startPath = location.resolve(parentKey) - Files.walkFileTree(startPath, new FileVisitor() { - - @Override - FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - FileVisitResult.CONTINUE - } + Stream getSubKeys(String parentKey) { + def startPath = location.resolve(parentKey) - @Override - FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if( file.name.startsWith('.data.json') && file.parent != startPath ) { - results << location.relativize(file.parent).toString() - } - FileVisitResult.CONTINUE + return Files.walk(startPath) + .filter { path -> + Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') && path.parent != startPath } - - @Override - FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { - FileVisitResult.CONTINUE - } - - @Override - FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - FileVisitResult.CONTINUE + .map { path -> + location.relativize(path.parent).toString() } - }) - return results } } \ No newline at end of file diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy index 38ab162983..90c8bc441a 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy @@ -24,7 +24,6 @@ import nextflow.Session import nextflow.extension.LinExtension import nextflow.lineage.fs.LinPathFactory import nextflow.lineage.model.v1beta1.FileOutput -import nextflow.lineage.serde.LinSerializable import static nextflow.lineage.fs.LinPath.* @@ -43,7 +42,9 @@ class LinExtensionImpl implements LinExtension { log.trace("Querying lineage with params: $queryParams") new LinPropertyValidator().validateQueryParams(queryParams.keySet()) final store = getStore(session) - emitSearchResults(channel, store.search(queryParams)) + try( def stream = store.search(queryParams) ){ + stream.forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) } + } channel.bind(Channel.STOP) } @@ -62,7 +63,6 @@ class LinExtensionImpl implements LinExtension { return queryParams } - protected LinStore getStore(Session session) { final store = LinStoreFactory.getOrCreate(session) if( !store ) { @@ -70,11 +70,4 @@ class LinExtensionImpl implements LinExtension { } return store } - - private void emitSearchResults(DataflowWriteChannel channel, Map results) { - if( !results ) { - return - } - results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) } - } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy index 5e8f95d432..11e76340e3 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy @@ -18,6 +18,7 @@ package nextflow.lineage import com.google.common.annotations.Beta import groovy.transform.CompileStatic +import java.util.stream.Stream import nextflow.lineage.config.LineageConfig import nextflow.lineage.serde.LinSerializable /** @@ -58,18 +59,18 @@ interface LinStore extends Closeable { /** * Search for lineage entries. * @param params Map of query params - * @return Key-lineage entry pairs fulfilling the query params + * @return Stream with keys fulfilling the query params */ - Map search(Map> params) + Stream search(Map> params) /** * Search for keys starting with a parent key. * For example, if a LinStore contains the following keys: '123abc', '123abc/samples/file1.txt' and '123abc/summary', - * The execution of the function with parentKey='123abc' will return a list with '123abc/samples/file1.txt' and '123abc/summary'. + * The execution of the function with parentKey='123abc' will return a stream with '123abc/samples/file1.txt' and '123abc/summary'. * Similarly, the execution of the function with parentKey='123abc/samples' will just return '123abc/samples/file1.txt" * * @param parentKey - * @return list of keys + * @return Stream of keys starting with parentKey */ - List getSubKeys(String parentKey) + Stream getSubKeys(String parentKey) } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy index 6fb5321307..704a251dbe 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy @@ -182,7 +182,9 @@ class LinCommandImpl implements CmdLineage.LinCommand { try { final params = parseFindArgs(args) new LinPropertyValidator().validateQueryParams(params.keySet()) - println LinUtils.encodeSearchOutputs( store.search(params).keySet().collect { asUriString(it) }, true ) + try (def stream = store.search(params) ) { + stream.forEach { println asUriString(it) } + } } catch (Throwable e){ println "Error searching for ${args[0]}. ${e.message}" } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy index c71221f707..b2d7ecc23c 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy @@ -35,6 +35,7 @@ import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileAttributeView import java.nio.file.spi.FileSystemProvider +import java.util.stream.Stream import groovy.transform.CompileStatic import nextflow.lineage.config.LineageConfig @@ -216,8 +217,9 @@ class LinFileSystemProvider extends FileSystemProvider { return getDirectoryStreamFromSubPath(lid) return getDirectoryStreamFromRealPath(real, lid) } + private static DirectoryStream getDirectoryStreamFromSubPath(LinPath lid){ - List paths = lid.getSubPaths() + Stream paths = lid.getSubPaths() if( !paths ) throw new FileNotFoundException("Sub paths for '$lid' do not exist") return new DirectoryStream() { @@ -225,7 +227,9 @@ class LinFileSystemProvider extends FileSystemProvider { return paths.iterator() } - void close() {} + void close() { + paths.close() + } } } private DirectoryStream getDirectoryStreamFromRealPath(Path real, LinPath lid) { diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy index e8bd260330..5605af02bb 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy @@ -27,6 +27,7 @@ import java.nio.file.WatchEvent import java.nio.file.WatchKey import java.nio.file.WatchService import java.time.OffsetDateTime +import java.util.stream.Stream import groovy.transform.CompileStatic import groovy.transform.Memoized @@ -155,7 +156,7 @@ class LinPath implements Path, LogicalDataPath { @TestOnly protected String getFilePath() { this.filePath } - protected List getSubPaths(){ + protected Stream getSubPaths(){ if( !fileSystem ) throw new IllegalArgumentException("Cannot get sub-paths for a relative lineage path") if( filePath.isEmpty() || filePath == SEPARATOR ) @@ -163,7 +164,7 @@ class LinPath implements Path, LogicalDataPath { final store = fileSystem.getStore() if( !store ) throw new Exception("Lineage store not found - Check Nextflow configuration") - return store.getSubKeys(filePath).collect {new LinPath(fileSystem as LinFileSystem, it)} as List + return store.getSubKeys(filePath).map {new LinPath(fileSystem as LinFileSystem, it) as Path } } /** diff --git a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy index ce7fd3d2dd..ccabb2e7bc 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy @@ -123,12 +123,10 @@ class DefaultLinStoreTest extends Specification { lidStore.save(key4, value4) when: - def results = lidStore.search( [type:['FileOutput'], labels:['value2']]) + def results = lidStore.search( [type:['FileOutput'], labels:['value2']]).toList() then: results.size() == 2 - results.keySet().containsAll([key2,key3]) - results[key2] == value2 - results[key3] == value3 + results.containsAll([key2,key3]) } def 'should search subkeys' () { @@ -154,7 +152,7 @@ class DefaultLinStoreTest extends Specification { lidStore.save(key4, value4) when: - def results = lidStore.getSubKeys("testKey") + def results = lidStore.getSubKeys("testKey").toList() then: results.size() == 2 results.containsAll([key2, key3]) diff --git a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy index 74129883ac..05e03412bb 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy @@ -470,8 +470,8 @@ class LinCommandImplTest extends Specification{ "lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, ["experiment=test"]) def entry3 = new FileOutput("path/to/file3",new Checksum("42472qet","nextflow","standard"), "lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, null) - def expectedOutput1 = '[\n "lid://123987/file.bam",\n "lid://123987/file2.bam"\n]' - def expectedOutput2 = '[\n "lid://123987/file2.bam",\n "lid://123987/file.bam"\n]' + def expectedOutput1 = 'lid://123987/file.bam\nlid://123987/file2.bam' + def expectedOutput2 = 'lid://123987/file2.bam\nlid://123987/file.bam' lidFile.text = encoder.encode(entry) lidFile2.text = encoder.encode(entry2) lidFile3.text = encoder.encode(entry3) From 35acafb496b704929eb6c9ab59c0bebcbd9a0ce2 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 14 May 2025 14:50:29 -0400 Subject: [PATCH 2/2] Minor refacor [ci fast] Signed-off-by: Paolo Di Tommaso --- .../nextflow/lineage/DefaultLinStore.groovy | 20 +++++++++---------- .../lineage/fs/LinFileSystemProvider.groovy | 4 +--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy index e5e0ef7613..564eec86d0 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy @@ -93,30 +93,30 @@ class DefaultLinStore implements LinStore { @Override Stream search(Map> params) { return Files.walk(location) - .filter { path -> + .filter { Path path -> Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') } - .map { path -> - def lidObject = encoder.decode(path.text) - def key = location.relativize(path.parent).toString() - return new AbstractMap.SimpleEntry(key, lidObject) + .map { Path path -> + final obj = encoder.decode(path.text) + final key = location.relativize(path.parent).toString() + return new AbstractMap.SimpleEntry(key, obj) } .filter { entry -> LinUtils.checkParams(entry.value, params) } - .map { it.key } + .map {it-> it.key } } @Override Stream getSubKeys(String parentKey) { - def startPath = location.resolve(parentKey) + final startPath = location.resolve(parentKey) return Files.walk(startPath) - .filter { path -> + .filter { Path path -> Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') && path.parent != startPath } - .map { path -> + .map { Path path -> location.relativize(path.parent).toString() } } -} \ No newline at end of file +} diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy index ff72356afa..f402f0a607 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy @@ -35,12 +35,10 @@ import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileAttributeView import java.nio.file.spi.FileSystemProvider -import java.util.stream.Stream import groovy.transform.CompileStatic import nextflow.lineage.config.LineageConfig import nextflow.util.TestOnly - /** * File System Provider for LID Paths * @@ -219,7 +217,7 @@ class LinFileSystemProvider extends FileSystemProvider { } private static DirectoryStream getDirectoryStreamFromSubPath(LinPath lid){ - Stream paths = lid.getSubPaths() + final paths = lid.getSubPaths() if( !paths ) throw new FileNotFoundException("Sub paths for '$lid' do not exist") return new DirectoryStream() {