summaryrefslogtreecommitdiff
path: root/watermark_worker/watermark.py
diff options
context:
space:
mode:
Diffstat (limited to 'watermark_worker/watermark.py')
-rw-r--r--watermark_worker/watermark.py122
1 files changed, 122 insertions, 0 deletions
diff --git a/watermark_worker/watermark.py b/watermark_worker/watermark.py
new file mode 100644
index 0000000..f2b02f0
--- /dev/null
+++ b/watermark_worker/watermark.py
@@ -0,0 +1,122 @@
+import PIL.Image
+import PIL.ImageDraw
+import PIL.ImageFont
+import boto3
+from botocore.credentials import json
+import psycopg
+import io
+import tempfile
+import beanstalkc
+
+# FIXME: These connection functions should obviously load the correct data in a full system.
+
+def connect_to_db():
+ return psycopg.connect("dbname=testdb user=postgres")
+
+def connect_to_s3():
+ return boto3.client("s3",
+ endpoint_url="http://localhost:9000",
+ aws_access_key_id="minioadmin",
+ aws_secret_access_key="minioadmin",
+ verify=False) # disable SSL
+
+def connect_to_beanstalkd():
+ return beanstalkc.Connection("localhost", 11300)
+
+def watermark(image: PIL.Image.Image) -> PIL.Image.Image:
+ w, h = image.size
+ padding = 0.05 * min(w, h)
+ x = padding
+ y = h - padding
+ font_size = 0.1 * min(w, h)
+
+ watermark_image = image.copy()
+ draw = PIL.ImageDraw.ImageDraw(watermark_image)
+ draw.text((x, y), "Memmora", font_size=font_size, anchor="ls")
+
+ return watermark_image
+
+def process_image(db_conn: psycopg.Connection, s3_client, image_id: int, s3_key: str):
+ IMAGES_BUCKET = "images"
+
+ print(f"Processing image {image_id} ({s3_key})...")
+
+ # Retrieve image from S3
+ print(">> Retrieving image")
+ original_image_bytes = io.BytesIO()
+ s3_client.download_fileobj(IMAGES_BUCKET, s3_key, original_image_bytes)
+
+ # Process image
+ print(">> Watermarking image")
+ original_image = PIL.Image.open(original_image_bytes)
+ watermarked_image = watermark(original_image)
+ watermarked_image_bytes = io.BytesIO()
+ watermarked_image.save(watermarked_image_bytes, format="png")
+
+ # Update S3 object
+ # FIXME: API sucks ass, avoid writing tmp file. At least /tmp should be ramfs.
+ print(">> Uploading watermarked image to S3")
+ with tempfile.NamedTemporaryFile("wb", delete_on_close=False) as fp:
+ fp.write(watermarked_image_bytes.getbuffer())
+ fp.close()
+ s3_client.upload_file(fp.name, IMAGES_BUCKET, s3_key)
+
+ # Mark the image as processed in the Database
+ # FIXME: If this step fails, we might have duplicate processing. If this
+ # data was stored as custom metadata in S3, the upload operation would
+ # atomically set the flag. OTOH the frontend would have a harder time getting to it.
+ print(">> Updating image in DB")
+ db_conn.execute("""
+ UPDATE
+ images
+ SET
+ is_watermarked = TRUE
+ WHERE
+ id = %s
+ """, [image_id])
+ db_conn.commit()
+
+def process_assignment(assignment_id: int):
+ # Retrieve all UNPROCESSED images from DB.
+ # Some images may have already been processed by an earlier, partial job.
+ db_conn = connect_to_db()
+ images = db_conn.execute("""
+ SELECT
+ id,
+ s3_path
+ FROM
+ images
+ WHERE
+ assignment_id = %s AND is_watermarked = false
+ """, [assignment_id]).fetchall()
+
+ # Connect to S3. We will use this to client to download images.
+ # Just like app/src/lib/server/s3.ts, this would read from .env in prod.
+ s3_client = connect_to_s3()
+ for image_id, s3_key in images:
+ process_image(db_conn, s3_client, image_id, s3_key)
+
+ # Assuming all of the above worked, we have finished this stage of processing assignments.
+ db_conn.execute("""
+ UPDATE
+ assignments
+ SET
+ state = 'AWAITING_OWNER_NOTIFICATION' :: assignment_state
+ WHERE
+ id = %s
+ """, [assignment_id])
+ db_conn.commit()
+
+beanstalk = connect_to_beanstalkd()
+beanstalk.watch("watermarking")
+
+while True:
+ job = beanstalk.reserve()
+ if not job:
+ continue
+ print(f"!! Got job: {job.body}")
+
+ job_body = json.loads(job.body)
+ process_assignment(job_body["assignmentId"])
+
+ job.delete()