Files
Oleks 4c7dcd9279
Some checks failed
ci/woodpecker/tag/woodpecker Pipeline failed
alertmanager-gotify-bridge: initial commit with bridge and CI
2026-03-16 17:26:06 +02:00

87 lines
2.7 KiB
Python

#!/usr/bin/env python3
"""Alertmanager webhook receiver that forwards alerts to Gotify."""
import json
import os
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.request import Request, urlopen
from urllib.error import URLError
GOTIFY_URL = os.environ["GOTIFY_URL"]
DISASTER_TOKEN = os.environ["DISASTER_TOKEN"]
WARNING_TOKEN = os.environ["WARNING_TOKEN"]
DISASTER_PRIORITY = int(os.environ.get("DISASTER_PRIORITY", "8"))
WARNING_PRIORITY = int(os.environ.get("WARNING_PRIORITY", "4"))
SEVERITY_MAP = {
"critical": (DISASTER_TOKEN, DISASTER_PRIORITY),
"warning": (WARNING_TOKEN, WARNING_PRIORITY),
}
def send_gotify(token, title, message, priority):
data = json.dumps({
"title": title,
"message": message,
"priority": priority,
}).encode()
req = Request(
f"{GOTIFY_URL}/message?token={token}",
data=data,
headers={"Content-Type": "application/json"},
)
try:
urlopen(req)
except URLError as e:
print(f"ERROR sending to Gotify: {e}", file=sys.stderr)
class Handler(BaseHTTPRequestHandler):
def do_POST(self):
try:
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length))
except (json.JSONDecodeError, ValueError) as e:
self.send_response(400)
self.end_headers()
self.wfile.write(f"Bad request: {e}".encode())
return
for alert in body.get("alerts", []):
severity = alert.get("labels", {}).get("severity", "warning")
token, priority = SEVERITY_MAP.get(severity, SEVERITY_MAP["warning"])
status = alert.get("status", "firing")
alertname = alert.get("labels", {}).get("alertname", "Unknown")
summary = alert.get("annotations", {}).get("summary", "")
prefix = "RESOLVED" if status == "resolved" else "FIRING"
title = f"[{prefix}] {alertname}"
message = summary or f"{alertname} is {status}"
send_gotify(token, title, message, priority)
print(f"Forwarded: {title} -> severity={severity} priority={priority}")
self.send_response(200)
self.end_headers()
self.wfile.write(b"ok")
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.end_headers()
self.wfile.write(b"ok")
return
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
print(format % args)
if __name__ == "__main__":
port = int(os.environ.get("PORT", "8080"))
server = HTTPServer(("0.0.0.0", port), Handler)
print(f"Bridge listening on :{port}")
server.serve_forever()