#!/usr/bin/env python import base64 import io import cgi import collections import hmac import mimetypes import os import secrets from typing import Any, Optional, overload import itsdangerous import requests def fail(status: str, reason: str) -> None: print(f"Status: {status}") print("Content-Type: text/html") print(f"X-Reason: {reason}") print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html") print("") exit(0) HONEYPOT_FIELD_NAME = "addressline1" CAPTCHA_FIELD_NAME = "question" CAPTCHA_FIELD_QUESTION = "Welcher Fluss fließt durch Würzburg?" CAPTCHA_FIELD_VALUE = "Main" SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "") request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/") serializer = itsdangerous.URLSafeSerializer("secret key", "salt") cookies = dict[str, str]() for entry in os.environ.get("HTTP_COOKIE", "").split(";"): name, *value = entry.lstrip(" ").split("=", 1) cookies[name] = value[0] if len(value) == 1 else "" # Get the CSRF token. This is pass along according to the double-submit # technique[1]. First as a hidden form input. Second as a cookie, which is # signed on the server, so we can verify its authenticity. The latter is also # used for subsequent requests so every session only gets one token. # 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie if "__Host-csrftoken" in cookies: # We already have a token for this client - try and extract it. signed_csrf_token = cookies["__Host-csrftoken"] try: csrf_token = serializer.loads(signed_csrf_token) except: # The token wasn't valid, so we need to generate a new one. csrf_token = secrets.token_urlsafe(32) else: csrf_token = secrets.token_urlsafe(32) # Re-sign the token, so it's fresh and the timeout doesn't waive. signed_csrf_token = serializer.dumps(csrf_token) match os.environ.get("REQUEST_METHOD", "").upper(): case "GET": # For GET requests, serve the form that the user requested. The CSRF # token will be added here as well. print("Status: 200") print("Content-Type: text/html") print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly") print("") with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template: for line in template.readlines(): # This is a very rudimentary check to ensure that we actually # place the token *inside* the form. It assumes that there is # a) only one form on the site and # b) the
tag doesn't end on the same line. if "') print(f'') print(f'') print(f'') print(line) exit(0) case "POST": # This is the main case where we handle the submitted form. This is # continued below. pass case _: # This case should never actually happen because lighttpd filters out # requests accordingly. fail("405 Method Not Allowed", "Method Not Allowed") form = cgi.FieldStorage() @overload def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ... @overload def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:... @overload def get_form_value(name: str, default: None = ..., cast: type[bytes] = ...) -> tuple[str, bytes]:... def get_form_value( name: str, default: Any = None, cast: type[str] | type[int] | type[io.BytesIO] = str, ) -> Any: if name not in form: if default is None: fail("400 Bad Request", f"Missing required field: {name}") else: return default if cast is bytes: value_object = form[name] if ( isinstance(value_object, list) or not value_object.file ): fail("400 Bad Request", f"Invalid value for field: {name}") return (value_object.filename or "upload"), value_object.file.read() else: try: return cast(form.getfirst(name)) except (TypeError, ValueError): fail("400 Bad Request", f"Invalid value for field: {name}") # Make sure the provided CSRF token is actually valid. Note the usage of the # constant-time string comparison here. given_csrf_token = get_form_value("csrftoken") if not hmac.compare_digest(csrf_token, given_csrf_token): fail("400 Bad Request", f"Invalid CSRF token") # If the honeypot field was not empty, back off. if ( get_form_value(HONEYPOT_FIELD_NAME, "") or get_form_value(CAPTCHA_FIELD_NAME, "").lower().strip() != CAPTCHA_FIELD_VALUE ): fail("200 OK", f"Invalid value for field: {CAPTCHA_FIELD_NAME}") # Extract all the actually provided form data. This is different from form to # form (see the match block below). contact_name = get_form_value("contactname") contact_email = get_form_value("contactemail") message = get_form_value("message") attachment: Optional[tuple[str, bytes]] = None ticket_details = collections.OrderedDict() ticket_details["Kontaktperson"] = contact_name ticket_details["Email"] = contact_email form_group = "csw-Allgemein" match request_uri: case "/kontakt": form_name = "Allgemein" case "/kontakt/problem": form_name = "Problem-Support" case "/spenden": form_name = "Geldspende" case "/mitmachen": form_name = "Mitgliederwerbung" case "/computer-beantragen/organisation": form_name = "Computerantrag (Organisation)" form_group = "csw-Anfragen" ticket_details["Organisation"] = get_form_value("organization") ticket_details["Adresse"] = get_form_value("addressline") ticket_details["PLZ"] = get_form_value("postalcode") ticket_details["Stadt"] = get_form_value("city") ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int) ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int) ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int) case "/computer-beantragen/privat": form_name = "Computerantrag (privat)" form_group = "csw-Anfragen" ticket_details["Adresse"] = get_form_value("addressline") ticket_details["PLZ"] = get_form_value("postalcode") ticket_details["Stadt"] = get_form_value("city") attachment = get_form_value("document", cast=bytes) # Note: the actual form contains a checkbox for whether the user has # read the guidelines, but we don't actually bother checking that here. case "/hardware-spenden/organisation": form_name = "Hardwarespende (Organisation)" form_group = "csw-Hardwarespenden" ticket_details["Organisation"] = get_form_value("organization") attachment = get_form_value("inventory", cast=bytes) case "/hardware-spenden/privat/laptop": form_name = "Laptopspende (privat)" form_group = "csw-Hardwarespenden" ticket_details["Gerätedetails"] = get_form_value("device") case _: # This case should never actually happen because lighttpd filters out # requests accordingly. fail("400 Bad Request", "Invalid form") ticket_details["Kontaktformular"] = form_name # Allow the form group to be overriden using the environment variable (for # testing). form_group = os.environ.get("ZAMMAD_GROUP", "") or form_group ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "").rstrip("/") ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "") session = requests.Session() session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}") try: # Add the actual ticket to the system. response = session.post( f"{ZAMMAD_URL}/api/v1/tickets", json=dict( title=f"Kontaktformular {contact_name} – {form_name}", group=form_group, customer_id=f"guess:{contact_email}", article=dict( type="web", internal=True, content_type="text/plain", subject=f"Kontaktformular-Anfrage {contact_name}", body=message, attachments=[] if attachment is None else [{ "filename": attachment[0], "data": base64.b64encode(attachment[1]).decode(), "mime-type": mimetypes.guess_type(attachment[0])[0] or "text/plain", }], ), ), ) response.raise_for_status() ticket_id = response.json()["id"] assert isinstance(ticket_id, int) # Add a second article to the ticket that contains all the other information # from the contact form. response = session.post( f"{ZAMMAD_URL}/api/v1/ticket_articles", json=dict( ticket_id=ticket_id, type="note", content_type="text/plain", internal=True, body="\n".join( f"{key}: {value}" for key, value in ticket_details.items() ) ) ) response.raise_for_status() # Add a tag to the ticket, denoting which contact form it came from. response = session.post( f"{ZAMMAD_URL}/api/v1/tags/add", json=dict( object="Ticket", o_id=ticket_id, item=f"Kontaktformular – {form_name}", ) ) response.raise_for_status() print("Status: 302 Found") print("Content-Type: text/html") print("Location: /kontakt/fertig") print("") except Exception as e: fail("500 Internal Server Error", str(e))