1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import PIL.Image
import PIL.ImageDraw
import PIL.ImageFont
import boto3
from botocore.credentials import json
import psycopg
import io
import tempfile
import beanstalkc
# FIXME: These connection functions should obviously load the correct data in a full system.
def connect_to_db():
return psycopg.connect("dbname=testdb user=postgres")
def connect_to_s3():
return boto3.client("s3",
endpoint_url="http://localhost:9000",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
verify=False) # disable SSL
def connect_to_beanstalkd():
return beanstalkc.Connection("localhost", 11300)
def watermark(image: PIL.Image.Image) -> PIL.Image.Image:
w, h = image.size
padding = 0.05 * min(w, h)
x = padding
y = h - padding
font_size = 0.1 * min(w, h)
watermark_image = image.copy()
draw = PIL.ImageDraw.ImageDraw(watermark_image)
draw.text((x, y), "Memmora", font_size=font_size, anchor="ls")
return watermark_image
def process_image(db_conn: psycopg.Connection, s3_client, image_id: int, s3_key: str):
IMAGES_BUCKET = "images"
print(f"Processing image {image_id} ({s3_key})...")
# Retrieve image from S3
print(">> Retrieving image")
original_image_bytes = io.BytesIO()
s3_client.download_fileobj(IMAGES_BUCKET, s3_key, original_image_bytes)
# Process image
print(">> Watermarking image")
original_image = PIL.Image.open(original_image_bytes)
watermarked_image = watermark(original_image)
watermarked_image_bytes = io.BytesIO()
watermarked_image.save(watermarked_image_bytes, format="png")
# Update S3 object
# FIXME: API sucks ass, avoid writing tmp file. At least /tmp should be ramfs.
print(">> Uploading watermarked image to S3")
with tempfile.NamedTemporaryFile("wb", delete_on_close=False) as fp:
fp.write(watermarked_image_bytes.getbuffer())
fp.close()
s3_client.upload_file(fp.name, IMAGES_BUCKET, s3_key)
# Mark the image as processed in the Database
# FIXME: If this step fails, we might have duplicate processing. If this
# data was stored as custom metadata in S3, the upload operation would
# atomically set the flag. OTOH the frontend would have a harder time getting to it.
print(">> Updating image in DB")
db_conn.execute("""
UPDATE
images
SET
is_watermarked = TRUE
WHERE
id = %s
""", [image_id])
db_conn.commit()
def process_assignment(assignment_id: int):
# Retrieve all UNPROCESSED images from DB.
# Some images may have already been processed by an earlier, partial job.
db_conn = connect_to_db()
images = db_conn.execute("""
SELECT
id,
s3_path
FROM
images
WHERE
assignment_id = %s AND is_watermarked = false
""", [assignment_id]).fetchall()
# Connect to S3. We will use this to client to download images.
# Just like app/src/lib/server/s3.ts, this would read from .env in prod.
s3_client = connect_to_s3()
for image_id, s3_key in images:
process_image(db_conn, s3_client, image_id, s3_key)
# Assuming all of the above worked, we have finished this stage of processing assignments.
db_conn.execute("""
UPDATE
assignments
SET
state = 'AWAITING_OWNER_NOTIFICATION' :: assignment_state
WHERE
id = %s
""", [assignment_id])
db_conn.commit()
beanstalk = connect_to_beanstalkd()
beanstalk.watch("watermarking")
while True:
job = beanstalk.reserve()
if not job:
continue
print(f"!! Got job: {job.body}")
job_body = json.loads(job.body)
process_assignment(job_body["assignmentId"])
job.delete()
|