diff --git a/job_thread/main.py b/job_thread/main.py index e5bc6ee..2e3ccf4 100644 --- a/job_thread/main.py +++ b/job_thread/main.py @@ -1,7 +1,8 @@ +import json import logging -import sqlite3 +from copy import deepcopy from dataclasses import dataclass -from os import environ +from os import environ, fsync from time import time import feedparser @@ -11,7 +12,7 @@ SECONDS_IN_WEEK = 60 * 60 * 24 * 7 class Config: - DB_PATH = "posts.db" + DB_PATH = "db.json" SUBREDDIT = "developersindia" POST_FLAIR = "Hiring" @@ -34,34 +35,79 @@ Stay tuned for updates on the latest job openings, and apply for the ones that i FEED_URL = "https://developersindia.in/?feed=job_feed" -class Database: - def __init__(self, db_path): - self.conn = sqlite3.connect(db_path) - self.conn.row_factory = sqlite3.Row +@dataclass +class Post: + post_id: str + epoch: int - self.cur = self.conn.cursor() + +def dict_raise_or_set(d, key, value): + if d.get(key) is not None: + raise ValueError(f"Key {key} already present in dictionary") + + d[key] = value + + +class Database: + POSTS = "postid_epoch" + COMMENTS = "jobid_commentid" + + def __init__(self, db_path): + try: + self._fp = open(db_path, "r+") + self._db = json.loads(self._fp.read() or "{}") + except FileNotFoundError: + self._fp = open(db_path, "w") + self._db = {} + + self._copy = None self._create() + def __enter__(self): + self._copy = deepcopy(self._db) + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # We re-write the DB on each transaction as we cannot guarantee "atomicity" + # when dealing with external APIs. Eg, we can locally roll back a transaction + # inserting multiple posts into the DB in case of an exception, but the + # posts obviously won't be deleted on the reddit side. So we write + # as much as possible incrementally to prevent losing track of already created + # posts, preventing their re-creation server side in case of a crash. + if exc_type: + self._db = self._copy + # If a change was actually made + elif self._db != self._copy: + self._fp.seek(0) + self._fp.write(json.dumps(self._db, indent=4)) + self._fp.truncate() # Trim the file to the desired size + fsync(self._fp) + + self._copy = None + def _create(self): - with self.conn: - self.cur.execute( - "CREATE TABLE IF NOT EXISTS Posts" - "(post_id TEXT PRIMARY KEY, time INTEGER NOT NULL DEFAULT(UNIXEPOCH()))" - ) + with self: + self._db.setdefault(self.POSTS, {}) + self._db.setdefault(self.COMMENTS, {}) - def get_latest_post(self): - self.cur.execute("SELECT post_id, time from Posts ORDER BY time DESC") + def get_latest_post(self) -> Post | None: + # {"id": 1234, "id2": "5678"} -> ("id2", "5678") (Descending) + try: + result = sorted( + self._db[self.POSTS].items(), + key=lambda item: item[1], + reverse=True, + )[0] + except IndexError: + return None - if (result := self.cur.fetchone()) is not None: - return dict(result) + return Post(post_id=result[0], epoch=result[1]) - def insert_post(self, post_id: str, timestamp: int): - with self.conn: - self.cur.execute( - "INSERT INTO Posts (post_id, time) VALUES ((?), (?))", - (post_id, timestamp), - ) + def insert_post(self, post: Post): + with self: + dict_raise_or_set(self._db[self.POSTS], post.post_id, post.epoch) @dataclass @@ -94,14 +140,14 @@ def get_job_entries(feed_url): ] -def should_create_new_post(latest_post): +def should_create_new_post(latest_post: Post) -> bool: if latest_post is not None: - return (time() - latest_post["time"]) >= SECONDS_IN_WEEK + return (time() - latest_post.epoch) >= SECONDS_IN_WEEK return True -def create_job_post(subreddit): +def create_job_post(subreddit) -> Post: # https://old.reddit.com/r/redditdev/comments/ovte4q/praw_flair_a_post/h7doqmd/?context=3 flair = next( filter( @@ -117,7 +163,7 @@ def create_job_post(subreddit): ) submission.mod.sticky() - return submission + return Post(post_id=submission.id, epoch=submission.created_utc) def main(): @@ -141,24 +187,22 @@ def main(): if should_create_new_post(maybe_old_post): # Un-stick/pin the old post if maybe_old_post is not None: - logging.info(f"Un-pinning old post") + logging.info(f"Un-pinning old post {maybe_old_post}") try: - reddit.submission(maybe_old_post["post_id"]).mod.sticky( + reddit.submission(maybe_old_post.post_id).mod.sticky( state=False ) except Exception: logging.warning(f"Failed to un-pin post!", exc_info=True) - new_submission = create_job_post(subreddit) + new_post = create_job_post(subreddit) - logging.info( - f"Created new post {new_submission.id} at {new_submission.created_utc}" - ) + logging.info(f"Created new post {new_post}") - db.insert_post(new_submission.id, new_submission.created_utc) + db.insert_post(new_post) - submission = reddit.submission(db.get_latest_post()["post_id"]) + submission = reddit.submission(db.get_latest_post().post_id) logging.info(f"Fetched latest submission {submission.id}")