From 95802478fbd90ae88169646e5c8c305fdff06de1 Mon Sep 17 00:00:00 2001 From: Mohammad Nejati Date: Sat, 20 Jun 2026 17:58:52 +0000 Subject: [PATCH 1/2] proxy: send resolved IP for socks5 and fix IPv6 addressing --- example/usage.cpp | 2 +- src/detail/connection_pool.cpp | 64 ++++++++++------ src/detail/http_tunnel.cpp | 8 +- src/detail/http_tunnel.hpp | 5 +- src/detail/socks5_tunnel.cpp | 49 ++++++++++-- src/detail/socks5_tunnel.hpp | 5 +- test/unit/detail/connection_pool.cpp | 109 ++++++++++++++++++++++++++- test/unit/detail/http_tunnel.cpp | 46 +++++++---- test/unit/detail/socks5_tunnel.cpp | 80 ++++++++++++++------ 9 files changed, 282 insertions(+), 86 deletions(-) diff --git a/example/usage.cpp b/example/usage.cpp index 7de135c..86e924a 100644 --- a/example/usage.cpp +++ b/example/usage.cpp @@ -491,7 +491,7 @@ use_proxy(corosio::tls_context tls_ctx) // tag::use_proxy[] burl::client::config cfg; // SOCKS5 and HTTP proxies are supported - cfg.proxy = urls::url("socks5h://user:pass@localhost:8080"); + cfg.proxy = urls::url("socks5h://user:pass@localhost:1080"); burl::client client(co_await capy::this_coro::executor, tls_ctx, cfg); diff --git a/src/detail/connection_pool.cpp b/src/detail/connection_pool.cpp index 6b46968..1ba7638 100644 --- a/src/detail/connection_pool.cpp +++ b/src/detail/connection_pool.cpp @@ -54,11 +54,11 @@ connect_tcp( corosio::tcp_socket& socket, capy::executor_ref exec, const client::config& cfg, - std::string_view host, - std::string_view port) + urls::url_view url) { corosio::resolver resolver(exec); - auto [rec, eps] = co_await resolver.resolve(host, port); + auto [rec, eps] = co_await resolver.resolve( + url.encoded_host_address(), effective_port(url)); if(rec) co_return rec; @@ -264,6 +264,11 @@ connection_pool::release(pooled_connection pc) capy::io_task> connection_pool::connect(urls::url_view url) const { + using urls::scheme; + + if(url.scheme_id() != scheme::http && url.scheme_id() != scheme::https) + co_return { error::unsupported_url_scheme, {} }; + if(config_.connect_handler) { auto [ec, stream] = co_await config_.connect_handler(url); @@ -273,41 +278,52 @@ connection_pool::connect(urls::url_view url) const {}, std::make_unique(std::move(stream)) }; } - auto target_port = effective_port(url); - if(target_port.empty()) - co_return { error::unsupported_url_scheme, {} }; - corosio::tcp_socket socket(exec_); if(config_.proxy) { auto const& proxy = *config_.proxy; - auto proxy_port = effective_port(proxy); - if(proxy_port.empty()) + if(effective_port(proxy).empty()) co_return { error::unsupported_proxy_scheme, {} }; - auto [ec] = co_await connect_tcp( - socket, exec_, config_, proxy.encoded_host(), proxy_port); - if(ec) + if(auto [ec] = co_await connect_tcp(socket, exec_, config_, proxy); ec) co_return { ec, {} }; if(proxy.scheme() == "http") { auto [ec] = co_await open_http_tunnel( - capy::any_stream(&socket), - url.encoded_host(), - target_port, - proxy); + capy::any_stream(&socket), url, proxy); if(ec) co_return { ec, {} }; } - else if(proxy.scheme() == "socks5" || proxy.scheme() == "socks5h") + else if(proxy.scheme() == "socks5") { + urls::url resolved; + + corosio::resolver resolver(exec_); + auto [rec, eps] = co_await resolver.resolve( + url.encoded_host_address(), effective_port(url)); + if(rec) + co_return { rec, {} }; + + auto const& ep = eps.front().get_endpoint(); + resolved.set_port_number(ep.port()); + if(ep.is_v4()) + resolved.set_host_ipv4( + urls::ipv4_address(ep.v4_address().to_bytes())); + else + resolved.set_host_ipv6( + urls::ipv6_address(ep.v6_address().to_bytes())); + auto [ec] = co_await open_socks5_tunnel( - capy::any_stream(&socket), - url.encoded_host(), - target_port, - proxy); + capy::any_stream(&socket), resolved, proxy); + if(ec) + co_return { ec, {} }; + } + else if(proxy.scheme() == "socks5h") + { + auto [ec] = co_await open_socks5_tunnel( + capy::any_stream(&socket), url, proxy); if(ec) co_return { ec, {} }; } @@ -318,13 +334,11 @@ connection_pool::connect(urls::url_view url) const } else { - auto [ec] = co_await connect_tcp( - socket, exec_, config_, url.encoded_host(), target_port); - if(ec) + if(auto [ec] = co_await connect_tcp(socket, exec_, config_, url); ec) co_return { ec, {} }; } - if(url.scheme_id() == urls::scheme::https) + if(url.scheme_id() == scheme::https) { auto tls_ctx = tls_ctx_; tls_ctx.set_hostname(url.encoded_host()); diff --git a/src/detail/http_tunnel.cpp b/src/detail/http_tunnel.cpp index 79bc127..803f7bc 100644 --- a/src/detail/http_tunnel.cpp +++ b/src/detail/http_tunnel.cpp @@ -12,6 +12,7 @@ #include #include "base64.hpp" +#include "effective_port.hpp" #include #include @@ -33,13 +34,12 @@ namespace detail capy::io_task<> open_http_tunnel( capy::any_stream stream, - std::string_view target_host, - std::string_view target_port, + urls::url_view target, urls::url_view proxy) { - std::string host_port(target_host); + std::string host_port(target.encoded_host()); host_port += ':'; - host_port += target_port; + host_port += effective_port(target); http::request req(http::method::connect, host_port); req.set(http::field::host, host_port); diff --git a/src/detail/http_tunnel.hpp b/src/detail/http_tunnel.hpp index f92bba6..2092411 100644 --- a/src/detail/http_tunnel.hpp +++ b/src/detail/http_tunnel.hpp @@ -14,8 +14,6 @@ #include #include -#include - namespace boost { namespace burl @@ -26,8 +24,7 @@ namespace detail capy::io_task<> open_http_tunnel( capy::any_stream stream, - std::string_view target_host, - std::string_view target_port, + urls::url_view target, urls::url_view proxy); } // namespace detail diff --git a/src/detail/socks5_tunnel.cpp b/src/detail/socks5_tunnel.cpp index 3ea923f..339bdaa 100644 --- a/src/detail/socks5_tunnel.cpp +++ b/src/detail/socks5_tunnel.cpp @@ -11,11 +11,14 @@ #include +#include "effective_port.hpp" + #include #include #include #include +#include #include #include #include @@ -30,8 +33,7 @@ namespace detail capy::io_task<> open_socks5_tunnel( capy::any_stream stream, - std::string_view target_host, - std::string_view target_port, + urls::url_view target, urls::url_view proxy) { // Greeting: offer username/password auth only when credentials are present. @@ -97,14 +99,45 @@ open_socks5_tunnel( co_return { error::proxy_auth_failed }; } - // connection request - std::string conn_req = { 0x05, 0x01, 0x00, 0x03 }; + // connection request: VER, CMD=connect, RSV + std::string conn_req = { 0x05, 0x01, 0x00 }; - conn_req.push_back(static_cast(target_host.size())); - conn_req.append(target_host); + switch(target.host_type()) + { + case urls::host_type::ipv4: + { + conn_req.push_back(0x01); // ATYP: IPv4 address + auto bytes = target.host_ipv4_address().to_bytes(); + conn_req.append( + reinterpret_cast(bytes.data()), bytes.size()); + break; + } + case urls::host_type::ipv6: + { + conn_req.push_back(0x04); // ATYP: IPv6 address + auto bytes = target.host_ipv6_address().to_bytes(); + conn_req.append( + reinterpret_cast(bytes.data()), bytes.size()); + break; + } + case urls::host_type::name: + { + auto host = target.host_address(); // decoded, without brackets + if(host.empty() || host.size() > 255) + co_return { error::proxy_connect_failed }; + conn_req.push_back(0x03); // ATYP: domain name + conn_req.push_back(static_cast(host.size())); + conn_req.append(host); + break; + } + default: // host_type::none or host_type::ipvfuture + co_return { error::proxy_connect_failed }; + } - auto port = - static_cast(std::stoul(std::string(target_port))); + std::uint16_t port = 0; + auto port_str = effective_port(target); + std::from_chars( + port_str.data(), port_str.data() + port_str.size(), port); conn_req.push_back(static_cast((port >> 8) & 0xFF)); conn_req.push_back(static_cast(port & 0xFF)); diff --git a/src/detail/socks5_tunnel.hpp b/src/detail/socks5_tunnel.hpp index e07c8ad..d8a63f2 100644 --- a/src/detail/socks5_tunnel.hpp +++ b/src/detail/socks5_tunnel.hpp @@ -14,8 +14,6 @@ #include #include -#include - namespace boost { namespace burl @@ -26,8 +24,7 @@ namespace detail capy::io_task<> open_socks5_tunnel( capy::any_stream stream, - std::string_view target_host, - std::string_view target_port, + urls::url_view target, urls::url_view proxy); } // namespace detail diff --git a/test/unit/detail/connection_pool.cpp b/test/unit/detail/connection_pool.cpp index 3b2970d..3f9a187 100644 --- a/test/unit/detail/connection_pool.cpp +++ b/test/unit/detail/connection_pool.cpp @@ -535,7 +535,7 @@ class connection_pool_test } void - testSocks5Proxy() + testSocks5hProxy() { corosio::io_context ioc; loopback_server server{ ioc }; @@ -544,7 +544,7 @@ class connection_pool_test { auto s = co_await server.next(); - // socks5 handshake + // socks5h handshake { // greeting: VER, NMETHODS, METHODS... std::uint8_t greeting[3]; @@ -612,6 +612,86 @@ class connection_pool_test ioc.run(); } + void + testSocks5Proxy() + { + corosio::io_context ioc; + loopback_server server{ ioc }; + + auto server_task = [&]() -> capy::task<> + { + auto s = co_await server.next(); + + // socks5 handshake + { + // greeting: VER, NMETHODS, METHODS... + std::uint8_t greeting[3]; + auto [gec, gn] = + co_await capy::read(s, make_buffer(greeting)); + BOOST_TEST(!gec); + BOOST_TEST_EQ(greeting[0], 0x05); // SOCKS5 + BOOST_TEST_EQ(greeting[1], 0x01); // one method + BOOST_TEST_EQ(greeting[2], 0x00); // no authentication + + // reply: VER, METHOD (no authentication) + std::uint8_t method[2] = { 0x05, 0x00 }; + auto [mec, mn] = co_await s.write_some(make_buffer(method)); + BOOST_TEST(!mec); + + // request head: VER, CMD, RSV, ATYP + std::uint8_t head[4]; + auto [hec, hn] = co_await capy::read(s, make_buffer(head)); + BOOST_TEST(!hec); + BOOST_TEST_EQ(head[0], 0x05); // SOCKS5 + BOOST_TEST_EQ(head[1], 0x01); // CONNECT + BOOST_TEST_EQ(head[3], 0x01); // IPv4 (resolved locally) + + // request tail: 4-byte IPv4 address + port + std::uint8_t tail[6]; + auto [tec, tn] = co_await capy::read(s, make_buffer(tail)); + BOOST_TEST(!tec); + BOOST_TEST_EQ(tail[0], 0x7F); // 127 + BOOST_TEST_EQ(tail[1], 0x00); // 0 + BOOST_TEST_EQ(tail[2], 0x00); // 0 + BOOST_TEST_EQ(tail[3], 0x01); // 1 + BOOST_TEST_EQ(tail[4], 0x00); // port hi + BOOST_TEST_EQ(tail[5], 0x50); // 80 + + // reply success: VER, REP, RSV, ATYP, BND.ADDR, BND.PORT + std::uint8_t reply[10] = { + 0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0 }; + auto [rec, rn] = co_await s.write_some(make_buffer(reply)); + BOOST_TEST(!rec); + } + + co_await pong(&s); + co_await pong(&s); + }; + + auto client_task = [&]() -> capy::task<> + { + client::config cfg{}; + cfg.proxy = server.url("socks5"); + + auto pool = std::make_shared( + co_await capy::this_coro::executor, + corosio::tls_context{}, + std::move(cfg)); + + for(auto i : { 0, 1 }) + { + auto [aec, pc] = co_await pool->acquire("http://127.0.0.1"); + BOOST_TEST(!aec); + co_await ping(&pc); + pool->release(std::move(pc)); + } + }; + + capy::run_async(ioc.get_executor())(server_task()); + capy::run_async(ioc.get_executor())(client_task()); + ioc.run(); + } + void testHttpProxy() { @@ -694,6 +774,29 @@ class connection_pool_test } } + void + testUnsupportedUrlScheme() + { + scripted_net net; + + capy::test::run_blocking()([&]() -> capy::task<> + { + auto pool = std::make_shared( + co_await capy::this_coro::executor, + corosio::tls_context(), + net.config()); + + for(auto const* url : { "ftp://example.com", "ws://example.com" }) + { + auto [ec, pc] = co_await pool->acquire(url); + BOOST_TEST_EQ(ec, error::unsupported_url_scheme); + BOOST_TEST(!pc); + } + }()); + + BOOST_TEST_EQ(net.connects(), 0u); + } + void run() { @@ -710,8 +813,10 @@ class connection_pool_test testTlsConnectionReuse(); testTlsHandshakeFailure(); testSocks5Proxy(); + testSocks5hProxy(); testHttpProxy(); testUnsupportedProxyScheme(); + testUnsupportedUrlScheme(); } }; diff --git a/test/unit/detail/http_tunnel.cpp b/test/unit/detail/http_tunnel.cpp index 9b9aebe..82f3ec0 100644 --- a/test/unit/detail/http_tunnel.cpp +++ b/test/unit/detail/http_tunnel.cpp @@ -30,14 +30,13 @@ class http_tunnel_test static std::error_code run( capy::test::stream& client, - std::string_view host, - std::string_view port, + urls::url_view target, urls::url_view proxy) { std::error_code ret; capy::test::run_blocking( [&](capy::io_result<> rs){ ret = rs.ec;}) - (open_http_tunnel(&client, host, port, proxy)); + (open_http_tunnel(&client, target, proxy)); return ret; } @@ -50,8 +49,7 @@ class http_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "http://proxy:8080"); BOOST_TEST(!ec); @@ -75,8 +73,7 @@ class http_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "http://user:pass@proxy:8080"); BOOST_TEST(!ec); @@ -86,6 +83,28 @@ class http_tunnel_test std::string_view::npos); } + void + testSuccessIPv6Target() + { + auto [client, server] = capy::test::make_stream_pair(); + server.provide("HTTP/1.1 200 Connection established\r\n\r\n"); + + auto ec = run( + client, + "https://[2001:db8::1]:8443", + "http://proxy:8080"); + + BOOST_TEST(!ec); + + auto req = server.data(); + // IPv6 literals must stay bracketed in the request-target and Host. + BOOST_TEST(req.starts_with( + "CONNECT [2001:db8::1]:8443 HTTP/1.1\r\n")); + BOOST_TEST( + req.find("Host: [2001:db8::1]:8443\r\n") != + std::string_view::npos); + } + void testAuthRequired() { @@ -94,8 +113,7 @@ class http_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "http://proxy:8080"); BOOST_TEST(ec == error::proxy_auth_failed); @@ -109,8 +127,7 @@ class http_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "http://proxy:8080"); BOOST_TEST(ec == error::proxy_connect_failed); @@ -124,8 +141,7 @@ class http_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "http://proxy:8080"); BOOST_TEST(ec == error::proxy_connect_failed); @@ -142,8 +158,7 @@ class http_tunnel_test auto [ec] = co_await open_http_tunnel( &client, - "example.com", - "443", + "https://example.com", "http://proxy:8080"); if(ec) @@ -160,6 +175,7 @@ class http_tunnel_test { testSuccess(); testSuccessWithAuth(); + testSuccessIPv6Target(); testAuthRequired(); testConnectFailed(); testReadError(); diff --git a/test/unit/detail/socks5_tunnel.cpp b/test/unit/detail/socks5_tunnel.cpp index 95538da..c78fe38 100644 --- a/test/unit/detail/socks5_tunnel.cpp +++ b/test/unit/detail/socks5_tunnel.cpp @@ -43,14 +43,13 @@ class socks5_tunnel_test static std::error_code run( capy::test::stream& client, - std::string_view host, - std::string_view port, + urls::url_view target, urls::url_view proxy) { std::error_code ret; capy::test::run_blocking( [&](capy::io_result<> rs){ ret = rs.ec;}) - (open_socks5_tunnel(&client, host, port, proxy)); + (open_socks5_tunnel(&client, target, proxy)); return ret; } @@ -63,8 +62,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(!ec); @@ -87,8 +85,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://user:pass@proxy:1080"); BOOST_TEST(!ec); @@ -112,13 +109,55 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(!ec); } + void + testSuccessIPv4Target() + { + auto [client, server] = capy::test::make_stream_pair(); + server.provide(bytes({ 0x05, 0x00 }) + ipv4_reply()); + + auto ec = run( + client, + "https://192.168.0.1", + "socks5://proxy:1080"); + + BOOST_TEST(!ec); + + std::string expected = bytes({ 0x05, 0x01, 0x00 }); // greeting, no auth + // connect, ATYP=IPv4, address bytes, port 443 + expected += bytes( + { 0x05, 0x01, 0x00, 0x01, 0xC0, 0xA8, 0x00, 0x01, 0x01, 0xBB }); + BOOST_TEST(server.data() == expected); + } + + void + testSuccessIPv6Target() + { + auto [client, server] = capy::test::make_stream_pair(); + server.provide(bytes({ 0x05, 0x00 }) + ipv4_reply()); + + auto ec = run( + client, + "https://[2001:db8::1]", + "socks5://proxy:1080"); + + BOOST_TEST(!ec); + + std::string expected = bytes({ 0x05, 0x01, 0x00 }); // greeting, no auth + // connect, ATYP=IPv6 + expected += bytes({ 0x05, 0x01, 0x00, 0x04 }); + expected += bytes( + { 0x20, 0x01, 0x0D, 0xB8, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01 }); + expected += bytes({ 0x01, 0xBB }); // port 443 + BOOST_TEST(server.data() == expected); + } + void testUnsupportedVersion() { @@ -127,8 +166,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(ec == error::proxy_unsupported_version); @@ -142,8 +180,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://user:pass@proxy:1080"); BOOST_TEST(ec == error::proxy_auth_failed); @@ -157,8 +194,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(ec == error::proxy_auth_failed); @@ -173,8 +209,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(ec == error::proxy_connect_failed); @@ -192,8 +227,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(!ec); @@ -209,8 +243,7 @@ class socks5_tunnel_test auto ec = run( client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); BOOST_TEST(ec == error::proxy_connect_failed); @@ -227,8 +260,7 @@ class socks5_tunnel_test auto [ec] = co_await open_socks5_tunnel( &client, - "example.com", - "443", + "https://example.com", "socks5://proxy:1080"); if(ec) @@ -249,6 +281,8 @@ class socks5_tunnel_test testSuccessNoAuth(); testSuccessWithAuth(); testSuccessDomainReply(); + testSuccessIPv4Target(); + testSuccessIPv6Target(); testUnsupportedVersion(); testAuthFailed(); testNoAcceptableMethods(); From 123a56073380008d17a8462dc2097ec40a587334 Mon Sep 17 00:00:00 2001 From: Mohammad Nejati Date: Tue, 30 Jun 2026 23:01:41 +0330 Subject: [PATCH 2/2] serializer: add custom request serializer with explicit body eof --- example/nlohmann_json.cpp | 2 +- include/boost/burl/error.hpp | 8 + src/client.cpp | 22 +- src/detail/send_file.cpp | 8 +- src/detail/send_file.hpp | 3 +- src/detail/serializer.cpp | 206 ++++++++ src/detail/serializer.hpp | 173 +++++++ src/error.cpp | 2 + src/file.cpp | 2 +- src/json.cpp | 10 +- src/multipart_form.cpp | 6 +- src/string.cpp | 18 +- src/urlencoded_form.cpp | 8 +- test/unit/body_test.hpp | 15 +- test/unit/detail/serializer.cpp | 876 ++++++++++++++++++++++++++++++++ test/unit/error.cpp | 3 + test/unit/file.cpp | 5 +- test/unit/multipart_form.cpp | 5 +- 18 files changed, 1323 insertions(+), 49 deletions(-) create mode 100644 src/detail/serializer.cpp create mode 100644 src/detail/serializer.hpp create mode 100644 test/unit/detail/serializer.cpp diff --git a/example/nlohmann_json.cpp b/example/nlohmann_json.cpp index 65ffccc..5eec092 100644 --- a/example/nlohmann_json.cpp +++ b/example/nlohmann_json.cpp @@ -51,7 +51,7 @@ tag_invoke(burl::body_from_tag, const nlohmann::json& value) capy::io_task<> write(capy::any_buffer_sink& sink) const { - auto [ec, n] = co_await sink.write(capy::make_buffer(text_)); + auto [ec, n] = co_await sink.write_eof(capy::make_buffer(text_)); co_return { ec }; } }; diff --git a/include/boost/burl/error.hpp b/include/boost/burl/error.hpp index 36a282d..fa5d275 100644 --- a/include/boost/burl/error.hpp +++ b/include/boost/burl/error.hpp @@ -64,6 +64,14 @@ enum class error /** The proxy replied with an unsupported protocol version. */ proxy_unsupported_version, + + /** The request body size did not match its content length. + + The number of bytes produced by the request body + differed from the `Content-Length` declared for the + request. + */ + body_size_mismatch, }; /** Error conditions corresponding to sets of error codes. diff --git a/src/client.cpp b/src/client.cpp index 9000bd6..2cbe60e 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -15,6 +15,7 @@ #include "detail/connection_pool.hpp" #include "detail/drain_body.hpp" #include "detail/redirect.hpp" +#include "detail/serializer.hpp" #include #include @@ -27,7 +28,6 @@ #include #include #include -#include #include #include @@ -231,11 +231,8 @@ client::execute_impl( if(!headers.exists(field::accept_encoding)) set_accept_encoding(parser_cfg, headers, config_); - http::serializer serializer(http::make_serializer_config({})); - serializer.reset(); - serializer.set_message(headers); - - http::response_parser parser(http::make_parser_config(parser_cfg)); + http::response_parser parser( + http::make_parser_config(parser_cfg)); auto url = request.url; auto trusted = true; @@ -269,13 +266,16 @@ client::execute_impl( if(request.body.has_value()) { - serializer.start_buffers(); - capy::any_buffer_sink sink(serializer.sink_for(conn)); + capy::any_write_stream ws(&conn); + detail::serializer sr(ws, headers); + capy::any_buffer_sink sink(&sr); if(auto [wec] = co_await request.body.write(sink); wec) co_return { wec, {} }; - // The body only writes its bytes; finalize the sink here. - if(auto [wec] = co_await sink.write_eof(); wec) - co_return { wec, {} }; + if(!sr.is_done()) + { + if(auto [wec] = co_await sr.write_eof(); wec) + co_return { wec, {} }; + } } else { diff --git a/src/detail/send_file.cpp b/src/detail/send_file.cpp index 65e4ceb..26dbf8a 100644 --- a/src/detail/send_file.cpp +++ b/src/detail/send_file.cpp @@ -31,7 +31,8 @@ capy::io_task<> send_file( capy::any_buffer_sink& sink, std::filesystem::path const& path, - std::uint64_t size) + std::uint64_t size, + bool call_eof) { corosio::stream_file f(co_await capy::this_coro::executor); // TODO: switch to a non-throwing open() overload once available. @@ -61,6 +62,11 @@ send_file( auto take = clamp(remaining, n); if(take) { + if(call_eof && take == remaining) + { + if(auto [ec] = co_await sink.commit_eof(take); ec) + co_return { ec }; + } if(auto [ec] = co_await sink.commit(take); ec) co_return { ec }; remaining -= take; diff --git a/src/detail/send_file.hpp b/src/detail/send_file.hpp index 20cf2a0..74ca388 100644 --- a/src/detail/send_file.hpp +++ b/src/detail/send_file.hpp @@ -34,7 +34,8 @@ capy::io_task<> send_file( capy::any_buffer_sink& sink, std::filesystem::path const& path, - std::uint64_t size); + std::uint64_t size, + bool call_eof = false); } // namespace detail } // namespace burl diff --git a/src/detail/serializer.cpp b/src/detail/serializer.cpp new file mode 100644 index 0000000..c6ea42a --- /dev/null +++ b/src/detail/serializer.cpp @@ -0,0 +1,206 @@ +// +// Copyright (c) 2026 Mohammad Nejati +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/burl +// + +#include "serializer.hpp" + +#include + +#include +#include + +namespace boost +{ +namespace burl +{ +namespace detail +{ + +serializer::serializer( + capy::any_write_stream& stream, + http::request_base& req, + config cfg) + : stream_(stream) + , req_(req) + , cfg_(cfg) + , storage_(new unsigned char[cfg_.buffer_size + margin] + margin) +{ + BOOST_ASSERT( + req_.chunked() || req_.payload() == http::payload::size); +} + +serializer::~serializer() +{ + delete[](storage_ - margin); +} + +capy::io_task<> +serializer::write_eof() +{ + return commit_eof(0); +} + +std::span +serializer::prepare(std::span dest) +{ + if(dest.empty() || capacity() == 0) + return dest.first(0); + dest[0] = writable(); + return dest.first(1); +} + +capy::io_task<> +serializer::commit(std::size_t n) +{ + BOOST_ASSERT(n <= capacity()); + avail_ += n; + if(capacity() >= cfg_.min_prepare) + co_return {}; + auto [ec, _] = co_await drain({}, false); + co_return { ec }; +} + +capy::io_task<> +serializer::commit_eof(std::size_t n) +{ + BOOST_ASSERT(n <= capacity()); + avail_ += n; + decide_framing(0); + auto [ec, _] = co_await drain({}, true); + co_return { ec }; +} + +void +serializer::decide_framing(std::size_t remaining) noexcept +{ + if(!req_.chunked() || hdr_sent_) + return; + + BOOST_ASSERT(total_body_ == 0); + req_.erase(http::field::transfer_encoding); + req_.set_content_length(avail_ + remaining); +} + +capy::io_task +serializer::drain( + std::span tail, + bool eof) +{ + auto const tail_size = capy::buffer_size(tail); + auto const chunked = req_.chunked(); + + BOOST_ASSERT(eof || avail_ + tail_size != 0); + + capy::const_buffer vec[capy::detail::max_iovec_ + 3]; + std::size_t n = 0; + std::size_t sum = 0; + + if(!hdr_sent_) + { + vec[n++] = capy::make_buffer(req_.buffer()); + sum += req_.buffer().size(); + } + + if(chunked) + { + auto const buf = chunk_frame(tail_size); + vec[n++] = buf; + sum += buf.size(); + } + else + { + auto const declared = + req_.payload() == http::payload::size + ? req_.payload_size() : 0; + auto const produced = total_body_ + avail_ + tail_size; + + if(produced > declared || (eof && produced != declared)) + co_return { error::body_size_mismatch, 0 }; + + if(avail_ != 0) + { + vec[n++] = { storage_, avail_ }; + sum += avail_; + } + } + + auto const owned = sum; + + sum += tail_size; + for(auto const& b : tail) + vec[n++] = b; + + if(chunked && eof) + { + std::size_t const term = avail_ || tail_size ? 7 : 2; + vec[n++] = { "\r\n0\r\n\r\n", term }; + sum += term; + } + + auto const need = chunked || eof ? sum : owned + bool(tail_size); + auto [ec, written] = co_await write_at_least({ vec, n }, need); + if(ec) + co_return { ec, 0 }; + + auto const consumed = (std::min)(written - owned, tail_size); + total_body_ += avail_ + consumed; + avail_ = 0; + hdr_sent_ = true; + done_ = eof; + co_return { {}, consumed }; +} + +capy::io_task +serializer::write_at_least( + std::span buffers, + std::size_t bytes) +{ + BOOST_ASSERT(bytes <= capy::buffer_size(buffers)); + auto slice = capy::buffer_slice(buffers); + std::size_t written = 0; + while(written < bytes) + { + auto [ec, n] = co_await stream_.write_some(slice.data()); + written += n; + if(ec && written < bytes) + co_return { ec, written }; + slice.remove_prefix(n); + } + co_return { {}, written }; +} + +capy::const_buffer +serializer::chunk_frame(std::size_t tail_size) noexcept +{ + static constexpr char hex[] = "0123456789ABCDEF"; + + auto size = avail_ + tail_size; + auto* p = storage_; + + *--p = '\n'; + *--p = '\r'; + + do + { + *--p = hex[size & 0xF]; + size >>= 4; + } while(size != 0); + + // previous chunk's CRLF + if(total_body_ != 0) + { + *--p = '\n'; + *--p = '\r'; + } + + return { p, static_cast(storage_ - p) + avail_ }; +} + +} // namespace detail +} // namespace burl +} // namespace boost diff --git a/src/detail/serializer.hpp b/src/detail/serializer.hpp new file mode 100644 index 0000000..1589dfc --- /dev/null +++ b/src/detail/serializer.hpp @@ -0,0 +1,173 @@ +// +// Copyright (c) 2026 Mohammad Nejati +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/burl +// + +#ifndef BOOST_BURL_SRC_DETAIL_SERIALIZER_HPP +#define BOOST_BURL_SRC_DETAIL_SERIALIZER_HPP + +#include + +#include +#include +#include +#include +#include +#include + +namespace boost +{ +namespace burl +{ +namespace detail +{ + +class serializer +{ +public: + struct config + { + std::size_t buffer_size = 64 * 1024; + std::size_t min_prepare = 32 * 1024; + std::size_t min_direct = 8 * 1024; + }; + + serializer( + capy::any_write_stream& stream, + http::request_base& req) + : serializer(stream, req, {}) + { + } + + serializer( + capy::any_write_stream& stream, + http::request_base& req, + config cfg); + + ~serializer(); + + bool + is_done() const noexcept + { + return done_; + } + + template + capy::io_task + write_some(Buffers buffers) + { + auto const avail = capy::buffer_size(buffers); + + if(avail < cfg_.min_direct) + { + if(avail >= capacity()) + if(auto [ec, _] = co_await drain({}, false); ec) + co_return { ec, 0 }; + auto n = capy::buffer_copy(writable(), buffers); + avail_ += n; + co_return { {}, n }; + } + + capy::const_buffer_param bp(buffers); + co_return co_await drain(bp.data(), false); + } + + template + capy::io_task + write(Buffers buffers) + { + auto const avail = capy::buffer_size(buffers); + auto slice = capy::buffer_slice(buffers); + std::size_t written = 0; + while(written < avail) + { + auto [ec, n] = co_await write_some(slice.data()); + written += n; + if(ec) + co_return { ec, written }; + slice.remove_prefix(n); + } + co_return { {}, avail }; + } + + template + capy::io_task + write_eof(Buffers buffers) + { + decide_framing(capy::buffer_size(buffers)); + + capy::const_buffer_param bp(buffers); + + if(!bp.more()) + co_return co_await drain(bp.data(), true); + + auto [ec1, n] = co_await write(buffers); + if(ec1) + co_return { ec1, n }; + auto [ec2, _] = co_await drain({}, true); + co_return { ec2, n }; + } + + capy::io_task<> + write_eof(); + + std::span + prepare(std::span dest); + + capy::io_task<> + commit(std::size_t n); + + capy::io_task<> + commit_eof(std::size_t n); + +private: + std::size_t + capacity() const noexcept + { + return cfg_.buffer_size - avail_; + } + + capy::mutable_buffer + writable() const noexcept + { + return { storage_ + avail_, capacity() }; + } + + void + decide_framing(std::size_t remaining) noexcept; + + capy::io_task + drain( + std::span tail, + bool eof); + + // TODO: replace this with capy's version + capy::io_task + write_at_least( + std::span buffers, + std::size_t bytes); + + capy::const_buffer + chunk_frame(std::size_t tail_size) noexcept; + + static constexpr std::size_t margin = 24; + + capy::any_write_stream& stream_; + http::request_base& req_; + config cfg_; + unsigned char* storage_; + std::uint64_t total_body_ = 0; + std::size_t avail_ = 0; + bool hdr_sent_ = false; + bool done_ = false; +}; + +} // namespace detail +} // namespace burl +} // namespace boost + +#endif diff --git a/src/error.cpp b/src/error.cpp index 51f6842..f42b947 100644 --- a/src/error.cpp +++ b/src/error.cpp @@ -50,6 +50,8 @@ error_category::message(int ev) const return "proxy authentication failed"; case error::proxy_unsupported_version: return "unsupported proxy protocol version"; + case error::body_size_mismatch: + return "request body size did not match content length"; default: return "unknown error"; } diff --git a/src/file.cpp b/src/file.cpp index b3e8541..156aa29 100644 --- a/src/file.cpp +++ b/src/file.cpp @@ -63,7 +63,7 @@ class file_body capy::io_task<> write(capy::any_buffer_sink& sink) const { - return detail::send_file(sink, path_, size_); + return detail::send_file(sink, path_, size_, true); } }; diff --git a/src/json.cpp b/src/json.cpp index d7362c1..a4b9a6e 100644 --- a/src/json.cpp +++ b/src/json.cpp @@ -57,7 +57,7 @@ class json_body json::serializer sr; sr.reset(&value_); - while(!sr.done()) + for(;;) { capy::mutable_buffer arr[2]; auto dst = sink.prepare(arr); @@ -70,11 +70,15 @@ class json_body n += sr.read(static_cast(b.data()), b.size()).size(); } + if(sr.done()) + { + auto [ec] = co_await sink.commit_eof(n); + co_return { ec }; + } + if(auto [ec] = co_await sink.commit(n); ec) co_return { ec }; } - - co_return {}; } }; diff --git a/src/multipart_form.cpp b/src/multipart_form.cpp index ded3a06..0446b38 100644 --- a/src/multipart_form.cpp +++ b/src/multipart_form.cpp @@ -198,10 +198,8 @@ class multipart_form::body } auto const trailer = "--" + form_.boundary_ + "--\r\n"; - if(auto [ec, n] = co_await sink.write(make_buffer(trailer)); ec) - co_return { ec }; - - co_return {}; + auto [ec, n] = co_await sink.write_eof(make_buffer(trailer)); + co_return { ec }; } }; diff --git a/src/string.cpp b/src/string.cpp index 6c4e2e0..0f66997 100644 --- a/src/string.cpp +++ b/src/string.cpp @@ -8,6 +8,7 @@ // #include +#include "detail/util.hpp" #include #include @@ -49,11 +50,10 @@ class string_body capy::io_task<> write(capy::any_buffer_sink& sink) const { - if(auto [ec, n] = - co_await sink.write(capy::make_buffer(std::string_view(body_))); - ec) - co_return { ec }; - co_return {}; + auto [ec, n] = + co_await sink.write_eof( + capy::make_buffer(std::string_view(body_))); + co_return { ec }; } }; @@ -82,9 +82,9 @@ class string_view_body capy::io_task<> write(capy::any_buffer_sink& sink) const { - if(auto [ec, n] = co_await sink.write(capy::make_buffer(body_)); ec) - co_return { ec }; - co_return {}; + auto [ec, n] = co_await sink.write_eof( + capy::make_buffer(body_)); + co_return { ec }; } }; @@ -108,7 +108,7 @@ tag_invoke(body_to_tag, response& resp) std::string ret; if(auto cl = resp.content_length()) - ret.reserve(*cl); + ret.reserve(detail::clamp(*cl)); auto source = resp.as_read_source(); auto [ec, n] = diff --git a/src/urlencoded_form.cpp b/src/urlencoded_form.cpp index d0e3f5e..48cc257 100644 --- a/src/urlencoded_form.cpp +++ b/src/urlencoded_form.cpp @@ -80,11 +80,9 @@ class urlencoded_form::body capy::io_task<> write(capy::any_buffer_sink& sink) const { - auto [ec, n] = - co_await sink.write(capy::make_buffer(std::string_view(body_))); - if(ec) - co_return { ec }; - co_return {}; + auto [ec, n] = co_await sink.write_eof( + capy::make_buffer(std::string_view(body_))); + co_return { ec }; } }; diff --git a/test/unit/body_test.hpp b/test/unit/body_test.hpp index 52f7dca..ce490ad 100644 --- a/test/unit/body_test.hpp +++ b/test/unit/body_test.hpp @@ -45,24 +45,29 @@ check_body( co_return; BOOST_TEST_EQ(bs.data(), expected); + BOOST_TEST(bs.eof_called()); }); BOOST_TEST(r.success); } -inline std::error_code +inline void check_io_body( any_request_body const& body, - capy::test::buffer_sink& bs) + std::string_view expected) { corosio::io_context ioc; - std::error_code ret; + std::error_code ec; + capy::test::buffer_sink bs; capy::any_buffer_sink sink(&bs); capy::run_async( ioc.get_executor(), - [&](capy::io_result<> res) {ret = res.ec; }) + [&](capy::io_result<> res) {ec = res.ec; }) (body.write(sink)); ioc.run(); - return ret; + + BOOST_TEST(!ec); + BOOST_TEST_EQ(bs.data(), expected); + BOOST_TEST(bs.eof_called()); } } // namespace burl diff --git a/test/unit/detail/serializer.cpp b/test/unit/detail/serializer.cpp new file mode 100644 index 0000000..3d2caf4 --- /dev/null +++ b/test/unit/detail/serializer.cpp @@ -0,0 +1,876 @@ +// +// Copyright (c) 2026 Mohammad Nejati +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/burl +// + +// Test that header file is self-contained. +#include "src/detail/serializer.hpp" + +#include + +#include +#include +#include +#include +#include + +#include "test_suite.hpp" + +namespace boost +{ +namespace burl +{ +namespace detail +{ + +class serializer_test +{ + // A scaled-down config so that framing thresholds (staged + // vs. direct writes, commit-triggered flushes) are crossed + // with tiny bodies, and so tests keep exercising the same + // paths if the default config values change. + static constexpr serializer::config cfg{ + .buffer_size = 64, + .min_prepare = 32, + .min_direct = 16 }; + + static http::request + make_request() + { + http::request req; + req.set_chunked(true); + return req; + } + + static http::request + make_request(std::size_t cl) + { + http::request req; + req.set_content_length(cl); + return req; + } + +public: + void + testContentLengthSmallBody() + { + auto req = make_request(5); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + BOOST_TEST(!sr.is_done()); + + auto [ec1, n] = co_await sr.write( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n, 5u); + BOOST_TEST(!sr.is_done()); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + BOOST_TEST(sr.is_done()); + }()); + + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + "hello"); + } + + void + testContentLengthLargeBody() + { + // A body at the direct-write threshold bypasses staging. + std::string const body(cfg.min_direct, 'x'); + + auto req = make_request(body.size()); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec1, n] = co_await sr.write( + capy::make_buffer(body)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n, body.size()); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + }()); + + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + body); + } + + void + testContentLengthMultipleBuffers() + { + std::string const b1(4, 'a'); + std::string const b2(4, 'b'); + + auto req = make_request(b1.size() + b2.size()); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + std::array bufs{ + capy::make_buffer(b1), + capy::make_buffer(b2) }; + + auto [ec, n] = co_await sr.write_eof(bufs); + BOOST_TEST(!ec); + BOOST_TEST_EQ(n, b1.size() + b2.size()); + BOOST_TEST(sr.is_done()); + }()); + + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + b1 + b2); + } + + void + testBodySizeMismatch() + { + // fewer bytes than Content-Length + { + auto req = make_request(10); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec1, n] = co_await sr.write( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n, 5u); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(ec2 == error::body_size_mismatch); + BOOST_TEST(!sr.is_done()); + }()); + + // Nothing reaches the wire when the mismatch is + // detected before the header is flushed. + BOOST_TEST(server.data().empty()); + } + + // more bytes than Content-Length + { + auto req = make_request(3); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec, n] = co_await sr.write_eof( + capy::const_buffer("hello", 5)); + BOOST_TEST(ec == error::body_size_mismatch); + BOOST_TEST(!sr.is_done()); + }()); + + BOOST_TEST(server.data().empty()); + } + } + + void + testChunkedSmallBodyConvertsToContentLength() + { + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + // The body stays below min_direct, so it is fully + // buffered before the header goes out. + auto [ec1, n] = co_await sr.write( + capy::const_buffer("hello world", 11)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n, 11u); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + }()); + + // The entire body was buffered before the header was + // flushed, so chunked encoding is replaced with + // Content-Length and the body is sent unframed. + BOOST_TEST(!req.chunked()); + BOOST_TEST_EQ(req.payload_size(), 11u); + BOOST_TEST( + server.data().find("Transfer-Encoding") == + std::string_view::npos); + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + "hello world"); + } + + void + testChunkedWriteEofWithBody() + { + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec, n] = co_await sr.write_eof( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec); + BOOST_TEST_EQ(n, 5u); + }()); + + BOOST_TEST(!req.chunked()); + BOOST_TEST_EQ(req.payload_size(), 5u); + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + "hello"); + } + + void + testChunkedLargeBody() + { + std::string const body(cfg.min_direct, 'x'); + + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec1, n1] = co_await sr.write( + capy::make_buffer(body)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n1, body.size()); + + auto [ec2, n2] = co_await sr.write( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec2); + BOOST_TEST_EQ(n2, 5u); + BOOST_TEST(!sr.is_done()); + + auto [ec3] = co_await sr.write_eof(); + BOOST_TEST(!ec3); + BOOST_TEST(sr.is_done()); + }()); + + // The large write is gathered with the header and sent + // as its own chunk; the small write is buffered and + // flushed at eof as a chunk. Chunk sizes are always + // minimal-width; 16 == 0x10. + std::string expected(req.buffer()); + expected += "10\r\n" + body + "\r\n"; + expected += "5\r\nhello\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST(req.chunked()); + BOOST_TEST_EQ(server.data(), expected); + } + + void + testChunkedWriteEofWithTail() + { + // Once the header is on the wire, write_eof() with + // caller buffers must frame the staged bytes and the + // caller's bytes as the final chunk and terminate the + // body with the last-chunk in the same gather. + std::string const body(cfg.min_direct, 'x'); + + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec1, n1] = co_await sr.write( + capy::make_buffer(body)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n1, body.size()); + + auto [ec2, n2] = co_await sr.write( + capy::const_buffer("abc", 3)); + BOOST_TEST(!ec2); + BOOST_TEST_EQ(n2, 3u); + + auto [ec3, n3] = co_await sr.write_eof( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec3); + BOOST_TEST_EQ(n3, 5u); + }()); + + std::string expected(req.buffer()); + expected += "10\r\n" + body + "\r\n"; + expected += "8\r\nabchello\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST(req.chunked()); + BOOST_TEST_EQ(server.data(), expected); + } + + void + testChunkedWriteEofTailOnly() + { + // write_eof() with caller buffers while no bytes are + // staged must still terminate the body with the + // last-chunk after the final chunk. + std::string const body(cfg.min_direct, 'x'); + + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + auto [ec1, n1] = co_await sr.write( + capy::make_buffer(body)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n1, body.size()); + + auto [ec2, n2] = co_await sr.write_eof( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec2); + BOOST_TEST_EQ(n2, 5u); + }()); + + std::string expected(req.buffer()); + expected += "10\r\n" + body + "\r\n"; + expected += "5\r\nhello\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST(req.chunked()); + BOOST_TEST_EQ(server.data(), expected); + } + + void + testChunkedEmptyBody() + { + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req); + BOOST_TEST(!sr.is_done()); + + auto [ec] = co_await sr.write_eof(); + BOOST_TEST(!ec); + BOOST_TEST(sr.is_done()); + }()); + + BOOST_TEST(!req.chunked()); + BOOST_TEST( + server.data().find("Content-Length: 0\r\n") != + std::string_view::npos); + BOOST_TEST_EQ(server.data(), req.buffer()); + } + + void + testPrepareCommitContentLength() + { + auto req = make_request(5); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), + capy::const_buffer("hello", 5)); + BOOST_TEST_EQ(n, 5u); + + auto [ec] = co_await sr.commit_eof(n); + BOOST_TEST(!ec); + BOOST_TEST(sr.is_done()); + }()); + + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + "hello"); + } + + void + testPrepareCommitPartialFlush() + { + // Committing enough data to drop the remaining capacity + // below min_prepare triggers a partial flush. + std::string const body(60, 'z'); + + auto req = make_request(body.size()); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), capy::make_buffer(body)); + BOOST_TEST_EQ(n, body.size()); + + auto [ec1] = co_await sr.commit(n); + BOOST_TEST(!ec1); + BOOST_TEST(!sr.is_done()); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + BOOST_TEST(sr.is_done()); + }()); + + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + body); + } + + void + testPrepareCommitChunked() + { + // A drain while chunked emits the buffered bytes as a + // single chunk with a minimal-width size; 60 == 0x3C. + std::string const body(60, 'z'); + + auto req = make_request(); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), capy::make_buffer(body)); + BOOST_TEST_EQ(n, body.size()); + + auto [ec1] = co_await sr.commit(n); + BOOST_TEST(!ec1); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + }()); + + std::string expected(req.buffer()); + expected += "3C\r\n" + body + "\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST(req.chunked()); + BOOST_TEST_EQ(server.data(), expected); + } + + void + testPrepareCommitNoFlush() + { + // Committing a small amount keeps the remaining + // capacity above min_prepare, so commit() completes + // without flushing anything to the stream. + auto req = make_request(5); + auto [client, server] = capy::test::make_stream_pair(); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), + capy::const_buffer("hello", 5)); + BOOST_TEST_EQ(n, 5u); + + auto [ec1] = co_await sr.commit(n); + BOOST_TEST(!ec1); + BOOST_TEST(server.data().empty()); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + }()); + + BOOST_TEST_EQ( + server.data(), + std::string(req.buffer()) + "hello"); + } + + void + testPrepareEmptyDest() + { + // prepare() with no destination buffers reports no + // writable space. + auto req = make_request(5); + auto [client, server] = capy::test::make_stream_pair(); + + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + BOOST_TEST(sr.prepare({}).empty()); + + // The staging buffer is untouched and remains fully + // available. + capy::mutable_buffer tmp[2]; + auto dest = sr.prepare(tmp); + BOOST_TEST_EQ(dest.size(), 1u); + BOOST_TEST_EQ(capy::buffer_size(dest), cfg.buffer_size); + } + + void + testPrepareFullBuffer() + { + // A commit that fills the staging buffer triggers a + // drain. When the drain fails, the buffered bytes are + // kept; prepare() must report no writable space rather + // than hand out a zero-sized buffer. + capy::test::fuse f; + auto r = f.armed([&](capy::test::fuse&) -> capy::task<> + { + std::string const body(cfg.buffer_size, 'z'); + + auto req = make_request(body.size()); + capy::test::write_stream ws_impl(f); + + capy::any_write_stream ws(&ws_impl); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), capy::make_buffer(body)); + BOOST_TEST_EQ(n, body.size()); + + if(auto [ec] = co_await sr.commit(n); ec) + { + BOOST_TEST(sr.prepare(tmp).empty()); + co_return; + } + + // The successful drain empties the staging buffer, + // so the full capacity is writable again. + auto dest = sr.prepare(tmp); + BOOST_TEST_EQ(dest.size(), 1u); + BOOST_TEST_EQ( + capy::buffer_size(dest), cfg.buffer_size); + }); + BOOST_TEST(r.success); + } + + void + testChunkedShortWrites() + { + // The staging buffer holds raw body bytes only, so the + // full capacity is stageable as one chunk; 64 == 0x40. + std::string const body(cfg.buffer_size, 'z'); + + auto req = make_request(); + capy::test::write_stream ws_impl({}, 1); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&ws_impl); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), capy::make_buffer(body)); + BOOST_TEST_EQ(n, body.size()); + + auto [ec1] = co_await sr.commit(n); + BOOST_TEST(!ec1); + + // The drain writes the header and the staged chunk + // to completion, even over a stream that transfers + // one byte per write; the chunk's closing CRLF is + // deferred to the next write. + BOOST_TEST_EQ(ws_impl.data(), + std::string(req.buffer()) + + "40\r\n" + body); + + auto [ec2, m] = co_await sr.write( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec2); + BOOST_TEST_EQ(m, 5u); + + auto [ec3] = co_await sr.write_eof(); + BOOST_TEST(!ec3); + }()); + + std::string expected(req.buffer()); + expected += "40\r\n" + body + "\r\n"; + expected += "5\r\nhello\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST(req.chunked()); + BOOST_TEST_EQ(ws_impl.data(), expected); + } + + void + testContentLengthShortWrites() + { + // A sized body streamed through the direct path must + // arrive intact over a stream that transfers one byte + // per write: bytes already on the wire may not be sent + // again, and every call must consume at least one byte + // of the caller's buffer. + std::string const b1(cfg.min_direct, 'x'); + std::string const b2(cfg.min_direct, 'y'); + + auto req = make_request(b1.size() + b2.size()); + capy::test::write_stream ws_impl({}, 1); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&ws_impl); + serializer sr(ws, req, cfg); + + // The first drain writes the header in full and + // exactly one body byte. + auto [ec1, n1] = co_await sr.write_some( + capy::make_buffer(b1)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n1, 1u); + BOOST_TEST_EQ( + ws_impl.data(), + std::string(req.buffer()) + "x"); + + // The rest of the first buffer is below min_direct + // and is staged without touching the wire. + auto [ec2, n2] = co_await sr.write( + capy::const_buffer(b1.data() + 1, b1.size() - 1)); + BOOST_TEST(!ec2); + BOOST_TEST_EQ(n2, b1.size() - 1); + BOOST_TEST_EQ( + ws_impl.data().size(), + req.buffer().size() + 1); + + // Even when a full staged prefix must be flushed + // first, at least one byte of the caller's buffer + // is consumed. + auto [ec3, n3] = co_await sr.write_some( + capy::make_buffer(b2)); + BOOST_TEST(!ec3); + BOOST_TEST_EQ(n3, 1u); + BOOST_TEST_EQ( + ws_impl.data(), + std::string(req.buffer()) + b1 + "y"); + + auto [ec4, n4] = co_await sr.write( + capy::const_buffer(b2.data() + 1, b2.size() - 1)); + BOOST_TEST(!ec4); + BOOST_TEST_EQ(n4, b2.size() - 1); + + auto [ec5] = co_await sr.write_eof(); + BOOST_TEST(!ec5); + }()); + + BOOST_TEST_EQ( + ws_impl.data(), + std::string(req.buffer()) + b1 + b2); + } + + void + testPrepareCommitShortWrites() + { + // A commit-triggered drain has no caller buffers; the + // header and staged bytes are written to completion + // over a stream that transfers one byte per write. + std::string const body(60, 'z'); + + auto req = make_request(body.size()); + capy::test::write_stream ws_impl({}, 1); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&ws_impl); + serializer sr(ws, req, cfg); + + capy::mutable_buffer tmp[2]; + auto n = capy::buffer_copy( + sr.prepare(tmp), capy::make_buffer(body)); + BOOST_TEST_EQ(n, body.size()); + + auto [ec1] = co_await sr.commit(n); + BOOST_TEST(!ec1); + BOOST_TEST_EQ( + ws_impl.data(), + std::string(req.buffer()) + body); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + }()); + + BOOST_TEST_EQ( + ws_impl.data(), + std::string(req.buffer()) + body); + } + + void + testShortWriteEofDoesNotTruncate() + { + // Everything due at eof is written to completion, even + // over a stream that transfers one byte per write; the + // body must not be silently dropped after the header. + auto req = make_request(); + capy::test::write_stream ws_impl({}, 1); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&ws_impl); + serializer sr(ws, req, cfg); + + auto [ec1, n] = co_await sr.write( + capy::const_buffer("hello", 5)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n, 5u); + + auto [ec2] = co_await sr.write_eof(); + BOOST_TEST(!ec2); + }()); + + // The body was fully buffered before the header went + // out, so the request converts to Content-Length. + BOOST_TEST(!req.chunked()); + BOOST_TEST_EQ( + ws_impl.data(), + std::string(req.buffer()) + "hello"); + } + + void + testChunkedPreservesWriteOrder() + { + // A large write arriving while a small write is still + // staged must not overtake it: both leave in a single + // gather as one chunk with the staged bytes first, even + // when the stream accepts one byte at a time; + // 3 + 16 == 0x13. + std::string const body(cfg.min_direct, 'x'); + + auto req = make_request(); + capy::test::write_stream ws_impl({}, 1); + + capy::test::run_blocking()([&]() -> capy::task<> + { + capy::any_write_stream ws(&ws_impl); + serializer sr(ws, req, cfg); + + auto [ec1, n1] = co_await sr.write( + capy::const_buffer("abc", 3)); + BOOST_TEST(!ec1); + BOOST_TEST_EQ(n1, 3u); + + auto [ec2, n2] = co_await sr.write( + capy::make_buffer(body)); + BOOST_TEST(!ec2); + BOOST_TEST_EQ(n2, body.size()); + + auto [ec3] = co_await sr.write_eof(); + BOOST_TEST(!ec3); + }()); + + std::string expected(req.buffer()); + expected += "13\r\nabc" + body + "\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST(req.chunked()); + BOOST_TEST_EQ(ws_impl.data(), expected); + } + + void + testErrorInjection() + { + capy::test::fuse f; + auto r = f.armed([&](capy::test::fuse&) -> capy::task<> + { + std::string const body(cfg.min_direct, 'x'); + + auto req = make_request(); + + auto [client, server] = + capy::test::make_stream_pair(f); + + capy::any_write_stream ws(&client); + serializer sr(ws, req, cfg); + + if(auto [ec, n] = co_await sr.write( + capy::make_buffer(body)); ec) + { + BOOST_TEST(!sr.is_done()); + co_return; + } + + if(auto [ec, n] = co_await sr.write( + capy::const_buffer("hello", 5)); ec) + { + BOOST_TEST(!sr.is_done()); + co_return; + } + + if(auto [ec] = co_await sr.write_eof(); ec) + { + BOOST_TEST(!sr.is_done()); + co_return; + } + + BOOST_TEST(sr.is_done()); + + std::string expected(req.buffer()); + expected += "10\r\n" + body + "\r\n"; + expected += "5\r\nhello\r\n"; + expected += "0\r\n\r\n"; + BOOST_TEST_EQ(server.data(), expected); + }); + BOOST_TEST(r.success); + } + + void + run() + { + testContentLengthSmallBody(); + testContentLengthLargeBody(); + testContentLengthMultipleBuffers(); + testBodySizeMismatch(); + testChunkedSmallBodyConvertsToContentLength(); + testChunkedWriteEofWithBody(); + testChunkedLargeBody(); + testChunkedWriteEofWithTail(); + testChunkedWriteEofTailOnly(); + testChunkedEmptyBody(); + testPrepareCommitContentLength(); + testPrepareCommitPartialFlush(); + testPrepareCommitChunked(); + testPrepareCommitNoFlush(); + testPrepareEmptyDest(); + testPrepareFullBuffer(); + testChunkedShortWrites(); + testContentLengthShortWrites(); + testPrepareCommitShortWrites(); + testShortWriteEofDoesNotTruncate(); + testChunkedPreservesWriteOrder(); + testErrorInjection(); + } +}; + +TEST_SUITE(serializer_test, "boost.burl.detail.serializer"); + +} // namespace detail +} // namespace burl +} // namespace boost diff --git a/test/unit/error.cpp b/test/unit/error.cpp index 7f0ab4b..f5f1489 100644 --- a/test/unit/error.cpp +++ b/test/unit/error.cpp @@ -59,6 +59,9 @@ struct error_test BOOST_TEST_EQ( msg(error::proxy_unsupported_version), "unsupported proxy protocol version"); + BOOST_TEST_EQ( + msg(error::body_size_mismatch), + "request body size did not match content length"); BOOST_TEST_EQ( std::error_code(9999, burl_category()).message(), diff --git a/test/unit/file.cpp b/test/unit/file.cpp index c1cafc6..a938bb2 100644 --- a/test/unit/file.cpp +++ b/test/unit/file.cpp @@ -56,10 +56,7 @@ class file_test BOOST_TEST(cl.has_value()); BOOST_TEST_EQ(cl.value(), contents.size()); - capy::test::buffer_sink bs; - auto ec = check_io_body(body, bs); - BOOST_TEST(!ec); - BOOST_TEST_EQ(bs.data(), contents); + check_io_body(body, contents); } void diff --git a/test/unit/multipart_form.cpp b/test/unit/multipart_form.cpp index d4fef11..72cfce3 100644 --- a/test/unit/multipart_form.cpp +++ b/test/unit/multipart_form.cpp @@ -90,10 +90,7 @@ struct multipart_form_test BOOST_TEST(cl.has_value()); BOOST_TEST_EQ(cl.value(), expected.size()); - capy::test::buffer_sink bs; - auto ec = check_io_body(body, bs); - BOOST_TEST(!ec); - BOOST_TEST_EQ(bs.data(), expected); + check_io_body(body, expected); } void