summaryrefslogtreecommitdiff
path: root/hosts/ahmed/minecraft-log-server/minecraft_log_server.py
blob: 30d295dc203517ae88fe8eea3a23a59887a92c35 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import subprocess
import json
import dataclasses
import typing as t
import urllib.parse

@dataclasses.dataclass(kw_only=True, slots=True)
class Event:
    id: t.Optional[str | bytes] = None
    event: t.Optional[str | bytes] = None
    data: t.Optional[bytes | bytes] = None
    retry: t.Optional[int] = None

    def __post_init__(self):
        if (self.id is None and
            self.event is None and
            self.data is None and
            self.retry is None):
            raise ValueError("At least one property of event must be non-None: id, event, data, retry")

    def encode(self) -> bytes:
        """Returns the on-line representation of this event."""

        def to_bytes(s: str | bytes | int) -> bytes:
            if isinstance(s, str):
                return s.encode()
            elif isinstance(s, int):
                return str(s).encode()
            else:
                return s

        # We know the result won't be empty because of the invariant that at least one field is non-None.
        result = b""

        if self.id:    result += b"id: "    + to_bytes(self.id)    + b"\n"
        if self.event: result += b"event: " + to_bytes(self.event) + b"\n"
        if self.data:  result += b"data: "  + to_bytes(self.data)  + b"\n"
        if self.retry: result += b"retry: " + to_bytes(self.retry) + b"\n"

        # With this final newline, the encoding will end with two newlines, signifying end of event.
        result += b"\n"

        return result

def app(environ, start_response):
    print(f"{environ=} {start_response=}") # NOCOMMIT
    path = environ["PATH_INFO"].lstrip("/")
    method = environ["REQUEST_METHOD"]

    if method == "GET" and path == "stream":
        return send_stream(environ, start_response)
    else:
        return send_404(environ, start_response)

def send_stream(environ, start_response):
    status = "200 OK"
    headers = [("Content-Type", "text/event-stream"),
               ("Cache-Control", "no-cache"),
               ("X-Accel-Buffering", "no")]
    start_response(status, headers)

    # Set the retry rate for when the client looses connection.
    retry_event = Event(retry=2_000)
    yield retry_event.encode()

    # Figure out if the client is reconnecting.
    last_event_id = None
    if "HTTP_LAST_EVENT_ID" in environ:
        last_event_id = environ["HTTP_LAST_EVENT_ID"]
    else:
        query = urllib.parse.parse_qs(environ["QUERY_STRING"])
        if "lastEventId" in query:
            last_event_id = query["lastEventId"][0]

    # FIXME: We should also send heartbeat events to avoid NGINX killing our connection.
    UNITS = [ "minecraft-listen.socket", "minecraft-listen.service", "minecraft-server.socket",
              "minecraft-server.service", "minecraft-hook.service", "minecraft-stop.timer",
              "minecraft-stop.service" ]
    for event in get_log_entries(UNITS, last_event_id):
        yield event.encode()

def get_log_entries(units, last_event_id = None) -> t.Generator[Event, None, None]:
    # TODO: We could save some work by only selecting the fields we're interested in with `--fields`.
    args = [
            "/run/current-system/sw/bin/journalctl",
            # We want a stream
            "--follow",
            "--lines=20",
            # A JSON line for each entry
            "--output=json",
            # Use UTC timestamps to avoid tricky timezone issues on the client
            "--utc",
            # Log entries from any of the units (logical OR)
            *(f"--unit={u}" for u in units)
    ]

    # Since we use the cursor as the SSE event ID, the client will send the
    # last cursor when retrying connections.
    if last_event_id:
        # If this is such a connection, we can avoid including duplicate entries by
        # starting just after the given cursor.
        args.append("--after-cursor=" + last_event_id)

    try:
        process = subprocess.Popen(args, stdout=subprocess.PIPE)
        assert process.stdout is not None
        for raw_line in process.stdout:
            assert raw_line[-2:] == b"}\n", "Raw line ends in single newline"
            parsed = json.loads(raw_line)
            event = Event(id=parsed["__CURSOR"],
                          event="entry",
                          data=raw_line.rstrip(b"\n"))
            yield event
    except Exception as e:
        print("Reading (mega sus) journalctl failed", e)
        raise e

def send_404(environ, start_response):
   status = "404 Not Found"
   headers = [("Content-type", "text/plain")]
   start_response(status, headers)
   return [b"The requested resource was not found."]