1
0
mirror of https://git.sr.ht/~cadence/cloudtube synced 2024-11-14 12:27:28 +00:00
cloudtube/background/feed-update.js
2021-08-23 23:43:23 +12:00

146 lines
4.1 KiB
JavaScript

const Denque = require("denque")
/** @type {import("node-fetch").default} */
// @ts-ignore
const fetch = require("node-fetch")
const constants = require("../utils/constants")
const db = require("../utils/db")
const prepared = {
video_insert: db.prepare(
"INSERT OR IGNORE INTO Videos"
+ " ( videoId, title, author, authorId, published, viewCountText, descriptionHtml)"
+ " VALUES"
+ " (@videoId, @title, @author, @authorId, @published, @viewCountText, @descriptionHtml)"
),
channel_refreshed_update: db.prepare(
"UPDATE Channels SET refreshed = ? WHERE ucid = ?"
),
unsubscribe_all_from_channel: db.prepare(
"UPDATE Subscriptions SET channel_missing = 1 WHERE ucid = ?"
)
}
class RefreshQueue {
constructor() {
this.set = new Set()
this.queue = new Denque()
this.lastLoadTime = 0
}
isEmpty() {
return this.queue.isEmpty()
}
load() {
// get the next set of scheduled channels to refresh
const afterTime = Date.now() - constants.caching.seen_token_subscriptions_eligible
const channels = db.prepare(
"SELECT DISTINCT Subscriptions.ucid FROM SeenTokens INNER JOIN Subscriptions ON SeenTokens.token = Subscriptions.token AND SeenTokens.seen > ? WHERE Subscriptions.channel_missing = 0 ORDER BY SeenTokens.seen DESC"
).pluck().all(afterTime)
this.addLast(channels)
this.lastLoadTime = Date.now()
}
addNext(items) {
for (const i of items) {
this.queue.unshift(i)
this.set.add(i)
}
}
addLast(items) {
for (const i of items) {
this.queue.push(i)
this.set.add(i)
}
}
next() {
if (this.isEmpty()) {
throw new Error("Cannot get next of empty refresh queue")
}
const item = this.queue.shift()
this.set.delete(item)
return item
}
}
class Refresher {
constructor() {
this.sym = constants.symbols.refresher
this.refreshQueue = new RefreshQueue()
this.state = this.sym.ACTIVE
this.waitingTimeout = null
this.next()
}
refreshChannel(ucid) {
return fetch(`${constants.server_setup.local_instance_origin}/api/v1/channels/${ucid}/latest`).then(res => res.json()).then(root => {
if (Array.isArray(root)) {
root.forEach(video => {
// organise
video.descriptionHtml = video.descriptionHtml.replace(/<a /g, '<a tabindex="-1" ') // should be safe
video.viewCountText = null //TODO?
// store
prepared.video_insert.run(video)
})
// update channel refreshed
prepared.channel_refreshed_update.run(Date.now(), ucid)
// console.log(`updated ${root.length} videos for channel ${ucid}`)
} else if (root.identifier === "PUBLISHED_DATES_NOT_PROVIDED") {
return [] // nothing we can do. skip this iteration.
} else if (root.identifier === "NOT_FOUND") {
// the channel does not exist. we should unsubscribe all users so we don't try again.
// console.log(`channel ${ucid} does not exist, unsubscribing all users`)
prepared.unsubscribe_all_from_channel.run(ucid)
} else {
throw new Error(root.error)
}
})
}
next() {
if (this.refreshQueue.isEmpty()) {
const timeSinceLastLoop = Date.now() - this.refreshQueue.lastLoadTime
if (timeSinceLastLoop < constants.caching.subscriptions_refresh_loop_min) {
const timeToWait = constants.caching.subscriptions_refresh_loop_min - timeSinceLastLoop
// console.log(`waiting ${timeToWait} before next loop`)
this.state = this.sym.WAITING
this.waitingTimeout = setTimeout(() => this.next(), timeToWait)
return
} else {
this.refreshQueue.load()
}
}
if (!this.refreshQueue.isEmpty()) {
this.state = this.sym.ACTIVE
const ucid = this.refreshQueue.next()
this.refreshChannel(ucid).then(() => this.next()).catch(error => {
// Problems related to fetching from the instance?
// All we can do is retry later.
// However, skip this channel this time in case the problem will occur every time.
console.error("Error in background refresh:\n", error)
setTimeout(() => {
this.next()
}, 10e3)
})
} else {
this.state = this.sym.EMPTY
}
}
skipWaiting() {
if (this.state !== this.sym.ACTIVE) {
clearTimeout(this.waitingTimeout)
this.refreshQueue.lastLoadTime = 0
this.next()
}
}
}
const refresher = new Refresher()
module.exports.refresher = refresher