import PIL.Image import PIL.ImageDraw import PIL.ImageFont import boto3 from botocore.credentials import json import psycopg import io import tempfile import beanstalkc # FIXME: These connection functions should obviously load the correct data in a full system. def connect_to_db(): return psycopg.connect("dbname=testdb user=postgres") def connect_to_s3(): return boto3.client("s3", endpoint_url="http://localhost:9000", aws_access_key_id="minioadmin", aws_secret_access_key="minioadmin", verify=False) # disable SSL def connect_to_beanstalkd(): return beanstalkc.Connection("localhost", 11300, encoding="utf-8") def watermark(image: PIL.Image.Image) -> PIL.Image.Image: w, h = image.size padding = 0.05 * min(w, h) x = padding y = h - padding font_size = 0.1 * min(w, h) watermark_image = image.copy() draw = PIL.ImageDraw.ImageDraw(watermark_image) draw.text((x, y), "Memmora", font_size=font_size, anchor="ls") return watermark_image def process_image(db_conn: psycopg.Connection, s3_client, image_id: int, s3_key: str): IMAGES_BUCKET = "images" print(f"Processing image {image_id} ({s3_key})...") # Retrieve image from S3 print(">> Retrieving image") original_image_bytes = io.BytesIO() s3_client.download_fileobj(IMAGES_BUCKET, s3_key, original_image_bytes) # Process image print(">> Watermarking image") original_image = PIL.Image.open(original_image_bytes) watermarked_image = watermark(original_image) watermarked_image_bytes = io.BytesIO() watermarked_image.save(watermarked_image_bytes, format="png") # Update S3 object # FIXME: API sucks ass, avoid writing tmp file. At least /tmp should be ramfs. print(">> Uploading watermarked image to S3") with tempfile.NamedTemporaryFile("wb", delete_on_close=False) as fp: fp.write(watermarked_image_bytes.getbuffer()) fp.close() s3_client.upload_file(fp.name, IMAGES_BUCKET, s3_key) # Mark the image as processed in the Database # FIXME: If this step fails, we might have duplicate processing. If this # data was stored as custom metadata in S3, the upload operation would # atomically set the flag. OTOH the frontend would have a harder time getting to it. print(">> Updating image in DB") db_conn.execute(""" UPDATE images SET is_watermarked = TRUE WHERE id = %s """, [image_id]) db_conn.commit() def notify_owner(beanstakd_conn: beanstalkc.Connection, db_conn: psycopg.Connection, assignment_id: int): result = db_conn.execute(""" SELECT c.owner_id, a.cemetary_plot_id FROM assignments AS a INNER JOIN cemetary_plots AS c ON c.id = a.cemetary_plot_id WHERE a.id = %s; """, [assignment_id]).fetchone() assert result is not None owner_id, cemetary_plot_id = result beanstakd_conn.use("notification") body = json.dumps({ "userId": owner_id, "message": "Vi har besøgt dit gravsted", "url": f"http://localhost:8000/cemetary_plots/{cemetary_plot_id}", }) beanstakd_conn.put(body) def process_assignment(beanstakd_conn: beanstalkc.Connection, assignment_id: int): # Retrieve all UNPROCESSED images from DB. # Some images may have already been processed by an earlier, partial job. db_conn = connect_to_db() images = db_conn.execute(""" SELECT id, s3_path FROM images WHERE assignment_id = %s AND is_watermarked = false """, [assignment_id]).fetchall() # Connect to S3. We will use this to client to download images. # Just like app/src/lib/server/s3.ts, this would read from .env in prod. s3_client = connect_to_s3() for image_id, s3_key in images: process_image(db_conn, s3_client, image_id, s3_key) # Assuming all of the above worked, we have finished this stage of processing assignments. db_conn.execute(""" UPDATE assignments SET state = 'AWAITING_DONE' :: assignment_state WHERE id = %s """, [assignment_id]) db_conn.commit() notify_owner(beanstakd_conn, db_conn, assignment_id) beanstalk = connect_to_beanstalkd() beanstalk.watch("watermarking") while True: job = beanstalk.reserve() if not job: continue print(f"!! Got job: {job.body}") job_body = json.loads(job.body) process_assignment(beanstalk, job_body["assignmentId"]) job.delete()