@@ -22,6 +22,7 @@ import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
22
22
import { getMaxDuration } from "../utils/maxDuration" ;
23
23
import { DevSubscriber , devPubSub } from "./devPubSub.server" ;
24
24
import { findQueueInEnvironment , sanitizeQueueName } from "~/models/taskQueue.server" ;
25
+ import { createRedisClient , RedisClient } from "~/redis.server" ;
25
26
26
27
const MessageBody = z . discriminatedUnion ( "type" , [
27
28
z . object ( {
@@ -53,14 +54,21 @@ export class DevQueueConsumer {
53
54
private _currentSpan : Span | undefined ;
54
55
private _endSpanInNextIteration = false ;
55
56
private _inProgressRuns : Map < string , string > = new Map ( ) ; // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
57
+ private _connectionLostAt ?: Date ;
58
+ private _redisClient : RedisClient ;
56
59
57
60
constructor (
61
+ public id : string ,
58
62
public env : AuthenticatedEnvironment ,
59
63
private _sender : ZodMessageSender < typeof serverWebsocketMessages > ,
60
64
private _options : DevQueueConsumerOptions = { }
61
65
) {
62
66
this . _traceTimeoutSeconds = _options . traceTimeoutSeconds ?? 60 ;
63
67
this . _maximumItemsPerTrace = _options . maximumItemsPerTrace ?? 1_000 ;
68
+ this . _redisClient = createRedisClient ( "tr:devQueueConsumer" , {
69
+ keyPrefix : "tr:devQueueConsumer:" ,
70
+ ...devPubSub . redisOptions ,
71
+ } ) ;
64
72
}
65
73
66
74
// This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it
@@ -235,6 +243,8 @@ export class DevQueueConsumer {
235
243
return ;
236
244
}
237
245
246
+ await this . _redisClient . set ( `connection:${ this . env . id } ` , this . id ) ;
247
+
238
248
this . _enabled = true ;
239
249
// Create the session
240
250
await createNewSession ( this . env , this . _options . ipAddress ?? "unknown" ) ;
@@ -252,6 +262,37 @@ export class DevQueueConsumer {
252
262
return ;
253
263
}
254
264
265
+ const canSendMessage = await this . _sender . validateCanSendMessage ( ) ;
266
+
267
+ if ( ! canSendMessage ) {
268
+ this . _connectionLostAt ??= new Date ( ) ;
269
+
270
+ if ( Date . now ( ) - this . _connectionLostAt . getTime ( ) > 60 * 1000 ) {
271
+ logger . debug ( "Connection lost for more than 60 seconds, stopping the consumer" , {
272
+ env : this . env ,
273
+ } ) ;
274
+
275
+ await this . stop ( "Connection lost for more than 60 seconds" ) ;
276
+ return ;
277
+ }
278
+
279
+ setTimeout ( ( ) => this . #doWork( ) , 1000 ) ;
280
+ }
281
+
282
+ this . _connectionLostAt = undefined ;
283
+
284
+ const currentConnection = await this . _redisClient . get ( `connection:${ this . env . id } ` ) ;
285
+
286
+ if ( currentConnection !== this . id ) {
287
+ logger . debug ( "Another connection is active, stopping the consumer" , {
288
+ currentConnection,
289
+ env : this . env ,
290
+ } ) ;
291
+
292
+ await this . stop ( "Another connection is active" ) ;
293
+ return ;
294
+ }
295
+
255
296
// Check if the trace has expired
256
297
if (
257
298
this . _perTraceCountdown === 0 ||
0 commit comments