#StackBounty: #javascript #node.js #asynchronous #cache Learning asynchronous caching by implementing one

Bounty: 150

After implementing a serial CLOCK second-chance cache in C++ without having enough rope to shoot myself in the foot, decided to dive into Javascript through NodeJs and write an asynchronous one. I think on basic functionality it’s nearly complete but there might be some things that I’ve missed on implementation and unit testing. Also what kind of design pattern could be better? I added Promise-based and multiple key request based versions of get and set methods.

Edit: using new Map instead of plain object for the mapping doubled the cache miss performance, lowered its latency for both reads and writes.

Implementation: (single file source )

'use strict';
/* 
* cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
* callbackBackingStoreLoad: user-given cache(read)-miss function to load data from datastore
*   takes 2 parameters: key, callback
*   example:
*       async function(key,callback){ 
*           redis.get(key,function(data,err){ 
*               callback(data); 
*           }); 
*       }
* callbackBackingStoreSave: user-given cache(write)-miss function to save data to datastore
*   takes 3 parameters: key, value, callback
*   example:
*       async function(key,value,callback){ 
*           redis.set(key,value,function(err){ 
*               callback(); 
*           }); 
*       }
* elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() or set() call with its key
* flush(): all in-flight get/set accesses are awaited and all edited keys are written back to backing-store. flushes the cache.
* reload(): evicts all cache to reload new values from backing store
* reloadKey(): only evicts selected item (to reload its new value on next access)
*
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
    const me = this;
    const aTypeGet = 0;
    const aTypeSet = 1;
    const maxWait = elementLifeTimeMs;
    const size = parseInt(cacheSize,10);
    const mapping = new Map();
    const mappingInFlightMiss = new Map();
    const bufData = new Array(size);
    const bufVisited = new Uint8Array(size);
    const bufEdited = new Uint8Array(size);
    const bufKey = new Array(size);
    const bufTime = new Float64Array(size);
    const bufLocked = new Uint8Array(size);
    for(let i=0;i<size;i++)
    {
        let rnd = Math.random();
        mapping.set(rnd,i);     
        bufData[i]="";
        bufVisited[i]=0;
        bufEdited[i]=0;
        bufKey[i]=rnd;
        bufTime[i]=0;
        bufLocked[i]=0;
    }
    let ctr = 0;
    let ctrEvict = parseInt(cacheSize/2,10);
    const loadData = callbackBackingStoreLoad;
    const saveData = callbackBackingStoreSave;
    let inFlightMissCtr = 0;
    // refresh all items time-span in cache
    this.reload=function(){
        for(let i=0;i<size;i++)
        {
            bufTime[i]=0;
        }
    };
    // refresh item time-span in cache by triggering eviction
    this.reloadKey=function(key){
        if(mapping.has(key))
        {
            bufTime[mapping[key]]=0;
        }
    };

    // get value by key
    this.get = function(keyPrm,callbackPrm){
        // aType=0: get
        access(keyPrm,callbackPrm,aTypeGet);
    };

    // set value by key (callback returns same value)
    this.set = function(keyPrm,valuePrm,callbackPrm){
        // aType=1: set
        access(keyPrm,callbackPrm,aTypeSet,valuePrm);
    };
    
    // aType=0: get
    // aType=1: set
    function access(keyPrm,callbackPrm,aType,valuePrm){
        
        const key = keyPrm;
        const callback = callbackPrm;
        const value = valuePrm;
        // stop dead-lock when many async get calls are made
        if(inFlightMissCtr>=size)
                {
                    setTimeout(function(){
                // get/set
                access(key,function(newData){
                    callback(newData);
                },aType,value);
            },0);
                    return;
            }
        
        // if key is busy, then delay the request towards end of the cache-miss completion
        if(mappingInFlightMiss.has(key))
        {
            
            setTimeout(function(){
                // get/set
                access(key,function(newData){
                    callback(newData);
                },aType,value);
            },0);
            return;
        }

        if(mapping.has(key))
        {
            // slot is an element in the circular buffer of CLOCK algorithm
            let slot = mapping.get(key);

            // RAM speed data
            if((Date.now() - bufTime[slot]) > maxWait)
            {
                
                // if slot is locked by another operation, postpone the current operation
                if(bufLocked[slot])
                {                                       
                    setTimeout(function(){
                        access(key,function(newData){
                            callback(newData);
                        },aType,value);
                    },0);
                    
                }
                else // slot is not locked and its lifespan has ended
                {
                    // if it was edited, update the backing-store first
                    if(bufEdited[slot] == 1)
                    {
                        bufLocked[slot] = 1;
                        bufEdited[slot]=0;
                        mappingInFlightMiss.set(key,1); // lock key
                        inFlightMissCtr++;
                        // update backing-store, this is async
                        saveData(bufKey[slot],bufData[slot],function(){ 
                            mappingInFlightMiss.delete(key);    // unlock key
                            bufLocked[slot] = 0;
                            inFlightMissCtr--;

                            mapping.delete(key); // disable mapping for current key
                            
                            // re-simulate the access, async
                            access(key,function(newData){
                                callback(newData);
                            },aType,value);

                        });
                    }
                    else
                    {
                        mapping.delete(key); // disable mapping for current key
                        access(key,function(newData){
                            
                            callback(newData);
                        },aType,value);
                    }
                }
                
            }
            else    // slot life span has not ended
            {
                bufVisited[slot]=1;
                bufTime[slot] = Date.now();

                // if it is a "set" operation
                if(aType == aTypeSet)
                {   
                    bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
                    bufData[slot] = value;
                }
                callback(bufData[slot]);
            }
        }
        else
        {
            // datastore loading + cache eviction
            let ctrFound = -1;
            let oldVal = 0;
            let oldKey = 0;
            while(ctrFound===-1)
            {
                // give slot a second chance before eviction
                if(!bufLocked[ctr] && bufVisited[ctr])
                {
                    bufVisited[ctr]=0;
                }
                ctr++;
                if(ctr >= size)
                {
                    ctr=0;
                }

                // eviction conditions
                if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
                {
                    // eviction preparations, lock the slot
                    bufLocked[ctrEvict] = 1;
                    inFlightMissCtr++;
                    ctrFound = ctrEvict;
                    oldVal = bufData[ctrFound];
                    oldKey = bufKey[ctrFound];
                }

                ctrEvict++;
                if(ctrEvict >= size)
                {
                    ctrEvict=0;
                }
            }
            
            // user-requested key is now asynchronously in-flight & locked for other operations
            mappingInFlightMiss.set(key,1);
            
            // eviction function. least recently used data is gone, newest recently used data is assigned
            let evict = function(res){

                mapping.delete(bufKey[ctrFound]);

                bufData[ctrFound]=res;
                bufVisited[ctrFound]=0;
                bufKey[ctrFound]=key;
                bufTime[ctrFound]=Date.now();
                bufLocked[ctrFound]=0;

                mapping.set(key,ctrFound);
                callback(res);
                inFlightMissCtr--;
                mappingInFlightMiss.delete(key);
            
            };

            // if old data was edited, send it to data-store first, then fetch new data
            if(bufEdited[ctrFound] == 1)
            {
                if(aType == aTypeGet)
                    bufEdited[ctrFound] = 0;

                // old edited data is sent back to data-store
                saveData(oldKey,oldVal,function(){ 
                    if(aType == aTypeGet)
                        loadData(key,evict);
                    else if(aType == aTypeSet)
                        evict(value);
                });
            }
            else
            {
                if(aType == aTypeSet)
                    bufEdited[ctrFound] = 1;
                if(aType == aTypeGet)
                    loadData(key,evict);
                else if(aType == aTypeSet)
                    evict(value);   
            }
        }
    };

    this.getAwaitable = function(key){
        return new Promise(function(success,fail){ 
            me.get(key,function(data){
                success(data);
            });
        });
    }

    this.setAwaitable = function(key,value){
        return new Promise(function(success,fail){ 
            me.set(key,value,function(data){
                success(data);
            });
        });
    }

    // as many keys as required can be given, separated by commas
    this.getMultiple = function(callback, ... keys){
        let result = [];
        let ctr1 = keys.length;
        for(let i=0;i<ctr1;i++)
            result.push(0);
        let ctr2 = 0;
        keys.forEach(function(key){
            let ctr3 = ctr2++;
            me.get(key,function(data){
                result[ctr3] = data;
                ctr1--;
                if(ctr1==0)
                {
                    callback(result);
                }
            });
        });
    };

    // as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
    this.setMultiple = function(callback, ... keyValuePairs){
        let result = [];
        let ctr1 = keyValuePairs.length;
        for(let i=0;i<ctr1;i++)
            result.push(0);
        let ctr2 = 0;
        keyValuePairs.forEach(function(pair){
            let ctr3 = ctr2++;
            me.set(pair.key,pair.value,function(data){
                result[ctr3] = data;
                ctr1--;
                if(ctr1==0)
                {
                    callback(result);
                }
            });
        });
    };

    // as many keys as required can be given, separated by commas
    this.getMultipleAwaitable = function(... keys){
        return new Promise(function(success,fail){
            me.getMultiple(function(results){
                success(results);
            }, ... keys);
        });
    };

    // as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
    this.setMultipleAwaitable = function(... keyValuePairs){
        return new Promise(function(success,fail){
            me.setMultiple(function(results){
                success(results);
            }, ... keyValuePairs);
        });
    };

    // push all edited slots to backing-store and reset all slots lifetime to "out of date"
    this.flush = function(callback){

        function waitForReadWrite(callbackW){

            // if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
            if(mappingInFlightMiss.size > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
            {
                setTimeout(()=>{ waitForReadWrite(callbackW); },10);
            }
            else
                callbackW();
        }
        waitForReadWrite(async function(){  
            for(let i=0;i<size;i++)
            {
                bufTime[i]=0;
                if(bufEdited[i] == 1)
                {       
                    // less concurrency pressure, less failure
                    await me.setAwaitable(bufKey[i],bufData[i]);
                }
            }
            callback(); // flush complete
        });
    };
};

exports.Lru = Lru;

Unit testing:

"use strict";
console.log("tests will take several seconds. results will be shown at end.");
let Lru = require("./lrucache.js").Lru;
let benchData = {hits:0, misses:0, total:0, expires:0, evict:0, evictCtr:0, access50:0, miss50:0};
let errorCheck = {  
    "cache_hit_test":"failed",
    "cache_miss_test":"failed",
    "cache_expire_test":"failed",
    "cache_eviction_test":"failed",
    "cache_50%_hit_ratio":"failed"
};
process.on('exit',function(){
    console.log(errorCheck);
});


let cache = new Lru(1000, async function(key,callback){
    if(key.indexOf("cache_eviction_test")===-1 && key.indexOf("cache_50_hit_ratio_test")===-1)
    setTimeout(function(){
        callback(key+" processed");
    if(key === "cache_miss_test")
    {
        errorCheck[key]="ok";
    }   

    if(key === "cache_hit_test")
    {
        benchData.misses++;
    }

    if(key === "cache_expire_test")
    {
        benchData.expires++;
        if(benchData.expires===2)
            errorCheck[key]="ok";
    }
    
    },1000);
    else if(key === "cache_eviction_test")
    {
    callback(key+" processed");
    benchData.evict++;
    if(benchData.evict === 2)
        errorCheck[key]="ok";
    }
    else
    {
        callback(key+" processed");
    if(key.indexOf("cache_50_hit_ratio_test")!==-1)
    {
        benchData.miss50++;
    }
    }
},1000);

cache.get("cache_miss_test",function(data){  });

for(let i=0;i<5;i++)
{
    
    cache.get("cache_hit_test",function(data){
        benchData.total++;
        if(benchData.total - benchData.misses === 4 && benchData.misses === 1)
        {
            errorCheck["cache_hit_test"]="ok";
        }
    });
}

cache.get("cache_expire_test",function(data){
    setTimeout(function(){
        cache.get("cache_expire_test",function(data){});
    },1500);
});

setTimeout(function(){

    cache.get("cache_eviction_test",function(data){ 
        
        for(let i=0;i<999;i++)
        {
            cache.get("cache_eviction_test_count"+i,function(data){
                benchData.evictCtr++;
                if(benchData.evictCtr===999)
                    cache.get("cache_eviction_test",function(data){ });
            });
        }
    });
},3000);

setTimeout(function(){

    let ctrMax = 100; // don't pick too low value, causes some uncertainity on 50% hit ratio
    function repeat(cur){
        if(cur>0)
        for(let i=0;i<900;i++)
        {
            cache.get("cache_50_hit_ratio_test"+parseInt(Math.random()*2000,10),function(data){
                benchData.access50++;
                if(benchData.access50===900)
                {           
                    benchData.access50=0;       
                    if(benchData.miss50>400*ctrMax && benchData.miss50< 500*ctrMax)
                        errorCheck["cache_50%_hit_ratio"]="ok";     
                    repeat(cur-1);          
                }
            });
        }
    }

    repeat(ctrMax);
},5000);

Caching Redis For Get/Set Operations:

"use strict"

// backing-store
const Redis = require("ioredis");
const redis = new Redis(); 

// LRU cache
let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 1500;
let element_life_time_miliseconds = 1000;

let cache = new Lru(num_cache_elements, async function(key,callback){
    redis.get(key, function (err, result) {
        callback(result);
    });
}, element_life_time_miliseconds, async function(key,value,callback){
    redis.set(key,value, function (err, result) {
        callback();
    });
});

const N_repeat = 20;
const N_bench = 20000;
const N_concurrency = 100;
const N_dataset = 1000;

function randomKey(){ return Math.floor(Math.random()*N_dataset); }

// without LRU caching
async function benchWithout(callback){

    for(let i=0;i<N_bench;i+=N_concurrency){
        let ctr = 0;
        let w8 = new Promise((success,fail)=>{
            for(let j=0;j<N_concurrency;j++)
            {
                redis.set(randomKey(),i, function (err, result) {
                    redis.get(randomKey(), function (err, result) {
                        ctr++;
                        if(ctr == N_concurrency)
                        {
                            success(1);
                        }   
                    });
                });
            }
        });
        let result = await w8;

    }
    callback();
}

// with LRU caching
async function benchWith(callback){
    for(let i=0;i<N_bench;i+=N_concurrency){
        let ctr = 0;
        let w8 = new Promise((success,fail)=>{
            for(let j=0;j<N_concurrency;j++)
            {
                cache.set(randomKey(),i, function (result) {
                    cache.get(randomKey(), function (result) {
                        ctr++;
                        if(ctr == N_concurrency)
                        {
                            success(1);
                        }   
                    });
                });
            }
        });
        let result = await w8;

    }
    callback();
}

let ctr = 0;
function restartWithoutLRU(callback){
    let t = Date.now();
    benchWithout(function(){
        console.log("without LRU: "+(Date.now() - t)+" milliseconds");
        ctr++;
        if(ctr != N_repeat)
        {
            restartWithoutLRU(callback);
        }
        else
        {
            ctr=0;
            callback();
        }
    });
}

function restartWithLRU(){
    let t = Date.now();
    benchWith(function(){
        console.log("with LRU: "+(Date.now() - t)+" milliseconds");
        ctr++;
        if(ctr != N_repeat)
        {
            restartWithLRU();
        }
        
    });
}


restartWithoutLRU(restartWithLRU);

On my low end computer (2GHz bulldozer cpu, 1 channel ddr3 1600MHz), API overhead causes a performance limitation that is around 1500 cache (read/write) hits per millisecond and about 850 cache (read/write) misses per millisecond. I couldn’t find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.