mirror of
https://codeberg.org/angestoepselt/homepage.git
synced 2025-05-24 14:46:16 +00:00
254 lines
7.9 KiB
Python
Executable file
254 lines
7.9 KiB
Python
Executable file
#!/usr/bin/env python
|
||
|
||
import base64
|
||
import io
|
||
import cgi
|
||
import collections
|
||
import hmac
|
||
import mimetypes
|
||
import os
|
||
import secrets
|
||
from typing import Any, Optional, overload, IO
|
||
|
||
import itsdangerous
|
||
import requests
|
||
|
||
|
||
def fail(status: str, reason: str) -> None:
|
||
print(f"Status: {status}")
|
||
print("Content-Type: text/html")
|
||
print(f"X-Reason: {reason}")
|
||
print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html")
|
||
print("")
|
||
exit(0)
|
||
|
||
|
||
SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "")
|
||
request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/")
|
||
serializer = itsdangerous.URLSafeSerializer("secret key", "salt")
|
||
|
||
|
||
cookies = dict[str, str]()
|
||
for entry in os.environ.get("HTTP_COOKIE", "").split(";"):
|
||
name, *value = entry.lstrip(" ").split("=", 1)
|
||
cookies[name] = value[0] if len(value) == 1 else ""
|
||
|
||
|
||
# Get the CSRF token. This is pass along according to the double-submit
|
||
# technique[1]. First as a hidden form input. Second as a cookie, which is
|
||
# signed on the server, so we can verify its authenticity. The latter is also
|
||
# used for subsequent requests so every session only gets one token.
|
||
# 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
|
||
if "__Host-csrftoken" in cookies:
|
||
# We already have a token for this client - try and extract it.
|
||
signed_csrf_token = cookies["__Host-csrftoken"]
|
||
try:
|
||
csrf_token = serializer.loads(signed_csrf_token)
|
||
except:
|
||
# The token wasn't valid, so we need to generate a new one.
|
||
csrf_token = secrets.token_urlsafe(32)
|
||
else:
|
||
csrf_token = secrets.token_urlsafe(32)
|
||
# Re-sign the token so it's fresh and the timeout doesn't waive.
|
||
signed_csrf_token = serializer.dumps(csrf_token)
|
||
|
||
|
||
match os.environ.get("REQUEST_METHOD", "").upper():
|
||
case "GET":
|
||
# For GET requests, serve the form that the user requested. The CSRF
|
||
# token will be added here as well.
|
||
print("Status: 200")
|
||
print("Content-Type: text/html")
|
||
print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly")
|
||
print("")
|
||
|
||
with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template:
|
||
for line in template.readlines():
|
||
print(line)
|
||
# This is a very rudimentary check to ensure that we actually
|
||
# place the token *inside* the form. It assumes that there is
|
||
# a) only one form on the site and
|
||
# b) the <form> tag doesn't end on the same line.
|
||
if "<form" in line.lower():
|
||
print(f'<input type="hidden" name="csrftoken" value="{csrf_token}" />')
|
||
|
||
exit(0)
|
||
|
||
case "POST":
|
||
# This is the main case where we handle the submitted form. This is
|
||
# continued below.
|
||
pass
|
||
|
||
case _:
|
||
# This case should never actually happen because lighttpd filters out
|
||
# requests accordingly.
|
||
fail("405 Method Not Allowed", "Method Not Allowed")
|
||
|
||
|
||
form = cgi.FieldStorage()
|
||
|
||
@overload
|
||
def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ...
|
||
@overload
|
||
def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:...
|
||
@overload
|
||
def get_form_value(name: str, default: None = ..., cast: type[bytes] = ...) -> tuple[str, bytes]:...
|
||
def get_form_value(
|
||
name: str,
|
||
default: Any = None,
|
||
cast: type[str] | type[int] | type[io.BytesIO] = str,
|
||
) -> Any:
|
||
if name not in form:
|
||
if default is None:
|
||
fail("400 Bad Request", f"Missing required field: {name}")
|
||
else:
|
||
return default
|
||
|
||
if cast is bytes:
|
||
value_object = form[name]
|
||
if (
|
||
isinstance(value_object, list)
|
||
or not value_object.file
|
||
):
|
||
fail("400 Bad Request", f"Invalid value for field: {name}")
|
||
return (value_object.filename or "upload"), value_object.file.read()
|
||
else:
|
||
try:
|
||
return cast(form.getfirst(name))
|
||
except (TypeError, ValueError):
|
||
fail("400 Bad Request", f"Invalid value for field: {name}")
|
||
|
||
|
||
# Make sure the provided CSRF token is actually valid. Note the usage of the
|
||
# constant-time string comparison here.
|
||
given_csrf_token = get_form_value("csrftoken")
|
||
if not hmac.compare_digest(csrf_token, given_csrf_token):
|
||
fail("400 Bad Request", f"Invalid CSRF token")
|
||
|
||
|
||
# Extract all the actually provided form data. This is different from form to
|
||
# form (see the match block below).
|
||
contact_name = get_form_value("contactname")
|
||
contact_email = get_form_value("contactemail")
|
||
message = get_form_value("message")
|
||
attachment: Optional[tuple[str, bytes]] = None
|
||
|
||
ticket_details = collections.OrderedDict()
|
||
ticket_details["Kontaktperson"] = contact_name
|
||
ticket_details["Email"] = contact_email
|
||
|
||
match request_uri:
|
||
case "/kontakt":
|
||
form_name = "Allgemein"
|
||
|
||
case "/kontakt/problem":
|
||
form_name = "Problem-Support"
|
||
|
||
case "/spenden":
|
||
form_name = "Geldspende"
|
||
|
||
case "/mitmachen":
|
||
form_name = "Mitgliederwerbung"
|
||
|
||
case "/computer-beantragen/organisation":
|
||
form_name = "Computerantrag (Organisation)"
|
||
ticket_details["Organisation"] = get_form_value("organization")
|
||
ticket_details["Adresse"] = get_form_value("addressline")
|
||
ticket_details["PLZ"] = get_form_value("postalcode")
|
||
ticket_details["Stadt"] = get_form_value("city")
|
||
ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int)
|
||
ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int)
|
||
ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int)
|
||
|
||
case "/computer-beantragen/privat":
|
||
form_name = "Computerantrag (privat)"
|
||
ticket_details["Adresse"] = get_form_value("addressline")
|
||
ticket_details["PLZ"] = get_form_value("postalcode")
|
||
ticket_details["Stadt"] = get_form_value("city")
|
||
attachment = get_form_value("document")
|
||
|
||
# Note: the actual form contains a checkbox for whether the user has
|
||
# read the guidelines, but we don't actually bother checking that here.
|
||
|
||
case "/hardware-spenden/organisation":
|
||
form_name = "Hardwarespende (Organisation)"
|
||
ticket_details["Organisation"] = get_form_value("organization")
|
||
attachment = get_form_value("inventory", cast=bytes)
|
||
|
||
case "/hardware-spenden/privat/laptop":
|
||
form_name = "Laptopspende (privat)"
|
||
ticket_details["Gerätedetails"] = get_form_value("device")
|
||
|
||
case _:
|
||
# This case should never actually happen because lighttpd filters out
|
||
# requests accordingly.
|
||
fail("400 Bad Request", "Invalid form")
|
||
|
||
ticket_details["Kontaktformular"] = form_name
|
||
|
||
|
||
ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/")
|
||
ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "")
|
||
ZAMMAD_GROUP = os.environ.get("ZAMMAD_GROUP", "")
|
||
session = requests.Session()
|
||
session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}")
|
||
|
||
try:
|
||
# Add the actual ticket to the system.
|
||
response = session.post(
|
||
f"{ZAMMAD_URL}/api/v1/tickets",
|
||
json=dict(
|
||
title=f"Kontaktformular {contact_name} – {form_name}",
|
||
group=ZAMMAD_GROUP,
|
||
customer_id=f"guess:{contact_email}",
|
||
article=dict(
|
||
type="web",
|
||
content_type="text/plain",
|
||
subject=f"Kontaktformular-Anfrage {contact_name}",
|
||
body=message,
|
||
attachments=[] if attachment is None else [{
|
||
"filename": attachment[0],
|
||
"data": base64.b64encode(attachment[1]).decode(),
|
||
"mime-type": mimetypes.guess_type(attachment[0])[0] or "text/plain",
|
||
}],
|
||
),
|
||
),
|
||
)
|
||
response.raise_for_status()
|
||
ticket_id = response.json()["id"]
|
||
assert isinstance(ticket_id, int)
|
||
|
||
# Add a second article to the ticket that contains all the other information
|
||
# from the contact form.
|
||
response = session.post(
|
||
f"{ZAMMAD_URL}/api/v1/ticket_articles",
|
||
json=dict(
|
||
ticket_id=ticket_id,
|
||
type="note",
|
||
content_type="text/plain",
|
||
internal=True,
|
||
body="\n".join(
|
||
f"{key}: {value}" for key, value in ticket_details.items()
|
||
)
|
||
)
|
||
)
|
||
response.raise_for_status()
|
||
|
||
# Add a tag to the ticket, denoting which contact form it came from.
|
||
response = session.post(
|
||
f"{ZAMMAD_URL}/api/v1/tags/add",
|
||
json=dict(
|
||
object="Ticket",
|
||
o_id=ticket_id,
|
||
item=f"Kontaktformular – {form_name}",
|
||
)
|
||
)
|
||
response.raise_for_status()
|
||
|
||
print("Status: 302 Found")
|
||
print("Content-Type: text/html")
|
||
print("Location: /kontakt/fertig")
|
||
print("")
|
||
except Exception as e:
|
||
fail("500 Internal Server Error", str(e))
|
||
|