Skip to content

Commit a549da7

Browse files
committed
fix: cleanup after streaming in sap hana
1 parent b9842e3 commit a549da7

File tree

2 files changed

+68
-19
lines changed

2 files changed

+68
-19
lines changed

src/driver/sap/SapQueryRunner.ts

+64-18
Original file line numberDiff line numberDiff line change
@@ -208,20 +208,25 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
208208
const isInsertQuery = query.substr(0, 11) === "INSERT INTO"
209209

210210
if (parameters?.some(Array.isArray)) {
211-
statement = await promisify(
212-
databaseConnection.prepare.bind(databaseConnection),
213-
)(query)
211+
statement = await promisify(databaseConnection.prepare).call(
212+
databaseConnection,
213+
query,
214+
)
214215
}
215216

216217
let raw: any
217218
try {
218219
raw = statement
219-
? await promisify(statement.exec.bind(statement))(
220+
? await promisify(statement.exec).call(
221+
statement,
220222
parameters,
221223
)
222-
: await promisify(
223-
databaseConnection.exec.bind(databaseConnection),
224-
)(query, parameters, {})
224+
: await promisify(databaseConnection.exec).call(
225+
databaseConnection,
226+
query,
227+
parameters,
228+
{},
229+
)
225230
} catch (err) {
226231
throw new QueryFailedError(query, parameters, err)
227232
}
@@ -333,20 +338,61 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
333338
): Promise<ReadStream> {
334339
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
335340

336-
const databaseConnection = await this.connect()
337-
this.driver.connection.logger.logQuery(query, parameters, this)
341+
const release = await this.lock.acquire()
342+
let statement: any
343+
let resultSet: any
338344

339-
const prepareAsync = promisify(databaseConnection.prepare).bind(
340-
databaseConnection,
341-
)
342-
const statement = await prepareAsync(query)
343-
const resultSet = statement.executeQuery(parameters)
344-
const stream = this.driver.streamClient.createObjectStream(resultSet)
345+
const cleanup = async () => {
346+
if (resultSet) {
347+
await promisify(resultSet.close).call(resultSet)
348+
}
349+
if (statement) {
350+
await promisify(statement.drop).call(statement)
351+
}
352+
release()
353+
}
345354

346-
if (onEnd) stream.on("end", onEnd)
347-
if (onError) stream.on("error", onError)
355+
try {
356+
const databaseConnection = await this.connect()
357+
this.driver.connection.logger.logQuery(query, parameters, this)
348358

349-
return stream
359+
statement = await promisify(databaseConnection.prepare).call(
360+
databaseConnection,
361+
query,
362+
)
363+
resultSet = await promisify(statement.executeQuery).call(
364+
statement,
365+
parameters,
366+
)
367+
368+
const stream =
369+
this.driver.streamClient.createObjectStream(resultSet)
370+
stream.on("end", async () => {
371+
await cleanup()
372+
onEnd?.()
373+
})
374+
stream.on("error", async (error: Error) => {
375+
this.driver.connection.logger.logQueryError(
376+
error,
377+
query,
378+
parameters,
379+
this,
380+
)
381+
await cleanup()
382+
onError?.(error)
383+
})
384+
385+
return stream
386+
} catch (error) {
387+
this.driver.connection.logger.logQueryError(
388+
error,
389+
query,
390+
parameters,
391+
this,
392+
)
393+
await cleanup()
394+
throw new QueryFailedError(query, parameters, error)
395+
}
350396
}
351397

352398
/**

test/functional/query-runner/stream.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ describe("query runner > stream", () => {
5252

5353
readStream.on("data", (row) => data.push(row))
5454

55-
await new Promise((ok) => readStream.once("end", ok))
55+
await new Promise((ok, fail) => {
56+
readStream.once("end", ok)
57+
readStream.once("error", fail)
58+
})
5659

5760
expect(data).to.have.length(4)
5861

0 commit comments

Comments
 (0)