diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy index b044fd832b..7eaba85ab8 100644 --- a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy @@ -279,7 +279,7 @@ class CmdLineageTest extends Specification { def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"), "lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, ['foo', 'bar']) def jsonSer = encoder.encode(entry) - def expectedOutput = '[\n "lid://12345"\n]' + def expectedOutput = 'lid://12345' lidFile.text = jsonSer when: def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput", "label=foo"]) diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy index 95b6094832..564eec86d0 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy @@ -16,11 +16,9 @@ package nextflow.lineage -import java.nio.file.FileVisitResult -import java.nio.file.FileVisitor import java.nio.file.Files import java.nio.file.Path -import java.nio.file.attribute.BasicFileAttributes +import java.util.stream.Stream import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -93,70 +91,32 @@ class DefaultLinStore implements LinStore { void close() throws IOException {} @Override - Map search(Map> params) { - final results = new HashMap() - - Files.walkFileTree(location, new FileVisitor() { - - @Override - FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - FileVisitResult.CONTINUE - } - - @Override - FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if( file.name.startsWith('.data.json') ) { - final lidObject = encoder.decode(file.text) - if( LinUtils.checkParams(lidObject, params) ) { - results.put(location.relativize(file.getParent()).toString(), lidObject as LinSerializable) - } - } - FileVisitResult.CONTINUE + Stream search(Map> params) { + return Files.walk(location) + .filter { Path path -> + Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') } - - @Override - FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { - FileVisitResult.CONTINUE + .map { Path path -> + final obj = encoder.decode(path.text) + final key = location.relativize(path.parent).toString() + return new AbstractMap.SimpleEntry(key, obj) } - - @Override - FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - FileVisitResult.CONTINUE + .filter { entry -> + LinUtils.checkParams(entry.value, params) } - }) - - return results + .map {it-> it.key } } @Override - List getSubKeys(String parentKey) { - final results = new LinkedList() + Stream getSubKeys(String parentKey) { final startPath = location.resolve(parentKey) - Files.walkFileTree(startPath, new FileVisitor() { - - @Override - FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - FileVisitResult.CONTINUE - } - @Override - FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if( file.name.startsWith('.data.json') && file.parent != startPath ) { - results << location.relativize(file.parent).toString() - } - FileVisitResult.CONTINUE + return Files.walk(startPath) + .filter { Path path -> + Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') && path.parent != startPath } - - @Override - FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { - FileVisitResult.CONTINUE - } - - @Override - FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - FileVisitResult.CONTINUE + .map { Path path -> + location.relativize(path.parent).toString() } - }) - return results } -} \ No newline at end of file +} diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy index 38ab162983..90c8bc441a 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy @@ -24,7 +24,6 @@ import nextflow.Session import nextflow.extension.LinExtension import nextflow.lineage.fs.LinPathFactory import nextflow.lineage.model.v1beta1.FileOutput -import nextflow.lineage.serde.LinSerializable import static nextflow.lineage.fs.LinPath.* @@ -43,7 +42,9 @@ class LinExtensionImpl implements LinExtension { log.trace("Querying lineage with params: $queryParams") new LinPropertyValidator().validateQueryParams(queryParams.keySet()) final store = getStore(session) - emitSearchResults(channel, store.search(queryParams)) + try( def stream = store.search(queryParams) ){ + stream.forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) } + } channel.bind(Channel.STOP) } @@ -62,7 +63,6 @@ class LinExtensionImpl implements LinExtension { return queryParams } - protected LinStore getStore(Session session) { final store = LinStoreFactory.getOrCreate(session) if( !store ) { @@ -70,11 +70,4 @@ class LinExtensionImpl implements LinExtension { } return store } - - private void emitSearchResults(DataflowWriteChannel channel, Map results) { - if( !results ) { - return - } - results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) } - } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy index 5e8f95d432..11e76340e3 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy @@ -18,6 +18,7 @@ package nextflow.lineage import com.google.common.annotations.Beta import groovy.transform.CompileStatic +import java.util.stream.Stream import nextflow.lineage.config.LineageConfig import nextflow.lineage.serde.LinSerializable /** @@ -58,18 +59,18 @@ interface LinStore extends Closeable { /** * Search for lineage entries. * @param params Map of query params - * @return Key-lineage entry pairs fulfilling the query params + * @return Stream with keys fulfilling the query params */ - Map search(Map> params) + Stream search(Map> params) /** * Search for keys starting with a parent key. * For example, if a LinStore contains the following keys: '123abc', '123abc/samples/file1.txt' and '123abc/summary', - * The execution of the function with parentKey='123abc' will return a list with '123abc/samples/file1.txt' and '123abc/summary'. + * The execution of the function with parentKey='123abc' will return a stream with '123abc/samples/file1.txt' and '123abc/summary'. * Similarly, the execution of the function with parentKey='123abc/samples' will just return '123abc/samples/file1.txt" * * @param parentKey - * @return list of keys + * @return Stream of keys starting with parentKey */ - List getSubKeys(String parentKey) + Stream getSubKeys(String parentKey) } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy index daaf12d324..f5f3098cb0 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy @@ -184,7 +184,9 @@ class LinCommandImpl implements CmdLineage.LinCommand { try { final params = parseFindArgs(args) new LinPropertyValidator().validateQueryParams(params.keySet()) - println LinUtils.encodeSearchOutputs( store.search(params).keySet().collect { asUriString(it) }, true ) + try (def stream = store.search(params) ) { + stream.forEach { println asUriString(it) } + } } catch (Throwable e){ println "Error searching for ${args[0]}. ${e.message}" } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy index d90f650914..f402f0a607 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy @@ -39,7 +39,6 @@ import java.nio.file.spi.FileSystemProvider import groovy.transform.CompileStatic import nextflow.lineage.config.LineageConfig import nextflow.util.TestOnly - /** * File System Provider for LID Paths * @@ -216,8 +215,9 @@ class LinFileSystemProvider extends FileSystemProvider { return getDirectoryStreamFromSubPath(lid) return getDirectoryStreamFromRealPath(real, lid) } + private static DirectoryStream getDirectoryStreamFromSubPath(LinPath lid){ - List paths = lid.getSubPaths() + final paths = lid.getSubPaths() if( !paths ) throw new FileNotFoundException("Sub paths for '$lid' do not exist") return new DirectoryStream() { @@ -225,7 +225,9 @@ class LinFileSystemProvider extends FileSystemProvider { return paths.iterator() } - void close() {} + void close() { + paths.close() + } } } private DirectoryStream getDirectoryStreamFromRealPath(Path real, LinPath lid) { diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy index 23c6adedaf..89189e6464 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy @@ -27,6 +27,7 @@ import java.nio.file.WatchEvent import java.nio.file.WatchKey import java.nio.file.WatchService import java.time.OffsetDateTime +import java.util.stream.Stream import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode @@ -156,7 +157,7 @@ class LinPath implements Path, LogicalDataPath { @TestOnly protected String getFilePath() { this.filePath } - protected List getSubPaths(){ + protected Stream getSubPaths(){ if( !fileSystem ) throw new IllegalArgumentException("Cannot get sub-paths for a relative lineage path") if( filePath.isEmpty() || filePath == SEPARATOR ) @@ -164,7 +165,7 @@ class LinPath implements Path, LogicalDataPath { final store = fileSystem.getStore() if( !store ) throw new Exception("Lineage store not found - Check Nextflow configuration") - return store.getSubKeys(filePath).collect {new LinPath(fileSystem as LinFileSystem, it)} as List + return store.getSubKeys(filePath).map {new LinPath(fileSystem as LinFileSystem, it) as Path } } /** diff --git a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy index ce7fd3d2dd..ccabb2e7bc 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy @@ -123,12 +123,10 @@ class DefaultLinStoreTest extends Specification { lidStore.save(key4, value4) when: - def results = lidStore.search( [type:['FileOutput'], labels:['value2']]) + def results = lidStore.search( [type:['FileOutput'], labels:['value2']]).toList() then: results.size() == 2 - results.keySet().containsAll([key2,key3]) - results[key2] == value2 - results[key3] == value3 + results.containsAll([key2,key3]) } def 'should search subkeys' () { @@ -154,7 +152,7 @@ class DefaultLinStoreTest extends Specification { lidStore.save(key4, value4) when: - def results = lidStore.getSubKeys("testKey") + def results = lidStore.getSubKeys("testKey").toList() then: results.size() == 2 results.containsAll([key2, key3]) diff --git a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy index 1a966ad2be..396e62bfec 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy @@ -488,8 +488,8 @@ class LinCommandImplTest extends Specification{ "lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, ["experiment=test"]) def entry3 = new FileOutput("path/to/file3",new Checksum("42472qet","nextflow","standard"), "lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, null) - def expectedOutput1 = '[\n "lid://123987/file.bam",\n "lid://123987/file2.bam"\n]' - def expectedOutput2 = '[\n "lid://123987/file2.bam",\n "lid://123987/file.bam"\n]' + def expectedOutput1 = 'lid://123987/file.bam\nlid://123987/file2.bam' + def expectedOutput2 = 'lid://123987/file2.bam\nlid://123987/file.bam' lidFile.text = encoder.encode(entry) lidFile2.text = encoder.encode(entry2) lidFile3.text = encoder.encode(entry3)