#!/usr/bin/env python
import base64
import io
import cgi
import collections
from collections.abc import Mapping
import hmac
import mimetypes
import re
import os
import secrets
from typing import Any, Optional, overload
import itsdangerous
import requests
def fail(status: str, reason: str) -> None:
print(f"Status: {status}")
print("Content-Type: text/html")
print(f"X-Reason: {reason}")
print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html")
print("")
exit(0)
HONEYPOT_FIELD_NAME = "addressline1"
SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "")
request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/")
serializer = itsdangerous.URLSafeSerializer("secret key", "salt")
session = requests.Session()
cookies = dict[str, str]()
for entry in os.environ.get("HTTP_COOKIE", "").split(";"):
name, *value = entry.lstrip(" ").split("=", 1)
cookies[name] = value[0] if len(value) == 1 else ""
# Get the CSRF token. This is pass along according to the double-submit
# technique[1]. First as a hidden form input. Second as a cookie, which is
# signed on the server, so we can verify its authenticity. The latter is also
# used for subsequent requests so every session only gets one token.
# 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
if "__Host-csrftoken" in cookies:
# We already have a token for this client - try and extract it.
signed_csrf_token = cookies["__Host-csrftoken"]
try:
csrf_token = serializer.loads(signed_csrf_token)
except:
# The token wasn't valid, so we need to generate a new one.
csrf_token = secrets.token_urlsafe(32)
else:
csrf_token = secrets.token_urlsafe(32)
# Re-sign the token, so it's fresh and the timeout doesn't waive.
signed_csrf_token = serializer.dumps(csrf_token)
match os.environ.get("REQUEST_METHOD", "").upper():
case "GET":
# For GET requests, serve the form that the user requested. The CSRF
# token will be added here as well.
form_disabled = request_uri.startswith("/computer-beantragen/privat")
print(f"Status: {200 if not form_disabled else 503}")
print(f"Content-Type: text/html")
print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly")
print("")
with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template:
for line in template.readlines():
if (
not form_disabled
# This is a very rudimentary check to ensure that we
# actually place the token *inside* the form. It requires
# adding a comment somewhere in the form that
# will be replaced with the hidden fields and hCaptcha.
and re.match(r"", line, re.IGNORECASE) is not None
):
print(f'')
print(f'')
print(f'')
print('')
print(f'
')
print(f'')
print(f'')
print(f'
')
else:
print(line)
exit(0)
case "POST":
# This is the main case where we handle the submitted form. This is
# continued below.
pass
case _:
# This case should never actually happen because lighttpd filters out
# requests accordingly.
fail("405 Method Not Allowed", "Method Not Allowed")
form = cgi.FieldStorage()
@overload
def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ...
@overload
def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:...
@overload
def get_form_value(name: str, default: None = ..., cast: type[bytes] = ...) -> tuple[str, bytes]:...
def get_form_value(
name: str,
default: Any = None,
cast: type[str] | type[int] | type[io.BytesIO] = str,
) -> Any:
if name not in form:
if default is None:
fail("400 Bad Request", f"Missing required field: {name}")
else:
return default
if cast is bytes:
value_object = form[name]
if (
isinstance(value_object, list)
or not value_object.file
):
fail("400 Bad Request", f"Invalid value for field: {name}")
return (value_object.filename or "upload"), value_object.file.read()
else:
try:
return cast(form.getfirst(name))
except (TypeError, ValueError):
fail("400 Bad Request", f"Invalid value for field: {name}")
# Make sure the provided CSRF token is actually valid. Note the usage of the
# constant-time string comparison here.
given_csrf_token = get_form_value("csrftoken")
if not hmac.compare_digest(csrf_token, given_csrf_token):
fail("400 Bad Request", f"Invalid CSRF token")
# If the honeypot field was not empty, back off.
if get_form_value(HONEYPOT_FIELD_NAME, ""):
fail("200 OK", f"Invalid value for field: {HONEYPOT_FIELD_NAME}")
if not (hcaptcha_token := get_form_value("h-captcha-response", "")):
fail("200 OK", "Empty hCaptcha token")
response = session.post("https://hcaptcha.com/siteverify", data={
"secret": os.environ.get("HCAPTCHA_SECRET_KEY", ""),
"response": hcaptcha_token,
})
hcaptcha_data = response.json()
if not isinstance(hcaptcha_data, Mapping) or not hcaptcha_data.get("success", False):
fail("200 OK", "hCaptcha fail")
# Extract all the actually provided form data. This is different from form to
# form (see the match block below).
contact_name = get_form_value("contactname")
contact_email = get_form_value("contactemail")
message = get_form_value("message")
attachment: Optional[tuple[str, bytes]] = None
ticket_details = collections.OrderedDict()
ticket_details["Kontaktperson"] = contact_name
ticket_details["Email"] = contact_email
form_group = "csw-Allgemein"
match request_uri:
case "/kontakt":
form_name = "Allgemein"
case "/kontakt/problem":
form_name = "Problem-Support"
case "/spenden":
form_name = "Geldspende"
case "/mitmachen":
form_name = "Mitgliederwerbung"
case "/computer-beantragen/organisation":
form_name = "Computerantrag (Organisation)"
form_group = "csw-Anfragen"
ticket_details["Organisation"] = get_form_value("organization")
ticket_details["Adresse"] = get_form_value("addressline")
ticket_details["PLZ"] = get_form_value("postalcode")
ticket_details["Stadt"] = get_form_value("city")
ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int)
ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int)
ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int)
case "/computer-beantragen/privat":
fail("503 Service Unavailable", "Form disabled")
form_name = "Computerantrag (privat)"
form_group = "csw-Anfragen"
ticket_details["Adresse"] = get_form_value("addressline")
ticket_details["PLZ"] = get_form_value("postalcode")
ticket_details["Stadt"] = get_form_value("city")
attachment = get_form_value("document", cast=bytes)
# Note: the actual form contains a checkbox for whether the user has
# read the guidelines, but we don't actually bother checking that here.
case "/hardware-spenden/organisation":
form_name = "Hardwarespende (Organisation)"
form_group = "csw-Hardwarespenden"
ticket_details["Organisation"] = get_form_value("organization")
attachment = get_form_value("inventory", cast=bytes)
case "/hardware-spenden/privat/laptop":
form_name = "Laptopspende (privat)"
form_group = "csw-Hardwarespenden"
ticket_details["Gerätedetails"] = get_form_value("device")
# This is only available on the CoderDojo site and will be blocked on the
# server side in other cases.
case "/anmelden":
form_name = "CoderDojo-Anmeldung"
form_group = "CoderDojo"
ticket_details["Anmelde-Typ"] = get_form_value("mode")
ticket_details["Teilnehmenden-Name"] = get_form_value("participantname", "-")
ticket_details["Telefonnummer"] = get_form_value("contactphone")
ticket_details["Fotos?"] = get_form_value("photos")
case _:
# This case should never actually happen because lighttpd filters out
# requests accordingly.
fail("400 Bad Request", "Invalid form")
ticket_details["Kontaktformular"] = form_name
# Allow the form group to be overriden using the environment variable (for
# testing).
form_group = os.environ.get("ZAMMAD_GROUP", "") or form_group
ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/")
ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "")
session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}")
try:
# Add the actual ticket to the system.
response = session.post(
f"{ZAMMAD_URL}/api/v1/tickets",
json=dict(
title=f"Kontaktformular {contact_name} – {form_name}",
group=form_group,
customer_id=f"guess:{contact_email}",
article=dict(
type="web",
internal=True,
content_type="text/plain",
subject=f"Kontaktformular-Anfrage {contact_name}",
body=message,
attachments=[] if attachment is None else [{
"filename": attachment[0],
"data": base64.b64encode(attachment[1]).decode(),
"mime-type": mimetypes.guess_type(attachment[0])[0] or "text/plain",
}],
),
),
)
response.raise_for_status()
ticket_id = response.json()["id"]
assert isinstance(ticket_id, int)
# Add a second article to the ticket that contains all the other information
# from the contact form.
response = session.post(
f"{ZAMMAD_URL}/api/v1/ticket_articles",
json=dict(
ticket_id=ticket_id,
type="note",
content_type="text/plain",
internal=True,
body="\n".join(
f"{key}: {value}" for key, value in ticket_details.items()
)
)
)
response.raise_for_status()
# Add a tag to the ticket, denoting which contact form it came from.
response = session.post(
f"{ZAMMAD_URL}/api/v1/tags/add",
json=dict(
object="Ticket",
o_id=ticket_id,
item=f"Kontaktformular – {form_name}",
)
)
response.raise_for_status()
print("Status: 302 Found")
print("Content-Type: text/html")
print("Location: /kontakt/fertig")
print("")
except Exception as e:
fail("500 Internal Server Error", str(e))