#!/usr/bin/env python
import base64
import io
import cgi
import collections
from collections.abc import Mapping
import hmac
import mimetypes
import re
import os
import secrets
import json
from typing import Any, Optional, overload
import itsdangerous
import requests
def fail(status: str, reason: str) -> None:
print(f"Status: {status}")
print("Content-Type: text/html")
print(f"X-Reason: {reason}")
print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html")
print("")
exit(0)
try:
with open("/config.json", "r") as config_file:
CONFIG = json.load(config_file)
except IOError:
CONFIG = {}
HONEYPOT_FIELD_NAME = "addressline1"
# This regex merely validates what the in-browser form validation already checks and
# isn't all too strict.
EMAIL_REGEX = re.compile(r"^[^ ]+@[^ ]+\.[^ ]+$")
SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "")
request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/")
serializer = itsdangerous.URLSafeSerializer("secret key", "salt")
session = requests.Session()
cookies = dict[str, str]()
for entry in os.environ.get("HTTP_COOKIE", "").split(";"):
name, *value = entry.lstrip(" ").split("=", 1)
cookies[name] = value[0] if len(value) == 1 else ""
# Get the CSRF token. This is pass along according to the double-submit
# technique[1]. First as a hidden form input. Second as a cookie, which is
# signed on the server, so we can verify its authenticity. The latter is also
# used for subsequent requests so every session only gets one token.
# 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
if "__Host-csrftoken" in cookies:
# We already have a token for this client - try and extract it.
signed_csrf_token = cookies["__Host-csrftoken"]
try:
csrf_token = serializer.loads(signed_csrf_token)
except:
# The token wasn't valid, so we need to generate a new one.
csrf_token = secrets.token_urlsafe(32)
else:
csrf_token = secrets.token_urlsafe(32)
# Re-sign the token, so it's fresh and the timeout doesn't waive.
signed_csrf_token = serializer.dumps(csrf_token)
form_disabled = False
if request_uri.startswith("/anmelden"):
#form_disabled = False
pass
if request_uri.startswith("/computer-beantragen/privat") and CONFIG.get("applicationsClosed"):
form_disabled = True
match os.environ.get("REQUEST_METHOD", "").upper():
case "GET":
# For GET requests, serve the form that the user requested. The CSRF
# token will be added here as well.
print(f"Status: {200 if not form_disabled else 503}")
print(f"Content-Type: text/html")
print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly")
print("")
with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template:
for line in template.readlines():
if (
not form_disabled
# This is a very rudimentary check to ensure that we
# actually place the token *inside* the form. It requires
# adding a comment somewhere in the form that
# will be replaced with the hidden fields and hCaptcha.
and re.match(r"", line, re.IGNORECASE) is not None
):
print(f'')
print(f'')
print(f'')
print('')
print(f'
')
print(f'')
print(f'')
print(f'
')
else:
print(line)
exit(0)
case "POST":
# This is the main case where we handle the submitted form. This is
# continued below.
pass
case _:
# This case should never actually happen because lighttpd filters out
# reprquests accordingly.
fail("405 Method Not Allowed", "Method Not Allowed")
if form_disabled:
fail("503 Service Unavailable", "Form is disabled")
form = cgi.FieldStorage()
@overload
def get_form_value(name: str, default: None = ..., *, cast: type[bytes]) -> tuple[str, bytes]:...
@overload
def get_form_value(name: str, default: Optional[int] = ..., *, cast: type[int]) -> int:...
@overload
def get_form_value(name: str, default: Optional[str] = ..., *, cast: type[str] = ...) -> str: ...
def get_form_value(
name: str,
default: Any = None,
*,
cast: type[str] | type[int] | type[bytes] = str,
) -> Any:
if name not in form:
if default is None:
fail("400 Bad Request", f"Missing required field: {name}")
else:
return default
if cast is bytes:
value_object = form[name]
if (
isinstance(value_object, list)
or not value_object.file
):
fail("400 Bad Request", f"Invalid value for field: {name}")
return (value_object.filename or "upload"), value_object.file.read()
else:
try:
result = cast(form.getfirst(name))
if isinstance(result, str):
result = result.strip()
return result
except (TypeError, ValueError):
fail("400 Bad Request", f"Invalid value for field: {name}")
# Make sure the provided CSRF token is actually valid. Note the usage of the
# constant-time string comparison here.
given_csrf_token = get_form_value("csrftoken")
if not hmac.compare_digest(csrf_token, given_csrf_token):
fail("400 Bad Request", "Invalid CSRF token")
# If the honeypot field was not empty, back off.
if get_form_value(HONEYPOT_FIELD_NAME, ""):
fail("200 OK", f"Invalid value for field: {HONEYPOT_FIELD_NAME}")
if not (hcaptcha_token := get_form_value("h-captcha-response", "")):
fail("200 OK", "Empty hCaptcha token")
response = session.post("https://hcaptcha.com/siteverify", data={
"secret": os.environ.get("HCAPTCHA_SECRET_KEY", ""),
"response": hcaptcha_token,
})
hcaptcha_data = response.json()
if not isinstance(hcaptcha_data, Mapping) or not hcaptcha_data.get("success", False):
fail("200 OK", "hCaptcha fail")
# Extract all the actually provided form data. This is different from form to
# form (see the match block below).
contact_name = get_form_value("contactname")
contact_names = contact_name.split(" ")
contact_email = get_form_value("contactemail")
if not EMAIL_REGEX.fullmatch(contact_email):
fail("400 Bad Request", "Invalid Email address")
message = get_form_value("message", "[Keine Nachricht hinterlassen]")
attachment: Optional[tuple[str, bytes]] = None
ticket_details = collections.OrderedDict[str, str | int]()
ticket_details["Kontaktperson"] = contact_name
ticket_details["Email"] = contact_email
form_group = "csw-Allgemein"
match request_uri:
case "/kontakt":
form_name = "Allgemein"
case "/kontakt/problem":
form_name = "Problem-Support"
case "/spenden":
form_name = "Geldspende"
case "/mitmachen":
form_name = "Mitgliederwerbung"
case "/computer-beantragen/organisation":
form_name = "Computerantrag (Organisation)"
form_group = "csw-Anfragen"
ticket_details["Organisation"] = get_form_value("organization")
ticket_details["Adresse"] = get_form_value("addressline")
ticket_details["PLZ"] = get_form_value("postalcode")
ticket_details["Stadt"] = get_form_value("city")
ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, cast=int)
ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, cast=int)
ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, cast=int)
case "/computer-beantragen/privat":
form_name = "Computerantrag (privat)"
form_group = "csw-Anfragen"
ticket_details["Gewünschte Hardware"] = get_form_value("hardware", default="Unbekannt")
ticket_details["Adresse"] = get_form_value("addressline")
ticket_details["PLZ"] = get_form_value("postalcode")
ticket_details["Stadt"] = get_form_value("city")
attachment = get_form_value("document", cast=bytes)
# Note: the actual form contains a checkbox for whether the user has
# read the guidelines, but we don't actually bother checking that here.
case "/hardware-spenden/organisation":
form_name = "Hardwarespende (Organisation)"
form_group = "csw-Hardwarespenden"
ticket_details["Organisation"] = get_form_value("organization")
attachment = get_form_value("inventory", cast=bytes)
case "/hardware-spenden/privat/laptop":
form_name = "Laptopspende (privat)"
form_group = "csw-Hardwarespenden"
ticket_details["Gerätedetails"] = get_form_value("device")
# This is only available on the CoderDojo site and will be blocked on the
# server side in other cases.
case "/anmelden":
form_name = "CoderDojo-Anmeldung"
form_group = "CoderDojo"
ticket_details["Anmelde-Typ"] = get_form_value("mode")
ticket_details["Teilnehmenden-Name"] = get_form_value("participantname", "-")
ticket_details["Telefonnummer"] = get_form_value("contactphone", "-")
ticket_details["Fotos?"] = get_form_value("photos")
case "/party":
form_name = "CoderDojo Minecraft LAN"
form_group = "CoderDojo"
ticket_details["Java-Spielername"] = get_form_value("javaname", "")
ticket_details["Bedrock-Spielername"] = get_form_value("bedrockname", "")
case "/freizeit":
form_name = "CoderCamp Umfrage"
form_group = "CoderDojo"
ticket_details["Interesse (9-12)"] = get_form_value("interested-kids", "")
ticket_details["Interesse (12-15)"] = get_form_value("interested-teens", "")
ticket_details["Interesse (16+)"] = get_form_value("interested-youngadults", "")
ticket_details["Gewünschtes Format"] = get_form_value("format")
ticket_details["Übernachtung"] = get_form_value("overnight")
case _:
# This case should never actually happen because lighttpd filters out
# requests accordingly.
fail("400 Bad Request", "Invalid form")
ticket_details["Kontaktformular"] = form_name
# Allow the form group to be overriden using the environment variable (for
# testing).
form_group = os.environ.get("ZAMMAD_GROUP", "") or form_group
ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/")
ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "")
session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}")
try:
# Create a new user for the client. For some reason, using "guess:{email}" as the
# customer_id when creating the ticket doesn't really work, as described in the
# Zammad documentation [1]. Instead, we sometimes need to explictily create the
# user beforehand. See this discussion [2] for more details.
# [1]: https://docs.zammad.org/en/latest/api/ticket/index.html#create
# [2]: https://codeberg.org/angestoepselt/homepage/issues/141
response = session.post(
f"{ZAMMAD_URL}/api/v1/users",
json=dict(
# Yes, yes... This goes against pretty much all best practices for parsing
# names. But: it's only internal and we save the name verbatim again below
# so we're going to go ahead and do it anyway.
firstname=" ".join(contact_names[:-1]) if len(contact_names) >= 2 else "?",
lastname=contact_names[-1],
email=contact_email,
)
)
if response.status_code == 422:
# This email address is already in use by another user.
customer_id = f"guess:{contact_email}"
else:
response.raise_for_status()
customer_id = response.json()["id"]
assert isinstance(customer_id, (str, int))
# Add the actual ticket to the system.
response = session.post(
f"{ZAMMAD_URL}/api/v1/tickets",
headers={
"X-On-Behalf-Of": contact_email,
},
json=dict(
title=f"Kontaktformular {contact_name} – {form_name}",
group=form_group,
customer_id=customer_id,
article=dict(
type="web",
internal=True,
content_type="text/plain",
subject=f"Kontaktformular-Anfrage {contact_name}",
body=message,
attachments=[] if attachment is None else [{
"filename": attachment[0],
"data": base64.b64encode(attachment[1]).decode(),
"mime-type": mimetypes.guess_type(attachment[0])[0] or "text/plain",
}],
),
),
)
response.raise_for_status()
ticket_id = response.json()["id"]
assert isinstance(ticket_id, int)
# Add a second article to the ticket that contains all the other information
# from the contact form.
response = session.post(
f"{ZAMMAD_URL}/api/v1/ticket_articles",
json=dict(
ticket_id=ticket_id,
type="note",
content_type="text/plain",
internal=True,
body="\n".join(
f"{key}: {value}" for key, value in ticket_details.items()
)
)
)
response.raise_for_status()
# Add a tag to the ticket, denoting which contact form it came from.
response = session.post(
f"{ZAMMAD_URL}/api/v1/tags/add",
json=dict(
object="Ticket",
o_id=ticket_id,
item=f"Kontaktformular – {form_name}",
)
)
response.raise_for_status()
print("Status: 302 Found")
print("Content-Type: text/html")
print("Location: /kontakt/fertig")
print("")
except Exception as e:
fail("500 Internal Server Error", str(e))