Skip to content

Commit e4af230

Browse files
jorgeepditommaso
andauthored
Return streams in Lineage search and subkeys methods (#6067) [ci fast]
Signed-off-by: jorgee <[email protected]> Signed-off-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent 2c9dd9b commit e4af230

File tree

9 files changed

+45
-88
lines changed

9 files changed

+45
-88
lines changed

modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ class CmdLineageTest extends Specification {
279279
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
280280
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, ['foo', 'bar'])
281281
def jsonSer = encoder.encode(entry)
282-
def expectedOutput = '[\n "lid://12345"\n]'
282+
def expectedOutput = 'lid://12345'
283283
lidFile.text = jsonSer
284284
when:
285285
def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput", "label=foo"])

modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy

+19-59
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616

1717
package nextflow.lineage
1818

19-
import java.nio.file.FileVisitResult
20-
import java.nio.file.FileVisitor
2119
import java.nio.file.Files
2220
import java.nio.file.Path
23-
import java.nio.file.attribute.BasicFileAttributes
21+
import java.util.stream.Stream
2422

2523
import groovy.transform.CompileStatic
2624
import groovy.util.logging.Slf4j
@@ -93,70 +91,32 @@ class DefaultLinStore implements LinStore {
9391
void close() throws IOException {}
9492

9593
@Override
96-
Map<String, LinSerializable> search(Map<String, List<String>> params) {
97-
final results = new HashMap<String, LinSerializable>()
98-
99-
Files.walkFileTree(location, new FileVisitor<Path>() {
100-
101-
@Override
102-
FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
103-
FileVisitResult.CONTINUE
104-
}
105-
106-
@Override
107-
FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
108-
if( file.name.startsWith('.data.json') ) {
109-
final lidObject = encoder.decode(file.text)
110-
if( LinUtils.checkParams(lidObject, params) ) {
111-
results.put(location.relativize(file.getParent()).toString(), lidObject as LinSerializable)
112-
}
113-
}
114-
FileVisitResult.CONTINUE
94+
Stream<String> search(Map<String, List<String>> params) {
95+
return Files.walk(location)
96+
.filter { Path path ->
97+
Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json')
11598
}
116-
117-
@Override
118-
FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
119-
FileVisitResult.CONTINUE
99+
.map { Path path ->
100+
final obj = encoder.decode(path.text)
101+
final key = location.relativize(path.parent).toString()
102+
return new AbstractMap.SimpleEntry<String, LinSerializable>(key, obj)
120103
}
121-
122-
@Override
123-
FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
124-
FileVisitResult.CONTINUE
104+
.filter { entry ->
105+
LinUtils.checkParams(entry.value, params)
125106
}
126-
})
127-
128-
return results
107+
.map {it-> it.key }
129108
}
130109

131110
@Override
132-
List<String> getSubKeys(String parentKey) {
133-
final results = new LinkedList<String>()
111+
Stream<String> getSubKeys(String parentKey) {
134112
final startPath = location.resolve(parentKey)
135-
Files.walkFileTree(startPath, new FileVisitor<Path>() {
136-
137-
@Override
138-
FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
139-
FileVisitResult.CONTINUE
140-
}
141113

142-
@Override
143-
FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
144-
if( file.name.startsWith('.data.json') && file.parent != startPath ) {
145-
results << location.relativize(file.parent).toString()
146-
}
147-
FileVisitResult.CONTINUE
114+
return Files.walk(startPath)
115+
.filter { Path path ->
116+
Files.isRegularFile(path) && path.fileName.toString().startsWith('.data.json') && path.parent != startPath
148117
}
149-
150-
@Override
151-
FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
152-
FileVisitResult.CONTINUE
153-
}
154-
155-
@Override
156-
FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
157-
FileVisitResult.CONTINUE
118+
.map { Path path ->
119+
location.relativize(path.parent).toString()
158120
}
159-
})
160-
return results
161121
}
162-
}
122+
}

modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy

