Skip to content

Commit 348c54a

Browse files
committed
Track metrics when executing via the WebSocket
1 parent 20b383b commit 348c54a

File tree

2 files changed

+54
-15
lines changed

2 files changed

+54
-15
lines changed

ui/src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub(crate) enum Outcome {
7676
ErrorTimeoutSoft,
7777
ErrorTimeoutHard,
7878
ErrorUserCode,
79+
Abandoned,
7980
}
8081

8182
pub(crate) struct LabelsCore {

ui/src/server_axum/websocket.rs

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use crate::{
2-
metrics, server_axum::api_orchestrator_integration_impls::*, Error, Result,
3-
StreamingCoordinatorIdleSnafu, StreamingCoordinatorSpawnSnafu, StreamingExecuteSnafu,
4-
WebSocketTaskPanicSnafu,
2+
metrics::{self, record_metric, Endpoint, HasLabelsCore, Outcome},
3+
server_axum::api_orchestrator_integration_impls::*,
4+
Error, Result, StreamingCoordinatorIdleSnafu, StreamingCoordinatorSpawnSnafu,
5+
StreamingExecuteSnafu, WebSocketTaskPanicSnafu,
56
};
67

78
use axum::extract::ws::{Message, WebSocket};
89
use futures::{Future, FutureExt};
910
use orchestrator::coordinator::{self, Coordinator, DockerBackend};
1011
use snafu::prelude::*;
1112
use std::{
12-
convert::{TryFrom, TryInto},
13+
convert::TryFrom,
1314
sync::{
1415
atomic::{AtomicU64, Ordering},
1516
Arc,
@@ -451,10 +452,16 @@ async fn handle_msg(txt: String, tx: &ResponseTx, manager: &mut CoordinatorManag
451452
}
452453
}
453454

454-
macro_rules! return_if_closed {
455+
#[derive(Debug)]
456+
enum CompletedOrAbandoned<T> {
457+
Abandoned,
458+
Completed(T),
459+
}
460+
461+
macro_rules! abandon_if_closed {
455462
($sent:expr) => {
456463
if $sent.is_err() {
457-
return Ok(());
464+
return Ok(CompletedOrAbandoned::Abandoned);
458465
}
459466
};
460467
}
@@ -466,8 +473,36 @@ async fn handle_execute(
466473
meta: Meta,
467474
) -> ExecuteResult<()> {
468475
use execute_error::*;
476+
use CompletedOrAbandoned::*;
477+
478+
let req = coordinator::ExecuteRequest::try_from(req).context(BadRequestSnafu)?;
479+
480+
let labels_core = req.labels_core();
481+
482+
let start = Instant::now();
483+
let v = handle_execute_inner(tx, coordinator, req, meta).await;
484+
let elapsed = start.elapsed();
469485

470-
let req = req.try_into().context(BadRequestSnafu)?;
486+
let outcome = match &v {
487+
Ok(Abandoned) => Outcome::Abandoned,
488+
Ok(Completed(v)) => *v,
489+
Err(_) => Outcome::ErrorServer,
490+
};
491+
492+
record_metric(Endpoint::Execute, labels_core, outcome, elapsed);
493+
494+
v?;
495+
Ok(())
496+
}
497+
498+
async fn handle_execute_inner(
499+
tx: ResponseTx,
500+
coordinator: SharedCoordinator,
501+
req: coordinator::ExecuteRequest,
502+
meta: Meta,
503+
) -> ExecuteResult<CompletedOrAbandoned<Outcome>> {
504+
use execute_error::*;
505+
use CompletedOrAbandoned::*;
471506

472507
let coordinator::ActiveExecution {
473508
mut task,
@@ -478,7 +513,7 @@ async fn handle_execute(
478513
let sent = tx
479514
.send(Ok(MessageResponse::ExecuteBegin { meta: meta.clone() }))
480515
.await;
481-
return_if_closed!(sent);
516+
abandon_if_closed!(sent);
482517

483518
let send_stdout = |payload| async {
484519
let meta = meta.clone();
@@ -498,38 +533,41 @@ async fn handle_execute(
498533

499534
Some(stdout) = stdout_rx.recv() => {
500535
let sent = send_stdout(stdout).await;
501-
return_if_closed!(sent);
536+
abandon_if_closed!(sent);
502537
},
503538

504539
Some(stderr) = stderr_rx.recv() => {
505540
let sent = send_stderr(stderr).await;
506-
return_if_closed!(sent);
541+
abandon_if_closed!(sent);
507542
},
508543
}
509544
};
510545

511546
// Drain any remaining output
512547
while let Some(Some(stdout)) = stdout_rx.recv().now_or_never() {
513548
let sent = send_stdout(stdout).await;
514-
return_if_closed!(sent);
549+
abandon_if_closed!(sent);
515550
}
516551

517552
while let Some(Some(stderr)) = stderr_rx.recv().now_or_never() {
518553
let sent = send_stderr(stderr).await;
519-
return_if_closed!(sent);
554+
abandon_if_closed!(sent);
520555
}
521556

522-
let coordinator::ExecuteResponse { success } = status.context(EndSnafu)?;
557+
let status = status.context(EndSnafu)?;
558+
let outcome = Outcome::from_success(&status);
559+
560+
let coordinator::ExecuteResponse { success } = status;
523561

524562
let sent = tx
525563
.send(Ok(MessageResponse::ExecuteEnd {
526564
payload: ExecuteResponse { success },
527565
meta,
528566
}))
529567
.await;
530-
return_if_closed!(sent);
568+
abandon_if_closed!(sent);
531569

532-
Ok(())
570+
Ok(Completed(outcome))
533571
}
534572

535573
#[derive(Debug, Snafu)]

0 commit comments

Comments
 (0)