Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ A 2xx response counts as success; anything else surfaces as a `NonSuccessStatus`
error so the caller can decide whether to retry. `headers` and `payload` are
optional (payload defaults to `{}`).

**Response body cap (64 KiB):** The executor reads at most **64 KiB** from the
response body. Webhook targets are expected to return only a short
acknowledgement — reading an unbounded body would be a denial-of-service
surface. Behaviour when the cap is hit:

- **2xx response:** body is truncated to 64 KiB and `WebhookResult::body_truncated`
is set to `true`.
- **Non-2xx response:** returns `WebhookError::ResponseTooLarge { status, captured_bytes }`
instead of `NonSuccessStatus`, so a caller is never handed a giant allocation
in the error payload.

### Worked example

```bash
Expand Down
4 changes: 2 additions & 2 deletions examples/cron_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};
use axum::{extract::State, routing::post, Json, Router};
use chrono::Utc;
use pypes::executors::cron::{CronAction, Scheduler};
use pypes::executors::webhook::{execute_webhook, WebhookAction};
use pypes::executors::webhook::{execute_webhook, WebhookAction, RESPONSE_BODY_LIMIT_BYTES};
use pypes::executors::Action;
use serde_json::{json, Value};

Expand Down Expand Up @@ -65,7 +65,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for idx in due {
let entry = &scheduler.entries()[idx];
match entry.action.action.as_ref() {
Action::Webhook(spec) => match execute_webhook(&client, spec).await {
Action::Webhook(spec) => match execute_webhook(&client, spec, RESPONSE_BODY_LIMIT_BYTES).await {
Ok(res) => println!(
"← cron[{idx}] fired webhook → status={} body={}",
res.status, res.body
Expand Down
7 changes: 6 additions & 1 deletion src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ pub async fn process_actions(agent: &Agent) -> Vec<ExecutionOutcome> {
match parse_action(raw) {
Ok(Action::Webhook(spec)) => {
outcomes.push(ExecutionOutcome::Webhook(
webhook::execute_webhook(&client, &spec).await,
webhook::execute_webhook(
&client,
&spec,
webhook::RESPONSE_BODY_LIMIT_BYTES,
)
.await,
));
}
Ok(Action::Cron(spec)) => {
Expand Down
160 changes: 152 additions & 8 deletions src/executors/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

/// Maximum bytes read from a webhook response body (64 KiB).
///
/// Webhook targets are expected to return only a short acknowledgement;
/// reading an unbounded body is a denial-of-service surface. Pass a custom
/// value to [`execute_webhook`] to override per call site.
pub const RESPONSE_BODY_LIMIT_BYTES: usize = 64 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookAction {
pub url: String,
Expand All @@ -32,6 +39,9 @@ pub struct WebhookAction {
pub struct WebhookResult {
pub status: u16,
pub body: String,
/// `true` when the response body exceeded `body_size_limit` and was
/// truncated to fit. The captured prefix is in `body`.
pub body_truncated: bool,
}

#[derive(Debug)]
Expand All @@ -40,6 +50,9 @@ pub enum WebhookError {
InvalidHeaderValue(String),
Transport(reqwest::Error),
NonSuccessStatus { status: u16, body: String },
/// The non-2xx response body exceeded `body_size_limit`. `captured_bytes`
/// is how many bytes were buffered before the stream was dropped.
ResponseTooLarge { status: u16, captured_bytes: usize },
}

impl std::fmt::Display for WebhookError {
Expand All @@ -53,20 +66,66 @@ impl std::fmt::Display for WebhookError {
WebhookError::NonSuccessStatus { status, body } => {
write!(f, "webhook returned non-success status {status}: {body}")
}
WebhookError::ResponseTooLarge {
status,
captured_bytes,
} => {
write!(
f,
"webhook response body exceeded limit \
({captured_bytes} bytes captured, status {status})"
)
}
}
}
}

impl std::error::Error for WebhookError {}

/// Buffer at most `limit` bytes from `response` using the chunk API.
///
/// Returns `(buf, truncated)` where `truncated` is `true` when the stream
/// contained more data than `limit` allowed.
async fn read_body_capped(
mut response: reqwest::Response,
limit: usize,
) -> Result<(Vec<u8>, bool), reqwest::Error> {
let mut buf: Vec<u8> = Vec::new();
let mut truncated = false;
loop {
match response.chunk().await? {
None => break,
Some(chunk) => {
let space = limit.saturating_sub(buf.len());
if chunk.len() > space {
buf.extend_from_slice(&chunk[..space]);
truncated = true;
break;
}
buf.extend_from_slice(&chunk);
}
}
}
Ok((buf, truncated))
}

/// POST `action.payload` as JSON to `action.url`, returning the response status
/// and body on success or a structured error otherwise.
///
/// 2xx is success; anything else surfaces as `NonSuccessStatus` so a caller can
/// decide whether to retry without re-parsing the response.
/// `body_size_limit` caps how many bytes are buffered from the response body;
/// use [`RESPONSE_BODY_LIMIT_BYTES`] for the default 64 KiB cap.
///
/// - On **2xx** the body is capped and returned in [`WebhookResult`]; if
/// truncated, `body_truncated` is `true`.
/// - On **non-2xx** with a body within the cap, returns
/// [`WebhookError::NonSuccessStatus`].
/// - On **non-2xx** with a body that exceeds the cap, returns
/// [`WebhookError::ResponseTooLarge`] so a caller is never handed a giant
/// allocation in the error payload.
pub async fn execute_webhook(
client: &reqwest::Client,
action: &WebhookAction,
body_size_limit: usize,
) -> Result<WebhookResult, WebhookError> {
let mut request = client
.post(&action.url)
Expand All @@ -82,11 +141,25 @@ pub async fn execute_webhook(

let response = request.send().await.map_err(WebhookError::Transport)?;
let status = response.status().as_u16();
let body = response.text().await.map_err(WebhookError::Transport)?;

let (buf, truncated) = read_body_capped(response, body_size_limit)
.await
.map_err(WebhookError::Transport)?;

if (200..300).contains(&status) {
Ok(WebhookResult { status, body })
let body = String::from_utf8_lossy(&buf).into_owned();
Ok(WebhookResult {
status,
body,
body_truncated: truncated,
})
} else if truncated {
Err(WebhookError::ResponseTooLarge {
status,
captured_bytes: buf.len(),
})
} else {
let body = String::from_utf8_lossy(&buf).into_owned();
Err(WebhookError::NonSuccessStatus { status, body })
}
}
Expand Down Expand Up @@ -120,9 +193,12 @@ mod tests {
};

let client = reqwest::Client::new();
let result = execute_webhook(&client, &action).await.unwrap();
let result = execute_webhook(&client, &action, RESPONSE_BODY_LIMIT_BYTES)
.await
.unwrap();
assert_eq!(result.status, 200);
assert_eq!(result.body, r#"{"ok":true}"#);
assert!(!result.body_truncated);
}

#[tokio::test]
Expand All @@ -144,8 +220,11 @@ mod tests {
};

let client = reqwest::Client::new();
let result = execute_webhook(&client, &action).await.unwrap();
let result = execute_webhook(&client, &action, RESPONSE_BODY_LIMIT_BYTES)
.await
.unwrap();
assert_eq!(result.status, 204);
assert!(!result.body_truncated);
}

#[tokio::test]
Expand All @@ -165,7 +244,9 @@ mod tests {
};

let client = reqwest::Client::new();
let err = execute_webhook(&client, &action).await.unwrap_err();
let err = execute_webhook(&client, &action, RESPONSE_BODY_LIMIT_BYTES)
.await
.unwrap_err();
match err {
WebhookError::NonSuccessStatus { status, body } => {
assert_eq!(status, 500);
Expand All @@ -187,7 +268,70 @@ mod tests {
};

let client = reqwest::Client::new();
let err = execute_webhook(&client, &action).await.unwrap_err();
let err = execute_webhook(&client, &action, RESPONSE_BODY_LIMIT_BYTES)
.await
.unwrap_err();
assert!(matches!(err, WebhookError::InvalidHeaderName(_)));
}

#[tokio::test]
async fn truncates_oversized_success_body() {
let server = MockServer::start().await;
// One byte over the tiny cap we use for this test.
let cap: usize = 16;
let big_body = "x".repeat(cap + 1);

Mock::given(method("POST"))
.and(path("/big-ok"))
.respond_with(ResponseTemplate::new(200).set_body_string(big_body))
.expect(1)
.mount(&server)
.await;

let action = WebhookAction {
url: format!("{}/big-ok", server.uri()),
headers: HashMap::new(),
payload: Some(json!({})),
};

let client = reqwest::Client::new();
let result = execute_webhook(&client, &action, cap).await.unwrap();
assert_eq!(result.status, 200);
assert!(result.body_truncated);
assert_eq!(result.body.len(), cap);
assert_eq!(result.body, "x".repeat(cap));
}

#[tokio::test]
async fn returns_response_too_large_on_oversized_non_2xx_body() {
let server = MockServer::start().await;
let cap: usize = 16;
let big_body = "e".repeat(cap + 1);

Mock::given(method("POST"))
.and(path("/big-err"))
.respond_with(ResponseTemplate::new(500).set_body_string(big_body))
.expect(1)
.mount(&server)
.await;

let action = WebhookAction {
url: format!("{}/big-err", server.uri()),
headers: HashMap::new(),
payload: Some(json!({})),
};

let client = reqwest::Client::new();
let err = execute_webhook(&client, &action, cap).await.unwrap_err();
match err {
WebhookError::ResponseTooLarge {
status,
captured_bytes,
} => {
assert_eq!(status, 500);
assert_eq!(captured_bytes, cap);
}
other => panic!("expected ResponseTooLarge, got {other:?}"),
}
}
}
Loading