#!/usr/bin/env python import io import cgi import collections import hmac import os import secrets from typing import Any, Optional, overload import itsdangerous import requests def fail(status: str, reason: str) -> None: print(f"Status: {status}") print("Content-Type: text/html") print(f"X-Reason: {reason}") print(f"X-Sendfile: {SITE_DIRECTORY}/kontakt/fehler/index.html") print("") exit(0) SITE_DIRECTORY = os.environ.get("SITE_DIRECTORY", "") if SITE_DIRECTORY == "": fail("503 Service Unavailable", "Cannot open site directory") request_uri = os.environ.get("REQUEST_URI", "").lower().rstrip("/") serializer = itsdangerous.URLSafeSerializer("secret key", "salt") cookies = dict[str, str]() for entry in os.environ.get("HTTP_COOKIE", "").split(";"): name, *value = entry.lstrip(" ").split("=", 1) cookies[name] = value[0] if len(value) == 1 else "" # Get the CSRF token. This is pass along according to the double-submit # technique[1]. First as a hidden form input. Second as a cookie, which is # signed on the server, so we can verify its authenticity. The latter is also # used for subsequent requests so every session only gets one token. # 1: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie if "__Host-csrftoken" in cookies: # We already have a token for this client - try and extract it. signed_csrf_token = cookies["__Host-csrftoken"] try: csrf_token = serializer.loads(signed_csrf_token) except: # The token wasn't valid, so we need to generate a new one. csrf_token = secrets.token_urlsafe(32) else: csrf_token = secrets.token_urlsafe(32) # Re-sign the token so it's fresh and the timeout doesn't waive. signed_csrf_token = serializer.dumps(csrf_token) match os.environ.get("REQUEST_METHOD", "").upper(): case "GET": # For GET requests, serve the form that the user requested. The CSRF # token will be added here as well. print("Status: 200") print("Content-Type: text/html") print(f"Set-Cookie: __Host-csrftoken={signed_csrf_token}; path=/; Secure; SameSite=Strict; HttpOnly") print("") with open(f"{SITE_DIRECTORY}/{request_uri.strip('/')}/index.html", "r") as template: for line in template.readlines(): print(line) # This is a very rudimentary check to ensure that we actually # place the token *inside* the form. It assumes that there is # a) only one form on the site and # b) the
') exit(0) case "POST": # This is the main case where we handle the submitted form. This is # continued below. pass case _: # This case should never actually happen because lighttpd filters out # requests accordingly. fail("405 Method Not Allowed", "Method Not Allowed") form = cgi.FieldStorage() @overload def get_form_value(name: str, default: Optional[str], cast: type[str] = str) -> str: ... @overload def get_form_value(name: str, default: Optional[int], cast: type[int]) -> int:... @overload def get_form_value(name: str, default: None = ..., cast: type[io.BytesIO] = ...) -> io.BytesIO:... def get_form_value( name: str, default: Any = None, cast: type[str] | type[int] | type[io.BytesIO] = str, ) -> Any: if name not in form: if default is None: fail("400 Bad Request", f"Missing required field: {name}") else: return default if cast is io.BytesIO: value_object = form[name] if isinstance(value_object, list) or not value_object.file: fail("400 Bad Request", f"Invalid value for field: {name}") return io.BytesIO(value_object.file) else: try: return cast(form.getfirst(name)) except (TypeError, ValueError): fail("400 Bad Request", f"Invalid value for field: {name}") # Make sure the provided CSRF token is actually valid. Note the usage of the # constant-time string comparison here. given_csrf_token = get_form_value("csrftoken") if not hmac.compare_digest(csrf_token, given_csrf_token): fail("400 Bad Request", f"Invalid CSRF token") # Extract all the actually provided form data. This is different from form to # form (see the match block below). contact_name = get_form_value("contactname") contact_email = get_form_value("contactemail") message = get_form_value("message") ticket_details = collections.OrderedDict() ticket_details["Kontaktperson"] = contact_name ticket_details["Email"] = contact_email match request_uri: case "/kontakt": form_name = "Allgemein" case "/kontakt/problem": form_name = "Problem-Support" case "/geld": form_name = "Geldspende" case "/mitmachen": form_name = "Mitgliederwerbung" case "/computer-beantragen/organisation": form_name = "Computerantrag (Organisation)" ticket_details["Organisation"] = get_form_value("organization") ticket_details["Adresse"] = get_form_value("addressline") ticket_details["PLZ"] = get_form_value("postalcode") ticket_details["Stadt"] = get_form_value("city") ticket_details["Anzahl Desktops"] = get_form_value("desktopcount", 0, int) ticket_details["Anzahl Laptops"] = get_form_value("laptopcount", 0, int) ticket_details["Anzahl Drucker"] = get_form_value("printercount", 0, int) case "/computer-beantragen/privat": form_name = "Computerantrag (privat)" ticket_details["Adresse"] = get_form_value("addressline") ticket_details["PLZ"] = get_form_value("postalcode") ticket_details["Stadt"] = get_form_value("city") #document = get_form_value("document") # Note: the actual form contains a checkbox for whether the user has # read the guidelines, but we don't actually bother checking that here. case "/hardware-spenden/organisation": form_name = "Hardwarespende (Organisation)" ticket_details["Organisation"] = get_form_value("organization") #inventory = get_form_value("inventory") case "/hardware-spenden/privat/laptop": form_name = "Laptopspende (privat)" ticket_details["Gerätedetails"] = get_form_value("device") case _: # This case should never actually happen because lighttpd filters out # requests accordingly. fail("400 Bad Request", "Invalid form") ticket_details["Kontaktformular"] = form_name ZAMMAD_URL = os.environ.get("ZAMMAD_URL", "https://ticket.z31.it").rstrip("/") ZAMMAD_TOKEN = os.environ.get("ZAMMAD_TOKEN", "") if ZAMMAD_TOKEN == "": fail("503 Service Unavailable", "Could not get Zammad token") ZAMMAD_GROUP = os.environ.get("ZAMMAD_GROUP", "testgruppe") session = requests.Session() session.headers.update(Authorization=f"Token token={ZAMMAD_TOKEN}") try: # Add the actual ticket to the system. response = session.post( f"{ZAMMAD_URL}/api/v1/tickets", json=dict( title=f"Kontaktformular {contact_name} – {form_name}", group=ZAMMAD_GROUP, customer_id=f"guess:{contact_email}", article=dict( type="web", content_type="text/plain", subject=f"Kontaktformular-Anfrage {contact_name}", body=message ) ) ) response.raise_for_status() ticket_id = response.json()["id"] assert isinstance(ticket_id, int) # Add a second article to the ticket that contains all the other information # from the contact form. response = session.post( f"{ZAMMAD_URL}/api/v1/ticket_articles", json=dict( ticket_id=ticket_id, type="note", content_type="text/plain", internal=True, body="\n".join( f"{key}: {value}" for key, value in ticket_details.items() ) ) ) response.raise_for_status() # Add a tag to the ticket, denoting which contact form it came from. response = session.post( f"{ZAMMAD_URL}/api/v1/tags/add", json=dict( object="Ticket", o_id=ticket_id, item=f"Kontaktformular – {form_name}", ) ) response.raise_for_status() print("Status: 302 Found") print("Content-Type: text/html") print("Location: /kontakt/fertig") print("") except: fail("500 Internal Server Error", "Backend error")