1 // Written in D programming language 2 /** 3 * Contains database using logic 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: Zaramzan <shamyan.roman@gmail.com> 8 */ 9 module server.database; 10 11 import core.time; 12 import core.thread; 13 14 import std.string; 15 import std.range; 16 import std.array; 17 import std.algorithm; 18 19 import vibe.data.bson; 20 21 import pgator.db.pool; 22 import pgator.db.async.pool; 23 import pgator.db.connection; 24 import pgator.db.pq.connection; 25 import pgator.db.pq.libpq; 26 27 import json_rpc.error; 28 import json_rpc.request; 29 import json_rpc.response; 30 31 import server.cache; 32 import server.config; 33 import server.sql_json; 34 35 import dlogg.log; 36 import util; 37 38 39 /** 40 * Represent database layer 41 * 42 * Authors: 43 * Zaramzan <shamyan.roman@gmail.com> 44 */ 45 shared class Database 46 { 47 /** 48 * Construct object from ILogger and configuration file. 49 */ 50 this(shared ILogger logger, immutable AppConfig appConfig) 51 { 52 this.logger = logger; 53 54 this.appConfig = appConfig; 55 56 init(); 57 } 58 59 /// configures async pool 60 void setupPool() // called on every start / restart 61 { 62 foreach(server; appConfig.sqlServers) 63 { 64 logger.logInfo(text("Connecting to ", server.name, ". Adding ", server.maxConn, " connections.")); 65 pool.addServer(server.connString, server.maxConn); 66 } 67 pool.loggingAllTransactions = appConfig.logSqlTransactions; 68 } 69 70 /// allocate shared cache 71 void createCache() 72 { 73 cache = new shared RequestCache(table); 74 } 75 76 /** 77 * Loads main table from database 78 * 79 * Throws: 80 * on $(B ConnTimeoutException) tries to reconnect 81 */ 82 void loadJsonSqlTable() 83 { 84 Bson[] convertRowEchelon(const Bson from) 85 { 86 auto m = from.deserializeBson!(Bson[][string]); 87 Bson[string][] result; 88 foreach(colName, colVals; m) 89 { 90 foreach(row, val; colVals) 91 { 92 if(result.length <= row) 93 { 94 result ~= [colName:val]; 95 } 96 else 97 { 98 result[row][colName] = val; 99 } 100 } 101 } 102 return result.map!(a => Bson(a)).array; 103 } 104 105 string queryStr = "SELECT * FROM "~appConfig.sqlJsonTable; 106 107 shared SqlJsonTable sqlTable = new shared SqlJsonTable(); 108 109 void load() 110 { 111 auto arri = pool.execTransaction([queryStr]); 112 113 foreach(ibson; arri) 114 { 115 foreach(v; convertRowEchelon(ibson)) 116 { 117 sqlTable.add(deserializeFromJson!Entry(v.toJson)); 118 } 119 } 120 121 table = sqlTable; 122 table.makeDropMap(); 123 124 logger.logInfo("Table loaded"); 125 } 126 127 try 128 { 129 load(); 130 } 131 catch(ConnTimeoutException ex) 132 { 133 logger.logError("There is no free connections in the pool, retry over 1 sec..."); 134 135 Thread.sleep(1.seconds); 136 137 load(); 138 } 139 } 140 141 142 /// finalize database resources 143 /** 144 * TODO: docs here 145 */ 146 void finalize() 147 { 148 if(pool !is null) 149 pool.finalize(); 150 if(api !is null) 151 api.finalize(); 152 } 153 154 /** 155 * Queries parsed request from async pool <br> 156 * 157 * Also caches request if needed 158 */ 159 RpcResponse query(ref RpcRequest req) 160 { 161 RpcResponse res; 162 163 if (cache.get(req, res)) 164 { 165 logger.logDebug("Found in cache"); 166 167 res.id = req.id; 168 169 return res; 170 } 171 172 Entry entry; 173 174 if (!table.methodFound(req.method, entry)) 175 { 176 throw new RpcMethodNotFound(); 177 } 178 else 179 { 180 size_t expected; 181 if (!entry.isValidParams(req.params, expected)) 182 { 183 throw new RpcInvalidParams(text("Expected ", expected, " parameters, ", 184 "but got ", req.params.length, "!")); 185 } 186 if (!entry.isValidFilter(expected)) 187 { 188 throw new RpcServerError(text("Json RPC table is invalid! result_filter should be empty or size " 189 "of sql_queries, expected ", expected, " but got ", entry.result_filter.length)); 190 } 191 if (!entry.isValidOneRowConstraint(expected)) 192 { 193 throw new RpcServerError(text("Json RPC table is invalid! one_row_flags should be empty or size " 194 "of sql_querise, expected ", expected, " but got ", entry.one_row_flags.length)); 195 } 196 197 try 198 { 199 InputRange!(immutable Bson) extremeDirtyHuckRunInSeparateThreadPleaseRedoneThis(string[] queries, string[] params, uint[] arg_nums, string[string] vars, bool[] oneRowFlag) 200 { 201 InputRange!(immutable Bson) irangeRes = null; 202 Throwable e = null; 203 204 static void thread(shared IConnectionPool pool, shared InputRange!(immutable Bson)* resPtr, shared Throwable* ePtr, immutable string[] queries, immutable string[] params, immutable uint[] arg_nums, immutable string[string] vars, immutable bool[] oneRowFlag) { 205 try { 206 string[] _queries = queries.dup; 207 string[] _params = params.dup; 208 uint[] _arg_nums = arg_nums.dup; 209 string[string] _vars = cast(string[string]) vars.dup; 210 bool[] _oneRowFlag = oneRowFlag.dup; 211 212 InputRange!(immutable Bson) res = pool.execTransaction(_queries, _params, _arg_nums, _vars, _oneRowFlag); 213 *resPtr = cast(shared)res; 214 } catch(Throwable th) { 215 *ePtr = cast(shared)th; 216 } 217 } 218 219 std.concurrency.spawn( 220 &thread, 221 pool, 222 cast(shared)&irangeRes, 223 cast(shared)&e, 224 queries.idup, 225 params.idup, 226 arg_nums.idup, 227 cast(immutable) vars.dup, 228 oneRowFlag.idup 229 ); 230 231 while(irangeRes is null) { 232 if(e !is null) throw e; 233 yield(); 234 } 235 return irangeRes; 236 } 237 238 239 InputRange!(immutable Bson) irange = null; 240 241 irange = extremeDirtyHuckRunInSeparateThreadPleaseRedoneThis( 242 entry.sql_queries, 243 req.params, 244 entry.arg_nums, 245 (entry.set_username ? req.auth : null), 246 entry.one_row_flags 247 ); 248 249 Bson[] processResultFiltering(R)(R data) 250 if(isInputRange!R && is(ElementType!R == immutable Bson)) 251 { 252 auto builder = appender!(Bson[]); 253 if(entry.needResultFiltering) 254 { 255 foreach(i, ibson; data) 256 { 257 if(entry.result_filter[i]) 258 builder.put(cast()ibson); 259 } 260 } 261 else 262 { 263 foreach(i, ibson; data) builder.put(cast()ibson); 264 } 265 266 return builder.data; 267 } 268 269 Bson[] processOneRowConstraints(R)(R data) 270 if(isInputRange!R && is(ElementType!R == Bson)) 271 { 272 Bson transformOneRow(Bson bson) 273 { 274 Bson[string] columns; 275 try columns = bson.get!(Bson[string]); 276 catch(Exception e) return bson; 277 278 Bson[string] newColumns; 279 foreach(name, col; columns) 280 { 281 Bson[] row; 282 try row = col.get!(Bson[]); 283 catch(Exception e) return bson; 284 if(row.length != 1) return bson; 285 newColumns[name] = row[0]; 286 } 287 288 return Bson(newColumns); 289 } 290 291 auto builder = appender!(Bson[]); 292 if(entry.needOneRowCheck) 293 { 294 foreach(i, bson; data) 295 { 296 if(entry.one_row_flags[i]) 297 { 298 builder.put(transformOneRow(bson)); 299 } else 300 { 301 builder.put(bson); 302 } 303 } 304 return builder.data; 305 } else 306 { 307 return data; 308 } 309 } 310 311 auto resultBody = processOneRowConstraints(processResultFiltering(irange)); 312 RpcResult result = RpcResult(Bson(resultBody)); 313 res = RpcResponse(req.id, result); 314 } 315 catch (QueryProcessingException e) 316 { 317 res = RpcResponse(req.id, RpcError(RPC_ERROR_CODE.SERVER_ERROR, "Server error. " ~ e.msg)); 318 } 319 catch (Exception e) 320 { 321 throw new RpcServerError(e.msg); 322 } 323 324 if (table.need_cache(req.method)) 325 { 326 shared RpcResponse cacheRes = res.toShared(); 327 logger.logDebug("Adding to cache"); 328 cache.add(req, cacheRes); 329 } 330 } 331 332 return res; 333 } 334 335 /** 336 * Returns: true, if authorization required in json_rpc 337 */ 338 bool needAuth(string method) 339 { 340 return table.needAuth(method); 341 } 342 343 /** 344 * Drop caches if needed 345 */ 346 void dropcaches(string method) 347 { 348 foreach(meth; table.needDrop(method)) 349 { 350 logger.logDebug("Reseting method: "~meth); 351 cache.reset(meth); 352 } 353 } 354 355 /** 356 * Initializes database resources 357 * 358 */ 359 private void init() //called once 360 { 361 Duration timeout = dur!"msecs"(appConfig.sqlTimeout); 362 Duration aliveCheckTime = dur!"msecs"(appConfig.aliveCheckTime); 363 Duration reTime; 364 365 if (appConfig.sqlReconnectTime > 0) 366 { 367 reTime = dur!"msecs"(appConfig.sqlReconnectTime); 368 } 369 else 370 { 371 reTime = timeout; 372 } 373 374 api = new shared PostgreSQL(logger); 375 auto provider = new shared PQConnProvider(logger, api); 376 377 pool = new shared AsyncPool(logger, provider, reTime, timeout, aliveCheckTime); 378 } 379 380 private 381 { 382 shared IPostgreSQL api; 383 shared ILogger logger; 384 shared IConnectionPool pool; 385 386 SqlJsonTable table; 387 RequestCache cache; 388 immutable AppConfig appConfig; 389 } 390 }