diff --git a/src/paperscout/__main__.py b/src/paperscout/__main__.py index fc6e7bd..52ad908 100644 --- a/src/paperscout/__main__.py +++ b/src/paperscout/__main__.py @@ -13,6 +13,7 @@ from pathlib import Path from typing import Any, cast +from .bolt_server import create_bolt_http_server from .config import settings from .db import init_db, init_pool, pool_status from .health import start_health_server @@ -168,6 +169,7 @@ async def _async_main() -> None: shutdown_event = asyncio.Event() shutdown_reason: list[str | None] = [None] health_server = None + bolt_server = None bolt_thread = None mq = None app = None @@ -285,9 +287,10 @@ def _extra_health_fields() -> dict[str, Any]: extra_fields_fn=_extra_health_fields, ) log.info("Starting Slack Bolt app on port %d", settings.port) + bolt_server = create_bolt_http_server(app, settings.port) bolt_thread = threading.Thread( - target=app.start, - kwargs={"port": settings.port}, + target=bolt_server.serve_forever, + kwargs={"poll_interval": 0.05}, daemon=True, name="bolt", ) @@ -304,7 +307,7 @@ def _extra_health_fields() -> dict[str, Any]: health_thread=( getattr(health_server, "_paperscout_thread", None) if health_server else None ), - app=app, + bolt_server=bolt_server, bolt_thread=bolt_thread, mq_drain_timeout=settings.shutdown_mq_drain_timeout_seconds, thread_join_timeout=settings.shutdown_thread_join_timeout_seconds, diff --git a/src/paperscout/bolt_server.py b/src/paperscout/bolt_server.py new file mode 100644 index 0000000..b2ebf3e --- /dev/null +++ b/src/paperscout/bolt_server.py @@ -0,0 +1,78 @@ +"""Slack Bolt HTTP server using public ``App.dispatch()`` only. + +Mirrors slack-bolt's built-in development server without accessing private +``_development_server`` internals. Requires slack-bolt >= 1.28.0 +(``App.dispatch``, ``BoltRequest``, ``BoltResponse``). +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Sequence +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Any + +from slack_bolt import App +from slack_bolt.request import BoltRequest +from slack_bolt.response import BoltResponse + + +def create_bolt_http_server( + app: App, + port: int, + path: str = "/slack/events", + bind_host: str = "0.0.0.0", +) -> HTTPServer: + """Create an ``HTTPServer`` that dispatches Slack events via ``app.dispatch()``.""" + bolt_path = path + bolt_app = app + + class _BoltHandler(BaseHTTPRequestHandler): + def log_message(self, format: str, *args: Any) -> None: + if bolt_app.logger.isEnabledFor(logging.DEBUG): + bolt_app.logger.debug("bolt: %s", format % args) + + def do_POST(self) -> None: + request_path, _, query = self.path.partition("?") + if request_path != bolt_path: + self._send_response(404, headers={}) + return + + len_header = self.headers.get("Content-Length") or 0 + request_body = self.rfile.read(int(len_header)).decode("utf-8") + bolt_req = BoltRequest( + body=request_body, + query=query, + headers=self.headers, + ) + bolt_resp = bolt_app.dispatch(bolt_req) + self._send_bolt_response(bolt_resp) + + def do_GET(self) -> None: + self._send_response(404, headers={}) + + def _send_bolt_response(self, bolt_resp: BoltResponse) -> None: + self._send_response( + status=bolt_resp.status, + headers=bolt_resp.headers, + body=bolt_resp.body, + ) + + def _send_response( + self, + status: int, + headers: dict[str, Sequence[str]], + body: str | dict = "", + ) -> None: + self.send_response(status) + response_body = body if isinstance(body, str) else json.dumps(body) + body_bytes = response_body.encode("utf-8") + for key, values in headers.items(): + for value in values: + self.send_header(key, value) + self.send_header("Content-Length", str(len(body_bytes))) + self.end_headers() + self.wfile.write(body_bytes) + + return HTTPServer((bind_host, port), _BoltHandler) diff --git a/src/paperscout/db.py b/src/paperscout/db.py index 91b9f91..bbe8f90 100644 --- a/src/paperscout/db.py +++ b/src/paperscout/db.py @@ -52,6 +52,8 @@ def init_pool(dsn: str, minconn: int = 1, maxconn: int = 10) -> pg_pool.Threaded def pool_status(p: pg_pool.ThreadedConnectionPool) -> dict[str, Any]: """Report pool reachability via documented ``getconn``/``putconn`` only. + Requires psycopg2-binary >= 2.9.12 (``ThreadedConnectionPool`` public API). + Borrows and immediately returns one connection as a liveness probe. Does not read undocumented pool attributes or private internals. """ diff --git a/src/paperscout/shutdown.py b/src/paperscout/shutdown.py index 7a7560f..69adadd 100644 --- a/src/paperscout/shutdown.py +++ b/src/paperscout/shutdown.py @@ -7,30 +7,12 @@ from http.server import HTTPServer from psycopg2 import pool as pg_pool -from slack_bolt import App from .scout import MessageQueue log = logging.getLogger("paperscout") -def stop_bolt_server(app: App) -> None: - """Stop Slack Bolt HTTP dev server started via ``app.start()``. - - Bolt has no public graceful-shutdown API for the dev server; this uses the - private ``_development_server._server`` handle (slack-bolt pinned in uv.lock). - """ - dev = getattr(app, "_development_server", None) - if dev is None: - return - server = getattr(dev, "_server", None) - if server is not None: - try: - server.shutdown() - except Exception: - log.exception("shutdown: bolt server shutdown failed") - - def _join_thread(thread: threading.Thread | None, timeout: float, label: str) -> None: """Wait for *thread* to finish; log a warning if it exceeds *timeout*.""" if thread is None or not thread.is_alive(): @@ -46,7 +28,7 @@ def shutdown_services( mq: MessageQueue | None, health_server: HTTPServer | None, health_thread: threading.Thread | None, - app: App | None, + bolt_server: HTTPServer | None, bolt_thread: threading.Thread | None, mq_drain_timeout: float, thread_join_timeout: float, @@ -70,11 +52,12 @@ def shutdown_services( except Exception: log.exception("shutdown: health thread join failed") - if app is not None: + if bolt_server is not None: try: - stop_bolt_server(app) + bolt_server.shutdown() + bolt_server.server_close() except Exception: - log.exception("shutdown: bolt server stop failed") + log.exception("shutdown: bolt server shutdown failed") try: _join_thread(bolt_thread, thread_join_timeout, "bolt") except Exception: diff --git a/tests/test_bolt_server.py b/tests/test_bolt_server.py new file mode 100644 index 0000000..e32559c --- /dev/null +++ b/tests/test_bolt_server.py @@ -0,0 +1,97 @@ +"""Tests for paperscout.bolt_server.""" + +from __future__ import annotations + +import json +import socket +import threading +import urllib.error +import urllib.request +from unittest.mock import MagicMock + +import pytest +from slack_bolt.response import BoltResponse + +from paperscout.bolt_server import create_bolt_http_server + + +def _find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +@pytest.fixture() +def bolt_server_url(): + app = MagicMock() + app.logger.isEnabledFor.return_value = False + app.dispatch.return_value = BoltResponse( + status=200, + body="ok", + headers={"Content-Type": ["text/plain"]}, + ) + port = _find_free_port() + server = create_bolt_http_server(app, port, bind_host="127.0.0.1") + thread = threading.Thread( + target=server.serve_forever, + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + thread.start() + yield f"http://127.0.0.1:{port}", app + server.shutdown() + server.server_close() + thread.join() + assert not thread.is_alive() + + +class TestBoltHttpServer: + def test_post_slack_events_dispatches_to_app(self, bolt_server_url): + base_url, app = bolt_server_url + req = urllib.request.Request( + f"{base_url}/slack/events", + data=b'{"type":"url_verification"}', + method="POST", + headers={"Content-Type": "application/json"}, + ) + resp = urllib.request.urlopen(req) + assert resp.status == 200 + assert resp.read() == b"ok" + app.dispatch.assert_called_once() + bolt_req = app.dispatch.call_args[0][0] + assert bolt_req.body == {"type": "url_verification"} + + def test_wrong_path_returns_404(self, bolt_server_url): + base_url, app = bolt_server_url + req = urllib.request.Request( + f"{base_url}/wrong", + data=b"{}", + method="POST", + headers={"Content-Type": "application/json"}, + ) + with pytest.raises(urllib.error.HTTPError) as exc_info: + urllib.request.urlopen(req) + assert exc_info.value.code == 404 + app.dispatch.assert_not_called() + + def test_get_returns_404(self, bolt_server_url): + base_url, app = bolt_server_url + with pytest.raises(urllib.error.HTTPError) as exc_info: + urllib.request.urlopen(f"{base_url}/slack/events") + assert exc_info.value.code == 404 + app.dispatch.assert_not_called() + + def test_dispatch_json_body_response(self, bolt_server_url): + base_url, app = bolt_server_url + app.dispatch.return_value = BoltResponse( + status=200, + body={"challenge": "abc"}, + headers={}, + ) + req = urllib.request.Request( + f"{base_url}/slack/events", + data=b"{}", + method="POST", + ) + resp = urllib.request.urlopen(req) + assert json.loads(resp.read()) == {"challenge": "abc"} diff --git a/tests/test_main_health_merge.py b/tests/test_main_health_merge.py index 4a3d69d..41190e9 100644 --- a/tests/test_main_health_merge.py +++ b/tests/test_main_health_merge.py @@ -6,6 +6,7 @@ from unittest.mock import MagicMock from paperscout.__main__ import _merge_extra_health_fields, _mq_health_fields +from paperscout.db import pool_status from paperscout.scout import MessageQueue @@ -71,3 +72,14 @@ def test_merge_includes_allowlisted_mq_fields(): assert out["mq_max_size"] == 1000 assert out["mq_utilization"] == 0.002 assert out["mq_circuit_state"] == "closed" + + +def test_merge_includes_db_pool_from_pool_status(): + pool = MagicMock() + conn = MagicMock() + pool.getconn.return_value = conn + scheduler = {"last_updated": None, "poll_count": 0} + out = _merge_extra_health_fields(scheduler, {}, pool_status(pool)) + assert out["db_pool"] == {"reachable": True} + pool.getconn.assert_called_once() + pool.putconn.assert_called_once_with(conn) diff --git a/tests/test_shutdown.py b/tests/test_shutdown.py index 838184a..98198ae 100644 --- a/tests/test_shutdown.py +++ b/tests/test_shutdown.py @@ -6,7 +6,7 @@ from http.server import HTTPServer from unittest.mock import MagicMock -from paperscout.shutdown import shutdown_services, stop_bolt_server +from paperscout.shutdown import shutdown_services class TestShutdownServices: @@ -19,7 +19,7 @@ def test_shutdown_services_drains_mq_and_logs(self, caplog): mq=mq, health_server=None, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0, @@ -34,18 +34,25 @@ def test_shutdown_services_skips_none_handles(self): mq=None, health_server=None, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0, ) - def test_stop_bolt_server_calls_shutdown(self): - app = MagicMock() - server = MagicMock() - app._development_server = MagicMock(_server=server) - stop_bolt_server(app) - server.shutdown.assert_called_once() + def test_shutdown_services_stops_bolt_server(self): + bolt_server = MagicMock(spec=HTTPServer) + shutdown_services( + reason="SIGINT", + mq=None, + health_server=None, + health_thread=None, + bolt_server=bolt_server, + bolt_thread=None, + mq_drain_timeout=30.0, + thread_join_timeout=5.0, + ) + bolt_server.shutdown.assert_called_once() def test_shutdown_services_stops_health_server(self): health_server = MagicMock(spec=HTTPServer) @@ -54,7 +61,7 @@ def test_shutdown_services_stops_health_server(self): mq=None, health_server=health_server, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0, @@ -71,7 +78,7 @@ def test_shutdown_services_continues_after_mq_drain_failure(self, caplog): mq=mq, health_server=health_server, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0, @@ -88,7 +95,7 @@ def test_shutdown_services_closes_pool(self, caplog): mq=None, health_server=None, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0, @@ -108,7 +115,7 @@ def test_shutdown_services_pool_close_raises_continues(self, caplog): mq=mq, health_server=None, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0, @@ -123,7 +130,7 @@ def test_shutdown_services_skips_pool_when_none(self): mq=None, health_server=None, health_thread=None, - app=None, + bolt_server=None, bolt_thread=None, mq_drain_timeout=30.0, thread_join_timeout=5.0,