1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
import PIL.Image
import PIL.ImageDraw
import PIL.ImageFont
import boto3
from botocore.credentials import json
import psycopg
import io
import tempfile
import beanstalkc
# FIXME: These connection functions should obviously load the correct data in a full system.
def connect_to_db():
return psycopg.connect("dbname=testdb user=postgres")
def connect_to_s3():
return boto3.client("s3",
endpoint_url="http://localhost:9000",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
verify=False) # disable SSL
def connect_to_beanstalkd():
return beanstalkc.Connection("localhost", 11300, encoding="utf-8")
def watermark(image: PIL.Image.Image) -> PIL.Image.Image:
w, h = image.size
padding = 0.05 * min(w, h)
x = padding
y = h - padding
font_size = 0.1 * min(w, h)
watermark_image = image.copy()
draw = PIL.ImageDraw.ImageDraw(watermark_image)
draw.text((x, y), "Memmora", font_size=font_size, anchor="ls")
return watermark_image
def process_image(db_conn: psycopg.Connection, s3_client, image_id: int, s3_key: str):
IMAGES_BUCKET = "images"
print(f"Processing image {image_id} ({s3_key})...")
# Retrieve image from S3
print(">> Retrieving image")
original_image_bytes = io.BytesIO()
s3_client.download_fileobj(IMAGES_BUCKET, s3_key, original_image_bytes)
# Process image
print(">> Watermarking image")
original_image = PIL.Image.open(original_image_bytes)
watermarked_image = watermark(original_image)
watermarked_image_bytes = io.BytesIO()
watermarked_image.save(watermarked_image_bytes, format="png")
# Update S3 object
# FIXME: API sucks ass, avoid writing tmp file. At least /tmp should be ramfs.
print(">> Uploading watermarked image to S3")
with tempfile.NamedTemporaryFile("wb", delete_on_close=False) as fp:
fp.write(watermarked_image_bytes.getbuffer())
fp.close()
s3_client.upload_file(fp.name, IMAGES_BUCKET, s3_key)
# Mark the image as processed in the Database
# FIXME: If this step fails, we might have duplicate processing. If this
# data was stored as custom metadata in S3, the upload operation would
# atomically set the flag. OTOH the frontend would have a harder time getting to it.
print(">> Updating image in DB")
db_conn.execute("""
UPDATE
images
SET
is_watermarked = TRUE
WHERE
id = %s
""", [image_id])
db_conn.commit()
def notify_owner(beanstakd_conn: beanstalkc.Connection, db_conn: psycopg.Connection, assignment_id: int):
result = db_conn.execute("""
SELECT
c.owner_id,
a.cemetary_plot_id
FROM assignments AS a
INNER JOIN cemetary_plots AS c ON c.id = a.cemetary_plot_id
WHERE a.id = %s;
""", [assignment_id]).fetchone()
assert result is not None
owner_id, cemetary_plot_id = result
beanstakd_conn.use("notification")
body = json.dumps({
"userId": owner_id,
"message": "Vi har besøgt dit gravsted",
"url": f"http://localhost:8000/cemetary_plots/{cemetary_plot_id}",
})
beanstakd_conn.put(body)
def process_assignment(beanstakd_conn: beanstalkc.Connection, assignment_id: int):
# Retrieve all UNPROCESSED images from DB.
# Some images may have already been processed by an earlier, partial job.
db_conn = connect_to_db()
images = db_conn.execute("""
SELECT
id,
s3_path
FROM
images
WHERE
assignment_id = %s AND is_watermarked = false
""", [assignment_id]).fetchall()
# Connect to S3. We will use this to client to download images.
# Just like app/src/lib/server/s3.ts, this would read from .env in prod.
s3_client = connect_to_s3()
for image_id, s3_key in images:
process_image(db_conn, s3_client, image_id, s3_key)
# Assuming all of the above worked, we have finished this stage of processing assignments.
db_conn.execute("""
UPDATE
assignments
SET
state = 'AWAITING_DONE' :: assignment_state
WHERE
id = %s
""", [assignment_id])
db_conn.commit()
notify_owner(beanstakd_conn, db_conn, assignment_id)
beanstalk = connect_to_beanstalkd()
beanstalk.watch("watermarking")
while True:
job = beanstalk.reserve()
if not job:
continue
print(f"!! Got job: {job.body}")
job_body = json.loads(job.body)
process_assignment(beanstalk, job_body["assignmentId"])
job.delete()
|