Skip to content

Commit 1b568f2

Browse files
authored
Merge pull request rust-lang#977 from rust-lang/streaming-metrics
Track metrics when executing via the WebSocket
2 parents 2ce5e91 + 348c54a commit 1b568f2

File tree

3 files changed

+85
-24
lines changed

3 files changed

+85
-24
lines changed

ui/src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub(crate) enum Outcome {
7676
ErrorTimeoutSoft,
7777
ErrorTimeoutHard,
7878
ErrorUserCode,
79+
Abandoned,
7980
}
8081

8182
pub(crate) struct LabelsCore {

ui/src/server_axum.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -308,18 +308,46 @@ trait IsSuccess {
308308
fn is_success(&self) -> bool;
309309
}
310310

311-
impl IsSuccess for coordinator::WithOutput<coordinator::CompileResponse> {
311+
impl<T> IsSuccess for &T
312+
where
313+
T: IsSuccess,
314+
{
315+
fn is_success(&self) -> bool {
316+
T::is_success(self)
317+
}
318+
}
319+
320+
impl<T> IsSuccess for coordinator::WithOutput<T>
321+
where
322+
T: IsSuccess,
323+
{
324+
fn is_success(&self) -> bool {
325+
self.response.is_success()
326+
}
327+
}
328+
329+
impl IsSuccess for coordinator::CompileResponse {
312330
fn is_success(&self) -> bool {
313331
self.success
314332
}
315333
}
316334

317-
impl IsSuccess for coordinator::WithOutput<coordinator::ExecuteResponse> {
335+
impl IsSuccess for coordinator::ExecuteResponse {
318336
fn is_success(&self) -> bool {
319337
self.success
320338
}
321339
}
322340

341+
impl Outcome {
342+
fn from_success(other: impl IsSuccess) -> Self {
343+
if other.is_success() {
344+
Outcome::Success
345+
} else {
346+
Outcome::ErrorUserCode
347+
}
348+
}
349+
}
350+
323351
async fn with_coordinator<WebReq, WebResp, Req, Resp, F>(req: WebReq, f: F) -> Result<WebResp>
324352
where
325353
WebReq: TryInto<Req>,
@@ -346,13 +374,7 @@ where
346374
let elapsed = start.elapsed();
347375

348376
let outcome = match &resp {
349-
Ok(Ok(v)) => {
350-
if v.is_success() {
351-
Outcome::Success
352-
} else {
353-
Outcome::ErrorUserCode
354-
}
355-
}
377+
Ok(Ok(v)) => Outcome::from_success(v),
356378
Ok(Err(_)) => Outcome::ErrorServer,
357379
Err(_) => Outcome::ErrorTimeoutSoft,
358380
};

ui/src/server_axum/websocket.rs

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use crate::{
2-
metrics, server_axum::api_orchestrator_integration_impls::*, Error, Result,
3-
StreamingCoordinatorIdleSnafu, StreamingCoordinatorSpawnSnafu, StreamingExecuteSnafu,
4-
WebSocketTaskPanicSnafu,
2+
metrics::{self, record_metric, Endpoint, HasLabelsCore, Outcome},
3+
server_axum::api_orchestrator_integration_impls::*,
4+
Error, Result, StreamingCoordinatorIdleSnafu, StreamingCoordinatorSpawnSnafu,
5+
StreamingExecuteSnafu, WebSocketTaskPanicSnafu,
56
};
67

78
use axum::extract::ws::{Message, WebSocket};
89
use futures::{Future, FutureExt};
910
use orchestrator::coordinator::{self, Coordinator, DockerBackend};
1011
use snafu::prelude::*;
1112
use std::{
12-
convert::{TryFrom, TryInto},
13+
convert::TryFrom,
1314
sync::{
1415
atomic::{AtomicU64, Ordering},
1516
Arc,
@@ -451,10 +452,16 @@ async fn handle_msg(txt: String, tx: &ResponseTx, manager: &mut CoordinatorManag
451452
}
452453
}
453454

454-
macro_rules! return_if_closed {
455+
#[derive(Debug)]
456+
enum CompletedOrAbandoned<T> {
457+
Abandoned,
458+
Completed(T),
459+
}
460+
461+
macro_rules! abandon_if_closed {
455462
($sent:expr) => {
456463
if $sent.is_err() {
457-
return Ok(());
464+
return Ok(CompletedOrAbandoned::Abandoned);
458465
}
459466
};
460467
}
@@ -466,8 +473,36 @@ async fn handle_execute(
466473
meta: Meta,
467474
) -> ExecuteResult<()> {
468475
use execute_error::*;
476+
use CompletedOrAbandoned::*;
477+
478+
let req = coordinator::ExecuteRequest::try_from(req).context(BadRequestSnafu)?;
479+
480+
let labels_core = req.labels_core();
481+
482+
let start = Instant::now();
483+
let v = handle_execute_inner(tx, coordinator, req, meta).await;
484+
let elapsed = start.elapsed();
469485

470-
let req = req.try_into().context(BadRequestSnafu)?;
486+
let outcome = match &v {
487+
Ok(Abandoned) => Outcome::Abandoned,
488+
Ok(Completed(v)) => *v,
489+
Err(_) => Outcome::ErrorServer,
490+
};
491+
492+
record_metric(Endpoint::Execute, labels_core, outcome, elapsed);
493+
494+
v?;
495+
Ok(())
496+
}
497+
498+
async fn handle_execute_inner(
499+
tx: ResponseTx,
500+
coordinator: SharedCoordinator,
501+
req: coordinator::ExecuteRequest,
502+
meta: Meta,
503+
) -> ExecuteResult<CompletedOrAbandoned<Outcome>> {
504+
use execute_error::*;
505+
use CompletedOrAbandoned::*;
471506

472507
let coordinator::ActiveExecution {
473508
mut task,
@@ -478,7 +513,7 @@ async fn handle_execute(
478513
let sent = tx
479514
.send(Ok(MessageResponse::ExecuteBegin { meta: meta.clone() }))
480515
.await;
481-
return_if_closed!(sent);
516+
abandon_if_closed!(sent);
482517

483518
let send_stdout = |payload| async {
484519
let meta = meta.clone();
@@ -498,38 +533,41 @@ async fn handle_execute(
498533

499534
Some(stdout) = stdout_rx.recv() => {
500535
let sent = send_stdout(stdout).await;
501-
return_if_closed!(sent);
536+
abandon_if_closed!(sent);
502537
},
503538

504539
Some(stderr) = stderr_rx.recv() => {
505540
let sent = send_stderr(stderr).await;
506-
return_if_closed!(sent);
541+
abandon_if_closed!(sent);
507542
},
508543
}
509544
};
510545

511546
// Drain any remaining output
512547
while let Some(Some(stdout)) = stdout_rx.recv().now_or_never() {
513548
let sent = send_stdout(stdout).await;
514-
return_if_closed!(sent);
549+
abandon_if_closed!(sent);
515550
}
516551

517552
while let Some(Some(stderr)) = stderr_rx.recv().now_or_never() {
518553
let sent = send_stderr(stderr).await;
519-
return_if_closed!(sent);
554+
abandon_if_closed!(sent);
520555
}
521556

522-
let coordinator::ExecuteResponse { success } = status.context(EndSnafu)?;
557+
let status = status.context(EndSnafu)?;
558+
let outcome = Outcome::from_success(&status);
559+
560+
let coordinator::ExecuteResponse { success } = status;
523561

524562
let sent = tx
525563
.send(Ok(MessageResponse::ExecuteEnd {
526564
payload: ExecuteResponse { success },
527565
meta,
528566
}))
529567
.await;
530-
return_if_closed!(sent);
568+
abandon_if_closed!(sent);
531569

532-
Ok(())
570+
Ok(Completed(outcome))
533571
}
534572

535573
#[derive(Debug, Snafu)]

0 commit comments

Comments
 (0)