summaryrefslogtreecommitdiff
path: root/watermark_worker/watermark.py
blob: 5d6388be3ef9a9b13c8dfc1e41f0b3b7d34356c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import PIL.Image
import PIL.ImageDraw
import PIL.ImageFont
import boto3
from botocore.credentials import json
import psycopg
import io
import tempfile
import beanstalkc

# FIXME: These connection functions should obviously load the correct data in a full system.

def connect_to_db():
    return psycopg.connect("dbname=testdb user=postgres")

def connect_to_s3():
    return boto3.client("s3",
                        endpoint_url="http://localhost:9000",
                        aws_access_key_id="minioadmin",
                        aws_secret_access_key="minioadmin",
                        verify=False) # disable SSL

def connect_to_beanstalkd():
    return beanstalkc.Connection("localhost", 11300, encoding="utf-8")

def watermark(image: PIL.Image.Image) -> PIL.Image.Image:
    w, h = image.size
    padding = 0.05 * min(w, h)
    x = padding
    y = h - padding
    font_size = 0.1 * min(w, h)

    watermark_image = image.copy()
    draw = PIL.ImageDraw.ImageDraw(watermark_image)
    draw.text((x, y), "Memmora", font_size=font_size, anchor="ls")

    return watermark_image

def process_image(db_conn: psycopg.Connection, s3_client, image_id: int, s3_key: str):
    IMAGES_BUCKET = "images"

    print(f"Processing image {image_id} ({s3_key})...")

    # Retrieve image from S3
    print(">> Retrieving image")
    original_image_bytes = io.BytesIO()
    s3_client.download_fileobj(IMAGES_BUCKET, s3_key, original_image_bytes)

    # Process image
    print(">> Watermarking image")
    original_image = PIL.Image.open(original_image_bytes)
    watermarked_image = watermark(original_image)
    watermarked_image_bytes = io.BytesIO()
    watermarked_image.save(watermarked_image_bytes, format="png")

    # Update S3 object
    # FIXME: API sucks ass, avoid writing tmp file. At least /tmp should be ramfs.
    print(">> Uploading watermarked image to S3")
    with tempfile.NamedTemporaryFile("wb", delete_on_close=False) as fp:
        fp.write(watermarked_image_bytes.getbuffer())
        fp.close()
        s3_client.upload_file(fp.name, IMAGES_BUCKET, s3_key)

    # Mark the image as processed in the Database
    # FIXME: If this step fails, we might have duplicate processing. If this
    #        data was stored as custom metadata in S3, the upload operation would
    #        atomically set the flag. OTOH the frontend would have a harder time getting to it.
    print(">> Updating image in DB")
    db_conn.execute("""
        UPDATE
            images
        SET
            is_watermarked = TRUE
        WHERE
            id = %s
    """, [image_id])
    db_conn.commit()

def notify_owner(beanstakd_conn: beanstalkc.Connection, db_conn: psycopg.Connection, assignment_id: int):
    result = db_conn.execute("""
        SELECT
            c.owner_id,
            a.cemetary_plot_id
        FROM assignments AS a
        INNER JOIN cemetary_plots AS c ON c.id = a.cemetary_plot_id
        WHERE a.id = %s;
    """, [assignment_id]).fetchone()
    assert result is not None
    owner_id, cemetary_plot_id = result

    beanstakd_conn.use("notification")
    body = json.dumps({
        "userId": owner_id,
        "message": "Vi har besøgt dit gravsted",
        "url": f"http://localhost:8000/cemetary_plots/{cemetary_plot_id}",
    })
    beanstakd_conn.put(body)

def process_assignment(beanstakd_conn: beanstalkc.Connection, assignment_id: int):
    # Retrieve all UNPROCESSED images from DB.
    # Some images may have already been processed by an earlier, partial job.
    db_conn = connect_to_db()
    images = db_conn.execute("""
        SELECT
            id,
            s3_path
        FROM
            images
        WHERE
            assignment_id = %s AND is_watermarked = false
    """, [assignment_id]).fetchall()

    # Connect to S3. We will use this to client to download images.
    # Just like app/src/lib/server/s3.ts, this would read from .env in prod.
    s3_client = connect_to_s3()
    for image_id, s3_key in images:
        process_image(db_conn, s3_client, image_id, s3_key)

    # Assuming all of the above worked, we have finished this stage of processing assignments.
    db_conn.execute("""
        UPDATE
            assignments
        SET
            state = 'AWAITING_DONE' :: assignment_state
        WHERE
            id = %s
    """, [assignment_id])
    db_conn.commit()

    notify_owner(beanstakd_conn, db_conn, assignment_id)

beanstalk = connect_to_beanstalkd()
beanstalk.watch("watermarking")

while True:
    job = beanstalk.reserve()
    if not job:
        continue
    print(f"!! Got job: {job.body}")

    job_body = json.loads(job.body)
    process_assignment(beanstalk, job_body["assignmentId"])

    job.delete()