1
1
import { Prisma , TaskSchedule } from "@trigger.dev/database" ;
2
2
import cronstrue from "cronstrue" ;
3
3
import { nanoid } from "nanoid" ;
4
- import { $transaction , PrismaClientOrTransaction } from "~/db.server" ;
4
+ import { $transaction } from "~/db.server" ;
5
5
import { generateFriendlyId } from "../friendlyIdentifiers" ;
6
6
import { UpsertSchedule } from "../schedules" ;
7
7
import { calculateNextScheduledTimestamp } from "../utils/calculateNextSchedule.server" ;
@@ -31,124 +31,45 @@ export class UpsertTaskScheduleService extends BaseService {
31
31
const checkSchedule = new CheckScheduleService ( this . _prisma ) ;
32
32
await checkSchedule . call ( projectId , schedule ) ;
33
33
34
- const result = await $transaction ( this . _prisma , async ( tx ) => {
35
- const deduplicationKey =
36
- typeof schedule . deduplicationKey === "string" && schedule . deduplicationKey !== ""
37
- ? schedule . deduplicationKey
38
- : nanoid ( 24 ) ;
34
+ const deduplicationKey =
35
+ typeof schedule . deduplicationKey === "string" && schedule . deduplicationKey !== ""
36
+ ? schedule . deduplicationKey
37
+ : nanoid ( 24 ) ;
39
38
40
- const existingSchedule = schedule . friendlyId
41
- ? await tx . taskSchedule . findUnique ( {
42
- where : {
43
- friendlyId : schedule . friendlyId ,
44
- } ,
45
- } )
46
- : await tx . taskSchedule . findUnique ( {
47
- where : {
48
- projectId_deduplicationKey : {
49
- projectId,
50
- deduplicationKey,
51
- } ,
39
+ const existingSchedule = schedule . friendlyId
40
+ ? await this . _prisma . taskSchedule . findUnique ( {
41
+ where : {
42
+ friendlyId : schedule . friendlyId ,
43
+ } ,
44
+ } )
45
+ : await this . _prisma . taskSchedule . findUnique ( {
46
+ where : {
47
+ projectId_deduplicationKey : {
48
+ projectId,
49
+ deduplicationKey,
52
50
} ,
53
- } ) ;
51
+ } ,
52
+ } ) ;
54
53
54
+ const result = await ( async ( tx ) => {
55
55
if ( existingSchedule ) {
56
56
if ( existingSchedule . type === "DECLARATIVE" ) {
57
57
throw new ServiceValidationError ( "Cannot update a declarative schedule" ) ;
58
58
}
59
59
60
- return await this . #updateExistingSchedule( tx , existingSchedule , schedule , projectId ) ;
60
+ return await this . #updateExistingSchedule( existingSchedule , schedule ) ;
61
61
} else {
62
- return await this . #createNewSchedule( tx , schedule , projectId , deduplicationKey ) ;
62
+ return await this . #createNewSchedule( schedule , projectId , deduplicationKey ) ;
63
63
}
64
- } ) ;
64
+ } ) ( ) ;
65
65
66
66
if ( ! result ) {
67
- throw new Error ( "Failed to create or update the schedule" ) ;
68
- }
69
-
70
- const { scheduleRecord, instances } = result ;
71
-
72
- return this . #createReturnObject( scheduleRecord , instances ) ;
73
- }
74
-
75
- async #createNewSchedule(
76
- tx : PrismaClientOrTransaction ,
77
- options : UpsertTaskScheduleServiceOptions ,
78
- projectId : string ,
79
- deduplicationKey : string
80
- ) {
81
- const scheduleRecord = await tx . taskSchedule . create ( {
82
- data : {
83
- projectId,
84
- friendlyId : generateFriendlyId ( "sched" ) ,
85
- taskIdentifier : options . taskIdentifier ,
86
- deduplicationKey,
87
- userProvidedDeduplicationKey :
88
- options . deduplicationKey !== undefined && options . deduplicationKey !== "" ,
89
- generatorExpression : options . cron ,
90
- generatorDescription : cronstrue . toString ( options . cron ) ,
91
- timezone : options . timezone ?? "UTC" ,
92
- externalId : options . externalId ? options . externalId : undefined ,
93
- } ,
94
- } ) ;
95
-
96
- const registerNextService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
97
-
98
- //create the instances (links to environments)
99
- let instances : InstanceWithEnvironment [ ] = [ ] ;
100
- for ( const environmentId of options . environments ) {
101
- const instance = await tx . taskScheduleInstance . create ( {
102
- data : {
103
- taskScheduleId : scheduleRecord . id ,
104
- environmentId,
105
- } ,
106
- include : {
107
- environment : {
108
- include : {
109
- orgMember : {
110
- include : {
111
- user : true ,
112
- } ,
113
- } ,
114
- } ,
115
- } ,
116
- } ,
117
- } ) ;
118
-
119
- await registerNextService . call ( instance . id ) ;
120
-
121
- instances . push ( instance ) ;
67
+ throw new ServiceValidationError ( "Failed to create or update schedule" ) ;
122
68
}
123
69
124
- return { scheduleRecord, instances } ;
125
- }
70
+ const { scheduleRecord } = result ;
126
71
127
- async #updateExistingSchedule(
128
- tx : PrismaClientOrTransaction ,
129
- existingSchedule : TaskSchedule ,
130
- options : UpsertTaskScheduleServiceOptions ,
131
- projectId : string
132
- ) {
133
- //update the schedule
134
- const scheduleRecord = await tx . taskSchedule . update ( {
135
- where : {
136
- id : existingSchedule . id ,
137
- } ,
138
- data : {
139
- generatorExpression : options . cron ,
140
- generatorDescription : cronstrue . toString ( options . cron ) ,
141
- timezone : options . timezone ?? "UTC" ,
142
- externalId : options . externalId ? options . externalId : null ,
143
- } ,
144
- } ) ;
145
-
146
- const scheduleHasChanged =
147
- scheduleRecord . generatorExpression !== existingSchedule . generatorExpression ||
148
- scheduleRecord . timezone !== existingSchedule . timezone ;
149
-
150
- // find the existing instances
151
- const existingInstances = await tx . taskScheduleInstance . findMany ( {
72
+ const instances = await this . _prisma . taskScheduleInstance . findMany ( {
152
73
where : {
153
74
taskScheduleId : scheduleRecord . id ,
154
75
} ,
@@ -165,18 +86,35 @@ export class UpsertTaskScheduleService extends BaseService {
165
86
} ,
166
87
} ) ;
167
88
168
- // create the new instances
169
- const newInstances : InstanceWithEnvironment [ ] = [ ] ;
170
- const updatingInstances : InstanceWithEnvironment [ ] = [ ] ;
89
+ return this . #createReturnObject( scheduleRecord , instances ) ;
90
+ }
91
+
92
+ async #createNewSchedule(
93
+ options : UpsertTaskScheduleServiceOptions ,
94
+ projectId : string ,
95
+ deduplicationKey : string
96
+ ) {
97
+ return await $transaction ( this . _prisma , async ( tx ) => {
98
+ const scheduleRecord = await tx . taskSchedule . create ( {
99
+ data : {
100
+ projectId,
101
+ friendlyId : generateFriendlyId ( "sched" ) ,
102
+ taskIdentifier : options . taskIdentifier ,
103
+ deduplicationKey,
104
+ userProvidedDeduplicationKey :
105
+ options . deduplicationKey !== undefined && options . deduplicationKey !== "" ,
106
+ generatorExpression : options . cron ,
107
+ generatorDescription : cronstrue . toString ( options . cron ) ,
108
+ timezone : options . timezone ?? "UTC" ,
109
+ externalId : options . externalId ? options . externalId : undefined ,
110
+ } ,
111
+ } ) ;
112
+
113
+ const registerNextService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
171
114
172
- for ( const environmentId of options . environments ) {
173
- const existingInstance = existingInstances . find ( ( i ) => i . environmentId === environmentId ) ;
115
+ //create the instances (links to environments)
174
116
175
- if ( existingInstance ) {
176
- // Update the existing instance
177
- updatingInstances . push ( existingInstance ) ;
178
- } else {
179
- // Create a new instance
117
+ for ( const environmentId of options . environments ) {
180
118
const instance = await tx . taskScheduleInstance . create ( {
181
119
data : {
182
120
taskScheduleId : scheduleRecord . id ,
@@ -195,39 +133,21 @@ export class UpsertTaskScheduleService extends BaseService {
195
133
} ,
196
134
} ) ;
197
135
198
- newInstances . push ( instance ) ;
136
+ await registerNextService . call ( instance . id ) ;
199
137
}
200
- }
201
-
202
- // find the instances that need to be removed
203
- const instancesToDeleted = existingInstances . filter (
204
- ( i ) => ! options . environments . includes ( i . environmentId )
205
- ) ;
206
138
207
- // delete the instances no longer selected
208
- for ( const instance of instancesToDeleted ) {
209
- await tx . taskScheduleInstance . delete ( {
210
- where : {
211
- id : instance . id ,
212
- } ,
213
- } ) ;
214
- }
215
-
216
- const registerService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
217
-
218
- for ( const instance of newInstances ) {
219
- await registerService . call ( instance . id ) ;
220
- }
221
-
222
- if ( scheduleHasChanged ) {
223
- for ( const instance of updatingInstances ) {
224
- await registerService . call ( instance . id ) ;
225
- }
226
- }
139
+ return { scheduleRecord } ;
140
+ } ) ;
141
+ }
227
142
228
- const instances = await tx . taskScheduleInstance . findMany ( {
143
+ async #updateExistingSchedule(
144
+ existingSchedule : TaskSchedule ,
145
+ options : UpsertTaskScheduleServiceOptions
146
+ ) {
147
+ // find the existing instances
148
+ const existingInstances = await this . _prisma . taskScheduleInstance . findMany ( {
229
149
where : {
230
- taskScheduleId : scheduleRecord . id ,
150
+ taskScheduleId : existingSchedule . id ,
231
151
} ,
232
152
include : {
233
153
environment : {
@@ -242,7 +162,89 @@ export class UpsertTaskScheduleService extends BaseService {
242
162
} ,
243
163
} ) ;
244
164
245
- return { scheduleRecord, instances } ;
165
+ return await $transaction (
166
+ this . _prisma ,
167
+ async ( tx ) => {
168
+ const scheduleRecord = await tx . taskSchedule . update ( {
169
+ where : {
170
+ id : existingSchedule . id ,
171
+ } ,
172
+ data : {
173
+ generatorExpression : options . cron ,
174
+ generatorDescription : cronstrue . toString ( options . cron ) ,
175
+ timezone : options . timezone ?? "UTC" ,
176
+ externalId : options . externalId ? options . externalId : null ,
177
+ } ,
178
+ } ) ;
179
+
180
+ const scheduleHasChanged =
181
+ scheduleRecord . generatorExpression !== existingSchedule . generatorExpression ||
182
+ scheduleRecord . timezone !== existingSchedule . timezone ;
183
+
184
+ // create the new instances
185
+ const newInstances : InstanceWithEnvironment [ ] = [ ] ;
186
+ const updatingInstances : InstanceWithEnvironment [ ] = [ ] ;
187
+
188
+ for ( const environmentId of options . environments ) {
189
+ const existingInstance = existingInstances . find ( ( i ) => i . environmentId === environmentId ) ;
190
+
191
+ if ( existingInstance ) {
192
+ // Update the existing instance
193
+ updatingInstances . push ( existingInstance ) ;
194
+ } else {
195
+ // Create a new instance
196
+ const instance = await tx . taskScheduleInstance . create ( {
197
+ data : {
198
+ taskScheduleId : scheduleRecord . id ,
199
+ environmentId,
200
+ } ,
201
+ include : {
202
+ environment : {
203
+ include : {
204
+ orgMember : {
205
+ include : {
206
+ user : true ,
207
+ } ,
208
+ } ,
209
+ } ,
210
+ } ,
211
+ } ,
212
+ } ) ;
213
+
214
+ newInstances . push ( instance ) ;
215
+ }
216
+ }
217
+
218
+ // find the instances that need to be removed
219
+ const instancesToDeleted = existingInstances . filter (
220
+ ( i ) => ! options . environments . includes ( i . environmentId )
221
+ ) ;
222
+
223
+ // delete the instances no longer selected
224
+ for ( const instance of instancesToDeleted ) {
225
+ await tx . taskScheduleInstance . delete ( {
226
+ where : {
227
+ id : instance . id ,
228
+ } ,
229
+ } ) ;
230
+ }
231
+
232
+ const registerService = new RegisterNextTaskScheduleInstanceService ( tx ) ;
233
+
234
+ for ( const instance of newInstances ) {
235
+ await registerService . call ( instance . id ) ;
236
+ }
237
+
238
+ if ( scheduleHasChanged ) {
239
+ for ( const instance of updatingInstances ) {
240
+ await registerService . call ( instance . id ) ;
241
+ }
242
+ }
243
+
244
+ return { scheduleRecord } ;
245
+ } ,
246
+ { timeout : 10_000 }
247
+ ) ;
246
248
}
247
249
248
250
#createReturnObject( taskSchedule : TaskSchedule , instances : InstanceWithEnvironment [ ] ) {
0 commit comments