import psycopg import beanstalkc import json def connect_to_beanstalkd(): return beanstalkc.Connection("localhost", 11300, parse_yaml=False, encoding="utf-8") def connect_to_db(): return psycopg.connect("dbname=testdb user=postgres") def handle_job(db_conn: psycopg.Connection, job: beanstalkc.Job): body = json.loads(job.body) user_id = body["userId"] message = body["message"] url = body["url"] # TODO: Fetch push creds from DB by userId # TODO: Send event to push server # Super ugly hack for demo import os os.system(f""" curl https://notifications.linus.onl/api/send-notification/40a466025b3aac7 \ --request POST \ --header "Content-Type: application/json" \ --data '{ "title": "{message}", "url": "{url}" }' """) print(f"PRETEND SENDIGN MESSAGE {user_id=} {message=} {url=}") print(f"PRETEND SENDIGN MESSAGE {user_id=} {message=} {url=}") print(f"PRETEND SENDIGN MESSAGE {user_id=} {message=} {url=}") print(f"PRETEND SENDIGN MESSAGE {user_id=} {message=} {url=}") print(f"PRETEND SENDIGN MESSAGE {user_id=} {message=} {url=}") def main(): beanstalkc_conn = connect_to_beanstalkd() db_conn = connect_to_db() beanstalkc_conn.watch("notification") while True: job = beanstalkc_conn.reserve() if not job: continue handle_job(db_conn, job) job.delete() if __name__ == "__main__": main()