1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import subprocess
import json
import dataclasses
import typing as t
import urllib.parse
@dataclasses.dataclass(kw_only=True, slots=True)
class Event:
id: t.Optional[str | bytes] = None
event: t.Optional[str | bytes] = None
data: t.Optional[bytes | bytes] = None
retry: t.Optional[int] = None
def __post_init__(self):
if (self.id is None and
self.event is None and
self.data is None and
self.retry is None):
raise ValueError("At least one property of event must be non-None: id, event, data, retry")
def encode(self) -> bytes:
"""Returns the on-line representation of this event."""
def to_bytes(s: str | bytes | int) -> bytes:
if isinstance(s, str):
return s.encode()
elif isinstance(s, int):
return str(s).encode()
else:
return s
# We know the result won't be empty because of the invariant that at least one field is non-None.
result = b""
if self.id: result += b"id: " + to_bytes(self.id) + b"\n"
if self.event: result += b"event: " + to_bytes(self.event) + b"\n"
if self.data: result += b"data: " + to_bytes(self.data) + b"\n"
if self.retry: result += b"retry: " + to_bytes(self.retry) + b"\n"
# With this final newline, the encoding will end with two newlines, signifying end of event.
result += b"\n"
return result
def app(environ, start_response):
print(f"{environ=} {start_response=}") # NOCOMMIT
path = environ["PATH_INFO"].lstrip("/")
method = environ["REQUEST_METHOD"]
if method == "GET" and path == "stream":
return send_stream(environ, start_response)
else:
return send_404(environ, start_response)
def send_stream(environ, start_response):
status = "200 OK"
headers = [("Content-Type", "text/event-stream"),
("Cache-Control", "no-cache"),
("X-Accel-Buffering", "no")]
start_response(status, headers)
# Set the retry rate for when the client looses connection.
retry_event = Event(retry=2_000)
yield retry_event.encode()
# Figure out if the client is reconnecting.
last_event_id = None
if "HTTP_LAST_EVENT_ID" in environ:
last_event_id = environ["HTTP_LAST_EVENT_ID"]
else:
query = urllib.parse.parse_qs(environ["QUERY_STRING"])
if "lastEventId" in query:
last_event_id = query["lastEventId"][0]
# FIXME: We should also send heartbeat events to avoid NGINX killing our connection.
UNITS = [ "minecraft-listen.socket", "minecraft-listen.service", "minecraft-server.socket",
"minecraft-server.service", "minecraft-hook.service", "minecraft-stop.timer",
"minecraft-stop.service" ]
for event in get_log_entries(UNITS, last_event_id):
yield event.encode()
def get_log_entries(units, last_event_id = None) -> t.Generator[Event, None, None]:
# TODO: We could save some work by only selecting the fields we're interested in with `--fields`.
args = [
"/run/current-system/sw/bin/journalctl",
# We want a stream
"--follow",
"--lines=20",
# A JSON line for each entry
"--output=json",
# Use UTC timestamps to avoid tricky timezone issues on the client
"--utc",
# Log entries from any of the units (logical OR)
*(f"--unit={u}" for u in units)
]
# Since we use the cursor as the SSE event ID, the client will send the
# last cursor when retrying connections.
if last_event_id:
# If this is such a connection, we can avoid including duplicate entries by
# starting just after the given cursor.
args.append("--after-cursor=" + last_event_id)
try:
process = subprocess.Popen(args, stdout=subprocess.PIPE)
assert process.stdout is not None
for raw_line in process.stdout:
assert raw_line[-2:] == b"}\n", "Raw line ends in single newline"
parsed = json.loads(raw_line)
event = Event(id=parsed["__CURSOR"],
event="entry",
data=raw_line.rstrip(b"\n"))
yield event
except Exception as e:
print("Reading (mega sus) journalctl failed", e)
raise e
def send_404(environ, start_response):
status = "404 Not Found"
headers = [("Content-type", "text/plain")]
start_response(status, headers)
return [b"The requested resource was not found."]
|