homepage/cgi-bin/form.py

328 lines
11 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
import base64
import io
import cgi
import collections
from collections.abc import Mapping
import hmac
import mimetypes
import re
import os
import secrets
from typing import Any, Optional, overload
import itsdangerous
import requests
def fail(status: str, reason: str) -> None:
print(f"Status: {status}")
print("Content-Type: text/html")
print(f"X-Reason: {reason}")
print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html")
print("")
exit(0)
HONEYPOT_FIELD_NAME = "addressline1"
SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "")
request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/")
serializer = itsdangerous.URLSafeSerializer("secret key", "salt")
session = requests.Session()
cookies = dict[str, str]()
for entry in os.environ.get("HTTP_COOKIE", "").split(";"):
name, *value = entry.lstrip(" ").split("=", 1)
cookies[name] = value[0] if len(value) == 1 else ""
# Get the CSRF token. This is pass along according to the double-submit
# technique[1]. First as a hidden form input. Second as a cookie, which is
# signed on the server, so we can verify its authenticity. The latter is also
# used for subsequent requests so every session only gets one token.
# 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
if "__Host-csrftoken" in cookies:
# We already have a token for this client - try and extract it.
signed_csrf_token = cookies["__Host-csrftoken"]
try:
csrf_token = serializer.loads(signed_csrf_token)
except:
# The token wasn't valid, so we need to generate a new one.
csrf_token = secrets.token_urlsafe(32)
else:
csrf_token = secrets.token_urlsafe(32)
# Re-sign the token, so it's fresh and the timeout doesn't waive.
signed_csrf_token = serializer.dumps(csrf_token)
form_disabled = request_uri.startswith("/anmelden")
match os.environ.get("REQUEST_METHOD", "").upper():
case "GET":
# For GET requests, serve the form that the user requested. The CSRF
# token will be added here as well.
print(f"Status: {200 if not form_disabled else 503}")
print(f"Content-Type: text/html")
print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly")
print("")
with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template:
for line in template.readlines():
if (
not form_disabled
# This is a very rudimentary check to ensure that we
# actually place the token *inside* the form. It requires
# adding a <!-- FORM --> comment somewhere in the form that
# will be replaced with the hidden fields and hCaptcha.
and re.match(r"<!--\s*form\s*-->", line, re.IGNORECASE) is not None
):
print(f'<input type="hidden" name="csrftoken" value="{csrf_token}" />')
print(f'<label class="form-input">')
print(f'<span>Bitte lasse dieses Feld leer:</span>')
print(f'<input type="text" name="{HONEYPOT_FIELD_NAME}" value="" placeholder="Hier nichts eingeben." tabindex="-1" autocomplete="off" />')
print(f'</label>')
print(f'<script type="text/javascript">')
print(f'document.querySelector("input[name={HONEYPOT_FIELD_NAME}]").parentNode.classList.add("isolated")')
print(f'</script>')
print('<script src="https://js.hcaptcha.com/1/api.js" async defer></script>')
print(f'<div class="form-input">')
print(f'<div aria-hidden="true"></div>')
print(f'<div class="h-captcha" data-sitekey="{os.environ.get("HCAPTCHA_SITE_KEY", "")}"></div>')
print(f'</div>')
else:
print(line)
exit(0)
case "POST":
# This is the main case where we handle the submitted form. This is
# continued below.
pass
case _:
# This case should never actually happen because lighttpd filters out
# reprquests accordingly.
fail("405 Method Not Allowed", "Method Not Allowed")
if form_disabled:
fail("503 Service Unavailable", "Form is disabled")
form = cgi.FieldStorage()
@overload
def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ...
@overload
def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:...
@overload
def get_form_value(name: str, default: None = ..., cast: type[bytes] = ...) -> tuple[str, bytes]:...
def get_form_value(
name: str,
default: Any = None,
cast: type[str] | type[int] | type[io.BytesIO] = str,
) -> Any:
if name not in form:
if default is None:
fail("400 Bad Request", f"Missing required field: {name}")
else:
return default
if cast is bytes:
value_object = form[name]
if (
isinstance(value_object, list)
or not value_object.file
):
fail("400 Bad Request", f"Invalid value for field: {name}")
return (value_object.filename or "upload"), value_object.file.read()
else:
try:
return cast(form.getfirst(name))
except (TypeError, ValueError):
fail("400 Bad Request", f"Invalid value for field: {name}")
# Make sure the provided CSRF token is actually valid. Note the usage of the
# constant-time string comparison here.
given_csrf_token = get_form_value("csrftoken")
if not hmac.compare_digest(csrf_token, given_csrf_token):
fail("400 Bad Request", f"Invalid CSRF token")
# If the honeypot field was not empty, back off.
if get_form_value(HONEYPOT_FIELD_NAME, ""):
fail("200 OK", f"Invalid value for field: {HONEYPOT_FIELD_NAME}")
if not (hcaptcha_token := get_form_value("h-captcha-response", "")):
fail("200 OK", "Empty hCaptcha token")
response = session.post("https://hcaptcha.com/siteverify", data={
"secret": os.environ.get("HCAPTCHA_SECRET_KEY", ""),
"response": hcaptcha_token,
})
hcaptcha_data = response.json()
if not isinstance(hcaptcha_data, Mapping) or not hcaptcha_data.get("success", False):
fail("200 OK", "hCaptcha fail")
# Extract all the actually provided form data. This is different from form to
# form (see the match block below).
contact_name = get_form_value("contactname")
contact_email = get_form_value("contactemail")
message = get_form_value("message", "[Keine Nachricht hinterlassen]")
attachment: Optional[tuple[str, bytes]] = None
ticket_details = collections.OrderedDict()
ticket_details["Kontaktperson"] = contact_name
ticket_details["Email"] = contact_email
form_group = "csw-Allgemein"
match request_uri:
case "/kontakt":
form_name = "Allgemein"
case "/kontakt/problem":
form_name = "Problem-Support"
case "/spenden":
form_name = "Geldspende"
case "/mitmachen":
form_name = "Mitgliederwerbung"
case "/computer-beantragen/organisation":
form_name = "Computerantrag (Organisation)"
form_group = "csw-Anfragen"
ticket_details["Organisation"] = get_form_value("organization")
ticket_details["Adresse"] = get_form_value("addressline")
ticket_details["PLZ"] = get_form_value("postalcode")
ticket_details["Stadt"] = get_form_value("city")
ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int)
ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int)
ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int)
case "/computer-beantragen/privat":
form_name = "Computerantrag (privat)"
form_group = "csw-Anfragen"
ticket_details["Gewünschte Hardware"] = get_form_value("hardware", default="Unbekannt")
ticket_details["Adresse"] = get_form_value("addressline")
ticket_details["PLZ"] = get_form_value("postalcode")
ticket_details["Stadt"] = get_form_value("city")
attachment = get_form_value("document", cast=bytes)
# Note: the actual form contains a checkbox for whether the user has
# read the guidelines, but we don't actually bother checking that here.
case "/hardware-spenden/organisation":
form_name = "Hardwarespende (Organisation)"
form_group = "csw-Hardwarespenden"
ticket_details["Organisation"] = get_form_value("organization")
attachment = get_form_value("inventory", cast=bytes)
case "/hardware-spenden/privat/laptop":
form_name = "Laptopspende (privat)"
form_group = "csw-Hardwarespenden"
ticket_details["Gerätedetails"] = get_form_value("device")
# This is only available on the CoderDojo site and will be blocked on the
# server side in other cases.
case "/anmelden":
form_name = "CoderDojo-Anmeldung"
form_group = "CoderDojo"
ticket_details["Anmelde-Typ"] = get_form_value("mode")
ticket_details["Teilnehmenden-Name"] = get_form_value("participantname", "-")
ticket_details["Telefonnummer"] = get_form_value("contactphone", "-")
ticket_details["Fotos?"] = get_form_value("photos")
case "/party":
form_name = "CoderDojo Minecraft LAN"
form_group = "CoderDojo"
ticket_details["Java-Spielername"] = get_form_value("javaname", "")
ticket_details["Bedrock-Spielername"] = get_form_value("bedrockname", "")
case "/freizeit":
form_name = "CoderCamp Umfrage"
form_group = "CoderDojo"
ticket_details["Interesse (9-12)"] = get_form_value("interested-kids", "")
ticket_details["Interesse (12-15)"] = get_form_value("interested-teens", "")
ticket_details["Interesse (16+)"] = get_form_value("interested-youngadults", "")
ticket_details["Gewünschtes Format"] = get_form_value("format")
ticket_details["Übernachtung"] = get_form_value("overnight")
case _:
# This case should never actually happen because lighttpd filters out
# requests accordingly.
fail("400 Bad Request", "Invalid form")
ticket_details["Kontaktformular"] = form_name
# Allow the form group to be overriden using the environment variable (for
# testing).
form_group = os.environ.get("ZAMMAD_GROUP", "") or form_group
ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/")
ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "")
session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}")
try:
# Add the actual ticket to the system.
response = session.post(
f"{ZAMMAD_URL}/api/v1/tickets",
json=dict(
title=f"Kontaktformular {contact_name} {form_name}",
group=form_group,
customer_id=f"guess:{contact_email}",
article=dict(
type="web",
internal=True,
content_type="text/plain",
subject=f"Kontaktformular-Anfrage {contact_name}",
body=message,
attachments=[] if attachment is None else [{
"filename": attachment[0],
"data": base64.b64encode(attachment[1]).decode(),
"mime-type": mimetypes.guess_type(attachment[0])[0] or "text/plain",
}],
),
),
)
response.raise_for_status()
ticket_id = response.json()["id"]
assert isinstance(ticket_id, int)
# Add a second article to the ticket that contains all the other information
# from the contact form.
response = session.post(
f"{ZAMMAD_URL}/api/v1/ticket_articles",
json=dict(
ticket_id=ticket_id,
type="note",
content_type="text/plain",
internal=True,
body="\n".join(
f"{key}: {value}" for key, value in ticket_details.items()
)
)
)
response.raise_for_status()
# Add a tag to the ticket, denoting which contact form it came from.
response = session.post(
f"{ZAMMAD_URL}/api/v1/tags/add",
json=dict(
object="Ticket",
o_id=ticket_id,
item=f"Kontaktformular {form_name}",
)
)
response.raise_for_status()
print("Status: 302 Found")
print("Content-Type: text/html")
print("Location: /kontakt/fertig")
print("")
except Exception as e:
fail("500 Internal Server Error", str(e))