mirror of
https://codeberg.org/angestoepselt/homepage.git
synced 2025-05-24 14:46:16 +00:00
82 lines
2.2 KiB
Python
82 lines
2.2 KiB
Python
#!/usr/bin/env python
|
||
|
||
import csv
|
||
import getpass
|
||
import sys
|
||
|
||
import requests
|
||
|
||
BASE_URL = "https://ticket.z31.it/api/v1"
|
||
|
||
session = requests.Session()
|
||
session.headers.update({
|
||
"Authorization": f"Token token={getpass.getpass(prompt='Zammad-Token: ')}"
|
||
})
|
||
|
||
if __name__ == "__main__":
|
||
tickets = []
|
||
|
||
print("Formular [CoderDojo-Anmeldung]: ", file=sys.stderr, end="", flush=True)
|
||
form_name = input() or "CoderDojo-Anmeldung"
|
||
print("Suche nach aktuellen Einträgen…", file=sys.stderr)
|
||
has_more = True
|
||
page = 1
|
||
page_size = 50
|
||
while has_more:
|
||
response = session.get(
|
||
f"{BASE_URL}/tickets/search",
|
||
params={
|
||
"query": f'tags:"Kontaktformular – {form_name}"',
|
||
"page": str(page),
|
||
"per_page": page_size,
|
||
}
|
||
)
|
||
response.raise_for_status()
|
||
|
||
ticket_dict = response.json().get("assets", {}).get("Ticket", {})
|
||
for ticket in ticket_dict.values():
|
||
if ticket["close_at"] is not None:
|
||
continue
|
||
|
||
assert isinstance(ticket["title"], str)
|
||
if not ticket["title"].endswith(f" – {form_name}"):
|
||
continue
|
||
|
||
assert isinstance(ticket["article_ids"], list)
|
||
if len(ticket["article_ids"]) < 2:
|
||
continue
|
||
|
||
assert isinstance(ticket["id"], int)
|
||
tickets.append((ticket["id"], ticket["created_at"]))
|
||
|
||
page += 1
|
||
has_more = len(ticket_dict) == page_size
|
||
|
||
print(f"{len(tickets)} Element(e) gefunden. Hole jetzt die Details…", file=sys.stderr)
|
||
|
||
fieldnames = ["Zeitstempel"]
|
||
rows = []
|
||
for ticket_id, ticket_timestamp in tickets:
|
||
row = {"Zeitstempel": ticket_timestamp}
|
||
|
||
response = session.get(f"{BASE_URL}/ticket_articles/by_ticket/{ticket_id}")
|
||
response.raise_for_status()
|
||
articles = response.json()
|
||
assert len(articles) >= 2
|
||
|
||
row.update({"Nachricht": articles[0].get("body", "").replace("\r", "").replace("\n", " ")})
|
||
for line in articles[1].get("body", "").split("\n"):
|
||
key, value = line.split(": ", 1)
|
||
row.update({key: value.replace("\r", "").replace("\n", " ")})
|
||
if key not in fieldnames:
|
||
fieldnames.append(key)
|
||
|
||
rows.append(row)
|
||
|
||
if "Nachricht" not in fieldnames:
|
||
fieldnames.append("Nachricht")
|
||
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
for row in rows:
|
||
writer.writerow(row)
|
||
sys.stdout.flush()
|