3
3
const core = require ( 'datastore-core' )
4
4
const ShardingStore = core . ShardingDatastore
5
5
const Block = require ( 'ipld-block' )
6
- const { cidToKey, keyToCid } = require ( './blockstore-utils' )
6
+ const { cidToKey } = require ( './blockstore-utils' )
7
7
const map = require ( 'it-map' )
8
- const pipe = require ( 'it-pipe' )
8
+ const drain = require ( 'it-drain' )
9
+ const pushable = require ( 'it-pushable' )
9
10
10
11
module . exports = async ( filestore , options ) => {
11
12
const store = await maybeWithSharding ( filestore , options )
@@ -23,7 +24,7 @@ function maybeWithSharding (filestore, options) {
23
24
function createBaseStore ( store ) {
24
25
return {
25
26
/**
26
- * Query the store.
27
+ * Query the store
27
28
*
28
29
* @param {Object } query
29
30
* @param {Object } options
@@ -32,38 +33,23 @@ function createBaseStore (store) {
32
33
async * query ( query , options ) { // eslint-disable-line require-await
33
34
yield * store . query ( query , options )
34
35
} ,
36
+
35
37
/**
36
- * Get a single block by CID.
38
+ * Get a single block by CID
37
39
*
38
40
* @param {CID } cid
39
41
* @param {Object } options
40
42
* @returns {Promise<Block> }
41
43
*/
42
44
async get ( cid , options ) {
43
45
const key = cidToKey ( cid )
44
- let blockData
45
- try {
46
- blockData = await store . get ( key , options )
47
- return new Block ( blockData , cid )
48
- } catch ( err ) {
49
- if ( err . code === 'ERR_NOT_FOUND' ) {
50
- const otherCid = cidToOtherVersion ( cid )
51
-
52
- if ( ! otherCid ) {
53
- throw err
54
- }
55
-
56
- const otherKey = cidToKey ( otherCid )
57
- const blockData = await store . get ( otherKey , options )
58
- await store . put ( key , blockData )
59
- return new Block ( blockData , cid )
60
- }
46
+ const blockData = await store . get ( key , options )
61
47
62
- throw err
63
- }
48
+ return new Block ( blockData , cid )
64
49
} ,
50
+
65
51
/**
66
- * Like get, but for more.
52
+ * Like get, but for more
67
53
*
68
54
* @param {AsyncIterator<CID> } cids
69
55
* @param {Object } options
@@ -74,8 +60,9 @@ function createBaseStore (store) {
74
60
yield this . get ( cid , options )
75
61
}
76
62
} ,
63
+
77
64
/**
78
- * Write a single block to the store.
65
+ * Write a single block to the store
79
66
*
80
67
* @param {Block } block
81
68
* @param {Object } options
@@ -86,59 +73,75 @@ function createBaseStore (store) {
86
73
throw new Error ( 'invalid block' )
87
74
}
88
75
89
- const exists = await this . has ( block . cid )
76
+ const key = cidToKey ( block . cid )
77
+ const exists = await store . has ( key , options )
90
78
91
- if ( exists ) {
92
- return this . get ( block . cid , options )
79
+ if ( ! exists ) {
80
+ await store . put ( key , block . data , options )
93
81
}
94
82
95
- await store . put ( cidToKey ( block . cid ) , block . data , options )
96
-
97
83
return block
98
84
} ,
99
85
100
86
/**
101
- * Like put, but for more.
87
+ * Like put, but for more
102
88
*
103
89
* @param {AsyncIterable<Block>|Iterable<Block> } blocks
104
90
* @param {Object } options
105
91
* @returns {AsyncIterable<Block> }
106
92
*/
107
93
async * putMany ( blocks , options ) { // eslint-disable-line require-await
108
- yield * pipe (
109
- blocks ,
110
- ( source ) => {
111
- // turn them into a key/value pair
112
- return map ( source , ( block ) => {
113
- return { key : cidToKey ( block . cid ) , value : block . data }
114
- } )
115
- } ,
116
- ( source ) => {
117
- // put them into the datastore
118
- return store . putMany ( source , options )
119
- } ,
120
- ( source ) => {
121
- // map the returned key/value back into a block
122
- return map ( source , ( { key, value } ) => {
123
- return new Block ( value , keyToCid ( key ) )
124
- } )
94
+ // we cannot simply chain to `store.putMany` because we convert a CID into
95
+ // a key based on the multihash only, so we lose the version & codec and
96
+ // cannot give the user back the CID they used to create the block, so yield
97
+ // to `store.putMany` but return the actual block the user passed in.
98
+ //
99
+ // nb. we want to use `store.putMany` here so bitswap can control batching
100
+ // up block HAVEs to send to the network - if we use multiple `store.put`s
101
+ // it will not be able to guess we are about to `store.put` more blocks
102
+ const output = pushable ( )
103
+
104
+ // process.nextTick runs on the microtask queue, setImmediate runs on the next
105
+ // event loop iteration so is slower. Use process.nextTick if it is available.
106
+ const runner = process && process . nextTick ? process . nextTick : setImmediate
107
+
108
+ runner ( async ( ) => {
109
+ try {
110
+ await drain ( store . putMany ( async function * ( ) {
111
+ for await ( const block of blocks ) {
112
+ const key = cidToKey ( block . cid )
113
+ const exists = await store . has ( key , options )
114
+
115
+ if ( ! exists ) {
116
+ yield { key, value : block . data }
117
+ }
118
+
119
+ // there is an assumption here that after the yield has completed
120
+ // the underlying datastore has finished writing the block
121
+ output . push ( block )
122
+ }
123
+ } ( ) ) )
124
+
125
+ output . end ( )
126
+ } catch ( err ) {
127
+ output . end ( err )
125
128
}
126
- )
129
+ } )
130
+
131
+ yield * output
127
132
} ,
133
+
128
134
/**
129
- * Does the store contain block with this cid ?
135
+ * Does the store contain block with this CID ?
130
136
*
131
137
* @param {CID } cid
132
138
* @param {Object } options
133
139
* @returns {Promise<bool> }
134
140
*/
135
- async has ( cid , options ) {
136
- const exists = await store . has ( cidToKey ( cid ) , options )
137
- if ( exists ) return exists
138
- const otherCid = cidToOtherVersion ( cid )
139
- if ( ! otherCid ) return false
140
- return store . has ( cidToKey ( otherCid ) , options )
141
+ async has ( cid , options ) { // eslint-disable-line require-await
142
+ return store . has ( cidToKey ( cid ) , options )
141
143
} ,
144
+
142
145
/**
143
146
* Delete a block from the store
144
147
*
@@ -149,6 +152,7 @@ function createBaseStore (store) {
149
152
async delete ( cid , options ) { // eslint-disable-line require-await
150
153
return store . delete ( cidToKey ( cid ) , options )
151
154
} ,
155
+
152
156
/**
153
157
* Delete a block from the store
154
158
*
@@ -157,12 +161,9 @@ function createBaseStore (store) {
157
161
* @returns {Promise<void> }
158
162
*/
159
163
async * deleteMany ( cids , options ) { // eslint-disable-line require-await
160
- yield * store . deleteMany ( ( async function * ( ) {
161
- for await ( const cid of cids ) {
162
- yield cidToKey ( cid )
163
- }
164
- } ( ) ) , options )
164
+ yield * store . deleteMany ( map ( cids , cid => cidToKey ( cid ) ) , options )
165
165
} ,
166
+
166
167
/**
167
168
* Close the store
168
169
*
@@ -173,11 +174,3 @@ function createBaseStore (store) {
173
174
}
174
175
}
175
176
}
176
-
177
- function cidToOtherVersion ( cid ) {
178
- try {
179
- return cid . version === 0 ? cid . toV1 ( ) : cid . toV0 ( )
180
- } catch ( err ) {
181
- return null
182
- }
183
- }
0 commit comments