Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/paperscout/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pathlib import Path
from typing import Any, cast

from .bolt_server import create_bolt_http_server
from .config import settings
from .db import init_db, init_pool, pool_status
from .health import start_health_server
Expand Down Expand Up @@ -168,6 +169,7 @@ async def _async_main() -> None:
shutdown_event = asyncio.Event()
shutdown_reason: list[str | None] = [None]
health_server = None
bolt_server = None
bolt_thread = None
mq = None
app = None
Expand Down Expand Up @@ -285,9 +287,10 @@ def _extra_health_fields() -> dict[str, Any]:
extra_fields_fn=_extra_health_fields,
)
log.info("Starting Slack Bolt app on port %d", settings.port)
bolt_server = create_bolt_http_server(app, settings.port)
bolt_thread = threading.Thread(
target=app.start,
kwargs={"port": settings.port},
target=bolt_server.serve_forever,
kwargs={"poll_interval": 0.05},
daemon=True,
name="bolt",
)
Expand All @@ -304,7 +307,7 @@ def _extra_health_fields() -> dict[str, Any]:
health_thread=(
getattr(health_server, "_paperscout_thread", None) if health_server else None
),
app=app,
bolt_server=bolt_server,
bolt_thread=bolt_thread,
mq_drain_timeout=settings.shutdown_mq_drain_timeout_seconds,
thread_join_timeout=settings.shutdown_thread_join_timeout_seconds,
Expand Down
78 changes: 78 additions & 0 deletions src/paperscout/bolt_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Slack Bolt HTTP server using public ``App.dispatch()`` only.

Mirrors slack-bolt's built-in development server without accessing private
``_development_server`` internals. Requires slack-bolt >= 1.28.0
(``App.dispatch``, ``BoltRequest``, ``BoltResponse``).
"""

from __future__ import annotations

import json
import logging
from collections.abc import Sequence
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any

from slack_bolt import App
from slack_bolt.request import BoltRequest
from slack_bolt.response import BoltResponse


def create_bolt_http_server(
app: App,
port: int,
path: str = "/slack/events",
bind_host: str = "0.0.0.0",
) -> HTTPServer:
"""Create an ``HTTPServer`` that dispatches Slack events via ``app.dispatch()``."""
bolt_path = path
bolt_app = app

class _BoltHandler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args: Any) -> None:
if bolt_app.logger.isEnabledFor(logging.DEBUG):
bolt_app.logger.debug("bolt: %s", format % args)

def do_POST(self) -> None:
request_path, _, query = self.path.partition("?")
if request_path != bolt_path:
self._send_response(404, headers={})
return

len_header = self.headers.get("Content-Length") or 0
request_body = self.rfile.read(int(len_header)).decode("utf-8")
bolt_req = BoltRequest(
body=request_body,
query=query,
headers=self.headers,
)
bolt_resp = bolt_app.dispatch(bolt_req)
self._send_bolt_response(bolt_resp)

def do_GET(self) -> None:
self._send_response(404, headers={})

def _send_bolt_response(self, bolt_resp: BoltResponse) -> None:
self._send_response(
status=bolt_resp.status,
headers=bolt_resp.headers,
body=bolt_resp.body,
)

def _send_response(
self,
status: int,
headers: dict[str, Sequence[str]],
body: str | dict = "",
) -> None:
self.send_response(status)
response_body = body if isinstance(body, str) else json.dumps(body)
body_bytes = response_body.encode("utf-8")
for key, values in headers.items():
for value in values:
self.send_header(key, value)
self.send_header("Content-Length", str(len(body_bytes)))
self.end_headers()
self.wfile.write(body_bytes)

return HTTPServer((bind_host, port), _BoltHandler)
2 changes: 2 additions & 0 deletions src/paperscout/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def init_pool(dsn: str, minconn: int = 1, maxconn: int = 10) -> pg_pool.Threaded
def pool_status(p: pg_pool.ThreadedConnectionPool) -> dict[str, Any]:
"""Report pool reachability via documented ``getconn``/``putconn`` only.

