1 // Written in D programming language 2 /** 3 * Contains database using logic 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: Zaramzan <shamyan.roman@gmail.com> 8 */ 9 module server.database; 10 11 import core.time; 12 import core.thread; 13 14 import std.string; 15 import std.range; 16 import std.array; 17 import std.algorithm; 18 19 import vibe.data.bson; 20 21 import pgator.db.pool; 22 import pgator.db.async.pool; 23 import pgator.db.connection; 24 import pgator.db.pq.connection; 25 import pgator.db.pq.libpq; 26 27 import json_rpc.error; 28 import json_rpc.request; 29 import json_rpc.response; 30 31 import server.cache; 32 import server.config; 33 import server.sql_json; 34 35 import dlogg.log; 36 import util; 37 38 39 /** 40 * Represent database layer 41 * 42 * Authors: 43 * Zaramzan <shamyan.roman@gmail.com> 44 */ 45 shared class Database 46 { 47 /** 48 * Construct object from ILogger and configuration file. 49 */ 50 this(shared ILogger logger, immutable AppConfig appConfig) 51 { 52 this.logger = logger; 53 54 this.appConfig = appConfig; 55 56 init(); 57 } 58 59 /// configures async pool 60 void setupPool() // called on every start / restart 61 { 62 foreach(server; appConfig.sqlServers) 63 { 64 logger.logInfo(text("Connecting to ", server.name, ". Adding ", server.maxConn, " connections.")); 65 pool.addServer(server.connString, server.maxConn); 66 } 67 pool.loggingAllTransactions = appConfig.logSqlTransactions; 68 } 69 70 /// allocate shared cache 71 void createCache() 72 { 73 cache = new shared RequestCache(table); 74 } 75 76 /** 77 * Loads main table from database 78 * 79 * Throws: 80 * on $(B ConnTimeoutException) tries to reconnect 81 */ 82 void loadJsonSqlTable() 83 { 84 Bson[] convertRowEchelon(const Bson from) 85 { 86 auto m = from.deserializeBson!(Bson[][string]); 87 Bson[string][] result; 88 foreach(colName, colVals; m) 89 { 90 foreach(row, val; colVals) 91 { 92 if(result.length <= row) 93 { 94 result ~= [colName:val]; 95 } 96 else 97 { 98 result[row][colName] = val; 99 } 100 } 101 } 102 return result.map!(a => Bson(a)).array; 103 } 104 105 string queryStr = "SELECT * FROM "~appConfig.sqlJsonTable; 106 107 shared SqlJsonTable sqlTable = new shared SqlJsonTable(); 108 109 void load() 110 { 111 auto arri = pool.execTransaction([queryStr]); 112 113 foreach(ibson; arri) 114 { 115 foreach(v; convertRowEchelon(ibson)) 116 { 117 sqlTable.add(deserializeFromJson!Entry(v.toJson)); 118 } 119 } 120 121 table = sqlTable; 122 table.makeDropMap(); 123 124 logger.logInfo("Table loaded"); 125 } 126 127 try 128 { 129 load(); 130 } 131 catch(ConnTimeoutException ex) 132 { 133 logger.logError("There is no free connections in the pool, retry over 1 sec..."); 134 135 Thread.sleep(1.seconds); 136 137 load(); 138 } 139 } 140 141 142 /// finalize database resources 143 /** 144 * TODO: docs here 145 */ 146 void finalize() 147 { 148 if(pool !is null) 149 pool.finalize(); 150 if(api !is null) 151 api.finalize(); 152 } 153 154 /** 155 * Queries parsed request from async pool <br> 156 * 157 * Also caches request if needed 158 */ 159 RpcResponse query(ref RpcRequest req) 160 { 161 RpcResponse res; 162 163 if (cache.get(req, res)) 164 { 165 logger.logDebug("Found in cache"); 166 167 res.id = req.id; 168 169 return res; 170 } 171 172 Entry entry; 173 174 if (!table.methodFound(req.method, entry)) 175 { 176 throw new RpcMethodNotFound(); 177 } 178 else 179 { 180 size_t expected; 181 if (!entry.isValidParams(req.params, expected)) 182 { 183 throw new RpcInvalidParams(text("Expected ", expected, " parameters, ", 184 "but got ", req.params.length, "!")); 185 } 186 if (!entry.isValidFilter(expected)) 187 { 188 throw new RpcServerError(text("Json RPC table is invalid! result_filter should be empty or size " 189 "of sql_queries, expected ", expected, " but got ", entry.result_filter.length)); 190 } 191 if (!entry.isValidOneRowConstraint(expected)) 192 { 193 throw new RpcServerError(text("Json RPC table is invalid! one_row_flags should be empty or size " 194 "of sql_querise, expected ", expected, " but got ", entry.one_row_flags.length)); 195 } 196 197 try 198 { 199 InputRange!(immutable Bson) irange; 200 201 if (entry.set_username) 202 { 203 irange = pool.execTransaction(entry.sql_queries, req.params, entry.arg_nums, req.auth, entry.one_row_flags); 204 } 205 else 206 { 207 irange = pool.execTransaction(entry.sql_queries, req.params, entry.arg_nums, null, entry.one_row_flags); 208 } 209 210 Bson[] processResultFiltering(R)(R data) 211 if(isInputRange!R && is(ElementType!R == immutable Bson)) 212 { 213 auto builder = appender!(Bson[]); 214 if(entry.needResultFiltering) 215 { 216 foreach(i, ibson; data) 217 { 218 if(entry.result_filter[i]) 219 builder.put(cast()ibson); 220 } 221 } 222 else 223 { 224 foreach(i, ibson; data) builder.put(cast()ibson); 225 } 226 227 return builder.data; 228 } 229 230 Bson[] processOneRowConstraints(R)(R data) 231 if(isInputRange!R && is(ElementType!R == Bson)) 232 { 233 Bson transformOneRow(Bson bson) 234 { 235 Bson[string] columns; 236 try columns = bson.get!(Bson[string]); 237 catch(Exception e) return bson; 238 239 Bson[string] newColumns; 240 foreach(name, col; columns) 241 { 242 Bson[] row; 243 try row = col.get!(Bson[]); 244 catch(Exception e) return bson; 245 if(row.length != 1) return bson; 246 newColumns[name] = row[0]; 247 } 248 249 return Bson(newColumns); 250 } 251 252 auto builder = appender!(Bson[]); 253 if(entry.needOneRowCheck) 254 { 255 foreach(i, bson; data) 256 { 257 if(entry.one_row_flags[i]) 258 { 259 builder.put(transformOneRow(bson)); 260 } else 261 { 262 builder.put(bson); 263 } 264 } 265 return builder.data; 266 } else 267 { 268 return data; 269 } 270 } 271 272 auto resultBody = processOneRowConstraints(processResultFiltering(irange)); 273 RpcResult result = RpcResult(Bson(resultBody)); 274 res = RpcResponse(req.id, result); 275 } 276 catch (QueryProcessingException e) 277 { 278 res = RpcResponse(req.id, RpcError(RPC_ERROR_CODE.SERVER_ERROR, "Server error. " ~ e.msg)); 279 } 280 catch (Exception e) 281 { 282 throw new RpcServerError(e.msg); 283 } 284 285 if (table.need_cache(req.method)) 286 { 287 shared RpcResponse cacheRes = res.toShared(); 288 logger.logDebug("Adding to cache"); 289 cache.add(req, cacheRes); 290 } 291 } 292 293 return res; 294 } 295 296 /** 297 * Returns: true, if authorization required in json_rpc 298 */ 299 bool needAuth(string method) 300 { 301 return table.needAuth(method); 302 } 303 304 /** 305 * Drop caches if needed 306 */ 307 void dropcaches(string method) 308 { 309 foreach(meth; table.needDrop(method)) 310 { 311 logger.logDebug("Reseting method: "~meth); 312 cache.reset(meth); 313 } 314 } 315 316 /** 317 * Initializes database resources 318 * 319 */ 320 private void init() //called once 321 { 322 Duration timeout = dur!"msecs"(appConfig.sqlTimeout); 323 Duration aliveCheckTime = dur!"msecs"(appConfig.aliveCheckTime); 324 Duration reTime; 325 326 if (appConfig.sqlReconnectTime > 0) 327 { 328 reTime = dur!"msecs"(appConfig.sqlReconnectTime); 329 } 330 else 331 { 332 reTime = timeout; 333 } 334 335 api = new shared PostgreSQL(logger); 336 auto provider = new shared PQConnProvider(logger, api); 337 338 pool = new shared AsyncPool(logger, provider, reTime, timeout, aliveCheckTime); 339 } 340 341 private 342 { 343 shared IPostgreSQL api; 344 shared ILogger logger; 345 shared IConnectionPool pool; 346 347 SqlJsonTable table; 348 RequestCache cache; 349 immutable AppConfig appConfig; 350 } 351 }