Skip to content

Return streams in Lineage search and subkeys methods. #6067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class CmdLineageTest extends Specification {
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, ['foo', 'bar'])
def jsonSer = encoder.encode(entry)
def expectedOutput = '[\n "lid://12345"\n]'
def expectedOutput = 'lid://12345'
lidFile.text = jsonSer
when:
def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput", "label=foo"])
Expand Down
78 changes: 19 additions & 59 deletions modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package nextflow.lineage

import java.nio.file.FileVisitResult
import java.nio.file.FileVisitor
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.attribute.BasicFileAttributes
import java.util.stream.Stream

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -93,70 +91,32 @@ class DefaultLinStore implements LinStore {
void close() throws IOException {}

@Override
Map<String, LinSerializable> search(Map<String, List<String>> params) {
final results = new HashMap<String, LinSerializable>()

Files.walkFileTree(location, new FileVisitor<Path>() {

@Override
FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
FileVisitResult.CONTINUE
}

@Override
FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if( file.name.startsWith('.data.json') ) {
final lidObject = encoder.decode(file.text)
if( LinUtils.checkParams(lidObject, params) ) {
results.put(location.relativize(file.getParent()).toString(), lidObject as LinSerializable)
}
}
FileVisitResult.CONTINUE
Stream<String> search(Map<String, List<String>> params) {
return Files.walk(location)
.filter { Path path ->
Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json')
}

@Override
FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
FileVisitResult.CONTINUE
.map { Path path ->
final obj = encoder.decode(path.text)
final key = location.relativize(path.parent).toString()
return new AbstractMap.SimpleEntry<String, LinSerializable>(key, obj)
}

@Override
FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
FileVisitResult.CONTINUE
.filter { entry ->
LinUtils.checkParams(entry.value, params)
}
})

return results
.map {it-> it.key }
}

@Override
List<String> getSubKeys(String parentKey) {
final results = new LinkedList<String>()
Stream<String> getSubKeys(String parentKey) {
final startPath = location.resolve(parentKey)
Files.walkFileTree(startPath, new FileVisitor<Path>() {

@Override
FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
FileVisitResult.CONTINUE
}

@Override
FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if( file.name.startsWith('.data.json') && file.parent != startPath ) {
results << location.relativize(file.parent).toString()
}
FileVisitResult.CONTINUE
return Files.walk(startPath)
.filter { Path path ->
Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') && path.parent != startPath
}

@Override
FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
FileVisitResult.CONTINUE
}

@Override
FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
FileVisitResult.CONTINUE
.map { Path path ->
location.relativize(path.parent).toString()
}
})
return results
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import nextflow.Session
import nextflow.extension.LinExtension
import nextflow.lineage.fs.LinPathFactory
import nextflow.lineage.model.v1beta1.FileOutput
import nextflow.lineage.serde.LinSerializable

import static nextflow.lineage.fs.LinPath.*

Expand All @@ -43,7 +42,9 @@ class LinExtensionImpl implements LinExtension {
log.trace("Querying lineage with params: $queryParams")
new LinPropertyValidator().validateQueryParams(queryParams.keySet())
final store = getStore(session)
emitSearchResults(channel, store.search(queryParams))
try( def stream = store.search(queryParams) ){
stream.forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
}
channel.bind(Channel.STOP)
}

Expand All @@ -62,19 +63,11 @@ class LinExtensionImpl implements LinExtension {
return queryParams
}


protected LinStore getStore(Session session) {
final store = LinStoreFactory.getOrCreate(session)
if( !store ) {
throw new Exception("Lineage store not found - Check Nextflow configuration")
}
return store
}

private void emitSearchResults(DataflowWriteChannel channel, Map<String, LinSerializable> results) {
if( !results ) {
return
}
results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
}
}
11 changes: 6 additions & 5 deletions modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.lineage

import com.google.common.annotations.Beta
import groovy.transform.CompileStatic
import java.util.stream.Stream
import nextflow.lineage.config.LineageConfig
import nextflow.lineage.serde.LinSerializable
/**
Expand Down Expand Up @@ -58,18 +59,18 @@ interface LinStore extends Closeable {
/**
* Search for lineage entries.
* @param params Map of query params
* @return Key-lineage entry pairs fulfilling the query params
* @return Stream with keys fulfilling the query params
*/
Map<String,LinSerializable> search(Map<String, List<String>> params)
Stream<String> search(Map<String, List<String>> params)

