1 // Written in D programming language
2 /**
3 * Contains database using logic
4 *
5 * Copyright: © 2014 DSoftOut
6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 * Authors: Zaramzan <shamyan.roman@gmail.com>
8 */
9 module server.database;
10 
11 import core.time;
12 import core.thread;
13 
14 import std.string;
15 import std.range;
16 import std.array;
17 import std.algorithm;
18 
19 import vibe.data.bson;
20 
21 import pgator.db.pool;
22 import pgator.db.async.pool;    
23 import pgator.db.connection;
24 import pgator.db.pq.connection;
25 import pgator.db.pq.libpq;
26 
27 import json_rpc.error;
28 import json_rpc.request;
29 import json_rpc.response;
30 
31 import server.cache;
32 import server.config;
33 import server.sql_json;
34 
35 import dlogg.log;
36 import util;
37 
38 
39 /**
40 * Represent database layer
41 *
42 * Authors:
43 *      Zaramzan <shamyan.roman@gmail.com>
44 */
45 shared class Database
46 {
47     /**
48     *   Construct object from ILogger and configuration file.   
49     */
50     this(shared ILogger logger, immutable AppConfig appConfig)
51     {
52         this.logger = logger;
53         
54         this.appConfig = appConfig;
55         
56         init();
57     }
58     
59     /// configures async pool
60     void setupPool() // called on every start / restart
61     {        
62         foreach(server; appConfig.sqlServers)
63         {
64             logger.logInfo(text("Connecting to ", server.name, ". Adding ", server.maxConn, " connections."));
65             pool.addServer(server.connString, server.maxConn);    
66         }
67         pool.loggingAllTransactions = appConfig.logSqlTransactions;
68     }
69     
70     /// allocate shared cache
71     void createCache()
72     {
73         cache = new shared RequestCache(table); 
74     }
75     
76     /**
77     * Loads main table from database
78     *
79     * Throws:
80     *     on $(B ConnTimeoutException) tries to reconnect
81     */
82     void loadJsonSqlTable()
83     {
84         Bson[] convertRowEchelon(const Bson from)
85         {
86             auto m = from.deserializeBson!(Bson[][string]);
87             Bson[string][] result;
88             foreach(colName, colVals; m)
89             {
90                 foreach(row, val; colVals)
91                 {
92                     if(result.length <= row)
93                     {
94                         result ~= [colName:val];
95                     }
96                     else
97                     {
98                         result[row][colName] = val;
99                     }
100                 }
101             }
102             return result.map!(a => Bson(a)).array;
103         }
104         
105         string queryStr = "SELECT * FROM "~appConfig.sqlJsonTable;
106         
107         shared SqlJsonTable sqlTable = new shared SqlJsonTable();
108         
109         void load()
110         {
111             auto arri = pool.execTransaction([queryStr]);
112 
113             foreach(ibson; arri)
114             {            
115                 foreach(v; convertRowEchelon(ibson))
116                 {
117                     sqlTable.add(deserializeFromJson!Entry(v.toJson));
118                 }
119             }
120 
121             table = sqlTable;
122             table.makeDropMap();
123 
124             logger.logInfo("Table loaded");
125         }
126         
127         try
128         {
129             load();
130         }
131         catch(ConnTimeoutException ex)
132         {
133             logger.logError("There is no free connections in the pool, retry over 1 sec...");
134             
135             Thread.sleep(1.seconds);
136             
137             load();
138         }
139     }
140     
141     
142     /// finalize database resources
143     /**
144     *  TODO: docs here
145     */
146     void finalize()
147     {
148         if(pool !is null)
149             pool.finalize();
150         if(api !is null)
151             api.finalize();
152     }
153     
154     /**
155     * Queries parsed request from async pool <br>
156     *
157     * Also caches request if needed
158     */
159     RpcResponse query(ref RpcRequest req)
160     {    
161         RpcResponse res;
162         
163         if (cache.get(req, res))
164         {
165             logger.logDebug("Found in cache");
166             
167             res.id = req.id;
168             
169             return res;
170         }
171         
172         Entry entry;
173             
174         if (!table.methodFound(req.method, entry))
175         {
176             throw new RpcMethodNotFound();
177         }
178         else
179         {
180             size_t expected;
181             if (!entry.isValidParams(req.params, expected))
182             {
183                 throw new RpcInvalidParams(text("Expected ", expected, " parameters, ",
184                         "but got ", req.params.length, "!"));
185             }
186             if (!entry.isValidFilter(expected))
187             {
188                 throw new RpcServerError(text("Json RPC table is invalid! result_filter should be empty or size "
189                     "of sql_queries, expected ", expected, " but got ", entry.result_filter.length));
190             }    
191             if (!entry.isValidOneRowConstraint(expected))
192             {
193                 throw new RpcServerError(text("Json RPC table is invalid! one_row_flags should be empty or size "
194                         "of sql_querise, expected ", expected, " but got ", entry.one_row_flags.length));
195             }
196                         
197             try
198             {
199                 InputRange!(immutable Bson) irange;
200                 
201                 if (entry.set_username)
202                 {
203                     irange = pool.execTransaction(entry.sql_queries, req.params, entry.arg_nums, req.auth, entry.one_row_flags);
204                 }
205                 else
206                 {
207                     irange = pool.execTransaction(entry.sql_queries, req.params, entry.arg_nums, null, entry.one_row_flags);
208                 }
209                 
210                 Bson[] processResultFiltering(R)(R data)
211                     if(isInputRange!R && is(ElementType!R == immutable Bson))
212                 {
213                     auto builder = appender!(Bson[]);
214                     if(entry.needResultFiltering)
215                     {
216                         foreach(i, ibson; data)
217                         {
218                             if(entry.result_filter[i])
219                                 builder.put(cast()ibson);
220                         }
221                     } 
222                     else
223                     {
224                         foreach(i, ibson; data) builder.put(cast()ibson);
225                     }
226                     
227                     return builder.data;
228                 }
229                 
230                 Bson[] processOneRowConstraints(R)(R data)
231                     if(isInputRange!R && is(ElementType!R == Bson))
232                 {
233                     Bson transformOneRow(Bson bson)
234                     {
235                         Bson[string] columns;
236                         try columns = bson.get!(Bson[string]);
237                         catch(Exception e) return bson;
238                         
239                         Bson[string] newColumns;
240                         foreach(name, col; columns)
241                         {
242                             Bson[] row;
243                             try row = col.get!(Bson[]);
244                             catch(Exception e) return bson;
245                             if(row.length != 1) return bson;
246                             newColumns[name] = row[0];
247                         }
248                         
249                         return Bson(newColumns);
250                     }
251                     
252                     auto builder = appender!(Bson[]);
253                     if(entry.needOneRowCheck)
254                     {
255                         foreach(i, bson; data)
256                         {
257                             if(entry.one_row_flags[i])
258                             {
259                                 builder.put(transformOneRow(bson));
260                             } else
261                             {
262                                 builder.put(bson);
263                             }
264                         }
265                         return builder.data;
266                     } else
267                     {
268                         return data;
269                     }
270                 }
271                 
272                 auto resultBody = processOneRowConstraints(processResultFiltering(irange));
273                 RpcResult result = RpcResult(Bson(resultBody));
274                 res = RpcResponse(req.id, result);
275             }
276             catch (QueryProcessingException e)
277             {
278                 res = RpcResponse(req.id, RpcError(RPC_ERROR_CODE.SERVER_ERROR, "Server error. " ~ e.msg));
279             }
280             catch (Exception e)
281             {
282                 throw new RpcServerError(e.msg);
283             }
284             
285             if (table.need_cache(req.method))
286             {
287                 shared RpcResponse cacheRes = res.toShared();
288                 logger.logDebug("Adding to cache");
289                 cache.add(req, cacheRes);
290             }
291         }
292         
293         return res;
294     }
295     
296     /**
297     * Returns: true, if authorization required in json_rpc
298     */
299     bool needAuth(string method)
300     {
301         return table.needAuth(method);
302     }
303     
304     /**
305     * Drop caches if needed
306     */
307     void dropcaches(string method)
308     {
309         foreach(meth; table.needDrop(method))
310         {
311             logger.logDebug("Reseting method: "~meth);
312             cache.reset(meth);
313         }
314     }
315     
316     /**
317     * Initializes database resources
318     *
319     */
320     private void init() //called once
321     {
322         Duration timeout = dur!"msecs"(appConfig.sqlTimeout);
323         Duration aliveCheckTime = dur!"msecs"(appConfig.aliveCheckTime);
324         Duration reTime;
325         
326         if (appConfig.sqlReconnectTime > 0)
327         {
328             reTime = dur!"msecs"(appConfig.sqlReconnectTime);
329         }
330         else
331         {
332             reTime = timeout;
333         }
334 
335         api = new shared PostgreSQL(logger);
336         auto provider = new shared PQConnProvider(logger, api);
337         
338         pool = new shared AsyncPool(logger, provider, reTime, timeout, aliveCheckTime);
339     }
340     
341     private
342     {
343         shared IPostgreSQL api;
344         shared ILogger logger;
345         shared IConnectionPool pool;
346         
347         SqlJsonTable table;
348         RequestCache cache;
349         immutable AppConfig appConfig;
350     }
351 }