Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ A 2xx response counts as success; anything else surfaces as a `NonSuccessStatus`
error so the caller can decide whether to retry. `headers` and `payload` are
optional (payload defaults to `{}`).

The shared HTTP client used by `process_actions` carries a **30-second
timeout** (connection + read combined). If the target does not respond within
that window the executor returns `WebhookError::Timeout`, distinct from generic
transport errors so callers can apply different retry semantics.

### Worked example

```bash
Expand Down
7 changes: 6 additions & 1 deletion src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
pub mod cron;
pub mod webhook;

use std::time::Duration;

use chrono::Utc;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -63,7 +65,10 @@ pub fn parse_action(raw: &str) -> Result<Action, serde_json::Error> {
/// Cron actions are not fired here; instead we compute and report the next
/// fire time. Real firing happens in [`cron::Scheduler`].
pub async fn process_actions(agent: &Agent) -> Vec<ExecutionOutcome> {
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("failed to build HTTP client");
let now = Utc::now();
let mut outcomes = Vec::with_capacity(agent.actions.len());
for raw in &agent.actions {
Expand Down
48 changes: 46 additions & 2 deletions src/executors/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct WebhookResult {
pub enum WebhookError {
InvalidHeaderName(String),
InvalidHeaderValue(String),
/// The remote did not respond within the configured timeout window.
Timeout,
Transport(reqwest::Error),
NonSuccessStatus { status: u16, body: String },
}
Expand All @@ -49,6 +51,7 @@ impl std::fmt::Display for WebhookError {
WebhookError::InvalidHeaderValue(name) => {
write!(f, "invalid header value for {name}")
}
WebhookError::Timeout => write!(f, "webhook request timed out"),
WebhookError::Transport(e) => write!(f, "webhook transport error: {e}"),
WebhookError::NonSuccessStatus { status, body } => {
write!(f, "webhook returned non-success status {status}: {body}")
Expand All @@ -59,6 +62,14 @@ impl std::fmt::Display for WebhookError {

impl std::error::Error for WebhookError {}

fn map_transport(e: reqwest::Error) -> WebhookError {
if e.is_timeout() {
WebhookError::Timeout
} else {
WebhookError::Transport(e)
}
}

/// POST `action.payload` as JSON to `action.url`, returning the response status
/// and body on success or a structured error otherwise.
///
Expand All @@ -80,9 +91,9 @@ pub async fn execute_webhook(
request = request.header(header_name, header_value);
}

let response = request.send().await.map_err(WebhookError::Transport)?;
let response = request.send().await.map_err(map_transport)?;
let status = response.status().as_u16();
let body = response.text().await.map_err(WebhookError::Transport)?;
let body = response.text().await.map_err(map_transport)?;

if (200..300).contains(&status) {
Ok(WebhookResult { status, body })
Expand All @@ -93,6 +104,8 @@ pub async fn execute_webhook(

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use wiremock::matchers::{body_json, header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
Expand Down Expand Up @@ -190,4 +203,35 @@ mod tests {
let err = execute_webhook(&client, &action).await.unwrap_err();
assert!(matches!(err, WebhookError::InvalidHeaderName(_)));
}

#[tokio::test]
async fn times_out_on_slow_target() {
let server = MockServer::start().await;

// Respond after 2 s — longer than the 1 s client timeout.
Mock::given(method("POST"))
.and(path("/slow"))
.respond_with(
ResponseTemplate::new(200).set_delay(Duration::from_secs(2)),
)
.mount(&server)
.await;

let action = WebhookAction {
url: format!("{}/slow", server.uri()),
headers: HashMap::new(),
payload: None,
};

let client = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()
.unwrap();

let err = execute_webhook(&client, &action).await.unwrap_err();
assert!(
matches!(err, WebhookError::Timeout),
"expected Timeout, got {err:?}"
);
}
}
Loading