diff options
author | Linnnus <[email protected]> | 2025-02-22 06:50:02 +0100 |
---|---|---|
committer | Linnnus <[email protected]> | 2025-02-22 06:50:02 +0100 |
commit | 6412f46a45d3b66c85c0cc3952206ad9cca0a110 (patch) | |
tree | 903016674595a980e2f443aec075d9c92a36c205 /watermark_worker | |
parent | b42bfa3abcd29cb977fbdc41a02d9f7f1ffeb1a2 (diff) |
Add watermarking service, fix everything
Diffstat (limited to 'watermark_worker')
-rw-r--r-- | watermark_worker/watermark.py | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/watermark_worker/watermark.py b/watermark_worker/watermark.py new file mode 100644 index 0000000..f2b02f0 --- /dev/null +++ b/watermark_worker/watermark.py @@ -0,0 +1,122 @@ +import PIL.Image +import PIL.ImageDraw +import PIL.ImageFont +import boto3 +from botocore.credentials import json +import psycopg +import io +import tempfile +import beanstalkc + +# FIXME: These connection functions should obviously load the correct data in a full system. + +def connect_to_db(): + return psycopg.connect("dbname=testdb user=postgres") + +def connect_to_s3(): + return boto3.client("s3", + endpoint_url="http://localhost:9000", + aws_access_key_id="minioadmin", + aws_secret_access_key="minioadmin", + verify=False) # disable SSL + +def connect_to_beanstalkd(): + return beanstalkc.Connection("localhost", 11300) + +def watermark(image: PIL.Image.Image) -> PIL.Image.Image: + w, h = image.size + padding = 0.05 * min(w, h) + x = padding + y = h - padding + font_size = 0.1 * min(w, h) + + watermark_image = image.copy() + draw = PIL.ImageDraw.ImageDraw(watermark_image) + draw.text((x, y), "Memmora", font_size=font_size, anchor="ls") + + return watermark_image + +def process_image(db_conn: psycopg.Connection, s3_client, image_id: int, s3_key: str): + IMAGES_BUCKET = "images" + + print(f"Processing image {image_id} ({s3_key})...") + + # Retrieve image from S3 + print(">> Retrieving image") + original_image_bytes = io.BytesIO() + s3_client.download_fileobj(IMAGES_BUCKET, s3_key, original_image_bytes) + + # Process image + print(">> Watermarking image") + original_image = PIL.Image.open(original_image_bytes) + watermarked_image = watermark(original_image) + watermarked_image_bytes = io.BytesIO() + watermarked_image.save(watermarked_image_bytes, format="png") + + # Update S3 object + # FIXME: API sucks ass, avoid writing tmp file. At least /tmp should be ramfs. + print(">> Uploading watermarked image to S3") + with tempfile.NamedTemporaryFile("wb", delete_on_close=False) as fp: + fp.write(watermarked_image_bytes.getbuffer()) + fp.close() + s3_client.upload_file(fp.name, IMAGES_BUCKET, s3_key) + + # Mark the image as processed in the Database + # FIXME: If this step fails, we might have duplicate processing. If this + # data was stored as custom metadata in S3, the upload operation would + # atomically set the flag. OTOH the frontend would have a harder time getting to it. + print(">> Updating image in DB") + db_conn.execute(""" + UPDATE + images + SET + is_watermarked = TRUE + WHERE + id = %s + """, [image_id]) + db_conn.commit() + +def process_assignment(assignment_id: int): + # Retrieve all UNPROCESSED images from DB. + # Some images may have already been processed by an earlier, partial job. + db_conn = connect_to_db() + images = db_conn.execute(""" + SELECT + id, + s3_path + FROM + images + WHERE + assignment_id = %s AND is_watermarked = false + """, [assignment_id]).fetchall() + + # Connect to S3. We will use this to client to download images. + # Just like app/src/lib/server/s3.ts, this would read from .env in prod. + s3_client = connect_to_s3() + for image_id, s3_key in images: + process_image(db_conn, s3_client, image_id, s3_key) + + # Assuming all of the above worked, we have finished this stage of processing assignments. + db_conn.execute(""" + UPDATE + assignments + SET + state = 'AWAITING_OWNER_NOTIFICATION' :: assignment_state + WHERE + id = %s + """, [assignment_id]) + db_conn.commit() + +beanstalk = connect_to_beanstalkd() +beanstalk.watch("watermarking") + +while True: + job = beanstalk.reserve() + if not job: + continue + print(f"!! Got job: {job.body}") + + job_body = json.loads(job.body) + process_assignment(job_body["assignmentId"]) + + job.delete() |