1 // Written in D programming language 2 /** 3 * Caching system 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: Zaramzan <shamyan.roman@gmail.com> 8 * 9 */ 10 module server.cache; 11 12 import core.atomic; 13 import core.sync.rwmutex; 14 15 import std.concurrency; 16 import std.exception; 17 import std.digest.md; 18 19 import vibe.data.json; 20 21 import json_rpc.response; 22 import json_rpc.request; 23 import json_rpc.error; 24 25 import server.sql_json; 26 27 import util; 28 29 private enum MAX_CACHE_SIZE = 1024 * 1024 * 1024; //1 Gbyte 30 31 private alias RpcResponse[string] stash; 32 33 private alias stash[string] CacheType; 34 35 private __gshared ReadWriteMutex mutex; 36 37 private string getHash(in RpcRequest req) 38 { 39 MD5 md5; 40 41 md5.start(); 42 43 ubyte[] bin = cast(ubyte[]) req.method; 44 45 md5.put(bin); 46 47 foreach(str; req.params) 48 { 49 bin = cast(ubyte[]) str; 50 md5.put(bin); 51 } 52 53 foreach(str; req.auth.byValue()) 54 { 55 bin = cast(ubyte[]) str; 56 md5.put(bin); 57 } 58 59 auto hash = md5.finish(); 60 61 return toHexString(hash).idup; 62 } 63 64 /** 65 * Represent caching system 66 * 67 * Authors: Zaramzan <shamyan.roman@gmail.com> 68 */ 69 shared class RequestCache 70 { 71 /** 72 * Construct caching system 73 * 74 * Params: 75 * table = describes methods and caching rules 76 */ 77 this(shared SqlJsonTable table) 78 { 79 this.table = table; 80 81 mutex = new ReadWriteMutex(ReadWriteMutex.Policy.PREFER_READERS); 82 } 83 84 /** 85 * Drop cache by request 86 */ 87 bool reset(RpcRequest req) 88 { 89 synchronized (mutex.writer) 90 { 91 if (req.method in cache) 92 { 93 return cache[req.method].remove(req.getHash); 94 } 95 } 96 97 return false; 98 } 99 100 /** 101 * Drop all method cache 102 */ 103 bool reset(string method) 104 { 105 synchronized (mutex.writer) 106 { 107 return cache.remove(method); 108 } 109 } 110 111 /** 112 * Add cache by request 113 */ 114 void add(in RpcRequest req, shared RpcResponse res) 115 { 116 synchronized (mutex.writer) 117 { 118 if (ifMaxSize) return; 119 120 if ((req.method in cache) is null) 121 { 122 shared stash aa; 123 124 aa[req.getHash] = res; 125 126 cache[req.method] = aa; 127 } 128 else 129 { 130 cache[req.method][req.getHash] = res; 131 } 132 } 133 } 134 135 /** 136 * Search cache by request in memory. 137 * 138 * Params: 139 * req = RPC request 140 * res = if cache found in memory, res will be assigned 141 * 142 * Returns: 143 * true, if found in memory, otherwise returns false 144 */ 145 bool get(in RpcRequest req, out RpcResponse res) 146 { 147 synchronized(mutex.reader) 148 { 149 auto p1 = req.method in cache; 150 if (p1) 151 { 152 auto p2 = req.getHash in cache[req.method]; 153 154 if (p2) 155 { 156 res = cast(RpcResponse) cache[req.method][req.getHash]; 157 158 return true; 159 } 160 } 161 162 } 163 164 return false; 165 } 166 167 private bool ifMaxSize() 168 { 169 return cache.sizeof > MAX_CACHE_SIZE; 170 } 171 172 private CacheType cache; 173 174 private SqlJsonTable table; 175 176 private Tid tid; 177 178 } 179 180 181 182 version(unittest) 183 { 184 shared RequestCache cache; 185 186 void initCache() 187 { 188 cache = new shared RequestCache(table); 189 } 190 191 //get 192 void get() 193 { 194 import std.stdio; 195 RpcResponse res; 196 197 //writeln("into get"); 198 if (cache.get(normalReq, res)) 199 { 200 //writeln(res.toJson); 201 } 202 203 if (cache.get(notificationReq, res)) 204 { 205 //writeln(res.toJson); 206 } 207 208 if (cache.get(methodNotFoundReq, res)) 209 { 210 //writeln(res.toJson); 211 } 212 213 if (cache.get(invalidParamsReq, res)) 214 { 215 //writeln(res.toJson); 216 } 217 } 218 219 // get -> reset -> get 220 void foo() 221 { 222 scope(failure) 223 { 224 assert(false, "foo exception"); 225 } 226 227 get(); 228 229 //std.stdio.writeln("Reseting cache"); 230 231 cache.reset(normalReq); 232 233 cache.reset(notificationReq); 234 235 cache.reset(methodNotFoundReq); 236 237 cache.reset(invalidParamsReq); 238 239 //std.stdio.writeln("Trying to get"); 240 241 get(); 242 243 import std.concurrency; 244 send(ownerTid, 1); 245 } 246 } 247 248 unittest 249 { 250 initTable(); 251 252 initCache(); 253 254 initResponses(); 255 256 cache.add(normalReq, normalRes.toShared); 257 258 cache.add(notificationReq, notificationRes.toShared); 259 260 cache.add(methodNotFoundReq, mnfRes.toShared); 261 262 cache.add(invalidParamsReq, invalidParasmRes.toShared); 263 264 import std.concurrency; 265 import core.thread; 266 import core.time; 267 268 enum count = 10; 269 foreach(i; 0 .. count) spawn(&foo); 270 foreach(i; 0 .. count) receiveOnly!int(); 271 Thread.sleep(10.dur!"msecs"); // wait to last thread die 272 }