2020-09-23 11:45:02 +00:00
const Denque = require ( "denque" )
const fetch = require ( "node-fetch" )
2020-09-23 12:05:02 +00:00
const constants = require ( "../utils/constants" )
const db = require ( "../utils/db" )
2020-08-31 13:22:16 +00:00
2020-09-23 11:45:02 +00:00
const prepared = {
video _insert : db . prepare (
"INSERT OR IGNORE INTO Videos"
2020-09-23 12:05:02 +00:00
+ " ( videoId, title, author, authorId, published, viewCountText, descriptionHtml)"
2020-09-23 11:45:02 +00:00
+ " VALUES"
2020-09-23 12:05:02 +00:00
+ " (@videoId, @title, @author, @authorId, @published, @viewCountText, @descriptionHtml)"
2020-09-23 12:48:32 +00:00
) ,
channel _refreshed _update : db . prepare (
"UPDATE Channels SET refreshed = ? WHERE ucid = ?"
2020-12-06 02:40:04 +00:00
) ,
unsubscribe _all _from _channel : db . prepare (
"DELETE FROM Subscriptions WHERE ucid = ?"
2020-09-23 11:45:02 +00:00
)
}
class RefreshQueue {
constructor ( ) {
this . set = new Set ( )
this . queue = new Denque ( )
this . lastLoadTime = 0
}
isEmpty ( ) {
return this . queue . isEmpty ( )
}
load ( ) {
// get the next set of scheduled channels to refresh
const afterTime = Date . now ( ) - constants . caching . seen _token _subscriptions _eligible
const channels = db . prepare (
"SELECT DISTINCT Subscriptions.ucid FROM SeenTokens INNER JOIN Subscriptions ON SeenTokens.token = Subscriptions.token AND SeenTokens.seen > ? ORDER BY SeenTokens.seen DESC"
) . pluck ( ) . all ( afterTime )
this . addLast ( channels )
this . lastLoadTime = Date . now ( )
}
addNext ( items ) {
for ( const i of items ) {
this . queue . unshift ( i )
this . set . add ( i )
}
}
addLast ( items ) {
for ( const i of items ) {
this . queue . push ( i )
this . set . add ( i )
}
}
next ( ) {
2020-09-23 12:05:02 +00:00
if ( this . isEmpty ( ) ) {
throw new Error ( "Cannot get next of empty refresh queue" )
}
2020-09-23 11:45:02 +00:00
const item = this . queue . shift ( )
this . set . delete ( item )
return item
}
}
2020-10-02 12:32:22 +00:00
class Refresher {
constructor ( ) {
this . sym = constants . symbols . refresher
this . refreshQueue = new RefreshQueue ( )
this . state = this . sym . ACTIVE
this . waitingTimeout = null
this . next ( )
}
2020-09-23 11:45:02 +00:00
2020-10-02 12:32:22 +00:00
refreshChannel ( ucid ) {
return fetch ( ` http://localhost:3000/api/v1/channels/ ${ ucid } /latest ` ) . then ( res => res . json ( ) ) . then ( root => {
if ( Array . isArray ( root ) ) {
root . forEach ( video => {
// organise
video . descriptionHtml = video . descriptionHtml . replace ( /<a /g , '<a tabindex="-1" ' ) // should be safe
video . viewCountText = null //TODO?
// store
prepared . video _insert . run ( video )
} )
// update channel refreshed
prepared . channel _refreshed _update . run ( Date . now ( ) , ucid )
console . log ( ` updated ${ root . length } videos for channel ${ ucid } ` )
} else if ( root . identifier === "PUBLISHED_DATES_NOT_PROVIDED" ) {
return [ ] // nothing we can do. skip this iteration.
2020-12-06 02:40:04 +00:00
} else if ( root . identifier === "NOT_FOUND" ) {
// the channel does not exist. we should unsubscribe all users so we don't try again.
console . log ( ` channel ${ ucid } does not exist, unsubscribing all users ` )
prepared . unsubscribe _all _from _channel . run ( ucid )
2020-10-02 12:32:22 +00:00
} else {
throw new Error ( root . error )
}
} )
}
next ( ) {
if ( this . refreshQueue . isEmpty ( ) ) {
const timeSinceLastLoop = Date . now ( ) - this . refreshQueue . lastLoadTime
if ( timeSinceLastLoop < constants . caching . subscriptions _refresh _loop _min ) {
const timeToWait = constants . caching . subscriptions _refresh _loop _min - timeSinceLastLoop
console . log ( ` waiting ${ timeToWait } before next loop ` )
this . state = this . sym . WAITING
this . waitingTimeout = setTimeout ( ( ) => this . next ( ) , timeToWait )
return
} else {
this . refreshQueue . load ( )
}
2020-09-23 11:45:02 +00:00
}
2020-10-02 12:32:22 +00:00
if ( ! this . refreshQueue . isEmpty ( ) ) {
this . state = this . sym . ACTIVE
const ucid = this . refreshQueue . next ( )
this . refreshChannel ( ucid ) . then ( ( ) => this . next ( ) )
2020-09-23 11:45:02 +00:00
} else {
2020-10-02 12:32:22 +00:00
this . state = this . sym . EMPTY
2020-09-23 11:45:02 +00:00
}
}
2020-10-02 12:32:22 +00:00
skipWaiting ( ) {
if ( this . state !== this . sym . ACTIVE ) {
clearTimeout ( this . waitingTimeout )
this . refreshQueue . lastLoadTime = 0
this . next ( )
}
}
2020-09-23 11:45:02 +00:00
}
2020-10-02 12:32:22 +00:00
const refresher = new Refresher ( )
module . exports . refresher = refresher