Requires psycopg2-binary >= 2.9.12 (``ThreadedConnectionPool`` public API).

Borrows and immediately returns one connection as a liveness probe.
Does not read undocumented pool attributes or private internals.
"""
Expand Down
27 changes: 5 additions & 22 deletions src/paperscout/shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,12 @@
from http.server import HTTPServer

from psycopg2 import pool as pg_pool
from slack_bolt import App

from .scout import MessageQueue

log = logging.getLogger("paperscout")


def stop_bolt_server(app: App) -> None:
"""Stop Slack Bolt HTTP dev server started via ``app.start()``.

Bolt has no public graceful-shutdown API for the dev server; this uses the
private ``_development_server._server`` handle (slack-bolt pinned in uv.lock).
"""
dev = getattr(app, "_development_server", None)
if dev is None:
return
server = getattr(dev, "_server", None)
if server is not None:
try:
server.shutdown()
except Exception:
log.exception("shutdown: bolt server shutdown failed")


def _join_thread(thread: threading.Thread | None, timeout: float, label: str) -> None:
"""Wait for *thread* to finish; log a warning if it exceeds *timeout*."""
if thread is None or not thread.is_alive():
Expand All @@ -46,7 +28,7 @@ def shutdown_services(
mq: MessageQueue | None,
health_server: HTTPServer | None,
health_thread: threading.Thread | None,
app: App | None,
bolt_server: HTTPServer | None,
bolt_thread: threading.Thread | None,
mq_drain_timeout: float,
thread_join_timeout: float,
Expand All @@ -70,11 +52,12 @@ def shutdown_services(
except Exception:
log.exception("shutdown: health thread join failed")

if app is not None:
if bolt_server is not None:
try:
stop_bolt_server(app)
bolt_server.shutdown()
bolt_server.server_close()
except Exception:
log.exception("shutdown: bolt server stop failed")
log.exception("shutdown: bolt server shutdown failed")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
try:
_join_thread(bolt_thread, thread_join_timeout, "bolt")
except Exception:
Expand Down
97 changes: 97 additions & 0 deletions tests/test_bolt_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Tests for paperscout.bolt_server."""

from __future__ import annotations

import json
import socket
import threading
import urllib.error
import urllib.request
from unittest.mock import MagicMock

import pytest
from slack_bolt.response import BoltResponse

from paperscout.bolt_server import create_bolt_http_server


def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]


@pytest.fixture()
def bolt_server_url():
app = MagicMock()
app.logger.isEnabledFor.return_value = False
app.dispatch.return_value = BoltResponse(
status=200,
body="ok",
headers={"Content-Type": ["text/plain"]},
)
port = _find_free_port()
server = create_bolt_http_server(app, port, bind_host="127.0.0.1")
thread = threading.Thread(
target=server.serve_forever,
kwargs={"poll_interval": 0.05},
daemon=True,
)
thread.start()
yield f"http://127.0.0.1:{port}", app
server.shutdown()
server.server_close()
thread.join()
assert not thread.is_alive()


class TestBoltHttpServer:
def test_post_slack_events_dispatches_to_app(self, bolt_server_url):
base_url, app = bolt_server_url
req = urllib.request.Request(
f"{base_url}/slack/events",
data=b'{"type":"url_verification"}',
method="POST",
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req)
assert resp.status == 200
assert resp.read() == b"ok"
app.dispatch.assert_called_once()
bolt_req = app.dispatch.call_args[0][0]
assert bolt_req.body == {"type": "url_verification"}

def test_wrong_path_returns_404(self, bolt_server_url):
base_url, app = bolt_server_url
req = urllib.request.Request(
f"{base_url}/wrong",
data=b"{}",
method="POST",
headers={"Content-Type": "application/json"},
)
with pytest.raises(urllib.error.HTTPError) as exc_info:
urllib.request.urlopen(req)
assert exc_info.value.code == 404
app.dispatch.assert_not_called()

