1 module pgator.app; 2 3 import pgator.rpc_table; 4 import pgator.sql_transaction; 5 import dpq2.oids: OidType; 6 import dpq2.value: ValueConvException; 7 import std.getopt; 8 import std.typecons: Tuple; 9 import std.exception: enforce; 10 import std.conv: to, ConvException; 11 import vibe.http.server; 12 import vibe.http.auth.basic_auth: checkBasicAuth; 13 import vibe.core.concurrency; 14 import vibe.core.log; 15 import vibe.data.json; 16 import vibe.data.bson; 17 import vibe.db.postgresql; 18 19 string configFileName = "/wrong/path/to/file.json"; 20 bool debugEnabled = false; 21 bool checkStatements = false; 22 23 void readOpts(string[] args) 24 { 25 try 26 { 27 auto helpInformation = getopt( 28 args, 29 "debug", &debugEnabled, 30 "check", &checkStatements, 31 "config", &configFileName 32 ); 33 } 34 catch(Exception e) 35 { 36 logFatal(e.msg); 37 throw e; 38 } 39 40 if(debugEnabled) setLogLevel = LogLevel.debugV; 41 } 42 43 private Bson _cfg; 44 45 Bson readConfig() 46 { 47 import std.file; 48 49 Bson cfg; 50 51 try 52 { 53 auto text = readText(configFileName); 54 cfg = Bson(parseJsonString(text)); 55 } 56 catch(Exception e) 57 { 58 logFatal(e.msg); 59 throw e; 60 } 61 62 return cfg; 63 } 64 65 private struct PrepareStatementsArgs 66 { 67 const SQLVariablesNames varNames; 68 bool methodsLoadedFlag = false; // need for bootstrap 69 Method[string] methods; 70 size_t rpcTableLength; 71 size_t failedCount; 72 string tableName; 73 } 74 75 int main(string[] args) 76 { 77 try 78 { 79 readOpts(args); 80 Bson cfg = readConfig(); 81 82 auto server = cfg["sqlServer"]; 83 const connString = server["connString"].get!string; 84 auto maxConn = to!uint(server["maxConn"].get!long); 85 86 PrepareStatementsArgs prepArgs = { 87 varNames: SQLVariablesNames( 88 cfg["sqlAuthVariables"]["username"].get!string, 89 cfg["sqlAuthVariables"]["password"].get!string 90 ) 91 }; 92 93 // delegate 94 void afterConnectOrReconnect(Connection conn) 95 { 96 if(prepArgs.methodsLoadedFlag) 97 { 98 logDebugV("Preparing"); 99 auto failedStatementsNames = prepareStatements(conn, prepArgs); 100 prepArgs.failedCount += failedStatementsNames.length; 101 102 foreach(n; failedStatementsNames) 103 prepArgs.methods.remove(n); 104 105 logInfo("Number of statements in the table "~prepArgs.tableName~": "~ 106 prepArgs.rpcTableLength.to!string~", failed to prepare: "~prepArgs.failedCount.to!string); 107 } 108 } 109 110 // connect to db 111 auto client = new PostgresClient(connString, maxConn, &afterConnectOrReconnect); 112 113 { 114 auto conn = client.lockConnection(); 115 auto sqlPgatorTable = cfg["sqlPgatorTable"].get!string; 116 117 // read pgator_rpc 118 prepArgs.tableName = conn.escapeIdentifier(sqlPgatorTable); 119 120 QueryParams p; 121 p.sqlCommand = "SELECT * FROM "~prepArgs.tableName; 122 auto answer = conn.execStatement(p); 123 124 prepArgs.rpcTableLength = answer.length; 125 auto readMethodsResult = readMethods(answer); 126 prepArgs.methods = readMethodsResult.methods; 127 prepArgs.methodsLoadedFlag = true; 128 129 { 130 prepArgs.failedCount = prepArgs.rpcTableLength - readMethodsResult.loaded; 131 logDebugV("Number of statements in the table "~prepArgs.tableName~": "~prepArgs.rpcTableLength.to!string~", failed to load into pgator: "~prepArgs.failedCount.to!string); 132 } 133 134 // prepare statements for previously used connection 135 afterConnectOrReconnect(conn); 136 137 destroy(conn); 138 } 139 140 if(!checkStatements) 141 { 142 immutable methods = cast(immutable) prepArgs.methods.dup; 143 loop(cfg, client, methods); 144 } 145 146 return prepArgs.failedCount ? 2 : 0; 147 } 148 catch(Exception e) 149 { 150 logFatal(e.msg); 151 152 return 1; 153 } 154 } 155 156 void loop(in Bson cfg, PostgresClient client, immutable Method[string] methods) 157 { 158 // http-server 159 import vibe.core.core; 160 161 void httpRequestHandler(scope HTTPServerRequest req, HTTPServerResponse res) 162 { 163 try 164 { 165 RpcRequestResults results = performRpcRequests(methods, client, req); 166 167 final switch(results.type) 168 { 169 case RpcType.vibedREST: 170 auto result = &results.results[0]; 171 172 if(result.exception is null) 173 { 174 res.writeJsonBody(result.responseBody["result"]); 175 } 176 else 177 { 178 res.writeJsonBody(result.responseBody, result.exception.httpCode); 179 } 180 181 break; 182 183 case RpcType.jsonRpc: 184 auto result = &results.results[0]; 185 186 if(result.exception is null) 187 { 188 if(result.isNotify) 189 { 190 res.statusCode = HTTPStatus.noContent; 191 res.writeVoidBody(); 192 } 193 else 194 { 195 res.writeJsonBody(result.responseBody); 196 } 197 } 198 else // error handling 199 { 200 if(result.isNotify) 201 { 202 res.statusCode = HTTPStatus.noContent; 203 res.statusPhrase = result.exception.msg; 204 res.writeVoidBody(); 205 } 206 else 207 { 208 res.writeJsonBody(result.responseBody, result.exception.httpCode); 209 } 210 } 211 212 break; 213 214 case RpcType.jsonRpcBatchMode: 215 Bson[] ret; 216 217 foreach(ref r; results.results) // fill response array 218 { 219 if(!r.isNotify) // skip notify responses 220 ret ~= r.responseBody; 221 } 222 223 if(ret.length) 224 { 225 res.writeJsonBody(Bson(ret)); // normal batch response 226 } 227 else 228 { 229 res.statusCode = HTTPStatus.noContent; 230 res.writeVoidBody(); // empty response for batch with notifies only 231 } 232 233 break; 234 } 235 } 236 catch(LoopException e) 237 { 238 res.writeJsonBody(Bson(e.msg), e.httpCode); // FIXME: wrong error body format 239 240 logWarn(e.msg); 241 } 242 catch(Exception e) 243 { 244 logFatal(e.toString); 245 } 246 } 247 248 //setupWorkerThreads(); // TODO: read number of threads from config 249 250 auto settings = new HTTPServerSettings; 251 // settings.options |= HTTPServerOption.distribute; // causes stuck on epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6 252 settings.bindAddresses = cfg["listenAddresses"].deserializeBson!(string[]); 253 settings.port = to!ushort(cfg["listenPort"].get!long); 254 255 auto listenHandler = listenHTTP(settings, &httpRequestHandler); 256 257 runEventLoop(); 258 } 259 260 struct SQLVariablesNames 261 { 262 string username; 263 string password; 264 } 265 266 private Bson execMethod( 267 PostgresClient client, 268 in Method method, 269 in RpcRequest rpcRequest 270 ) 271 { 272 TransactionQueryParams qp; 273 qp.auth = rpcRequest.auth; 274 qp.queryParams.length = method.statements.length; 275 size_t paramCounter = 0; 276 277 foreach(i, statement; method.statements) 278 { 279 qp.queryParams[i].preparedStatementName = preparedName(method, statement); 280 281 if(rpcRequest.positionParams.length == 0) 282 { 283 if(rpcRequest.namedParams !is null) // named parameters with types 284 { 285 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParams); 286 } 287 else // named parameters without types 288 { 289 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParamsStringValues); 290 } 291 } 292 else // positional parameters 293 { 294 if(rpcRequest.positionParams.length - paramCounter < statement.argsNames.length) 295 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too few", __FILE__, __LINE__); 296 297 qp.queryParams[i].args = new Value[statement.argsNames.length]; 298 299 foreach(n, ref b; rpcRequest.positionParams[paramCounter .. paramCounter + statement.argsNames.length]) 300 { 301 auto v = &qp.queryParams[i].args[n]; 302 const oid = statement.argsOids[n]; 303 304 *v = bsonToValue(b, oid); 305 306 if(v.oidType != oid) 307 throw new LoopException( 308 JsonRpcErrorCode.invalidParams, 309 HTTPStatus.badRequest, 310 "Parameter #"~i.to!string~" type is "~v.oidType.to!string~", but expected "~oid.to!string, 311 __FILE__, __LINE__); 312 } 313 314 paramCounter += statement.argsNames.length; 315 } 316 } 317 318 if(rpcRequest.positionParams.length != 0 && paramCounter != rpcRequest.positionParams.length) 319 { 320 assert(paramCounter < rpcRequest.positionParams.length); 321 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too big", __FILE__, __LINE__); 322 } 323 324 SQLTransaction trans = SQLTransaction(client); 325 326 try 327 { 328 if(method.needAuthVariablesFlag && !qp.auth.authVariablesSet) 329 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.unauthorized, "Basic HTTP authentication need", __FILE__, __LINE__); 330 331 trans.begin(method.readOnlyFlag); 332 333 immutable answer = trans.execMethod(method, qp); 334 335 enforce(answer.length == method.statements.length); 336 337 Bson ret = Bson.emptyObject; 338 339 if(!method.isMultiStatement) 340 { 341 ret = formatResult(answer[0], method.statements[0].resultFormat); 342 } 343 else 344 { 345 foreach(i, statement; method.statements) 346 { 347 if(statement.resultFormat != ResultFormat.VOID) 348 ret[statement.resultName] = formatResult(answer[i], statement.resultFormat); 349 } 350 } 351 352 trans.commit(); 353 354 return ret; 355 } 356 catch(PostgresClientTimeoutException e) 357 { 358 trans.resetStart(); 359 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 360 } 361 catch(ConnectionException e) 362 { 363 trans.resetStart(); 364 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 365 } 366 catch(AnswerCreationException e) 367 { 368 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__, e); 369 } 370 finally 371 { 372 destroy(trans); 373 } 374 } 375 376 private Bson formatResult(immutable Answer answer, ResultFormat format) 377 { 378 Bson getValue(size_t rowNum, size_t colNum) 379 { 380 string columnName = answer.columnName(colNum); 381 382 try 383 { 384 return answer[rowNum][colNum].as!Bson; 385 } 386 catch(ValueConvException e) 387 { 388 e.msg = "Column "~columnName~" ("~rowNum.to!string~" row): "~e.msg; 389 throw e; 390 } 391 } 392 393 with(ResultFormat) 394 final switch(format) 395 { 396 case CELL: 397 { 398 if(answer.length != 1 || answer.columnCount != 1) 399 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One cell flag constraint failed", __FILE__, __LINE__); 400 401 return getValue(0, 0); 402 } 403 404 case ROW: 405 { 406 if(answer.length != 1) 407 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One row flag constraint failed", __FILE__, __LINE__); 408 409 Bson ret = Bson.emptyObject; 410 411 foreach(colNum; 0 .. answer.columnCount) 412 ret[answer.columnName(colNum)] = getValue(0, colNum); 413 414 return ret; 415 } 416 417 case TABLE: 418 { 419 Bson ret = Bson.emptyObject; 420 421 foreach(colNum; 0 .. answer.columnCount) 422 { 423 Bson[] col = new Bson[answer.length]; 424 425 foreach(rowNum; 0 .. answer.length) 426 col[rowNum] = getValue(rowNum, colNum); 427 428 ret[answer.columnName(colNum)] = col; 429 } 430 431 return ret; 432 } 433 434 case ROTATED: 435 { 436 Bson[] ret = new Bson[answer.length]; 437 438 foreach(rowNum; 0 .. answer.length) 439 { 440 Bson row = Bson.emptyObject; 441 442 foreach(colNum; 0 .. answer.columnCount) 443 row[answer.columnName(colNum)] = getValue(rowNum, colNum); 444 445 ret[rowNum] = row; 446 } 447 448 return Bson(ret); 449 } 450 451 case VOID: 452 { 453 return Bson.emptyObject; 454 } 455 } 456 } 457 458 Value[] named2positionalParameters(T)(in Statement method, in T[string] namedParams) 459 if(is(T == Bson) || is(T == string)) 460 { 461 Value[] ret = new Value[method.argsNames.length]; 462 463 foreach(i, argName; method.argsNames) 464 { 465 auto argValue = argName in namedParams; 466 467 if(argValue) 468 { 469 const oid = method.argsOids[i]; 470 Value v; 471 472 static if(is(T == Bson)) 473 { 474 v = bsonToValue(*argValue, oid); 475 476 if(v.oidType != oid) 477 throw new LoopException( 478 JsonRpcErrorCode.invalidParams, 479 HTTPStatus.badRequest, 480 argName~" parameter type is "~v.oidType.to!string~", but expected "~oid.to!string, 481 __FILE__, __LINE__); 482 } 483 else // T == string, unknown parameter type 484 { 485 // Using Postgres ability to determine argument type 486 v = toValue(*argValue, ValueFormat.TEXT); 487 } 488 489 ret[i] = v; 490 } 491 else 492 { 493 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Missing required parameter "~argName, __FILE__, __LINE__); 494 } 495 } 496 497 return ret; 498 } 499 500 private struct AuthorizationCredentials 501 { 502 bool authVariablesSet = false; 503 string username; 504 string password; 505 } 506 507 RpcRequestResults performRpcRequests(immutable Method[string] methods, PostgresClient client, scope HTTPServerRequest req) 508 { 509 RpcRequestResults ret; 510 511 // Recognition of request type 512 RpcRequest[] dbRequests; 513 514 if(req.method == HTTPMethod.GET) 515 { 516 ret.type = RpcType.vibedREST; 517 518 dbRequests.length = 1; 519 dbRequests[0] = RpcRequest.vibeRestGetToRpcRequest(req); 520 dbRequests[0].type = RpcType.vibedREST; 521 } 522 else 523 { 524 if(req.contentType != "application/json") 525 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.unsupportedMediaType, "Supported only application/json content type", __FILE__, __LINE__); 526 527 Json j = req.json; 528 529 switch(j.type) 530 { 531 case Json.Type.array: 532 { 533 if(!j.length) 534 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Empty JSON-RPC 2.0 batch array", __FILE__, __LINE__); 535 536 ret.type = RpcType.jsonRpcBatchMode; 537 dbRequests.length = j.length; 538 539 foreach(i, ref request; dbRequests) 540 { 541 if(!RpcRequest.isValidJsonRpcRequest(j[i])) 542 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Isn't JSON-RPC 2.0 protocol", __FILE__, __LINE__); 543 544 request = RpcRequest.jsonToRpcRequest(j[i], req); 545 request.type = RpcType.jsonRpcBatchMode; 546 } 547 548 break; 549 } 550 551 case Json.Type.object: 552 dbRequests.length = 1; 553 554 if(RpcRequest.isValidJsonRpcRequest(j)) 555 { 556 dbRequests[0] = RpcRequest.jsonToRpcRequest(j, req); 557 dbRequests[0].type = RpcType.jsonRpc; 558 ret.type = RpcType.jsonRpc; 559 } 560 else // JSON vibe.d REST POST 561 { 562 dbRequests[0] = RpcRequest.vibeRestToRpcRequest(j, req); 563 dbRequests[0].type = RpcType.vibedREST; 564 ret.type = RpcType.vibedREST; 565 } 566 567 break; 568 569 default: 570 throw new LoopException(JsonRpcErrorCode.parseError, HTTPStatus.badRequest, "Parse error", __FILE__, __LINE__); 571 } 572 } 573 574 ret.results.length = dbRequests.length; 575 576 foreach(i, const ref request; dbRequests) 577 { 578 ret.results[i] = async({ 579 return request.performRpcRequest(methods, client); 580 }); 581 } 582 583 return ret; 584 } 585 586 struct RpcRequest 587 { 588 Bson id; 589 string methodName; 590 Bson[string] namedParams = null; 591 string[string] namedParamsStringValues = null; /// used if types of parameters is unknown 592 Bson[] positionParams = null; 593 AuthorizationCredentials auth; 594 RpcType type; // used only for pretty error formatting 595 596 invariant() 597 { 598 size_t count = 0; 599 600 if(namedParams !is null) count++; 601 if(namedParamsStringValues !is null) count++; 602 if(positionParams !is null) count++; 603 604 assert(count <= 1); 605 } 606 607 bool isNotify() const 608 { 609 return id.type == Bson.Type.undefined; 610 } 611 612 private static bool isValidJsonRpcRequest(scope Json j) 613 { 614 return j["jsonrpc"] == "2.0"; 615 } 616 617 private static RpcRequest jsonToRpcRequest(scope Json j, scope HTTPServerRequest req) 618 { 619 RpcRequest r; 620 621 r.id = j["id"]; 622 r.methodName = j["method"].get!string; 623 624 Json params = j["params"]; 625 626 switch(params.type) 627 { 628 case Json.Type.undefined: // params omitted 629 break; 630 631 case Json.Type.object: 632 foreach(string key, value; params) 633 r.namedParams[key] = value; 634 635 break; 636 637 case Json.Type.array: 638 foreach(value; params) 639 r.positionParams ~= Bson(value); 640 641 break; 642 643 default: 644 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Unexpected params type", __FILE__, __LINE__); 645 } 646 647 // pick out name and password from the request 648 { 649 bool pwcheck(string _username, string _password) 650 { 651 r.auth.username = _username; 652 r.auth.password = _password; 653 654 return true; 655 } 656 657 r.auth.authVariablesSet = checkBasicAuth(req, &pwcheck); 658 } 659 660 return r; 661 } 662 663 /// Converts Vibe.d REST client GET request to RpcRequest 664 private static RpcRequest vibeRestGetToRpcRequest(ref HTTPServerRequest req) 665 { 666 RpcRequest r; 667 668 enforce(req.path.length > 0); 669 r.methodName = req.path[1..$]; // strips first '/' 670 671 foreach(string key, ref value; req.query.byKeyValue) 672 r.namedParamsStringValues[key] = value; 673 674 r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify" 675 676 return r; 677 } 678 679 680 /// Converts Vibe.d REST client request to RpcRequest 681 private static RpcRequest vibeRestToRpcRequest(scope Json j, ref HTTPServerRequest req) 682 { 683 RpcRequest r; 684 685 enforce(req.path.length > 0); 686 r.methodName = req.path[1..$]; // strips first '/' 687 688 foreach(string key, ref value; j) 689 r.namedParams[key] = value; 690 691 r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify" 692 693 return r; 694 } 695 696 RpcRequestResult performRpcRequest(immutable Method[string] methods, PostgresClient client) const 697 { 698 try 699 { 700 try 701 { 702 const method = (methodName in methods); 703 704 if(method is null) 705 throw new LoopException(JsonRpcErrorCode.methodNotFound, HTTPStatus.badRequest, "Method "~methodName~" not found", __FILE__, __LINE__); 706 707 RpcRequestResult ret; 708 ret.isNotify = isNotify; 709 710 if(!ret.isNotify) 711 { 712 ret.responseBody = Bson(["id": id]); 713 ret.responseBody["jsonrpc"] = "2.0"; 714 ret.responseBody["result"] = client.execMethod(*method, this); 715 } 716 else // JSON-RPC 2.0 Notification 717 { 718 client.execMethod(*method, this); 719 ret.responseBody = Bson.emptyObject; 720 } 721 722 return ret; 723 } 724 catch(ValueConvException e) 725 { 726 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 727 } 728 } 729 catch(LoopException e) 730 { 731 Bson err = Bson.emptyObject; 732 733 if(type != RpcType.vibedREST) 734 { 735 err["id"] = id; 736 err["jsonrpc"] = "2.0"; 737 } 738 739 if(e.answerException is null) 740 { 741 err["error"] = Bson([ 742 "message": Bson(e.msg), 743 "code": Bson(e.jsonCode) 744 ]); 745 } 746 else 747 { 748 Bson data = Bson([ 749 "hint": Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_HINT)), 750 "detail": Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_DETAIL)), 751 "errcode": Bson(e.answerException.resultErrorField(PG_DIAG_SQLSTATE)) 752 ]); 753 754 err["error"] = Bson([ 755 "message": Bson(e.msg), 756 "code": Bson(e.jsonCode), 757 "data": data 758 ]); 759 } 760 761 RpcRequestResult ret; 762 ret.isNotify = isNotify; 763 ret.responseBody = err; 764 ret.exception = e; 765 766 logWarn(methodName~": "~e.httpCode.to!string~" "~err.toString); 767 768 return ret; 769 } 770 } 771 } 772 773 private struct RpcRequestResult 774 { 775 Bson responseBody; 776 LoopException exception; 777 bool isNotify; 778 779 void opAssign(shared RpcRequestResult s) shared 780 { 781 synchronized 782 { 783 // This need because Bson don't have shared opAssign 784 Bson copy = s.responseBody; 785 (cast() this.responseBody) = copy; 786 787 this.exception = s.exception; 788 this.isNotify = s.isNotify; 789 } 790 } 791 } 792 793 private struct RpcRequestResults 794 { 795 Future!RpcRequestResult[] results; 796 RpcType type; 797 } 798 799 private enum RpcType 800 { 801 jsonRpc, /// Normal JSON mode response 802 jsonRpcBatchMode, /// Batch JSON mode response 803 vibedREST /// Vibe.d REST client mode response 804 } 805 806 enum JsonRpcErrorCode : short 807 { 808 /// Invalid JSON was received by the server. 809 /// An error occurred on the server while parsing the JSON text 810 parseError = -32700, 811 812 /// The JSON sent is not a valid Request object. 813 invalidRequest = -32600, 814 815 /// Statement not found 816 methodNotFound = -32601, 817 818 /// Invalid params 819 invalidParams = -32602, 820 821 /// Internal error 822 internalError = -32603, 823 } 824 825 class LoopException : Exception 826 { 827 const JsonRpcErrorCode jsonCode; 828 const HTTPStatus httpCode; 829 const AnswerCreationException answerException; 830 831 this(JsonRpcErrorCode jsonCode, HTTPStatus httpCode, string msg, string file, size_t line, AnswerCreationException ae = null) pure 832 { 833 this.jsonCode = jsonCode; 834 this.httpCode = httpCode; 835 this.answerException = ae; 836 837 super(msg, file, line); 838 } 839 } 840 841 /// returns names of unprepared methods, but length is number of unprepared statements 842 private string[] prepareStatements(Connection conn, ref PrepareStatementsArgs args) 843 { 844 { 845 logDebugV("try to prepare internal statements"); 846 847 conn.prepareStatement(BuiltInPrep.BEGIN, "BEGIN"); 848 conn.prepareStatement(BuiltInPrep.BEGIN_RO, "BEGIN READ ONLY"); 849 conn.prepareStatement(BuiltInPrep.COMMIT, "COMMIT"); 850 conn.prepareStatement(BuiltInPrep.ROLLBACK, "ROLLBACK"); 851 conn.prepareStatement(BuiltInPrep.SET_AUTH_VARS, 852 "SELECT set_config("~conn.escapeLiteral(args.varNames.username)~", $1, true), "~ 853 "set_config("~conn.escapeLiteral(args.varNames.password)~", $2, true)"); 854 855 logDebugV("internal statements prepared"); 856 } 857 858 string[] failedStatements; 859 860 foreach(ref method; args.methods.byValue) 861 { 862 foreach(ref statement; method.statements) 863 { 864 const prepName = preparedName(method, statement); 865 866 logDebugV("try to prepare statement "~prepName~": "~statement.sqlCommand); 867 868 try 869 { 870 conn.prepareStatement(prepName, statement.sqlCommand); 871 statement.argsOids = conn.retrieveArgsTypes(prepName); 872 873 logDebugV("statement "~prepName~" prepared"); 874 } 875 catch(ConnectionException e) 876 { 877 throw e; 878 } 879 catch(Exception e) 880 { 881 logWarn("Skipping "~prepName~": "~e.msg); 882 failedStatements ~= method.name; 883 } 884 } 885 } 886 887 return failedStatements; 888 } 889 890 string preparedName(in Method method, in Statement statement) 891 { 892 if(statement.statementNum < 0) 893 { 894 return method.name; 895 } 896 else 897 { 898 import std.conv: to; 899 900 return method.name~"_"~statement.statementNum.to!string; 901 } 902 } 903 904 private OidType[] retrieveArgsTypes(Connection conn, string preparedStatementName) 905 { 906 auto desc = conn.describePreparedStatement(preparedStatementName); 907 908 OidType[] ret = new OidType[desc.nParams]; 909 910 argsLoop: 911 foreach(i, ref t; ret) 912 { 913 t = desc.paramType(i); 914 915 foreach(sup; argsSupportedTypes) 916 { 917 try 918 if(t == sup || t == oidConvTo!"array"(sup)) continue argsLoop; 919 catch(ValueConvException) 920 {} 921 } 922 923 throw new Exception("unsupported parameter $"~(i+1).to!string~" type: "~t.to!string, __FILE__, __LINE__); 924 } 925 926 // Result fields type check 927 resultTypesLoop: 928 foreach(i; 0 .. desc.columnCount) 929 { 930 auto t = desc.OID(i); 931 932 foreach(sup; resultSupportedTypes) 933 { 934 try 935 if(t == sup || t == oidConvTo!"array"(sup)) continue resultTypesLoop; 936 catch(ValueConvException) 937 {} 938 } 939 940 throw new Exception("unsupported result field "~desc.columnName(i)~" ("~i.to!string~") type: "~t.to!string, __FILE__, __LINE__); 941 } 942 943 return ret; 944 } 945 946 // Actually this is types what described in BSON specification 947 private immutable OidType[] argsSupportedTypes = 948 [ 949 OidType.Bool, 950 OidType.Int4, 951 OidType.Int8, 952 OidType.Float8, 953 OidType.Text, 954 OidType.Json 955 ]; 956 957 // Types what can be converted to BSON 958 private immutable OidType[] resultSupportedTypes = argsSupportedTypes ~ 959 [ 960 OidType.Numeric, 961 OidType.FixedString, 962 OidType.Jsonb, 963 //OidType.UUID 964 ];