Implemented Refresh-Ahead caching for Redis
This updates the AdapterCacheRedis instance to be able to handle updating itself when reading from the cache. For this to work we need to pass a `fetchData` function to the `get` method. In the case of a cache miss, we will read the data via the `fetchData` function, and store it in the cache, before returning the value to the caller. When coupled with a `refreshAheadFactor` config, we will go a step further and implement the "Refresh Ahead" caching strategy. What this means is that we will refresh the contents of the cache in the background, this happens on a cache read and only once the data in the cache has only a certain percentage of the TTL left, which is set as a decimal value between 0 and 1. e.g. ttl = 100s refreshAheadFactor = 0.2; Any read from the cache that happens _after_ 80s will do a background refresh
This commit is contained in:
parent
ef999c4fd4
commit
f34999a51f
@ -17,6 +17,7 @@ class AdapterCacheRedis extends BaseCacheAdapter {
|
||||
* @param {Object} [config.clusterConfig] - redis cluster config used in case no cache instance provided
|
||||
* @param {Object} [config.storeConfig] - extra redis client config used in case no cache instance provided
|
||||
* @param {Number} [config.ttl] - default cached value Time To Live (expiration) in *seconds*
|
||||
* @param {Number} [config.refreshAheadFactor] - 0-1 number to use to determine how old (as a percentage of ttl) an entry should be before refreshing it
|
||||
* @param {String} [config.keyPrefix] - prefix to use when building a unique cache key, e.g.: 'some_id:image-sizes:'
|
||||
* @param {Boolean} [config.reuseConnection] - specifies if the redis store/connection should be reused within the process
|
||||
*/
|
||||
@ -57,6 +58,9 @@ class AdapterCacheRedis extends BaseCacheAdapter {
|
||||
});
|
||||
}
|
||||
|
||||
this.ttl = config.ttl;
|
||||
this.refreshAheadFactor = config.refreshAheadFactor || 0;
|
||||
this.currentlyExecutingBackgroundRefreshes = new Set();
|
||||
this.keyPrefix = config.keyPrefix;
|
||||
this._keysPattern = config.keyPrefix ? `${config.keyPrefix}*` : '';
|
||||
this.redisClient = this.cache.store.getClient();
|
||||
@ -134,11 +138,67 @@ class AdapterCacheRedis extends BaseCacheAdapter {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {String} key
|
||||
* @param {String} internalKey
|
||||
*/
|
||||
async get(key) {
|
||||
async shouldRefresh(internalKey) {
|
||||
if (this.refreshAheadFactor === 0) {
|
||||
debug(`shouldRefresh ${internalKey}: false - refreshAheadFactor = 0`);
|
||||
return false;
|
||||
}
|
||||
if (this.refreshAheadFactor === 1) {
|
||||
debug(`shouldRefresh ${internalKey}: true - refreshAheadFactor = 1`);
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
return await this.cache.get(this._buildKey(key));
|
||||
const ttlRemainingForEntry = await this.cache.ttl(internalKey);
|
||||
const shouldRefresh = ttlRemainingForEntry < this.refreshAheadFactor * this.ttl;
|
||||
debug(`shouldRefresh ${internalKey}: ${shouldRefresh} - TTL remaining = ${ttlRemainingForEntry}`);
|
||||
return shouldRefresh;
|
||||
} catch (err) {
|
||||
logging.error(err);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} key
|
||||
* @param {() => Promise<any>} [fetchData] An optional function to fetch the data, which will be used in the case of a cache MISS or a background refresh
|
||||
*/
|
||||
async get(key, fetchData) {
|
||||
const internalKey = this._buildKey(key);
|
||||
try {
|
||||
const result = await this.cache.get(internalKey);
|
||||
debug(`get ${internalKey}: Cache ${result ? 'HIT' : 'MISS'}`);
|
||||
if (!fetchData) {
|
||||
return result;
|
||||
}
|
||||
if (result) {
|
||||
const shouldRefresh = await this.shouldRefresh(internalKey);
|
||||
const isRefreshing = this.currentlyExecutingBackgroundRefreshes.has(internalKey);
|
||||
if (isRefreshing) {
|
||||
debug(`Background refresh already happening for ${internalKey}`);
|
||||
}
|
||||
if (!isRefreshing && shouldRefresh) {
|
||||
debug(`Doing background refresh for ${internalKey}`);
|
||||
this.currentlyExecutingBackgroundRefreshes.add(internalKey);
|
||||
fetchData().then(async (data) => {
|
||||
await this.set(key, data); // We don't use `internalKey` here because `set` handles it
|
||||
this.currentlyExecutingBackgroundRefreshes.delete(internalKey);
|
||||
}).catch((error) => {
|
||||
this.currentlyExecutingBackgroundRefreshes.delete(internalKey);
|
||||
logging.error({
|
||||
message: 'There was an error refreshing cache data in the background',
|
||||
error: error
|
||||
});
|
||||
});
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
const data = await fetchData();
|
||||
await this.set(key, data); // We don't use `internalKey` here because `set` handles it
|
||||
return data;
|
||||
}
|
||||
} catch (err) {
|
||||
logging.error(err);
|
||||
}
|
||||
@ -150,9 +210,10 @@ class AdapterCacheRedis extends BaseCacheAdapter {
|
||||
* @param {*} value
|
||||
*/
|
||||
async set(key, value) {
|
||||
debug('set', key);
|
||||
const internalKey = this._buildKey(key);
|
||||
debug('set', internalKey);
|
||||
try {
|
||||
return await this.cache.set(this._buildKey(key), value);
|
||||
return await this.cache.set(internalKey, value);
|
||||
} catch (err) {
|
||||
logging.error(err);
|
||||
}
|
||||
|
@ -59,6 +59,127 @@ describe('Adapter Cache Redis', function () {
|
||||
|
||||
assert.equal(value, 'value from cache');
|
||||
});
|
||||
it('can update the cache in the case of a cache miss', async function () {
|
||||
const KEY = 'update-cache-on-miss';
|
||||
let cachedValue = null;
|
||||
const redisCacheInstanceStub = {
|
||||
get: function (key) {
|
||||
assert(key === KEY);
|
||||
return cachedValue;
|
||||
},
|
||||
set: function (key, value) {
|
||||
assert(key === KEY);
|
||||
cachedValue = value;
|
||||
},
|
||||
store: {
|
||||
getClient: sinon.stub().returns({
|
||||
on: sinon.stub()
|
||||
})
|
||||
}
|
||||
};
|
||||
const cache = new RedisCache({
|
||||
cache: redisCacheInstanceStub
|
||||
});
|
||||
|
||||
const fetchData = sinon.stub().resolves('Da Value');
|
||||
|
||||
checkFirstRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 1);
|
||||
assert.equal(value, 'Da Value');
|
||||
break checkFirstRead;
|
||||
}
|
||||
|
||||
checkSecondRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 1);
|
||||
assert.equal(value, 'Da Value');
|
||||
break checkSecondRead;
|
||||
}
|
||||
});
|
||||
|
||||
it('Can do a background update of the cache', async function () {
|
||||
const KEY = 'update-cache-in-background';
|
||||
let cachedValue = null;
|
||||
let remainingTTL = 100;
|
||||
|
||||
const redisCacheInstanceStub = {
|
||||
get: function (key) {
|
||||
assert(key === KEY);
|
||||
return cachedValue;
|
||||
},
|
||||
set: function (key, value) {
|
||||
assert(key === KEY);
|
||||
cachedValue = value;
|
||||
},
|
||||
ttl: function (key) {
|
||||
assert(key === KEY);
|
||||
return remainingTTL;
|
||||
},
|
||||
store: {
|
||||
getClient: sinon.stub().returns({
|
||||
on: sinon.stub()
|
||||
})
|
||||
}
|
||||
};
|
||||
const cache = new RedisCache({
|
||||
cache: redisCacheInstanceStub,
|
||||
ttl: 100,
|
||||
refreshAheadFactor: 0.2
|
||||
});
|
||||
|
||||
const fetchData = sinon.stub();
|
||||
fetchData.onFirstCall().resolves('First Value');
|
||||
fetchData.onSecondCall().resolves('Second Value');
|
||||
|
||||
checkFirstRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 1);
|
||||
assert.equal(value, 'First Value');
|
||||
break checkFirstRead;
|
||||
}
|
||||
|
||||
// We simulate having been in the cache for 15 seconds
|
||||
remainingTTL = 85;
|
||||
|
||||
checkSecondRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 1);
|
||||
assert.equal(value, 'First Value');
|
||||
break checkSecondRead;
|
||||
}
|
||||
|
||||
// We simulate having been in the cache for 30 seconds
|
||||
remainingTTL = 70;
|
||||
|
||||
checkThirdRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 1);
|
||||
assert.equal(value, 'First Value');
|
||||
break checkThirdRead;
|
||||
}
|
||||
|
||||
// We simulate having been in the cache for 85 seconds
|
||||
// This should trigger a background refresh
|
||||
remainingTTL = 15;
|
||||
|
||||
checkFourthRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 2);
|
||||
assert.equal(value, 'First Value');
|
||||
break checkFourthRead;
|
||||
}
|
||||
|
||||
// We reset the TTL to 100 for the most recent write
|
||||
remainingTTL = 100;
|
||||
|
||||
checkFifthRead: {
|
||||
const value = await cache.get(KEY, fetchData);
|
||||
assert.equal(fetchData.callCount, 2);
|
||||
assert.equal(value, 'Second Value');
|
||||
break checkFifthRead;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('set', function () {
|
||||
|
@ -238,8 +238,7 @@ const pipeline = (apiController, apiUtils, apiType) => {
|
||||
const cacheKey = stringify(cacheKeyData);
|
||||
|
||||
if (apiImpl.cache) {
|
||||
const response = await apiImpl.cache.get(cacheKey);
|
||||
|
||||
const response = await apiImpl.cache.get(cacheKey, getResponse);
|
||||
if (response) {
|
||||
return Promise.resolve(response);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user