+3-10
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import nextflow.Session
2424
import nextflow.extension.LinExtension
2525
import nextflow.lineage.fs.LinPathFactory
2626
import nextflow.lineage.model.v1beta1.FileOutput
27-
import nextflow.lineage.serde.LinSerializable
2827

2928
import static nextflow.lineage.fs.LinPath.*
3029

@@ -43,7 +42,9 @@ class LinExtensionImpl implements LinExtension {
4342
log.trace("Querying lineage with params: $queryParams")
4443
new LinPropertyValidator().validateQueryParams(queryParams.keySet())
4544
final store = getStore(session)
46-
emitSearchResults(channel, store.search(queryParams))
45+
try( def stream = store.search(queryParams) ){
46+
stream.forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
47+
}
4748
channel.bind(Channel.STOP)
4849
}
4950

@@ -62,19 +63,11 @@ class LinExtensionImpl implements LinExtension {
6263
return queryParams
6364
}
6465

65-
6666
protected LinStore getStore(Session session) {
6767
final store = LinStoreFactory.getOrCreate(session)
6868
if( !store ) {
6969
throw new Exception("Lineage store not found - Check Nextflow configuration")
7070
}
7171
return store
7272
}
73-
74-
private void emitSearchResults(DataflowWriteChannel channel, Map<String, LinSerializable> results) {
75-
if( !results ) {
76-
return
77-
}
78-
results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
79-
}
8073
}

modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy

+6-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package nextflow.lineage
1818

1919
import com.google.common.annotations.Beta
2020
import groovy.transform.CompileStatic
21+
import java.util.stream.Stream
2122
import nextflow.lineage.config.LineageConfig
2223
import nextflow.lineage.serde.LinSerializable
2324
/**
@@ -58,18 +59,18 @@ interface LinStore extends Closeable {
5859
/**
5960
* Search for lineage entries.
6061
* @param params Map of query params
61-
* @return Key-lineage entry pairs fulfilling the query params
62+
* @return Stream with keys fulfilling the query params
6263
*/
63-
Map<String,LinSerializable> search(Map<String, List<String>> params)
64+
Stream<String> search(Map<String, List<String>> params)
6465

6566
/**
6667
* Search for keys starting with a parent key.
6768
* For example, if a LinStore contains the following keys: '123abc', '123abc/samples/file1.txt' and '123abc/summary',
68-
* The execution of the function with parentKey='123abc' will return a list with '123abc/samples/file1.txt' and '123abc/summary'.
69+
* The execution of the function with parentKey='123abc' will return a stream with '123abc/samples/file1.txt' and '123abc/summary'.
6970
* Similarly, the execution of the function with parentKey='123abc/samples' will just return '123abc/samples/file1.txt"
7071
*
7172
* @param parentKey
72-
* @return list of keys
73+
* @return Stream of keys starting with parentKey
7374
*/
74-
List<String> getSubKeys(String parentKey)
75+
Stream<String> getSubKeys(String parentKey)
7576
}

modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy

