fix-racy-store.mjs 4.41 KB
import { EventEmitter } from 'node:events'
import { isEqual } from 'lodash-es'

const READ = Symbol('READ')
const TOUCH = Symbol('TOUCH')
const WRITE = Symbol('WRITE')

export default function (session) {
  var Store = session.Store

  function FixRacyStore(options = {}) {
    var self = this

    Store.call(this, options)
    this.options = options
    this.readEvents = new EventEmitter()
    this.writeEvents = new EventEmitter()
    this.cache = {}
    this.clearCache = {}
  }

  FixRacyStore.prototype.__proto__ = Store.prototype

  FixRacyStore.prototype._setCache = function(sessionId, session, newMode, downgrade) {
    const { [ sessionId ]: { timerId, writes, mode = READ } = {} } = this.cache
    //console.log(Date.now() + ':setCache', {sessionId, mode, newMode, downgrade})
    const newEntry = {timerId, writes, session, mode: (mode === READ || downgrade) ? newMode : mode}
    this.cache[ sessionId ] = newEntry
    if (mode === newMode && newMode === TOUCH) {
      return newEntry
    }
    clearTimeout(timerId)
    if (newEntry.mode !== WRITE) {
      newEntry.timerId = setTimeout(() => {
        try {
        const { [ sessionId ]: { mode } } = this.cache
        if (mode === TOUCH) {
          //console.log(Date.now() + ':flushing cache', sessionId)
          this._flush(sessionId)
        } else {
          //console.log(Date.now() + ':clearing cache', sessionId)
          delete this.cache[ sessionId ]
        }
        } catch (e) {
          console.error(e)
        }
      }, 1000)
    }
    return newEntry
  }

  FixRacyStore.prototype.get = function(sessionId, callback = (err, session) => {}) {
    const { [ sessionId ]: { session } = {} } = this.cache
    if (session) {
      //console.log('read from memory', {sessionId})
      callback(null, session)
      return
    }
    this.readEvents.once(sessionId, callback)
    if (this.readEvents.listenerCount(sessionId) === 1) {
      //console.log('issuing get', {sessionId})
      this.options.store.get(sessionId, (err, session) => {
        //console.log('got session', {sessionId})
        if (!this.readEvents.listenerCount(sessionId)) {
          //console.log('race condition fixed!')
        }
        this._setCache(sessionId, session, READ)
        this.readEvents.emit(sessionId, err, session)
      })
    } else {
      //console.log('get already issued', {sessionId})
    }
  }

  FixRacyStore.prototype._flush = function (sessionId, ...callbacks) {
    const { session } = this.cache[ sessionId ]
    //console.log(Date.now() + ':writing', {sessionId, callbacks: callbacks.length})
    this.options.store.set(sessionId, session, (err, session) => {
      callbacks.forEach(callback => callback(err, session))
      const { [ sessionId ]: cacheEntry } = this.cache
      const { writes } = cacheEntry
      cacheEntry.writes = undefined
      if (writes === undefined || writes.length === 0) {
        this._setCache(sessionId, session, READ, true)
      } else {
        this._flush(sessionId, ...writes)
      }
    })
  }

  FixRacyStore.prototype.set = function (sessionId, session, callback) {
    if (isEqual(this.cache[ sessionId ], session)) {
      //console.log('set:isEqual', {sessionId})
      callback(null, session)
      return
    }
    const cacheEntry = this._setCache(sessionId, session, WRITE)
    //console.log(Date.now() + ':set', {sessionId, cacheEntry})
    this.readEvents.emit(sessionId, null, session)
    if (cacheEntry.writes === undefined) {
      cacheEntry.writes = []
      this._flush(sessionId, callback)
    } else {
      cacheEntry.writes.push(callback)
    }
  }

  FixRacyStore.prototype.touch = function (sessionId, session, callback) {
    //console.log(Date.now() + ':touch', {sessionId})
    const cacheEntry = this._setCache(sessionId, session, TOUCH)
    callback(null, session)
    return
  }

  FixRacyStore.prototype.destroy = function (sessionId, callback) {
    this.readEvents.removeAllListeners(sessionId)
    this.options.store.destroy(sessionId, callback)
  }

  FixRacyStore.prototype.clear = function (callback) {
    this.readEvents.removeAllListeners()
    this.options.store.clear(callback)
  }

  FixRacyStore.prototype.length = function (callback) {
    this.options.store.length(callback)
  }

  FixRacyStore.prototype.list = function (callback) {
    this.options.store.list(callback)
  }

  FixRacyStore.prototype.expired = function (sessionId, callback) {
    this.options.store.expired(sessionId, callback)
  }

  return FixRacyStore
}