diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr index b70e9ef44..a134cbec1 100644 --- a/src/invidious/jobs/notification_job.cr +++ b/src/invidious/jobs/notification_job.cr @@ -35,14 +35,21 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) } # hash of channels to their videos (id+published) that need notifying - to_notify = Hash(String, Set(VideoNotification)).new(->(hash : Hash(String, Set(VideoNotification)), key : String) { hash[key] = Set(VideoNotification).new }) + to_notify = Hash(String, Set(VideoNotification)).new( + ->(hash : Hash(String, Set(VideoNotification)), key : String) { + hash[key] = Set(VideoNotification).new + } + ) + notify_mutex = Mutex.new() # fiber to locally cache all incoming notifications (from pubsub webhooks and refresh channels job) spawn do begin loop do notification = notification_channel.receive + notify_mutex.lock to_notify[notification.channel_id] << notification + notify_mutex.unlock end end end @@ -51,8 +58,10 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob loop do begin LOGGER.debug("NotificationJob: waking up") + notify_mutex.lock cloned = to_notify.clone to_notify.clear + notify_mutex.unlock cloned.each do |channel_id, notifications| if notifications.empty? @@ -63,14 +72,16 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob if CONFIG.enable_user_notifications video_ids = notifications.map { |n| n.video_id } Invidious::Database::Users.add_multiple_notifications(channel_id, video_ids) - notifications.each do |n| - # Deliver notifications to `/api/v1/auth/notifications` - payload = { - "topic" => n.channel_id, - "videoId" => n.video_id, - "published" => n.published.to_unix, - }.to_json - PG_DB.exec("NOTIFY notifications, E'#{payload}'") + PG_DB.using_connection do |conn| + notifications.each do |n| + # Deliver notifications to `/api/v1/auth/notifications` + payload = { + "topic" => n.channel_id, + "videoId" => n.video_id, + "published" => n.published.to_unix, + }.to_json + conn.exec("NOTIFY notifications, E'#{payload}'") + end end else Invidious::Database::Users.feed_needs_update(channel_id)