1 module pgator.app; 2 3 import pgator.rpc_table; 4 import pgator.sql_transaction; 5 import dpq2.oids: OidType; 6 import dpq2.value: ValueConvException; 7 import std.getopt; 8 import std.typecons: Tuple; 9 import std.exception: enforce; 10 import std.conv: to, ConvException; 11 import vibe.http.server; 12 import vibe.http.auth.basic_auth: checkBasicAuth; 13 import vibe.core.concurrency; 14 import vibe.core.log; 15 import vibe.data.json; 16 import vibe.data.bson; 17 import vibe.db.postgresql; 18 19 string configFileName = "/wrong/path/to/file.json"; 20 bool debugEnabled = false; 21 bool checkStatements = false; 22 23 void readOpts(string[] args) 24 { 25 try 26 { 27 auto helpInformation = getopt( 28 args, 29 "debug", &debugEnabled, 30 "check", &checkStatements, 31 "config", &configFileName 32 ); 33 } 34 catch(Exception e) 35 { 36 logFatal(e.msg); 37 throw e; 38 } 39 40 if(debugEnabled) setLogLevel = LogLevel.debugV; 41 } 42 43 private Bson _cfg; 44 45 Bson readConfig() 46 { 47 import std.file; 48 49 Bson cfg; 50 51 try 52 { 53 auto text = readText(configFileName); 54 cfg = Bson(parseJsonString(text)); 55 } 56 catch(Exception e) 57 { 58 logFatal(e.msg); 59 throw e; 60 } 61 62 return cfg; 63 } 64 65 private struct PrepareStatementsArgs 66 { 67 const SQLVariablesNames varNames; 68 bool methodsLoadedFlag = false; // need for bootstrap 69 Method[string] methods; 70 size_t rpcTableLength; 71 size_t failedCount; 72 string tableName; 73 } 74 75 int main(string[] args) 76 { 77 try 78 { 79 readOpts(args); 80 Bson cfg = readConfig(); 81 82 auto server = cfg["sqlServer"]; 83 const connString = server["connString"].get!string; 84 auto maxConn = to!uint(server["maxConn"].get!long); 85 86 PrepareStatementsArgs prepArgs = { 87 varNames: SQLVariablesNames( 88 cfg["sqlAuthVariables"]["username"].get!string, 89 cfg["sqlAuthVariables"]["password"].get!string 90 ) 91 }; 92 93 // delegate 94 void afterConnectOrReconnect(Connection conn) 95 { 96 if(prepArgs.methodsLoadedFlag) 97 { 98 logDebugV("Preparing"); 99 auto failedStatementsNames = prepareStatements(conn, prepArgs); 100 prepArgs.failedCount += failedStatementsNames.length; 101 102 foreach(n; failedStatementsNames) 103 prepArgs.methods.remove(n); 104 105 logInfo("Number of statements in the table "~prepArgs.tableName~": "~ 106 prepArgs.rpcTableLength.to!string~", failed to prepare: "~prepArgs.failedCount.to!string); 107 } 108 } 109 110 // connect to db 111 auto client = new PostgresClient(connString, maxConn, &afterConnectOrReconnect); 112 113 { 114 auto conn = client.lockConnection(); 115 auto sqlPgatorTable = cfg["sqlPgatorTable"].get!string; 116 117 // read pgator_rpc 118 prepArgs.tableName = conn.escapeIdentifier(sqlPgatorTable); 119 120 QueryParams p; 121 p.sqlCommand = "SELECT * FROM "~prepArgs.tableName; 122 auto answer = conn.execStatement(p); 123 124 prepArgs.rpcTableLength = answer.length; 125 auto readMethodsResult = readMethods(answer); 126 prepArgs.methods = readMethodsResult.methods; 127 prepArgs.methodsLoadedFlag = true; 128 129 { 130 prepArgs.failedCount = prepArgs.rpcTableLength - readMethodsResult.loaded; 131 logDebugV("Number of statements in the table "~prepArgs.tableName~": "~prepArgs.rpcTableLength.to!string~", failed to load into pgator: "~prepArgs.failedCount.to!string); 132 } 133 134 // prepare statements for previously used connection 135 afterConnectOrReconnect(conn); 136 137 destroy(conn); 138 } 139 140 if(!checkStatements) 141 { 142 immutable methods = cast(immutable) prepArgs.methods.dup; 143 loop(cfg, client, methods); 144 } 145 146 return prepArgs.failedCount ? 2 : 0; 147 } 148 catch(Exception e) 149 { 150 logFatal(e.msg); 151 152 return 1; 153 } 154 } 155 156 void loop(in Bson cfg, PostgresClient client, immutable Method[string] methods) 157 { 158 // http-server 159 import vibe.core.core; 160 161 void httpRequestHandler(scope HTTPServerRequest req, HTTPServerResponse res) 162 { 163 try 164 { 165 RpcRequestResults results = performRpcRequests(methods, client, req); 166 167 final switch(results.type) 168 { 169 case RpcType.vibedREST: 170 auto result = &results.results[0]; 171 172 if(result.exception is null) 173 { 174 res.writeJsonBody(result.responseBody["result"]); 175 } 176 else 177 { 178 res.writeJsonBody(result.responseBody, result.exception.httpCode); 179 } 180 181 break; 182 183 case RpcType.jsonRpc: 184 auto result = &results.results[0]; 185 186 if(result.exception is null) 187 { 188 if(result.isNotify) 189 { 190 res.statusCode = HTTPStatus.noContent; 191 res.writeVoidBody(); 192 } 193 else 194 { 195 res.writeJsonBody(result.responseBody); 196 } 197 } 198 else // error handling 199 { 200 if(result.isNotify) 201 { 202 res.statusCode = HTTPStatus.noContent; 203 res.statusPhrase = result.exception.msg; 204 res.writeVoidBody(); 205 } 206 else 207 { 208 res.writeJsonBody(result.responseBody, result.exception.httpCode); 209 } 210 } 211 212 break; 213 214 case RpcType.jsonRpcBatchMode: 215 Bson[] ret; 216 217 foreach(ref r; results.results) // fill response array 218 { 219 if(!r.isNotify) // skip notify responses 220 ret ~= r.responseBody; 221 } 222 223 if(ret.length) 224 { 225 res.writeJsonBody(Bson(ret)); // normal batch response 226 } 227 else 228 { 229 res.statusCode = HTTPStatus.noContent; 230 res.writeVoidBody(); // empty response for batch with notifies only 231 } 232 233 break; 234 } 235 } 236 catch(LoopException e) 237 { 238 res.writeJsonBody(Bson(e.msg), e.httpCode); // FIXME: wrong error body format 239 240 logWarn(e.msg); 241 } 242 catch(Exception e) 243 { 244 logFatal(e.toString); 245 } 246 } 247 248 //setupWorkerThreads(); // TODO: read number of threads from config 249 250 auto settings = new HTTPServerSettings; 251 // settings.options |= HTTPServerOption.distribute; // causes stuck on epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6 252 settings.options |= HTTPServerOption.parseJsonBody; 253 settings.bindAddresses = cfg["listenAddresses"].deserializeBson!(string[]); 254 settings.port = to!ushort(cfg["listenPort"].get!long); 255 256 auto listenHandler = listenHTTP(settings, &httpRequestHandler); 257 258 runEventLoop(); 259 } 260 261 struct SQLVariablesNames 262 { 263 string username; 264 string password; 265 } 266 267 private Bson execMethod( 268 PostgresClient client, 269 in Method method, 270 in RpcRequest rpcRequest 271 ) 272 { 273 TransactionQueryParams qp; 274 qp.auth = rpcRequest.auth; 275 qp.queryParams.length = method.statements.length; 276 size_t paramCounter = 0; 277 278 foreach(i, statement; method.statements) 279 { 280 qp.queryParams[i].preparedStatementName = preparedName(method, statement); 281 282 if(rpcRequest.positionParams.length == 0) 283 { 284 if(rpcRequest.namedParams !is null) // named parameters with types 285 { 286 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParams); 287 } 288 else // named parameters without types 289 { 290 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParamsStringValues); 291 } 292 } 293 else // positional parameters 294 { 295 if(rpcRequest.positionParams.length - paramCounter < statement.argsNames.length) 296 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too few", __FILE__, __LINE__); 297 298 qp.queryParams[i].args = new Value[statement.argsNames.length]; 299 300 foreach(n, ref b; rpcRequest.positionParams[paramCounter .. paramCounter + statement.argsNames.length]) 301 { 302 auto v = &qp.queryParams[i].args[n]; 303 const oid = statement.argsOids[n]; 304 305 *v = bsonToValue(b, oid); 306 307 if(v.oidType != oid) 308 throw new LoopException( 309 JsonRpcErrorCode.invalidParams, 310 HTTPStatus.badRequest, 311 "Parameter #"~i.to!string~" type is "~v.oidType.to!string~", but expected "~oid.to!string, 312 __FILE__, __LINE__); 313 } 314 315 paramCounter += statement.argsNames.length; 316 } 317 } 318 319 if(rpcRequest.positionParams.length != 0 && paramCounter != rpcRequest.positionParams.length) 320 { 321 assert(paramCounter < rpcRequest.positionParams.length); 322 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too big", __FILE__, __LINE__); 323 } 324 325 SQLTransaction trans = SQLTransaction(client); 326 327 try 328 { 329 if(method.needAuthVariablesFlag && !qp.auth.authVariablesSet) 330 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.unauthorized, "Basic HTTP authentication need", __FILE__, __LINE__); 331 332 trans.begin(method.readOnlyFlag); 333 334 immutable answer = trans.execMethod(method, qp); 335 336 enforce(answer.length == method.statements.length); 337 338 Bson ret = Bson.emptyObject; 339 340 if(!method.isMultiStatement) 341 { 342 ret = formatResult(answer[0], method.statements[0].resultFormat); 343 } 344 else 345 { 346 foreach(i, statement; method.statements) 347 { 348 if(statement.resultFormat != ResultFormat.VOID) 349 ret[statement.resultName] = formatResult(answer[i], statement.resultFormat); 350 } 351 } 352 353 trans.commit(); 354 355 return ret; 356 } 357 catch(PostgresClientTimeoutException e) 358 { 359 trans.resetStart(); 360 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 361 } 362 catch(ConnectionException e) 363 { 364 trans.resetStart(); 365 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 366 } 367 catch(AnswerCreationException e) 368 { 369 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__, e); 370 } 371 finally 372 { 373 destroy(trans); 374 } 375 } 376 377 private Bson formatResult(immutable Answer answer, ResultFormat format) 378 { 379 Bson getValue(size_t rowNum, size_t colNum) 380 { 381 string columnName = answer.columnName(colNum); 382 383 try 384 { 385 return answer[rowNum][colNum].as!Bson; 386 } 387 catch(ValueConvException e) 388 { 389 e.msg = "Column "~columnName~" ("~rowNum.to!string~" row): "~e.msg; 390 throw e; 391 } 392 } 393 394 with(ResultFormat) 395 final switch(format) 396 { 397 case CELL: 398 { 399 if(answer.length != 1 || answer.columnCount != 1) 400 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One cell flag constraint failed", __FILE__, __LINE__); 401 402 return getValue(0, 0); 403 } 404 405 case ROW: 406 { 407 if(answer.length != 1) 408 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One row flag constraint failed", __FILE__, __LINE__); 409 410 Bson ret = Bson.emptyObject; 411 412 foreach(colNum; 0 .. answer.columnCount) 413 ret[answer.columnName(colNum)] = getValue(0, colNum); 414 415 return ret; 416 } 417 418 case TABLE: 419 { 420 Bson ret = Bson.emptyObject; 421 422 foreach(colNum; 0 .. answer.columnCount) 423 { 424 Bson[] col = new Bson[answer.length]; 425 426 foreach(rowNum; 0 .. answer.length) 427 col[rowNum] = getValue(rowNum, colNum); 428 429 ret[answer.columnName(colNum)] = col; 430 } 431 432 return ret; 433 } 434 435 case ROTATED: 436 { 437 Bson[] ret = new Bson[answer.length]; 438 439 foreach(rowNum; 0 .. answer.length) 440 { 441 Bson row = Bson.emptyObject; 442 443 foreach(colNum; 0 .. answer.columnCount) 444 row[answer.columnName(colNum)] = getValue(rowNum, colNum); 445 446 ret[rowNum] = row; 447 } 448 449 return Bson(ret); 450 } 451 452 case VOID: 453 { 454 return Bson.emptyObject; 455 } 456 } 457 } 458 459 Value[] named2positionalParameters(T)(in Statement method, in T[string] namedParams) 460 if(is(T == Bson) || is(T == string)) 461 { 462 Value[] ret = new Value[method.argsNames.length]; 463 464 foreach(i, argName; method.argsNames) 465 { 466 auto argValue = argName in namedParams; 467 468 if(argValue) 469 { 470 const oid = method.argsOids[i]; 471 Value v; 472 473 static if(is(T == Bson)) 474 { 475 v = bsonToValue(*argValue, oid); 476 477 if(v.oidType != oid) 478 throw new LoopException( 479 JsonRpcErrorCode.invalidParams, 480 HTTPStatus.badRequest, 481 argName~" parameter type is "~v.oidType.to!string~", but expected "~oid.to!string, 482 __FILE__, __LINE__); 483 } 484 else // T == string, unknown parameter type 485 { 486 // Using Postgres ability to determine argument type 487 v = toValue(*argValue, ValueFormat.TEXT); 488 } 489 490 ret[i] = v; 491 } 492 else 493 { 494 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Missing required parameter "~argName, __FILE__, __LINE__); 495 } 496 } 497 498 return ret; 499 } 500 501 private struct AuthorizationCredentials 502 { 503 bool authVariablesSet = false; 504 string username; 505 string password; 506 } 507 508 RpcRequestResults performRpcRequests(immutable Method[string] methods, PostgresClient client, scope HTTPServerRequest req) 509 { 510 RpcRequestResults ret; 511 512 // Recognition of request type 513 RpcRequest[] dbRequests; 514 515 if(req.method == HTTPMethod.GET) 516 { 517 ret.type = RpcType.vibedREST; 518 519 dbRequests.length = 1; 520 dbRequests[0] = RpcRequest.vibeRestGetToRpcRequest(req); 521 dbRequests[0].type = RpcType.vibedREST; 522 } 523 else 524 { 525 if(req.contentType != "application/json") 526 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.unsupportedMediaType, "Supported only application/json content type", __FILE__, __LINE__); 527 528 Json j = req.json; 529 530 switch(j.type) 531 { 532 case Json.Type.array: 533 { 534 if(!j.length) 535 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Empty JSON-RPC 2.0 batch array", __FILE__, __LINE__); 536 537 ret.type = RpcType.jsonRpcBatchMode; 538 dbRequests.length = j.length; 539 540 foreach(i, ref request; dbRequests) 541 { 542 if(!RpcRequest.isValidJsonRpcRequest(j[i])) 543 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Isn't JSON-RPC 2.0 protocol", __FILE__, __LINE__); 544 545 request = RpcRequest.jsonToRpcRequest(j[i], req); 546 request.type = RpcType.jsonRpcBatchMode; 547 } 548 549 break; 550 } 551 552 case Json.Type.object: 553 dbRequests.length = 1; 554 555 if(RpcRequest.isValidJsonRpcRequest(j)) 556 { 557 dbRequests[0] = RpcRequest.jsonToRpcRequest(j, req); 558 dbRequests[0].type = RpcType.jsonRpc; 559 ret.type = RpcType.jsonRpc; 560 } 561 else // JSON vibe.d REST POST 562 { 563 dbRequests[0] = RpcRequest.vibeRestToRpcRequest(j, req); 564 dbRequests[0].type = RpcType.vibedREST; 565 ret.type = RpcType.vibedREST; 566 } 567 568 break; 569 570 default: 571 throw new LoopException(JsonRpcErrorCode.parseError, HTTPStatus.badRequest, "Parse error", __FILE__, __LINE__); 572 } 573 } 574 575 ret.results.length = dbRequests.length; 576 577 foreach(i, const ref request; dbRequests) 578 { 579 ret.results[i] = async({ 580 return request.performRpcRequest(methods, client); 581 }); 582 } 583 584 return ret; 585 } 586 587 struct RpcRequest 588 { 589 Bson id; 590 string methodName; 591 Bson[string] namedParams = null; 592 string[string] namedParamsStringValues = null; /// used if types of parameters is unknown 593 Bson[] positionParams = null; 594 AuthorizationCredentials auth; 595 RpcType type; // used only for pretty error formatting 596 597 invariant() 598 { 599 size_t count = 0; 600 601 if(namedParams !is null) count++; 602 if(namedParamsStringValues !is null) count++; 603 if(positionParams !is null) count++; 604 605 assert(count <= 1); 606 } 607 608 bool isNotify() const 609 { 610 return id.type == Bson.Type.undefined; 611 } 612 613 private static bool isValidJsonRpcRequest(scope Json j) 614 { 615 return j["jsonrpc"] == "2.0"; 616 } 617 618 private static RpcRequest jsonToRpcRequest(scope Json j, scope HTTPServerRequest req) 619 { 620 RpcRequest r; 621 622 r.id = j["id"]; 623 r.methodName = j["method"].get!string; 624 625 Json params = j["params"]; 626 627 switch(params.type) 628 { 629 case Json.Type.undefined: // params omitted 630 break; 631 632 case Json.Type.object: 633 foreach(string key, value; params) 634 r.namedParams[key] = value; 635 636 break; 637 638 case Json.Type.array: 639 foreach(value; params) 640 r.positionParams ~= Bson(value); 641 642 break; 643 644 default: 645 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Unexpected params type", __FILE__, __LINE__); 646 } 647 648 // pick out name and password from the request 649 { 650 bool pwcheck(string _username, string _password) 651 { 652 r.auth.username = _username; 653 r.auth.password = _password; 654 655 return true; 656 } 657 658 r.auth.authVariablesSet = checkBasicAuth(req, &pwcheck); 659 } 660 661 return r; 662 } 663 664 /// Converts Vibe.d REST client GET request to RpcRequest 665 private static RpcRequest vibeRestGetToRpcRequest(ref HTTPServerRequest req) 666 { 667 RpcRequest r; 668 669 enforce(req.path.length > 0); 670 r.methodName = req.path[1..$]; // strips first '/' 671 672 foreach(string key, ref value; req.query) 673 r.namedParamsStringValues[key] = value; 674 675 r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify" 676 677 return r; 678 } 679 680 681 /// Converts Vibe.d REST client request to RpcRequest 682 private static RpcRequest vibeRestToRpcRequest(scope Json j, ref HTTPServerRequest req) 683 { 684 RpcRequest r; 685 686 enforce(req.path.length > 0); 687 r.methodName = req.path[1..$]; // strips first '/' 688 689 foreach(string key, ref value; j) 690 r.namedParams[key] = value; 691 692 r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify" 693 694 return r; 695 } 696 697 RpcRequestResult performRpcRequest(immutable Method[string] methods, PostgresClient client) const 698 { 699 try 700 { 701 try 702 { 703 const method = (methodName in methods); 704 705 if(method is null) 706 throw new LoopException(JsonRpcErrorCode.methodNotFound, HTTPStatus.badRequest, "Method "~methodName~" not found", __FILE__, __LINE__); 707 708 RpcRequestResult ret; 709 ret.isNotify = isNotify; 710 711 if(!ret.isNotify) 712 { 713 ret.responseBody = Bson(["id": id]); 714 ret.responseBody["jsonrpc"] = "2.0"; 715 ret.responseBody["result"] = client.execMethod(*method, this); 716 } 717 else // JSON-RPC 2.0 Notification 718 { 719 client.execMethod(*method, this); 720 ret.responseBody = Bson.emptyObject; 721 } 722 723 return ret; 724 } 725 catch(ValueConvException e) 726 { 727 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 728 } 729 } 730 catch(LoopException e) 731 { 732 Bson err = Bson.emptyObject; 733 734 if(type != RpcType.vibedREST) 735 { 736 err["id"] = id; 737 err["jsonrpc"] = "2.0"; 738 } 739 740 if(e.answerException is null) 741 { 742 err["error"] = Bson([ 743 "message": Bson(e.msg), 744 "code": Bson(e.jsonCode) 745 ]); 746 } 747 else 748 { 749 Bson data = Bson([ 750 "hint": Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_HINT)), 751 "detail": Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_DETAIL)), 752 "errcode": Bson(e.answerException.resultErrorField(PG_DIAG_SQLSTATE)) 753 ]); 754 755 err["error"] = Bson([ 756 "message": Bson(e.msg), 757 "code": Bson(e.jsonCode), 758 "data": data 759 ]); 760 } 761 762 RpcRequestResult ret; 763 ret.isNotify = isNotify; 764 ret.responseBody = err; 765 ret.exception = e; 766 767 logWarn(methodName~": "~e.httpCode.to!string~" "~err.toString); 768 769 return ret; 770 } 771 } 772 } 773 774 private struct RpcRequestResult 775 { 776 Bson responseBody; 777 LoopException exception; 778 bool isNotify; 779 780 void opAssign(shared RpcRequestResult s) shared 781 { 782 synchronized 783 { 784 // This need because Bson don't have shared opAssign 785 Bson copy = s.responseBody; 786 (cast() this.responseBody) = copy; 787 788 this.exception = s.exception; 789 this.isNotify = s.isNotify; 790 } 791 } 792 } 793 794 private struct RpcRequestResults 795 { 796 Future!RpcRequestResult[] results; 797 RpcType type; 798 } 799 800 private enum RpcType 801 { 802 jsonRpc, /// Normal JSON mode response 803 jsonRpcBatchMode, /// Batch JSON mode response 804 vibedREST /// Vibe.d REST client mode response 805 } 806 807 enum JsonRpcErrorCode : short 808 { 809 /// Invalid JSON was received by the server. 810 /// An error occurred on the server while parsing the JSON text 811 parseError = -32700, 812 813 /// The JSON sent is not a valid Request object. 814 invalidRequest = -32600, 815 816 /// Statement not found 817 methodNotFound = -32601, 818 819 /// Invalid params 820 invalidParams = -32602, 821 822 /// Internal error 823 internalError = -32603, 824 } 825 826 class LoopException : Exception 827 { 828 const JsonRpcErrorCode jsonCode; 829 const HTTPStatus httpCode; 830 const AnswerCreationException answerException; 831 832 this(JsonRpcErrorCode jsonCode, HTTPStatus httpCode, string msg, string file, size_t line, AnswerCreationException ae = null) pure 833 { 834 this.jsonCode = jsonCode; 835 this.httpCode = httpCode; 836 this.answerException = ae; 837 838 super(msg, file, line); 839 } 840 } 841 842 /// returns names of unprepared methods, but length is number of unprepared statements 843 private string[] prepareStatements(Connection conn, ref PrepareStatementsArgs args) 844 { 845 { 846 logDebugV("try to prepare internal statements"); 847 848 conn.prepareStatement(BuiltInPrep.BEGIN, "BEGIN"); 849 conn.prepareStatement(BuiltInPrep.BEGIN_RO, "BEGIN READ ONLY"); 850 conn.prepareStatement(BuiltInPrep.COMMIT, "COMMIT"); 851 conn.prepareStatement(BuiltInPrep.ROLLBACK, "ROLLBACK"); 852 conn.prepareStatement(BuiltInPrep.SET_AUTH_VARS, 853 "SELECT set_config("~conn.escapeLiteral(args.varNames.username)~", $1, true), "~ 854 "set_config("~conn.escapeLiteral(args.varNames.password)~", $2, true)"); 855 856 logDebugV("internal statements prepared"); 857 } 858 859 string[] failedStatements; 860 861 foreach(ref method; args.methods.byValue) 862 { 863 foreach(ref statement; method.statements) 864 { 865 const prepName = preparedName(method, statement); 866 867 logDebugV("try to prepare statement "~prepName~": "~statement.sqlCommand); 868 869 try 870 { 871 conn.prepareStatement(prepName, statement.sqlCommand); 872 statement.argsOids = conn.retrieveArgsTypes(prepName); 873 874 logDebugV("statement "~prepName~" prepared"); 875 } 876 catch(ConnectionException e) 877 { 878 throw e; 879 } 880 catch(Exception e) 881 { 882 logWarn("Skipping "~prepName~": "~e.msg); 883 failedStatements ~= method.name; 884 } 885 } 886 } 887 888 return failedStatements; 889 } 890 891 string preparedName(in Method method, in Statement statement) 892 { 893 if(statement.statementNum < 0) 894 { 895 return method.name; 896 } 897 else 898 { 899 import std.conv: to; 900 901 return method.name~"_"~statement.statementNum.to!string; 902 } 903 } 904 905 private OidType[] retrieveArgsTypes(Connection conn, string preparedStatementName) 906 { 907 auto desc = conn.describePreparedStatement(preparedStatementName); 908 909 OidType[] ret = new OidType[desc.nParams]; 910 911 argsLoop: 912 foreach(i, ref t; ret) 913 { 914 t = desc.paramType(i); 915 916 foreach(sup; argsSupportedTypes) 917 { 918 try 919 if(t == sup || t == oidConvTo!"array"(sup)) continue argsLoop; 920 catch(ValueConvException) 921 {} 922 } 923 924 throw new Exception("unsupported parameter $"~(i+1).to!string~" type: "~t.to!string, __FILE__, __LINE__); 925 } 926 927 // Result fields type check 928 resultTypesLoop: 929 foreach(i; 0 .. desc.columnCount) 930 { 931 auto t = desc.OID(i); 932 933 foreach(sup; resultSupportedTypes) 934 { 935 try 936 if(t == sup || t == oidConvTo!"array"(sup)) continue resultTypesLoop; 937 catch(ValueConvException) 938 {} 939 } 940 941 throw new Exception("unsupported result field "~desc.columnName(i)~" ("~i.to!string~") type: "~t.to!string, __FILE__, __LINE__); 942 } 943 944 return ret; 945 } 946 947 // Actually this is types what described in BSON specification 948 private immutable OidType[] argsSupportedTypes = 949 [ 950 OidType.Bool, 951 OidType.Int4, 952 OidType.Int8, 953 OidType.Float8, 954 OidType.Text, 955 OidType.Json 956 ]; 957 958 // Types what can be converted to BSON 959 private immutable OidType[] resultSupportedTypes = argsSupportedTypes ~ 960 [ 961 OidType.Numeric, 962 OidType.FixedString, 963 OidType.Jsonb, 964 //OidType.UUID 965 ];