def test_get_returns_404(self, bolt_server_url):
base_url, app = bolt_server_url
with pytest.raises(urllib.error.HTTPError) as exc_info:
urllib.request.urlopen(f"{base_url}/slack/events")
assert exc_info.value.code == 404
app.dispatch.assert_not_called()

def test_dispatch_json_body_response(self, bolt_server_url):
base_url, app = bolt_server_url
app.dispatch.return_value = BoltResponse(
status=200,
body={"challenge": "abc"},
headers={},
)
req = urllib.request.Request(
f"{base_url}/slack/events",
data=b"{}",
method="POST",
)
resp = urllib.request.urlopen(req)
assert json.loads(resp.read()) == {"challenge": "abc"}
12 changes: 12 additions & 0 deletions tests/test_main_health_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from unittest.mock import MagicMock

from paperscout.__main__ import _merge_extra_health_fields, _mq_health_fields
from paperscout.db import pool_status
from paperscout.scout import MessageQueue


Expand Down Expand Up @@ -71,3 +72,14 @@ def test_merge_includes_allowlisted_mq_fields():
assert out["mq_max_size"] == 1000
assert out["mq_utilization"] == 0.002
assert out["mq_circuit_state"] == "closed"


def test_merge_includes_db_pool_from_pool_status():
pool = MagicMock()
conn = MagicMock()
pool.getconn.return_value = conn
scheduler = {"last_updated": None, "poll_count": 0}
out = _merge_extra_health_fields(scheduler, {}, pool_status(pool))
assert out["db_pool"] == {"reachable": True}
pool.getconn.assert_called_once()
pool.putconn.assert_called_once_with(conn)
35 changes: 21 additions & 14 deletions tests/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from http.server import HTTPServer
from unittest.mock import MagicMock

from paperscout.shutdown import shutdown_services, stop_bolt_server
from paperscout.shutdown import shutdown_services


class TestShutdownServices:
Expand All @@ -19,7 +19,7 @@ def test_shutdown_services_drains_mq_and_logs(self, caplog):
mq=mq,
health_server=None,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
Expand All @@ -34,18 +34,25 @@ def test_shutdown_services_skips_none_handles(self):
mq=None,
health_server=None,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
)

def test_stop_bolt_server_calls_shutdown(self):
app = MagicMock()
server = MagicMock()
app._development_server = MagicMock(_server=server)
stop_bolt_server(app)
server.shutdown.assert_called_once()
def test_shutdown_services_stops_bolt_server(self):
bolt_server = MagicMock(spec=HTTPServer)
shutdown_services(
reason="SIGINT",
mq=None,
health_server=None,
health_thread=None,
bolt_server=bolt_server,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
)
bolt_server.shutdown.assert_called_once()

def test_shutdown_services_stops_health_server(self):
health_server = MagicMock(spec=HTTPServer)
Expand All @@ -54,7 +61,7 @@ def test_shutdown_services_stops_health_server(self):
mq=None,
health_server=health_server,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
Expand All @@ -71,7 +78,7 @@ def test_shutdown_services_continues_after_mq_drain_failure(self, caplog):
mq=mq,
health_server=health_server,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
Expand All @@ -88,7 +95,7 @@ def test_shutdown_services_closes_pool(self, caplog):
mq=None,
health_server=None,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
Expand All @@ -108,7 +115,7 @@ def test_shutdown_services_pool_close_raises_continues(self, caplog):
mq=mq,
health_server=None,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
Expand All @@ -123,7 +130,7 @@ def test_shutdown_services_skips_pool_when_none(self):
mq=None,
health_server=None,
health_thread=None,
app=None,
bolt_server=None,
bolt_thread=None,
mq_drain_timeout=30.0,
thread_join_timeout=5.0,
Expand Down