1 // Written in D programming language
2 /**
3 * Contains database using logic
4 *
5 * Copyright: © 2014 DSoftOut
6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 * Authors: Zaramzan <shamyan.roman@gmail.com>
8 */
9 module server.database;
10 
11 import core.time;
12 import core.thread;
13 
14 import std.string;
15 import std.range;
16 import std.array;
17 import std.algorithm;
18 
19 import vibe.data.bson;
20 
21 import pgator.db.pool;
22 import pgator.db.async.pool;    
23 import pgator.db.connection;
24 import pgator.db.pq.connection;
25 import pgator.db.pq.libpq;
26 
27 import json_rpc.error;
28 import json_rpc.request;
29 import json_rpc.response;
30 
31 import server.cache;
32 import server.config;
33 import server.sql_json;
34 
35 import dlogg.log;
36 import util;
37 
38 
39 /**
40 * Represent database layer
41 *
42 * Authors:
43 *      Zaramzan <shamyan.roman@gmail.com>
44 */
45 shared class Database
46 {
47     /**
48     *   Construct object from ILogger and configuration file.   
49     */
50     this(shared ILogger logger, immutable AppConfig appConfig)
51     {
52         this.logger = logger;
53         
54         this.appConfig = appConfig;
55         
56         init();
57     }
58     
59     /// configures async pool
60     void setupPool() // called on every start / restart
61     {        
62         foreach(server; appConfig.sqlServers)
63         {
64             logger.logInfo(text("Connecting to ", server.name, ". Adding ", server.maxConn, " connections."));
65             pool.addServer(server.connString, server.maxConn);    
66         }
67         pool.loggingAllTransactions = appConfig.logSqlTransactions;
68     }
69     
70     /// allocate shared cache
71     void createCache()
72     {
73         cache = new shared RequestCache(table); 
74     }
75     
76     /**
77     * Loads main table from database
78     *
79     * Throws:
80     *     on $(B ConnTimeoutException) tries to reconnect
81     */
82     void loadJsonSqlTable()
83     {
84         Bson[] convertRowEchelon(const Bson from)
85         {
86             auto m = from.deserializeBson!(Bson[][string]);
87             Bson[string][] result;
88             foreach(colName, colVals; m)
89             {
90                 foreach(row, val; colVals)
91                 {
92                     if(result.length <= row)
93                     {
94                         result ~= [colName:val];
95                     }
96                     else
97                     {
98                         result[row][colName] = val;
99                     }
100                 }
101             }
102             return result.map!(a => Bson(a)).array;
103         }
104         
105         string queryStr = "SELECT * FROM "~appConfig.sqlJsonTable;
106         
107         shared SqlJsonTable sqlTable = new shared SqlJsonTable();
108         
109         void load()
110         {
111             auto arri = pool.execTransaction([queryStr]);
112 
113             foreach(ibson; arri)
114             {            
115                 foreach(v; convertRowEchelon(ibson))
116                 {
117                     sqlTable.add(deserializeFromJson!Entry(v.toJson));
118                 }
119             }
120 
121             table = sqlTable;
122             table.makeDropMap();
123 
124             logger.logInfo("Table loaded");
125         }
126         
127         try
128         {
129             load();
130         }
131         catch(ConnTimeoutException ex)
132         {
133             logger.logError("There is no free connections in the pool, retry over 1 sec...");
134             
135             Thread.sleep(1.seconds);
136             
137             load();
138         }
139     }
140     
141     
142     /// finalize database resources
143     /**
144     *  TODO: docs here
145     */
146     void finalize()
147     {
148         if(pool !is null)
149             pool.finalize();
150         if(api !is null)
151             api.finalize();
152     }
153     
154     /**
155     * Queries parsed request from async pool <br>
156     *
157     * Also caches request if needed
158     */
159     RpcResponse query(ref RpcRequest req)
160     {    
161         RpcResponse res;
162         
163         if (cache.get(req, res))
164         {
165             logger.logDebug("Found in cache");
166             
167             res.id = req.id;
168             
169             return res;
170         }
171         
172         Entry entry;
173             
174         if (!table.methodFound(req.method, entry))
175         {
176             throw new RpcMethodNotFound();
177         }
178         else
179         {
180             size_t expected;
181             if (!entry.isValidParams(req.params, expected))
182             {
183                 throw new RpcInvalidParams(text("Expected ", expected, " parameters, ",
184                         "but got ", req.params.length, "!"));
185             }
186             if (!entry.isValidFilter(expected))
187             {
188                 throw new RpcServerError(text("Json RPC table is invalid! result_filter should be empty or size "
189                     "of sql_queries, expected ", expected, " but got ", entry.result_filter.length));
190             }    
191             if (!entry.isValidOneRowConstraint(expected))
192             {
193                 throw new RpcServerError(text("Json RPC table is invalid! one_row_flags should be empty or size "
194                         "of sql_querise, expected ", expected, " but got ", entry.one_row_flags.length));
195             }
196                         
197             try
198             {
199                 InputRange!(immutable Bson) extremeDirtyHuckRunInSeparateThreadPleaseRedoneThis(string[] queries, string[] params, uint[] arg_nums, string[string] vars, bool[] oneRowFlag)
200                 {
201                     InputRange!(immutable Bson) irangeRes = null;
202                     Throwable e = null;
203 
204                     static void thread(shared IConnectionPool pool, shared InputRange!(immutable Bson)* resPtr, shared Throwable* ePtr, immutable string[] queries, immutable string[] params, immutable uint[] arg_nums, immutable string[string] vars, immutable bool[] oneRowFlag) {
205                         try {
206                             string[] _queries = queries.dup;
207                             string[] _params = params.dup;
208                             uint[] _arg_nums = arg_nums.dup;
209                             string[string] _vars = cast(string[string]) vars.dup;
210                             bool[] _oneRowFlag = oneRowFlag.dup;
211 
212                             InputRange!(immutable Bson) res = pool.execTransaction(_queries, _params, _arg_nums, _vars, _oneRowFlag);
213                             *resPtr = cast(shared)res;
214                         } catch(Throwable th) {
215                             *ePtr = cast(shared)th;
216                         }
217                     }
218 
219                     std.concurrency.spawn(
220                             &thread,
221                             pool,
222                             cast(shared)&irangeRes,
223                             cast(shared)&e,
224                             queries.idup,
225                             params.idup,
226                             arg_nums.idup,
227                             cast(immutable) vars.dup,
228                             oneRowFlag.idup
229                         );
230 
231                     while(irangeRes is null) {
232                         if(e !is null) throw e;
233                         yield();
234                     }
235                     return irangeRes;
236                 }
237 
238 
239                 InputRange!(immutable Bson) irange = null;
240 
241                 irange = extremeDirtyHuckRunInSeparateThreadPleaseRedoneThis(
242                         entry.sql_queries,
243                         req.params,
244                         entry.arg_nums,
245                         (entry.set_username ? req.auth : null),
246                         entry.one_row_flags
247                     );
248 
249                 Bson[] processResultFiltering(R)(R data)
250                     if(isInputRange!R && is(ElementType!R == immutable Bson))
251                 {
252                     auto builder = appender!(Bson[]);
253                     if(entry.needResultFiltering)
254                     {
255                         foreach(i, ibson; data)
256                         {
257                             if(entry.result_filter[i])
258                                 builder.put(cast()ibson);
259                         }
260                     } 
261                     else
262                     {
263                         foreach(i, ibson; data) builder.put(cast()ibson);
264                     }
265                     
266                     return builder.data;
267                 }
268                 
269                 Bson[] processOneRowConstraints(R)(R data)
270                     if(isInputRange!R && is(ElementType!R == Bson))
271                 {
272                     Bson transformOneRow(Bson bson)
273                     {
274                         Bson[string] columns;
275                         try columns = bson.get!(Bson[string]);
276                         catch(Exception e) return bson;
277                         
278                         Bson[string] newColumns;
279                         foreach(name, col; columns)
280                         {
281                             Bson[] row;
282                             try row = col.get!(Bson[]);
283                             catch(Exception e) return bson;
284                             if(row.length != 1) return bson;
285                             newColumns[name] = row[0];
286                         }
287                         
288                         return Bson(newColumns);
289                     }
290                     
291                     auto builder = appender!(Bson[]);
292                     if(entry.needOneRowCheck)
293                     {
294                         foreach(i, bson; data)
295                         {
296                             if(entry.one_row_flags[i])
297                             {
298                                 builder.put(transformOneRow(bson));
299                             } else
300                             {
301                                 builder.put(bson);
302                             }
303                         }
304                         return builder.data;
305                     } else
306                     {
307                         return data;
308                     }
309                 }
310                 
311                 auto resultBody = processOneRowConstraints(processResultFiltering(irange));
312                 RpcResult result = RpcResult(Bson(resultBody));
313                 res = RpcResponse(req.id, result);
314             }
315             catch (QueryProcessingException e)
316             {
317                 res = RpcResponse(req.id, RpcError(RPC_ERROR_CODE.SERVER_ERROR, "Server error. " ~ e.msg));
318             }
319             catch (Exception e)
320             {
321                 throw new RpcServerError(e.msg);
322             }
323             
324             if (table.need_cache(req.method))
325             {
326                 shared RpcResponse cacheRes = res.toShared();
327                 logger.logDebug("Adding to cache");
328                 cache.add(req, cacheRes);
329             }
330         }
331         
332         return res;
333     }
334     
335     /**
336     * Returns: true, if authorization required in json_rpc
337     */
338     bool needAuth(string method)
339     {
340         return table.needAuth(method);
341     }
342     
343     /**
344     * Drop caches if needed
345     */
346     void dropcaches(string method)
347     {
348         foreach(meth; table.needDrop(method))
349         {
350             logger.logDebug("Reseting method: "~meth);
351             cache.reset(meth);
352         }
353     }
354     
355     /**
356     * Initializes database resources
357     *
358     */
359     private void init() //called once
360     {
361         Duration timeout = dur!"msecs"(appConfig.sqlTimeout);
362         Duration aliveCheckTime = dur!"msecs"(appConfig.aliveCheckTime);
363         Duration reTime;
364         
365         if (appConfig.sqlReconnectTime > 0)
366         {
367             reTime = dur!"msecs"(appConfig.sqlReconnectTime);
368         }
369         else
370         {
371             reTime = timeout;
372         }
373 
374         api = new shared PostgreSQL(logger);
375         auto provider = new shared PQConnProvider(logger, api);
376         
377         pool = new shared AsyncPool(logger, provider, reTime, timeout, aliveCheckTime);
378     }
379     
380     private
381     {
382         shared IPostgreSQL api;
383         shared ILogger logger;
384         shared IConnectionPool pool;
385         
386         SqlJsonTable table;
387         RequestCache cache;
388         immutable AppConfig appConfig;
389     }
390 }