@@ -5,28 +5,40 @@ use std::sync::Arc;
5
5
use clap:: Parser ;
6
6
use eyre:: WrapErr as _;
7
7
use tokio:: sync:: Mutex ;
8
+
8
9
use twitch_api:: {
9
10
client:: ClientDefault ,
10
11
eventsub:: { self , Event , Message , Payload } ,
11
12
HelixClient ,
12
13
} ;
13
14
use twitch_oauth2:: { Scope , TwitchToken as _, UserToken } ;
14
15
16
+ /// The scopes we need for the bot.
17
+ const SCOPES : & [ Scope ] = & [ Scope :: UserReadChat , Scope :: UserWriteChat ] ;
18
+
19
+ /// The threshold at which we should refresh the token before expiration.
20
+ const TOKEN_EXPIRATION_THRESHOLD : std:: time:: Duration = std:: time:: Duration :: from_secs ( 60 ) ;
21
+
15
22
#[ derive( Parser , Debug , Clone ) ]
16
23
#[ clap( about, version) ]
17
24
pub struct Cli {
18
25
/// Client ID of twitch application
19
26
#[ clap( long, env, hide_env = true ) ]
20
27
pub client_id : twitch_oauth2:: ClientId ,
21
- #[ clap( long, env, hide_env = true ) ]
22
- pub broadcaster_login : twitch_api:: types:: UserName ,
28
+ /// Chat to connect to, can take multiple values separated by commas
29
+ #[ clap( long, env, value_delimiter = ',' , hide_env = true ) ]
30
+ pub broadcaster_login : Vec < twitch_api:: types:: UserName > ,
23
31
/// Path to config file
24
32
#[ clap( long, default_value = concat!( env!( "CARGO_MANIFEST_DIR" ) , "/config.toml" ) ) ]
25
33
pub config : std:: path:: PathBuf ,
34
+ /// Path to token file
35
+ #[ clap( long, default_value = concat!( env!( "CARGO_MANIFEST_DIR" ) , "/auth.toml" ) ) ]
36
+ pub auth : std:: path:: PathBuf ,
26
37
}
27
38
28
39
#[ derive( serde_derive:: Serialize , serde_derive:: Deserialize , Debug ) ]
29
40
pub struct Config {
41
+ #[ serde( default ) ]
30
42
command : Vec < Command > ,
31
43
}
32
44
@@ -57,37 +69,44 @@ async fn main() -> Result<(), eyre::Report> {
57
69
ClientDefault :: default_client_with_name ( Some ( "my_chatbot" . parse ( ) ?) ) ?,
58
70
) ;
59
71
60
- // First we need to get a token, preferably you'd also store this information somewhere safe to reuse when restarting the application.
61
- // For now we'll just get a new token every time the application starts.
62
- // One way to store the token is to store the access_token and refresh_token in a file and load it when the application starts with
63
- // `twitch_oauth2::UserToken::from_existing`
64
- let mut builder = twitch_oauth2:: tokens:: DeviceUserTokenBuilder :: new (
65
- opts. client_id . clone ( ) ,
66
- vec ! [ Scope :: UserReadChat , Scope :: UserWriteChat ] ,
67
- ) ;
68
- let code = builder. start ( & client) . await ?;
69
- println ! ( "Please go to: {}" , code. verification_uri) ;
70
- let token = builder. wait_for_code ( & client, tokio:: time:: sleep) . await ?;
71
-
72
- let Some ( twitch_api:: helix:: users:: User {
73
- id : broadcaster, ..
74
- } ) = client
75
- . get_user_from_login ( & opts. broadcaster_login , & token)
76
- . await ?
77
- else {
78
- eyre:: bail!(
79
- "No broadcaster found with login: {}" ,
80
- opts. broadcaster_login
72
+ // Get an user access token.
73
+ // For this example we store the token in a file, but you should probably store it in a database or similar.
74
+ // If there is no token saved, we use Device Code Flow to get a token.
75
+ // This flow works best with public client type applications.
76
+ // If you have a confidential client type application you should use `UserTokenBuilder` for OAuth authorization code flow.
77
+ let token = if let Some ( token) = load_token ( & opts. auth , & client) . await ? {
78
+ token
79
+ } else {
80
+ let mut builder = twitch_oauth2:: tokens:: DeviceUserTokenBuilder :: new (
81
+ opts. client_id . clone ( ) ,
82
+ SCOPES . to_vec ( ) ,
81
83
) ;
84
+ let code = builder. start ( & client) . await ?;
85
+ println ! ( "Please go to: {}" , code. verification_uri) ;
86
+ builder. wait_for_code ( & client, tokio:: time:: sleep) . await ?
82
87
} ;
88
+ save_token ( & token, & opts. auth ) ?;
83
89
let token = Arc :: new ( Mutex :: new ( token) ) ;
84
90
91
+ // Get the broadcaster ids from the logins.
92
+ let mut broadcasters = vec ! [ ] ;
93
+ for login in opts. broadcaster_login . iter ( ) {
94
+ if let Some ( twitch_api:: helix:: users:: User { id, .. } ) =
95
+ client. get_user_from_login ( login, & token) . await ?
96
+ {
97
+ broadcasters. push ( id) ;
98
+ } else {
99
+ eyre:: bail!( "No broadcaster found with login: {}" , login) ;
100
+ }
101
+ }
102
+
103
+ // Create the bot and start it.
85
104
let bot = Bot {
86
105
opts,
87
106
client,
88
107
token,
89
108
config,
90
- broadcaster ,
109
+ broadcasters ,
91
110
} ;
92
111
bot. start ( ) . await ?;
93
112
Ok ( ( ) )
@@ -98,10 +117,12 @@ pub struct Bot {
98
117
pub client : HelixClient < ' static , reqwest:: Client > ,
99
118
pub token : Arc < Mutex < twitch_oauth2:: UserToken > > ,
100
119
pub config : Config ,
101
- pub broadcaster : twitch_api:: types:: UserId ,
120
+ pub broadcasters : Vec < twitch_api:: types:: UserId > ,
102
121
}
103
122
104
123
impl Bot {
124
+ /// Start the bot. This will connect to the chat and start handling for events with [Bot::handle_event].
125
+ /// This will also start a task that will refresh the token if it's about to expire and check if it's still valid.
105
126
pub async fn start ( & self ) -> Result < ( ) , eyre:: Report > {
106
127
// To make a connection to the chat we need to use a websocket connection.
107
128
// This is a wrapper for the websocket connection that handles the reconnects and handles all messages from eventsub.
@@ -110,7 +131,7 @@ impl Bot {
110
131
token : self . token . clone ( ) ,
111
132
client : self . client . clone ( ) ,
112
133
connect_url : twitch_api:: TWITCH_EVENTSUB_WEBSOCKET_URL . clone ( ) ,
113
- chats : vec ! [ self . broadcaster . clone( ) ] ,
134
+ chats : self . broadcasters . clone ( ) ,
114
135
} ;
115
136
let refresh_token = async move {
116
137
let token = self . token . clone ( ) ;
@@ -121,16 +142,7 @@ impl Bot {
121
142
loop {
122
143
interval. tick ( ) . await ;
123
144
let mut token = token. lock ( ) . await ;
124
- if token. expires_in ( ) < std:: time:: Duration :: from_secs ( 60 ) {
125
- token
126
- . refresh_token ( & self . client )
127
- . await
128
- . wrap_err ( "couldn't refresh token" ) ?;
129
- }
130
- token
131
- . validate_token ( & client)
132
- . await
133
- . wrap_err ( "couldn't validate token" ) ?;
145
+ refresh_and_validate_token ( & mut token, & client, & self . opts ) . await ?;
134
146
}
135
147
#[ allow( unreachable_code) ]
136
148
Ok ( ( ) )
@@ -140,38 +152,60 @@ impl Bot {
140
152
Ok ( ( ) )
141
153
}
142
154
155
+ /// Handle chat messages, if they start with `!` send it to [Bot::command].
156
+ async fn handle_chat_message (
157
+ & self ,
158
+ token : tokio:: sync:: MutexGuard < ' _ , UserToken > ,
159
+ payload : eventsub:: channel:: ChannelChatMessageV1Payload ,
160
+ subscription : eventsub:: EventSubscriptionInformation <
161
+ eventsub:: channel:: ChannelChatMessageV1 ,
162
+ > ,
163
+ ) -> Result < ( ) , eyre:: Error > {
164
+ if let Some ( command) = payload. message . text . strip_prefix ( "!" ) {
165
+ let mut split_whitespace = command. split_whitespace ( ) ;
166
+ let command = split_whitespace. next ( ) . unwrap ( ) ;
167
+ let rest = split_whitespace. next ( ) ;
168
+
169
+ self . command ( & payload, & subscription, command, rest, & token)
170
+ . await ?;
171
+ }
172
+ Ok ( ( ) )
173
+ }
174
+
175
+ /// Handle all eventsub events.
176
+ /// We print the message to the console and if it's a chat message we send it to [Bot::handle_chat_message].
177
+ /// If there's an event you want to listen to you should first add it to [websocket::ChatWebsocketClient::process_welcome_message] and then handle it here.
143
178
async fn handle_event (
144
179
& self ,
145
180
event : Event ,
146
181
timestamp : twitch_api:: types:: Timestamp ,
147
182
) -> Result < ( ) , eyre:: Report > {
148
183
let token = self . token . lock ( ) . await ;
184
+ let time_format = time:: format_description:: parse ( "[hour]:[minute]:[second]" ) ?;
149
185
match event {
150
186
Event :: ChannelChatMessageV1 ( Payload {
151
187
message : Message :: Notification ( payload) ,
152
188
subscription,
153
189
..
154
190
} ) => {
155
191
println ! (
156
- "[{}] {}: {}" ,
157
- timestamp, payload. chatter_user_name, payload. message. text
192
+ "[{}] #{} {}: {}" ,
193
+ timestamp. to_utc( ) . format( & time_format) . unwrap( ) ,
194
+ payload. broadcaster_user_login,
195
+ payload. chatter_user_name,
196
+ payload. message. text
158
197
) ;
159
- if let Some ( command) = payload. message . text . strip_prefix ( "!" ) {
160
- let mut split_whitespace = command. split_whitespace ( ) ;
161
- let command = split_whitespace. next ( ) . unwrap ( ) ;
162
- let rest = split_whitespace. next ( ) ;
163
-
164
- self . command ( & payload, & subscription, command, rest, & token)
165
- . await ?;
166
- }
198
+
199
+ self . handle_chat_message ( token, payload, subscription)
200
+ . await ?;
167
201
}
168
202
Event :: ChannelChatNotificationV1 ( Payload {
169
203
message : Message :: Notification ( payload) ,
170
204
..
171
205
} ) => {
172
206
println ! (
173
- "[{}] {}: {}" ,
174
- timestamp,
207
+ "[{}] [Event] {}: {}" ,
208
+ timestamp. to_utc ( ) . format ( & time_format ) . unwrap ( ) ,
175
209
match & payload. chatter {
176
210
eventsub:: channel:: chat:: notification:: Chatter :: Chatter {
177
211
chatter_user_name: user,
@@ -206,7 +240,7 @@ impl Bot {
206
240
& payload. message_id ,
207
241
response
208
242
. response
209
- . replace ( "{user}" , & payload. chatter_user_name . as_str ( ) )
243
+ . replace ( "{user}" , payload. chatter_user_name . as_str ( ) )
210
244
. as_str ( ) ,
211
245
token,
212
246
)
@@ -215,3 +249,65 @@ impl Bot {
215
249
Ok ( ( ) )
216
250
}
217
251
}
252
+
253
+ async fn refresh_and_validate_token (
254
+ token : & mut UserToken ,
255
+ client : & HelixClient < ' _ , reqwest:: Client > ,
256
+ opts : & Cli ,
257
+ ) -> Result < ( ) , eyre:: Report > {
258
+ if token. expires_in ( ) < TOKEN_EXPIRATION_THRESHOLD {
259
+ tracing:: info!( "refreshed token" ) ;
260
+ token
261
+ . refresh_token ( client)
262
+ . await
263
+ . wrap_err ( "couldn't refresh token" ) ?;
264
+ save_token ( token, & opts. auth ) ?;
265
+ }
266
+ token
267
+ . validate_token ( client)
268
+ . await
269
+ . wrap_err ( "couldn't validate token" ) ?;
270
+ Ok ( ( ) )
271
+ }
272
+
273
+ /// Used to save the token to a file
274
+ #[ derive( serde_derive:: Serialize , serde_derive:: Deserialize ) ]
275
+ struct SavedToken {
276
+ access_token : twitch_oauth2:: AccessToken ,
277
+ refresh_token : twitch_oauth2:: RefreshToken ,
278
+ }
279
+
280
+ // you should probably replace this with something more robust
281
+ #[ cfg( debug_assertions) ]
282
+ fn save_token (
283
+ token : & twitch_oauth2:: UserToken ,
284
+ save_path : & std:: path:: Path ,
285
+ ) -> Result < ( ) , eyre:: Report > {
286
+ let token = SavedToken {
287
+ access_token : token. access_token . clone ( ) ,
288
+ refresh_token : token. refresh_token . clone ( ) . unwrap ( ) ,
289
+ } ;
290
+ let text = toml:: to_string ( & token) ?;
291
+ std:: fs:: write ( save_path, text) ?;
292
+ Ok ( ( ) )
293
+ }
294
+
295
+ #[ cfg( debug_assertions) ]
296
+ async fn load_token (
297
+ path : & std:: path:: Path ,
298
+ client : & HelixClient < ' _ , reqwest:: Client > ,
299
+ ) -> Result < Option < twitch_oauth2:: UserToken > , eyre:: Report > {
300
+ let Some ( text) = std:: fs:: read_to_string ( path) . ok ( ) else {
301
+ return Ok ( None ) ;
302
+ } ;
303
+ let token: SavedToken = toml:: from_str ( & text) ?;
304
+ Ok ( Some (
305
+ twitch_oauth2:: UserToken :: from_existing (
306
+ client,
307
+ token. access_token ,
308
+ token. refresh_token ,
309
+ None ,
310
+ )
311
+ . await ?,
312
+ ) )
313
+ }
0 commit comments