/**
* Search for keys starting with a parent key.
* For example, if a LinStore contains the following keys: '123abc', '123abc/samples/file1.txt' and '123abc/summary',
* The execution of the function with parentKey='123abc' will return a list with '123abc/samples/file1.txt' and '123abc/summary'.
* The execution of the function with parentKey='123abc' will return a stream with '123abc/samples/file1.txt' and '123abc/summary'.
* Similarly, the execution of the function with parentKey='123abc/samples' will just return '123abc/samples/file1.txt"
*
* @param parentKey
* @return list of keys
* @return Stream of keys starting with parentKey
*/
List<String> getSubKeys(String parentKey)
Stream<String> getSubKeys(String parentKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ class LinCommandImpl implements CmdLineage.LinCommand {
try {
final params = parseFindArgs(args)
new LinPropertyValidator().validateQueryParams(params.keySet())
println LinUtils.encodeSearchOutputs( store.search(params).keySet().collect { asUriString(it) }, true )
try (def stream = store.search(params) ) {
stream.forEach { println asUriString(it) }
}
} catch (Throwable e){
println "Error searching for ${args[0]}. ${e.message}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import java.nio.file.spi.FileSystemProvider
import groovy.transform.CompileStatic
import nextflow.lineage.config.LineageConfig
import nextflow.util.TestOnly

/**
* File System Provider for LID Paths
*
Expand Down Expand Up @@ -216,16 +215,19 @@ class LinFileSystemProvider extends FileSystemProvider {
return getDirectoryStreamFromSubPath(lid)
return getDirectoryStreamFromRealPath(real, lid)
}

private static DirectoryStream<Path> getDirectoryStreamFromSubPath(LinPath lid){
List<Path> paths = lid.getSubPaths()
final paths = lid.getSubPaths()
if( !paths )
throw new FileNotFoundException("Sub paths for '$lid' do not exist")
return new DirectoryStream<Path>() {
Iterator<Path> iterator() {
return paths.iterator()
}

void close() {}
void close() {
paths.close()
}
}
}
private DirectoryStream<Path> getDirectoryStreamFromRealPath(Path real, LinPath lid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.nio.file.WatchEvent
import java.nio.file.WatchKey
import java.nio.file.WatchService
import java.time.OffsetDateTime
import java.util.stream.Stream

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
Expand Down Expand Up @@ -156,15 +157,15 @@ class LinPath implements Path, LogicalDataPath {
@TestOnly
protected String getFilePath() { this.filePath }

protected List<Path> getSubPaths(){
protected Stream<Path> getSubPaths(){
if( !fileSystem )
throw new IllegalArgumentException("Cannot get sub-paths for a relative lineage path")
if( filePath.isEmpty() || filePath == SEPARATOR )
throw new IllegalArgumentException("Cannot get sub-paths for an empty lineage path (lid:///)")
final store = fileSystem.getStore()
if( !store )
throw new Exception("Lineage store not found - Check Nextflow configuration")
return store.getSubKeys(filePath).collect {new LinPath(fileSystem as LinFileSystem, it)} as List<Path>
return store.getSubKeys(filePath).map {new LinPath(fileSystem as LinFileSystem, it) as Path }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,10 @@ class DefaultLinStoreTest extends Specification {
lidStore.save(key4, value4)

when:
def results = lidStore.search( [type:['FileOutput'], labels:['value2']])
def results = lidStore.search( [type:['FileOutput'], labels:['value2']]).toList()
then:
results.size() == 2
results.keySet().containsAll([key2,key3])
results[key2] == value2
results[key3] == value3
results.containsAll([key2,key3])
}

def 'should search subkeys' () {
Expand All @@ -154,7 +152,7 @@ class DefaultLinStoreTest extends Specification {
lidStore.save(key4, value4)

when:
def results = lidStore.getSubKeys("testKey")
def results = lidStore.getSubKeys("testKey").toList()
then:
results.size() == 2
results.containsAll([key2, key3])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ class LinCommandImplTest extends Specification{
"lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, ["experiment=test"])
def entry3 = new FileOutput("path/to/file3",new Checksum("42472qet","nextflow","standard"),
"lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, null)
def expectedOutput1 = '[\n "lid://123987/file.bam",\n "lid://123987/file2.bam"\n]'
def expectedOutput2 = '[\n "lid://123987/file2.bam",\n "lid://123987/file.bam"\n]'
def expectedOutput1 = 'lid://123987/file.bam\nlid://123987/file2.bam'
def expectedOutput2 = 'lid://123987/file2.bam\nlid://123987/file.bam'
lidFile.text = encoder.encode(entry)
lidFile2.text = encoder.encode(entry2)
lidFile3.text = encoder.encode(entry3)
Expand Down