From 740c2fff3f19b4eb724be6db13966a38a294e052 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yannik=20R=C3=B6del?= Date: Sun, 14 Apr 2024 16:36:26 +0200 Subject: [PATCH] Reapply pull request 'Diverse Formular-Verbessungen' (#155) This reverts commit a054bc1b07e307976c4e2493908c23e93edd22c7. --- cgi-bin/form.py | 99 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 22 deletions(-) diff --git a/cgi-bin/form.py b/cgi-bin/form.py index 8087cc0..f047682 100755 --- a/cgi-bin/form.py +++ b/cgi-bin/form.py @@ -1,17 +1,17 @@ #!/usr/bin/env python import base64 -import io import cgi import collections from collections.abc import Mapping import hmac -import mimetypes -import re -import os -import secrets import json +import mimetypes +import os +import re +import secrets from typing import Any, Optional, overload +from urllib.parse import urljoin import itsdangerous import requests @@ -34,6 +34,20 @@ except IOError: HONEYPOT_FIELD_NAME = "addressline1" +# This regex merely validates what the in-browser form validation already checks and +# isn't all too strict. +EMAIL_REGEX = re.compile(r"^[^ ]+@[^ ]+\.[^ ]+$") + +# Mapping from site-defined devices (see sites/angestoepselt/_data/config.json in this +# repository) to the corresponding Zammad categories: +# https://codeberg.org/angestoepselt/homepage/issues/120#issuecomment-1727768 +# This is a (str | int) -> str map because some keys we check against below might be +# integers and it's just easier to type this way. +FORM_CATEGORY_MAP: dict[str | int, str] = { + "Laptop": "laptop", + "Laptop ohne Akku": "laptop-battery-missing", + "Desktop-Computer": "desktop", +} SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "") request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/") @@ -128,15 +142,16 @@ if form_disabled: form = cgi.FieldStorage() @overload -def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ... +def get_form_value(name: str, default: None = ..., *, cast: type[bytes]) -> tuple[str, bytes]:... @overload -def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:... +def get_form_value(name: str, default: Optional[int] = ..., *, cast: type[int]) -> int:... @overload -def get_form_value(name: str, default: None = ..., cast: type[bytes] = ...) -> tuple[str, bytes]:... +def get_form_value(name: str, default: Optional[str] = ..., *, cast: type[str] = ...) -> str: ... def get_form_value( name: str, default: Any = None, - cast: type[str] | type[int] | type[io.BytesIO] = str, + *, + cast: type[str] | type[int] | type[bytes] = str, ) -> Any: if name not in form: if default is None: @@ -154,7 +169,10 @@ def get_form_value( return (value_object.filename or "upload"), value_object.file.read() else: try: - return cast(form.getfirst(name)) + result = cast(form.getfirst(name)) + if isinstance(result, str): + result = result.strip() + return result except (TypeError, ValueError): fail("400 Bad Request", f"Invalid value for field: {name}") @@ -163,7 +181,7 @@ def get_form_value( # constant-time string comparison here. given_csrf_token = get_form_value("csrftoken") if not hmac.compare_digest(csrf_token, given_csrf_token): - fail("400 Bad Request", f"Invalid CSRF token") + fail("400 Bad Request", "Invalid CSRF token") # If the honeypot field was not empty, back off. @@ -183,15 +201,21 @@ if not isinstance(hcaptcha_data, Mapping) or not hcaptcha_data.get("success", Fa # Extract all the actually provided form data. This is different from form to # form (see the match block below). contact_name = get_form_value("contactname") +contact_names = contact_name.split(" ") + contact_email = get_form_value("contactemail") +if not EMAIL_REGEX.fullmatch(contact_email): + fail("400 Bad Request", "Invalid Email address") + message = get_form_value("message", "[Keine Nachricht hinterlassen]") attachment: Optional[tuple[str, bytes]] = None -ticket_details = collections.OrderedDict() +ticket_details = collections.OrderedDict[str, str | int]() ticket_details["Kontaktperson"] = contact_name ticket_details["Email"] = contact_email form_group = "csw-Allgemein" +form_category: str | None = None match request_uri: case "/kontakt": @@ -213,14 +237,17 @@ match request_uri: ticket_details["Adresse"] = get_form_value("addressline") ticket_details["PLZ"] = get_form_value("postalcode") ticket_details["Stadt"] = get_form_value("city") - ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int) - ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int) - ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int) + ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, cast=int) + ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, cast=int) + ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, cast=int) case "/computer-beantragen/privat": form_name = "Computerantrag (privat)" form_group = "csw-Anfragen" + ticket_details["Gewünschte Hardware"] = get_form_value("hardware", default="Unbekannt") + form_category = FORM_CATEGORY_MAP.get(ticket_details["Gewünschte Hardware"], None) + ticket_details["Adresse"] = get_form_value("addressline") ticket_details["PLZ"] = get_form_value("postalcode") ticket_details["Stadt"] = get_form_value("city") @@ -249,13 +276,13 @@ match request_uri: ticket_details["Teilnehmenden-Name"] = get_form_value("participantname", "-") ticket_details["Telefonnummer"] = get_form_value("contactphone", "-") ticket_details["Fotos?"] = get_form_value("photos") - + case "/party": form_name = "CoderDojo Minecraft LAN" form_group = "CoderDojo" ticket_details["Java-Spielername"] = get_form_value("javaname", "") ticket_details["Bedrock-Spielername"] = get_form_value("bedrockname", "") - + case "/freizeit": form_name = "CoderCamp Umfrage" form_group = "CoderDojo" @@ -276,18 +303,46 @@ ticket_details["Kontaktformular"] = form_name # testing). form_group = os.environ.get("ZAMMAD_GROUP", "") or form_group -ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/") +ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "") ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "") session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}") try: + # Create a new user for the client. For some reason, using "guess:{email}" as the + # customer_id when creating the ticket doesn't really work, as described in the + # Zammad documentation [1]. Instead, we sometimes need to explictily create the + # user beforehand. See this discussion [2] for more details. + # [1]: https://docs.zammad.org/en/latest/api/ticket/index.html#create + # [2]: https://codeberg.org/angestoepselt/homepage/issues/141 + response = session.post( + urljoin(ZAMMAD_URL, "api/v1/users"), + json=dict( + # Yes, yes... This goes against pretty much all best practices for parsing + # names. But: it's only internal and we save the name verbatim again below + # so we're going to go ahead and do it anyway. + firstname=" ".join(contact_names[:-1]) if len(contact_names) >= 2 else "?", + lastname=contact_names[-1], + email=contact_email, + ) + ) + if response.status_code == 422: + # This email address is already in use by another user. + customer_id = f"guess:{contact_email}" + else: + response.raise_for_status() + customer_id = response.json()["id"] + assert isinstance(customer_id, (str, int)) + # Add the actual ticket to the system. response = session.post( - f"{ZAMMAD_URL}/api/v1/tickets", + urljoin(ZAMMAD_URL, "api/v1/tickets"), + headers={ + "X-On-Behalf-Of": contact_email, + }, json=dict( title=f"Kontaktformular {contact_name} – {form_name}", group=form_group, - customer_id=f"guess:{contact_email}", + customer_id=customer_id, article=dict( type="web", internal=True, @@ -309,7 +364,7 @@ try: # Add a second article to the ticket that contains all the other information # from the contact form. response = session.post( - f"{ZAMMAD_URL}/api/v1/ticket_articles", + urljoin(ZAMMAD_URL, "api/v1/ticket_articles"), json=dict( ticket_id=ticket_id, type="note", @@ -324,7 +379,7 @@ try: # Add a tag to the ticket, denoting which contact form it came from. response = session.post( - f"{ZAMMAD_URL}/api/v1/tags/add", + urljoin(ZAMMAD_URL, "api/v1/tags/add"), json=dict( object="Ticket", o_id=ticket_id,