1 // Written in D programming language
2 /**
3 * Caching system
4 *
5 * Copyright: © 2014 DSoftOut
6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 * Authors: Zaramzan <shamyan.roman@gmail.com>
8 *
9 */
10 module server.cache;
11 
12 import core.atomic;
13 import core.sync.rwmutex;
14 
15 import std.concurrency;
16 import std.exception;
17 import std.digest.md;
18 
19 import vibe.data.json;
20 
21 import json_rpc.response;
22 import json_rpc.request;
23 import json_rpc.error;
24 
25 import server.sql_json;
26 
27 import util;
28 
29 private enum MAX_CACHE_SIZE = 1024 * 1024 * 1024; //1 Gbyte
30 
31 private alias RpcResponse[string] stash;
32 
33 private alias stash[string] CacheType;
34 
35 private __gshared ReadWriteMutex mutex;
36 
37 private string getHash(in RpcRequest req)
38 {
39 	MD5 md5;
40 
41 	md5.start();
42 
43 	ubyte[] bin = cast(ubyte[]) req.method;
44 
45 	md5.put(bin);
46 
47 	foreach(str; req.params)
48 	{
49 		bin = cast(ubyte[]) str;
50 		md5.put(bin);
51 	}
52 	
53 	foreach(str; req.auth.byValue())
54 	{
55 		bin = cast(ubyte[]) str;
56 		md5.put(bin);
57 	}
58 
59 	auto hash = md5.finish();
60 
61 	return toHexString(hash).idup;
62 }
63 
64 /**
65 * Represent caching system
66 *
67 * Authors: Zaramzan <shamyan.roman@gmail.com>
68 */
69 shared class RequestCache
70 {	
71 	/**
72 	* Construct caching system
73 	*
74 	* Params:
75 	* 	table = describes methods and caching rules
76 	*/
77 	this(shared SqlJsonTable table)
78 	{
79 		this.table = table;
80 		
81 		mutex = new ReadWriteMutex(ReadWriteMutex.Policy.PREFER_READERS);
82 	}
83 	
84 	/**
85 	* Drop cache by request
86 	*/
87 	bool reset(RpcRequest req)
88 	{			
89 		synchronized (mutex.writer)
90 		{
91 			if (req.method in cache)
92 			{
93 				return cache[req.method].remove(req.getHash);
94 			}
95 		}
96 		
97 		return false;
98 	}
99 	
100 	/**
101 	* Drop all method cache
102 	*/
103 	bool reset(string method)
104 	{		
105 		synchronized (mutex.writer)
106 		{	
107 			return cache.remove(method);
108 		}
109 	}
110 	
111 	/**
112 	* Add cache by request
113 	*/
114 	void add(in RpcRequest req, shared RpcResponse res)
115 	{	
116 		synchronized (mutex.writer)
117 		{
118 			if (ifMaxSize) return;
119 			
120 			if ((req.method in cache) is null)
121 			{
122 				shared stash aa;
123 				
124 				aa[req.getHash] = res;
125 					
126 				cache[req.method] = aa;
127 			}
128 			else
129 			{
130 				cache[req.method][req.getHash] = res;
131 			}
132 		}
133 	}
134 	
135 	/**
136 	* Search cache by request in memory.
137 	*
138 	* Params:
139 	*	req = RPC request
140 	*	res = if cache found in memory, res will be assigned
141 	*
142 	* Returns:
143 	*	true, if found in memory, otherwise returns false
144 	*/
145 	bool get(in RpcRequest req, out RpcResponse res)
146 	{
147 		synchronized(mutex.reader)
148 		{	
149 			auto p1 = req.method in cache; 
150 			if (p1)
151 			{
152 				auto p2 = req.getHash in cache[req.method];
153 				
154 				if (p2)
155 				{
156 					res = cast(RpcResponse) cache[req.method][req.getHash];
157 			
158 					return true;
159 				}
160 			}
161 					
162 		}
163 		
164 		return false;
165 }
166 	
167 	private bool ifMaxSize()
168 	{
169 		return cache.sizeof > MAX_CACHE_SIZE;
170 	}
171 	 
172 	private CacheType cache;
173 	
174 	private SqlJsonTable table;
175 	
176 	private Tid tid;
177 	
178 }
179 
180 
181 
182 version(unittest)
183 {
184 	shared RequestCache cache;
185 	
186 	void initCache()
187 	{
188 		cache = new shared RequestCache(table);
189 	}
190 	
191 	//get
192 	void get()
193 	{
194 		import std.stdio;
195 		RpcResponse res;
196 		
197 		//writeln("into get");
198 		if (cache.get(normalReq, res))
199 		{
200 			//writeln(res.toJson);
201 		}
202 		
203 		if (cache.get(notificationReq, res))
204 		{
205 			//writeln(res.toJson);
206 		}
207 		
208 		if (cache.get(methodNotFoundReq, res))
209 		{
210 			//writeln(res.toJson);
211 		}
212 		
213 		if (cache.get(invalidParamsReq, res))
214 		{
215 			//writeln(res.toJson);
216 		}
217 	}
218 	
219 	// get -> reset -> get
220 	void foo()
221 	{
222 		scope(failure)
223 		{
224 			assert(false, "foo exception");
225 		}
226 		
227 		get();
228 		
229 		//std.stdio.writeln("Reseting cache");
230 		
231 		cache.reset(normalReq);
232 		
233 		cache.reset(notificationReq);
234 		
235 		cache.reset(methodNotFoundReq);
236 		
237 		cache.reset(invalidParamsReq);
238 		
239 		//std.stdio.writeln("Trying to get");
240 		
241 		get();
242 		
243 		import std.concurrency;
244 		send(ownerTid, 1);
245 	}
246 }
247 
248 unittest
249 {		
250 	initTable();
251 	
252 	initCache();
253 	
254 	initResponses();
255 	
256 	cache.add(normalReq, normalRes.toShared);
257 	
258 	cache.add(notificationReq, notificationRes.toShared);
259 	
260 	cache.add(methodNotFoundReq, mnfRes.toShared);
261 	
262 	cache.add(invalidParamsReq, invalidParasmRes.toShared);
263 	
264 	import std.concurrency;
265 	import core.thread;
266 	import core.time;
267 	
268 	enum count = 10;
269 	foreach(i; 0 .. count) spawn(&foo);
270 	foreach(i; 0 .. count) receiveOnly!int();
271 	Thread.sleep(10.dur!"msecs"); // wait to last thread die
272 }