mirror of
https://codeberg.org/angestoepselt/homepage.git
synced 2025-05-24 14:46:16 +00:00
338 lines
11 KiB
Python
Executable file
338 lines
11 KiB
Python
Executable file
#!/usr/bin/env python
|
||
|
||
import base64
|
||
import io
|
||
import cgi
|
||
import collections
|
||
from collections.abc import Mapping
|
||
import hmac
|
||
import mimetypes
|
||
import re
|
||
import os
|
||
import secrets
|
||
import json
|
||
from typing import Any, Optional, overload
|
||
|
||
import itsdangerous
|
||
import requests
|
||
|
||
|
||
def fail(status: str, reason: str) -> None:
|
||
print(f"Status: {status}")
|
||
print("Content-Type: text/html")
|
||
print(f"X-Reason: {reason}")
|
||
print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html")
|
||
print("")
|
||
exit(0)
|
||
|
||
|
||
try:
|
||
with open("/config.json", "r") as config_file:
|
||
CONFIG = json.load(config_file)
|
||
except IOError:
|
||
CONFIG = {}
|
||
|
||
|
||
HONEYPOT_FIELD_NAME = "addressline1"
|
||
|
||
SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "")
|
||
request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/")
|
||
serializer = itsdangerous.URLSafeSerializer("secret key", "salt")
|
||
session = requests.Session()
|
||
|
||
cookies = dict[str, str]()
|
||
for entry in os.environ.get("HTTP_COOKIE", "").split(";"):
|
||
name, *value = entry.lstrip(" ").split("=", 1)
|
||
cookies[name] = value[0] if len(value) == 1 else ""
|
||
|
||
|
||
# Get the CSRF token. This is pass along according to the double-submit
|
||
# technique[1]. First as a hidden form input. Second as a cookie, which is
|
||
# signed on the server, so we can verify its authenticity. The latter is also
|
||
# used for subsequent requests so every session only gets one token.
|
||
# 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
|
||
if "__Host-csrftoken" in cookies:
|
||
# We already have a token for this client - try and extract it.
|
||
signed_csrf_token = cookies["__Host-csrftoken"]
|
||
try:
|
||
csrf_token = serializer.loads(signed_csrf_token)
|
||
except:
|
||
# The token wasn't valid, so we need to generate a new one.
|
||
csrf_token = secrets.token_urlsafe(32)
|
||
else:
|
||
csrf_token = secrets.token_urlsafe(32)
|
||
# Re-sign the token, so it's fresh and the timeout doesn't waive.
|
||
signed_csrf_token = serializer.dumps(csrf_token)
|
||
|
||
form_disabled = request_uri.startswith("/anmelden")
|
||
if request_uri.startswith("/computer-beantragen/privat") and CONFIG.get("applicationsClosed"):
|
||
form_disabled = True
|
||
|
||
match os.environ.get("REQUEST_METHOD", "").upper():
|
||
case "GET":
|
||
# For GET requests, serve the form that the user requested. The CSRF
|
||
# token will be added here as well.
|
||
|
||
print(f"Status: {200 if not form_disabled else 503}")
|
||
print(f"Content-Type: text/html")
|
||
print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly")
|
||
print("")
|
||
|
||
with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template:
|
||
for line in template.readlines():
|
||
if (
|
||
not form_disabled
|
||
# This is a very rudimentary check to ensure that we
|
||
# actually place the token *inside* the form. It requires
|
||
# adding a <!-- FORM --> comment somewhere in the form that
|
||
# will be replaced with the hidden fields and hCaptcha.
|
||
and re.match(r"<!--\s*form\s*-->", line, re.IGNORECASE) is not None
|
||
):
|
||
print(f'<input type="hidden" name="csrftoken" value="{csrf_token}" />')
|
||
|
||
print(f'<label class="form-input">')
|
||
print(f'<span>Bitte lasse dieses Feld leer:</span>')
|
||
print(f'<input type="text" name="{HONEYPOT_FIELD_NAME}" value="" placeholder="Hier nichts eingeben." tabindex="-1" autocomplete="off" />')
|
||
print(f'</label>')
|
||
|
||
print(f'<script type="text/javascript">')
|
||
print(f'document.querySelector("input[name={HONEYPOT_FIELD_NAME}]").parentNode.classList.add("isolated")')
|
||
print(f'</script>')
|
||
|
||
print('<script src="https://js.hcaptcha.com/1/api.js" async defer></script>')
|
||
print(f'<div class="form-input">')
|
||
print(f'<div aria-hidden="true"></div>')
|
||
print(f'<div class="h-captcha" data-sitekey="{os.environ.get("HCAPTCHA_SITE_KEY", "")}"></div>')
|
||
print(f'</div>')
|
||
else:
|
||
print(line)
|
||
|
||
exit(0)
|
||
|
||
case "POST":
|
||
# This is the main case where we handle the submitted form. This is
|
||
# continued below.
|
||
pass
|
||
|
||
case _:
|
||
# This case should never actually happen because lighttpd filters out
|
||
# reprquests accordingly.
|
||
fail("405 Method Not Allowed", "Method Not Allowed")
|
||
|
||
if form_disabled:
|
||
fail("503 Service Unavailable", "Form is disabled")
|
||
|
||
form = cgi.FieldStorage()
|
||
|
||
@overload
|
||
def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ...
|
||
@overload
|
||
def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:...
|
||
@overload
|
||
def get_form_value(name: str, default: None = ..., cast: type[bytes] = ...) -> tuple[str, bytes]:...
|
||
def get_form_value(
|
||
name: str,
|
||
default: Any = None,
|
||
cast: type[str] | type[int] | type[io.BytesIO] = str,
|
||
) -> Any:
|
||
if name not in form:
|
||
if default is None:
|
||
fail("400 Bad Request", f"Missing required field: {name}")
|
||
else:
|
||
return default
|
||
|
||
if cast is bytes:
|
||
value_object = form[name]
|
||
if (
|
||
isinstance(value_object, list)
|
||
or not value_object.file
|
||
):
|
||
fail("400 Bad Request", f"Invalid value for field: {name}")
|
||
return (value_object.filename or "upload"), value_object.file.read()
|
||
else:
|
||
try:
|
||
return cast(form.getfirst(name))
|
||
except (TypeError, ValueError):
|
||
fail("400 Bad Request", f"Invalid value for field: {name}")
|
||
|
||
|
||
# Make sure the provided CSRF token is actually valid. Note the usage of the
|
||
# constant-time string comparison here.
|
||
given_csrf_token = get_form_value("csrftoken")
|
||
if not hmac.compare_digest(csrf_token, given_csrf_token):
|
||
fail("400 Bad Request", f"Invalid CSRF token")
|
||
|
||
|
||
# If the honeypot field was not empty, back off.
|
||
if get_form_value(HONEYPOT_FIELD_NAME, ""):
|
||
fail("200 OK", f"Invalid value for field: {HONEYPOT_FIELD_NAME}")
|
||
|
||
if not (hcaptcha_token := get_form_value("h-captcha-response", "")):
|
||
fail("200 OK", "Empty hCaptcha token")
|
||
response = session.post("https://hcaptcha.com/siteverify", data={
|
||
"secret": os.environ.get("HCAPTCHA_SECRET_KEY", ""),
|
||
"response": hcaptcha_token,
|
||
})
|
||
hcaptcha_data = response.json()
|
||
if not isinstance(hcaptcha_data, Mapping) or not hcaptcha_data.get("success", False):
|
||
fail("200 OK", "hCaptcha fail")
|
||
|
||
# Extract all the actually provided form data. This is different from form to
|
||
# form (see the match block below).
|
||
contact_name = get_form_value("contactname")
|
||
contact_email = get_form_value("contactemail")
|
||
message = get_form_value("message", "[Keine Nachricht hinterlassen]")
|
||
attachment: Optional[tuple[str, bytes]] = None
|
||
|
||
ticket_details = collections.OrderedDict()
|
||
ticket_details["Kontaktperson"] = contact_name
|
||
ticket_details["Email"] = contact_email
|
||
|
||
form_group = "csw-Allgemein"
|
||
|
||
match request_uri:
|
||
case "/kontakt":
|
||
form_name = "Allgemein"
|
||
|
||
case "/kontakt/problem":
|
||
form_name = "Problem-Support"
|
||
|
||
case "/spenden":
|
||
form_name = "Geldspende"
|
||
|
||
case "/mitmachen":
|
||
form_name = "Mitgliederwerbung"
|
||
|
||
case "/computer-beantragen/organisation":
|
||
form_name = "Computerantrag (Organisation)"
|
||
form_group = "csw-Anfragen"
|
||
ticket_details["Organisation"] = get_form_value("organization")
|
||
ticket_details["Adresse"] = get_form_value("addressline")
|
||
ticket_details["PLZ"] = get_form_value("postalcode")
|
||
ticket_details["Stadt"] = get_form_value("city")
|
||
ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int)
|
||
ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int)
|
||
ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int)
|
||
|
||
case "/computer-beantragen/privat":
|
||
form_name = "Computerantrag (privat)"
|
||
form_group = "csw-Anfragen"
|
||
ticket_details["Gewünschte Hardware"] = get_form_value("hardware", default="Unbekannt")
|
||
ticket_details["Adresse"] = get_form_value("addressline")
|
||
ticket_details["PLZ"] = get_form_value("postalcode")
|
||
ticket_details["Stadt"] = get_form_value("city")
|
||
attachment = get_form_value("document", cast=bytes)
|
||
|
||
# Note: the actual form contains a checkbox for whether the user has
|
||
# read the guidelines, but we don't actually bother checking that here.
|
||
|
||
case "/hardware-spenden/organisation":
|
||
form_name = "Hardwarespende (Organisation)"
|
||
form_group = "csw-Hardwarespenden"
|
||
ticket_details["Organisation"] = get_form_value("organization")
|
||
attachment = get_form_value("inventory", cast=bytes)
|
||
|
||
case "/hardware-spenden/privat/laptop":
|
||
form_name = "Laptopspende (privat)"
|
||
form_group = "csw-Hardwarespenden"
|
||
ticket_details["Gerätedetails"] = get_form_value("device")
|
||
|
||
# This is only available on the CoderDojo site and will be blocked on the
|
||
# server side in other cases.
|
||
case "/anmelden":
|
||
form_name = "CoderDojo-Anmeldung"
|
||
form_group = "CoderDojo"
|
||
ticket_details["Anmelde-Typ"] = get_form_value("mode")
|
||
ticket_details["Teilnehmenden-Name"] = get_form_value("participantname", "-")
|
||
ticket_details["Telefonnummer"] = get_form_value("contactphone", "-")
|
||
ticket_details["Fotos?"] = get_form_value("photos")
|
||
|
||
case "/party":
|
||
form_name = "CoderDojo Minecraft LAN"
|
||
form_group = "CoderDojo"
|
||
ticket_details["Java-Spielername"] = get_form_value("javaname", "")
|
||
ticket_details["Bedrock-Spielername"] = get_form_value("bedrockname", "")
|
||
|
||
case "/freizeit":
|
||
form_name = "CoderCamp Umfrage"
|
||
form_group = "CoderDojo"
|
||
ticket_details["Interesse (9-12)"] = get_form_value("interested-kids", "")
|
||
ticket_details["Interesse (12-15)"] = get_form_value("interested-teens", "")
|
||
ticket_details["Interesse (16+)"] = get_form_value("interested-youngadults", "")
|
||
ticket_details["Gewünschtes Format"] = get_form_value("format")
|
||
ticket_details["Übernachtung"] = get_form_value("overnight")
|
||
|
||
case _:
|
||
# This case should never actually happen because lighttpd filters out
|
||
# requests accordingly.
|
||
fail("400 Bad Request", "Invalid form")
|
||
|
||
ticket_details["Kontaktformular"] = form_name
|
||
|
||
# Allow the form group to be overriden using the environment variable (for
|
||
# testing).
|
||
form_group = os.environ.get("ZAMMAD_GROUP", "") or form_group
|
||
|
||
ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/")
|
||
ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "")
|
||
session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}")
|
||
|
||
try:
|
||
# Add the actual ticket to the system.
|
||
response = session.post(
|
||
f"{ZAMMAD_URL}/api/v1/tickets",
|
||
json=dict(
|
||
title=f"Kontaktformular {contact_name} – {form_name}",
|
||
group=form_group,
|
||
customer_id=f"guess:{contact_email}",
|
||
article=dict(
|
||
type="web",
|
||
internal=True,
|
||
content_type="text/plain",
|
||
subject=f"Kontaktformular-Anfrage {contact_name}",
|
||
body=message,
|
||
attachments=[] if attachment is None else [{
|
||
"filename": attachment[0],
|
||
"data": base64.b64encode(attachment[1]).decode(),
|
||
"mime-type": mimetypes.guess_type(attachment[0])[0] or "text/plain",
|
||
}],
|
||
),
|
||
),
|
||
)
|
||
response.raise_for_status()
|
||
ticket_id = response.json()["id"]
|
||
assert isinstance(ticket_id, int)
|
||
|
||
# Add a second article to the ticket that contains all the other information
|
||
# from the contact form.
|
||
response = session.post(
|
||
f"{ZAMMAD_URL}/api/v1/ticket_articles",
|
||
json=dict(
|
||
ticket_id=ticket_id,
|
||
type="note",
|
||
content_type="text/plain",
|
||
internal=True,
|
||
body="\n".join(
|
||
f"{key}: {value}" for key, value in ticket_details.items()
|
||
)
|
||
)
|
||
)
|
||
response.raise_for_status()
|
||
|
||
# Add a tag to the ticket, denoting which contact form it came from.
|
||
response = session.post(
|
||
f"{ZAMMAD_URL}/api/v1/tags/add",
|
||
json=dict(
|
||
object="Ticket",
|
||
o_id=ticket_id,
|
||
item=f"Kontaktformular – {form_name}",
|
||
)
|
||
)
|
||
response.raise_for_status()
|
||
|
||
print("Status: 302 Found")
|
||
print("Content-Type: text/html")
|
||
print("Location: /kontakt/fertig")
|
||
print("")
|
||
except Exception as e:
|
||
fail("500 Internal Server Error", str(e))
|