+3-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ class LinCommandImpl implements CmdLineage.LinCommand {
184184
try {
185185
final params = parseFindArgs(args)
186186
new LinPropertyValidator().validateQueryParams(params.keySet())
187-
println LinUtils.encodeSearchOutputs( store.search(params).keySet().collect { asUriString(it) }, true )
187+
try (def stream = store.search(params) ) {
188+
stream.forEach { println asUriString(it) }
189+
}
188190
} catch (Throwable e){
189191
println "Error searching for ${args[0]}. ${e.message}"
190192
}

modules/nf-lineage/src/main/nextflow/lineage/fs/LinFileSystemProvider.groovy

+5-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import java.nio.file.spi.FileSystemProvider
3939
import groovy.transform.CompileStatic
4040
import nextflow.lineage.config.LineageConfig
4141
import nextflow.util.TestOnly
42-
4342
/**
4443
* File System Provider for LID Paths
4544
*
@@ -216,16 +215,19 @@ class LinFileSystemProvider extends FileSystemProvider {
216215
return getDirectoryStreamFromSubPath(lid)
217216
return getDirectoryStreamFromRealPath(real, lid)
218217
}
218+
219219
private static DirectoryStream<Path> getDirectoryStreamFromSubPath(LinPath lid){
220-
List<Path> paths = lid.getSubPaths()
220+
final paths = lid.getSubPaths()
221221
if( !paths )
222222
throw new FileNotFoundException("Sub paths for '$lid' do not exist")
223223
return new DirectoryStream<Path>() {
224224
Iterator<Path> iterator() {
225225
return paths.iterator()
226226
}
227227

228-
void close() {}
228+
void close() {
229+
paths.close()
230+
}
229231
}
230232
}
231233
private DirectoryStream<Path> getDirectoryStreamFromRealPath(Path real, LinPath lid) {

modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import java.nio.file.WatchEvent
2727
import java.nio.file.WatchKey
2828
import java.nio.file.WatchService
2929
import java.time.OffsetDateTime
30+
import java.util.stream.Stream
3031

3132
import groovy.transform.CompileStatic
3233
import groovy.transform.EqualsAndHashCode
@@ -156,15 +157,15 @@ class LinPath implements Path, LogicalDataPath {
156157
@TestOnly
157158
protected String getFilePath() { this.filePath }
158159

159-
protected List<Path> getSubPaths(){
160+
protected Stream<Path> getSubPaths(){
160161
if( !fileSystem )
161162
throw new IllegalArgumentException("Cannot get sub-paths for a relative lineage path")
162163
if( filePath.isEmpty() || filePath == SEPARATOR )
163164
throw new IllegalArgumentException("Cannot get sub-paths for an empty lineage path (lid:///)")
164165
final store = fileSystem.getStore()
165166
if( !store )
166167
throw new Exception("Lineage store not found - Check Nextflow configuration")
167-
return store.getSubKeys(filePath).collect {new LinPath(fileSystem as LinFileSystem, it)} as List<Path>
168+
return store.getSubKeys(filePath).map {new LinPath(fileSystem as LinFileSystem, it) as Path }
168169
}
169170

170171
/**

modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy

+3-5
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,10 @@ class DefaultLinStoreTest extends Specification {
123123
lidStore.save(key4, value4)
124124

125125
when:
126-
def results = lidStore.search( [type:['FileOutput'], labels:['value2']])
126+
def results = lidStore.search( [type:['FileOutput'], labels:['value2']]).toList()
127127
then:
128128
results.size() == 2
129-
results.keySet().containsAll([key2,key3])
130-
results[key2] == value2
131-
results[key3] == value3
129+
results.containsAll([key2,key3])
132130
}
133131

134132
def 'should search subkeys' () {
@@ -154,7 +152,7 @@ class DefaultLinStoreTest extends Specification {
154152
lidStore.save(key4, value4)
155153

156154
when:
157-
def results = lidStore.getSubKeys("testKey")
155+
def results = lidStore.getSubKeys("testKey").toList()
158156
then:
159157
results.size() == 2
160158
results.containsAll([key2, key3])

modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy

+2-2
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,8 @@ class LinCommandImplTest extends Specification{
488488
"lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, ["experiment=test"])
489489
def entry3 = new FileOutput("path/to/file3",new Checksum("42472qet","nextflow","standard"),
490490
"lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, null)
491-
def expectedOutput1 = '[\n "lid://123987/file.bam",\n "lid://123987/file2.bam"\n]'
492-
def expectedOutput2 = '[\n "lid://123987/file2.bam",\n "lid://123987/file.bam"\n]'
491+
def expectedOutput1 = 'lid://123987/file.bam\nlid://123987/file2.bam'
492+
def expectedOutput2 = 'lid://123987/file2.bam\nlid://123987/file.bam'
493493
lidFile.text = encoder.encode(entry)
494494
lidFile2.text = encoder.encode(entry2)
495495
lidFile3.text = encoder.encode(entry3)

0 commit comments

Comments
 (0)