1 module pgator.app; 2 3 import pgator.rpc_table; 4 import pgator.sql_transaction; 5 import dpq2.oids: OidType; 6 import dpq2.value: ValueConvException; 7 import std.getopt; 8 import std.typecons: Tuple; 9 import std.exception: enforce; 10 import std.conv: to, ConvException; 11 import vibe.http.server; 12 import vibe.http.auth.basic_auth: checkBasicAuth; 13 import vibe.core.concurrency; 14 import vibe.core.log; 15 import vibe.data.json; 16 import vibe.data.bson; 17 import vibe.db.postgresql; 18 19 string configFileName = "/wrong/path/to/file.json"; 20 bool debugEnabled = false; 21 bool checkStatements = false; 22 23 void readOpts(string[] args) 24 { 25 try 26 { 27 auto helpInformation = getopt( 28 args, 29 "debug", &debugEnabled, 30 "check", &checkStatements, 31 "config", &configFileName 32 ); 33 } 34 catch(Exception e) 35 { 36 logFatal(e.msg); 37 throw e; 38 } 39 40 if(debugEnabled) setLogLevel = LogLevel.debugV; 41 } 42 43 private Bson _cfg; 44 45 Bson readConfig() 46 { 47 import std.file; 48 49 Bson cfg; 50 51 try 52 { 53 auto text = readText(configFileName); 54 cfg = Bson(parseJsonString(text)); 55 } 56 catch(Exception e) 57 { 58 logFatal(e.msg); 59 throw e; 60 } 61 62 return cfg; 63 } 64 65 private struct PrepareStatementsArgs 66 { 67 const SQLVariablesNames varNames; 68 bool methodsLoadedFlag = false; // need for bootstrap 69 Method[string] methods; 70 size_t rpcTableLength; 71 size_t failedCount; 72 string tableName; 73 } 74 75 int main(string[] args) 76 { 77 try 78 { 79 readOpts(args); 80 Bson cfg = readConfig(); 81 82 auto server = cfg["sqlServer"]; 83 const connString = server["connString"].get!string; 84 auto maxConn = to!uint(server["maxConn"].get!long); 85 86 PrepareStatementsArgs prepArgs = { 87 varNames: SQLVariablesNames( 88 cfg["sqlAuthVariables"]["username"].get!string, 89 cfg["sqlAuthVariables"]["password"].get!string 90 ) 91 }; 92 93 // delegate 94 void afterConnectOrReconnect(__Conn conn) 95 { 96 if(prepArgs.methodsLoadedFlag) 97 { 98 logDebugV("Preparing"); 99 auto failedStatementsNames = prepareStatements(conn, prepArgs); 100 prepArgs.failedCount += failedStatementsNames.length; 101 102 foreach(n; failedStatementsNames) 103 prepArgs.methods.remove(n); 104 105 logInfo("Number of statements in the table "~prepArgs.tableName~": "~ 106 prepArgs.rpcTableLength.to!string~", failed to prepare: "~prepArgs.failedCount.to!string); 107 } 108 } 109 110 // connect to db 111 auto client = new PostgresClient(connString, maxConn, &afterConnectOrReconnect); 112 113 { 114 auto conn = client.lockConnection(); 115 auto sqlPgatorTable = cfg["sqlPgatorTable"].get!string; 116 117 // read pgator_rpc 118 prepArgs.tableName = conn.escapeIdentifier(sqlPgatorTable); 119 120 QueryParams p; 121 p.sqlCommand = "SELECT * FROM "~prepArgs.tableName; 122 auto answer = conn.execStatement(p); 123 124 prepArgs.rpcTableLength = answer.length; 125 auto readMethodsResult = readMethods(answer); 126 prepArgs.methods = readMethodsResult.methods; 127 prepArgs.methodsLoadedFlag = true; 128 129 { 130 prepArgs.failedCount = prepArgs.rpcTableLength - readMethodsResult.loaded; 131 logDebugV("Number of statements in the table "~prepArgs.tableName~": "~prepArgs.rpcTableLength.to!string~", failed to load into pgator: "~prepArgs.failedCount.to!string); 132 } 133 134 // prepare statements for previously used connection 135 afterConnectOrReconnect(conn); 136 137 delete conn; 138 } 139 140 if(!checkStatements) 141 { 142 immutable methods = cast(immutable) prepArgs.methods.dup; 143 loop(cfg, client, methods); 144 } 145 146 return prepArgs.failedCount ? 2 : 0; 147 } 148 catch(Exception e) 149 { 150 logFatal(e.msg); 151 152 return 1; 153 } 154 } 155 156 void loop(in Bson cfg, PostgresClient client, immutable Method[string] methods) 157 { 158 debug(BreakUpSomeConnections) // for testing purposes 159 { 160 auto s1 = client.lockConnection(); 161 auto s2 = client.lockConnection(); 162 163 s1.socket.send([0x12, 0x13]); // just garbage 164 s2.socket.send([0x12, 0x13]); 165 166 delete s1; 167 delete s2; 168 } 169 170 // http-server 171 import vibe.core.core; 172 173 void httpRequestHandler(scope HTTPServerRequest req, HTTPServerResponse res) 174 { 175 try 176 { 177 RpcRequestResults results = performRpcRequests(methods, client, req); 178 179 final switch(results.type) 180 { 181 case RpcType.vibedREST: 182 auto result = &results.results[0]; 183 184 if(result.exception is null) 185 { 186 res.writeJsonBody(result.responseBody["result"]); 187 } 188 else 189 { 190 res.writeJsonBody(result.responseBody, result.exception.httpCode); 191 } 192 193 break; 194 195 case RpcType.jsonRpc: 196 auto result = &results.results[0]; 197 198 if(result.exception is null) 199 { 200 if(result.isNotify) 201 { 202 res.statusCode = HTTPStatus.noContent; 203 res.writeVoidBody(); 204 } 205 else 206 { 207 res.writeJsonBody(result.responseBody); 208 } 209 } 210 else // error handling 211 { 212 if(result.isNotify) 213 { 214 res.statusCode = HTTPStatus.noContent; 215 res.statusPhrase = result.exception.msg; 216 res.writeVoidBody(); 217 } 218 else 219 { 220 res.writeJsonBody(result.responseBody, result.exception.httpCode); 221 } 222 } 223 224 break; 225 226 case RpcType.jsonRpcBatchMode: 227 Bson[] ret; 228 229 foreach(ref r; results.results) // fill response array 230 { 231 if(!r.isNotify) // skip notify responses 232 ret ~= r.responseBody; 233 } 234 235 if(ret.length) 236 { 237 res.writeJsonBody(Bson(ret)); // normal batch response 238 } 239 else 240 { 241 res.statusCode = HTTPStatus.noContent; 242 res.writeVoidBody(); // empty response for batch with notifies only 243 } 244 245 break; 246 } 247 } 248 catch(LoopException e) 249 { 250 res.writeJsonBody(Bson(e.msg), e.httpCode); // FIXME: wrong error body format 251 252 logWarn(e.msg); 253 } 254 catch(Exception e) 255 { 256 logFatal(e.toString); 257 } 258 } 259 260 //setupWorkerThreads(); // TODO: read number of threads from config 261 262 auto settings = new HTTPServerSettings; 263 // settings.options |= HTTPServerOption.distribute; // causes stuck on epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6 264 settings.options |= HTTPServerOption.parseJsonBody; 265 settings.bindAddresses = cfg["listenAddresses"].deserializeBson!(string[]); 266 settings.port = to!ushort(cfg["listenPort"].get!long); 267 268 auto listenHandler = listenHTTP(settings, &httpRequestHandler); 269 270 runEventLoop(); 271 } 272 273 struct SQLVariablesNames 274 { 275 string username; 276 string password; 277 } 278 279 private Bson execMethod( 280 PostgresClient client, 281 in Method method, 282 in RpcRequest rpcRequest 283 ) 284 { 285 TransactionQueryParams qp; 286 qp.auth = rpcRequest.auth; 287 qp.queryParams.length = method.statements.length; 288 size_t paramCounter = 0; 289 290 foreach(i, statement; method.statements) 291 { 292 qp.queryParams[i].preparedStatementName = preparedName(method, statement); 293 294 if(rpcRequest.positionParams.length == 0) 295 { 296 if(rpcRequest.namedParams !is null) // named parameters with types 297 { 298 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParams); 299 } 300 else // named parameters without types 301 { 302 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParamsStringValues); 303 } 304 } 305 else // positional parameters 306 { 307 if(rpcRequest.positionParams.length - paramCounter < statement.argsNames.length) 308 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too few", __FILE__, __LINE__); 309 310 qp.queryParams[i].args = new Value[statement.argsNames.length]; 311 312 foreach(n, ref b; rpcRequest.positionParams[paramCounter .. paramCounter + statement.argsNames.length]) 313 { 314 auto v = &qp.queryParams[i].args[n]; 315 const oid = statement.argsOids[n]; 316 317 *v = bsonToValue(b, oid); 318 319 if(v.oidType != oid) 320 throw new LoopException( 321 JsonRpcErrorCode.invalidParams, 322 HTTPStatus.badRequest, 323 "Parameter #"~i.to!string~" type is "~v.oidType.to!string~", but expected "~oid.to!string, 324 __FILE__, __LINE__); 325 } 326 327 paramCounter += statement.argsNames.length; 328 } 329 } 330 331 if(rpcRequest.positionParams.length != 0 && paramCounter != rpcRequest.positionParams.length) 332 { 333 assert(paramCounter < rpcRequest.positionParams.length); 334 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too big", __FILE__, __LINE__); 335 } 336 337 SQLTransaction trans = SQLTransaction(client); 338 339 try 340 { 341 if(method.needAuthVariablesFlag && !qp.auth.authVariablesSet) 342 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.unauthorized, "Basic HTTP authentication need", __FILE__, __LINE__); 343 344 trans.begin(method.readOnlyFlag); 345 346 immutable answer = trans.execMethod(method, qp); 347 348 enforce(answer.length == method.statements.length); 349 350 Bson ret = Bson.emptyObject; 351 352 if(!method.isMultiStatement) 353 { 354 ret = formatResult(answer[0], method.statements[0].resultFormat); 355 } 356 else 357 { 358 foreach(i, statement; method.statements) 359 { 360 if(statement.resultFormat != ResultFormat.VOID) 361 ret[statement.resultName] = formatResult(answer[i], statement.resultFormat); 362 } 363 } 364 365 trans.commit(); 366 367 return ret; 368 } 369 catch(PostgresClientTimeoutException e) 370 { 371 trans.resetStart(); 372 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 373 } 374 catch(ConnectionException e) 375 { 376 trans.resetStart(); 377 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 378 } 379 catch(AnswerCreationException e) 380 { 381 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__, e); 382 } 383 finally 384 { 385 destroy(trans); 386 } 387 } 388 389 private Bson formatResult(immutable Answer answer, ResultFormat format) 390 { 391 Bson getValue(size_t rowNum, size_t colNum) 392 { 393 string columnName = answer.columnName(colNum); 394 395 try 396 { 397 return answer[rowNum][colNum].as!Bson; 398 } 399 catch(ValueConvException e) 400 { 401 e.msg = "Column "~columnName~" ("~rowNum.to!string~" row): "~e.msg; 402 throw e; 403 } 404 } 405 406 with(ResultFormat) 407 final switch(format) 408 { 409 case CELL: 410 { 411 if(answer.length != 1 || answer.columnCount != 1) 412 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One cell flag constraint failed", __FILE__, __LINE__); 413 414 return getValue(0, 0); 415 } 416 417 case ROW: 418 { 419 if(answer.length != 1) 420 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One row flag constraint failed", __FILE__, __LINE__); 421 422 Bson ret = Bson.emptyObject; 423 424 foreach(colNum; 0 .. answer.columnCount) 425 ret[answer.columnName(colNum)] = getValue(0, colNum); 426 427 return ret; 428 } 429 430 case TABLE: 431 { 432 Bson ret = Bson.emptyObject; 433 434 foreach(colNum; 0 .. answer.columnCount) 435 { 436 Bson[] col = new Bson[answer.length]; 437 438 foreach(rowNum; 0 .. answer.length) 439 col[rowNum] = getValue(rowNum, colNum); 440 441 ret[answer.columnName(colNum)] = col; 442 } 443 444 return ret; 445 } 446 447 case ROTATED: 448 { 449 Bson[] ret = new Bson[answer.length]; 450 451 foreach(rowNum; 0 .. answer.length) 452 { 453 Bson row = Bson.emptyObject; 454 455 foreach(colNum; 0 .. answer.columnCount) 456 row[answer.columnName(colNum)] = getValue(rowNum, colNum); 457 458 ret[rowNum] = row; 459 } 460 461 return Bson(ret); 462 } 463 464 case VOID: 465 { 466 return Bson.emptyObject; 467 } 468 } 469 } 470 471 Value[] named2positionalParameters(T)(in Statement method, in T[string] namedParams) 472 if(is(T == Bson) || is(T == string)) 473 { 474 Value[] ret = new Value[method.argsNames.length]; 475 476 foreach(i, argName; method.argsNames) 477 { 478 auto argValue = argName in namedParams; 479 480 if(argValue) 481 { 482 const oid = method.argsOids[i]; 483 Value v; 484 485 static if(is(T == Bson)) 486 { 487 v = bsonToValue(*argValue, oid); 488 489 if(v.oidType != oid) 490 throw new LoopException( 491 JsonRpcErrorCode.invalidParams, 492 HTTPStatus.badRequest, 493 argName~" parameter type is "~v.oidType.to!string~", but expected "~oid.to!string, 494 __FILE__, __LINE__); 495 } 496 else // T == string, unknown parameter type 497 { 498 // Using Postgres ability to determine argument type 499 v = toValue(*argValue, ValueFormat.TEXT); 500 } 501 502 ret[i] = v; 503 } 504 else 505 { 506 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Missing required parameter "~argName, __FILE__, __LINE__); 507 } 508 } 509 510 return ret; 511 } 512 513 private struct AuthorizationCredentials 514 { 515 bool authVariablesSet = false; 516 string username; 517 string password; 518 } 519 520 RpcRequestResults performRpcRequests(immutable Method[string] methods, PostgresClient client, scope HTTPServerRequest req) 521 { 522 RpcRequestResults ret; 523 524 // Recognition of request type 525 RpcRequest[] dbRequests; 526 527 if(req.method == HTTPMethod.GET) 528 { 529 ret.type = RpcType.vibedREST; 530 531 dbRequests.length = 1; 532 dbRequests[0] = RpcRequest.vibeRestGetToRpcRequest(req); 533 dbRequests[0].type = RpcType.vibedREST; 534 } 535 else 536 { 537 if(req.contentType != "application/json") 538 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.unsupportedMediaType, "Supported only application/json content type", __FILE__, __LINE__); 539 540 Json j = req.json; 541 542 switch(j.type) 543 { 544 case Json.Type.array: 545 { 546 if(!j.length) 547 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Empty JSON-RPC 2.0 batch array", __FILE__, __LINE__); 548 549 ret.type = RpcType.jsonRpcBatchMode; 550 dbRequests.length = j.length; 551 552 foreach(i, ref request; dbRequests) 553 { 554 if(!RpcRequest.isValidJsonRpcRequest(j[i])) 555 throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Isn't JSON-RPC 2.0 protocol", __FILE__, __LINE__); 556 557 request = RpcRequest.jsonToRpcRequest(j[i], req); 558 request.type = RpcType.jsonRpcBatchMode; 559 } 560 561 break; 562 } 563 564 case Json.Type.object: 565 dbRequests.length = 1; 566 567 if(RpcRequest.isValidJsonRpcRequest(j)) 568 { 569 dbRequests[0] = RpcRequest.jsonToRpcRequest(j, req); 570 dbRequests[0].type = RpcType.jsonRpc; 571 ret.type = RpcType.jsonRpc; 572 } 573 else // JSON vibe.d REST POST 574 { 575 dbRequests[0] = RpcRequest.vibeRestToRpcRequest(j, req); 576 dbRequests[0].type = RpcType.vibedREST; 577 ret.type = RpcType.vibedREST; 578 } 579 580 break; 581 582 default: 583 throw new LoopException(JsonRpcErrorCode.parseError, HTTPStatus.badRequest, "Parse error", __FILE__, __LINE__); 584 } 585 } 586 587 ret.results.length = dbRequests.length; 588 589 foreach(i, const ref request; dbRequests) 590 { 591 ret.results[i] = async({ 592 return request.performRpcRequest(methods, client); 593 }); 594 } 595 596 return ret; 597 } 598 599 struct RpcRequest 600 { 601 Bson id; 602 string methodName; 603 Bson[string] namedParams = null; 604 string[string] namedParamsStringValues = null; /// used if types of parameters is unknown 605 Bson[] positionParams = null; 606 AuthorizationCredentials auth; 607 RpcType type; // used only for pretty error formatting 608 609 invariant() 610 { 611 size_t count = 0; 612 613 if(namedParams !is null) count++; 614 if(namedParamsStringValues !is null) count++; 615 if(positionParams !is null) count++; 616 617 assert(count <= 1); 618 } 619 620 bool isNotify() const 621 { 622 return id.type == Bson.Type.undefined; 623 } 624 625 private static bool isValidJsonRpcRequest(scope Json j) 626 { 627 return j["jsonrpc"] == "2.0"; 628 } 629 630 private static RpcRequest jsonToRpcRequest(scope Json j, scope HTTPServerRequest req) 631 { 632 RpcRequest r; 633 634 r.id = j["id"]; 635 r.methodName = j["method"].get!string; 636 637 Json params = j["params"]; 638 639 switch(params.type) 640 { 641 case Json.Type.undefined: // params omitted 642 break; 643 644 case Json.Type.object: 645 foreach(string key, value; params) 646 r.namedParams[key] = value; 647 648 break; 649 650 case Json.Type.array: 651 foreach(value; params) 652 r.positionParams ~= Bson(value); 653 654 break; 655 656 default: 657 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Unexpected params type", __FILE__, __LINE__); 658 } 659 660 // pick out name and password from the request 661 { 662 bool pwcheck(string _username, string _password) 663 { 664 r.auth.username = _username; 665 r.auth.password = _password; 666 667 return true; 668 } 669 670 r.auth.authVariablesSet = checkBasicAuth(req, &pwcheck); 671 } 672 673 return r; 674 } 675 676 /// Converts Vibe.d REST client GET request to RpcRequest 677 private static RpcRequest vibeRestGetToRpcRequest(ref HTTPServerRequest req) 678 { 679 RpcRequest r; 680 681 enforce(req.path.length > 0); 682 r.methodName = req.path[1..$]; // strips first '/' 683 684 foreach(string key, ref value; req.query) 685 r.namedParamsStringValues[key] = value; 686 687 r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify" 688 689 return r; 690 } 691 692 693 /// Converts Vibe.d REST client request to RpcRequest 694 private static RpcRequest vibeRestToRpcRequest(scope Json j, ref HTTPServerRequest req) 695 { 696 RpcRequest r; 697 698 enforce(req.path.length > 0); 699 r.methodName = req.path[1..$]; // strips first '/' 700 701 foreach(string key, ref value; j) 702 r.namedParams[key] = value; 703 704 r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify" 705 706 return r; 707 } 708 709 RpcRequestResult performRpcRequest(immutable Method[string] methods, PostgresClient client) const 710 { 711 try 712 { 713 try 714 { 715 const method = (methodName in methods); 716 717 if(method is null) 718 throw new LoopException(JsonRpcErrorCode.methodNotFound, HTTPStatus.badRequest, "Method "~methodName~" not found", __FILE__, __LINE__); 719 720 RpcRequestResult ret; 721 ret.isNotify = isNotify; 722 723 if(!ret.isNotify) 724 { 725 ret.responseBody = Bson(["id": id]); 726 ret.responseBody["jsonrpc"] = "2.0"; 727 ret.responseBody["result"] = client.execMethod(*method, this); 728 } 729 else // JSON-RPC 2.0 Notification 730 { 731 client.execMethod(*method, this); 732 ret.responseBody = Bson.emptyObject; 733 } 734 735 return ret; 736 } 737 catch(ValueConvException e) 738 { 739 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__); 740 } 741 } 742 catch(LoopException e) 743 { 744 Bson err = Bson.emptyObject; 745 746 if(type != RpcType.vibedREST) 747 { 748 err["id"] = id; 749 err["jsonrpc"] = "2.0"; 750 } 751 752 if(e.answerException is null) 753 { 754 err["error"] = Bson([ 755 "message": Bson(e.msg), 756 "code": Bson(e.jsonCode) 757 ]); 758 } 759 else 760 { 761 Bson data = Bson([ 762 "hint": Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_HINT)), 763 "detail": Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_DETAIL)), 764 "errcode": Bson(e.answerException.resultErrorField(PG_DIAG_SQLSTATE)) 765 ]); 766 767 err["error"] = Bson([ 768 "message": Bson(e.msg), 769 "code": Bson(e.jsonCode), 770 "data": data 771 ]); 772 } 773 774 RpcRequestResult ret; 775 ret.isNotify = isNotify; 776 ret.responseBody = err; 777 ret.exception = e; 778 779 logWarn(methodName~": "~e.httpCode.to!string~" "~err.toString); 780 781 return ret; 782 } 783 } 784 } 785 786 private struct RpcRequestResult 787 { 788 Bson responseBody; 789 LoopException exception; 790 bool isNotify; 791 792 void opAssign(shared RpcRequestResult s) shared 793 { 794 synchronized 795 { 796 // This need because Bson don't have shared opAssign 797 Bson copy = s.responseBody; 798 (cast() this.responseBody) = copy; 799 800 this.exception = s.exception; 801 this.isNotify = s.isNotify; 802 } 803 } 804 } 805 806 private struct RpcRequestResults 807 { 808 Future!RpcRequestResult[] results; 809 RpcType type; 810 } 811 812 private enum RpcType 813 { 814 jsonRpc, /// Normal JSON mode response 815 jsonRpcBatchMode, /// Batch JSON mode response 816 vibedREST /// Vibe.d REST client mode response 817 } 818 819 enum JsonRpcErrorCode : short 820 { 821 /// Invalid JSON was received by the server. 822 /// An error occurred on the server while parsing the JSON text 823 parseError = -32700, 824 825 /// The JSON sent is not a valid Request object. 826 invalidRequest = -32600, 827 828 /// Statement not found 829 methodNotFound = -32601, 830 831 /// Invalid params 832 invalidParams = -32602, 833 834 /// Internal error 835 internalError = -32603, 836 } 837 838 class LoopException : Exception 839 { 840 const JsonRpcErrorCode jsonCode; 841 const HTTPStatus httpCode; 842 const AnswerCreationException answerException; 843 844 this(JsonRpcErrorCode jsonCode, HTTPStatus httpCode, string msg, string file, size_t line, AnswerCreationException ae = null) pure 845 { 846 this.jsonCode = jsonCode; 847 this.httpCode = httpCode; 848 this.answerException = ae; 849 850 super(msg, file, line); 851 } 852 } 853 854 /// returns names of unprepared methods, but length is number of unprepared statements 855 private string[] prepareStatements(__Conn conn, ref PrepareStatementsArgs args) 856 { 857 { 858 logDebugV("try to prepare internal statements"); 859 860 conn.prepareStatement(BuiltInPrep.BEGIN, "BEGIN"); 861 conn.prepareStatement(BuiltInPrep.BEGIN_RO, "BEGIN READ ONLY"); 862 conn.prepareStatement(BuiltInPrep.COMMIT, "COMMIT"); 863 conn.prepareStatement(BuiltInPrep.ROLLBACK, "ROLLBACK"); 864 conn.prepareStatement(BuiltInPrep.SET_AUTH_VARS, 865 "SELECT set_config("~conn.escapeLiteral(args.varNames.username)~", $1, true), "~ 866 "set_config("~conn.escapeLiteral(args.varNames.password)~", $2, true)"); 867 868 logDebugV("internal statements prepared"); 869 } 870 871 string[] failedStatements; 872 873 foreach(ref method; args.methods.byValue) 874 { 875 foreach(ref statement; method.statements) 876 { 877 const prepName = preparedName(method, statement); 878 879 logDebugV("try to prepare statement "~prepName~": "~statement.sqlCommand); 880 881 try 882 { 883 conn.prepareStatement(prepName, statement.sqlCommand); 884 statement.argsOids = conn.retrieveArgsTypes(prepName); 885 886 logDebugV("statement "~prepName~" prepared"); 887 } 888 catch(ConnectionException e) 889 { 890 throw e; 891 } 892 catch(Exception e) 893 { 894 logWarn("Skipping "~prepName~": "~e.msg); 895 failedStatements ~= method.name; 896 } 897 } 898 } 899 900 return failedStatements; 901 } 902 903 string preparedName(in Method method, in Statement statement) 904 { 905 if(statement.statementNum < 0) 906 { 907 return method.name; 908 } 909 else 910 { 911 import std.conv: to; 912 913 return method.name~"_"~statement.statementNum.to!string; 914 } 915 } 916 917 private OidType[] retrieveArgsTypes(__Conn conn, string preparedStatementName) 918 { 919 auto desc = conn.describePreparedStatement(preparedStatementName); 920 921 OidType[] ret = new OidType[desc.nParams]; 922 923 argsLoop: 924 foreach(i, ref t; ret) 925 { 926 t = desc.paramType(i); 927 928 foreach(sup; argsSupportedTypes) 929 { 930 try 931 if(t == sup || t == oidConvTo!"array"(sup)) continue argsLoop; 932 catch(ValueConvException) 933 {} 934 } 935 936 throw new Exception("unsupported parameter $"~(i+1).to!string~" type: "~t.to!string, __FILE__, __LINE__); 937 } 938 939 // Result fields type check 940 resultTypesLoop: 941 foreach(i; 0 .. desc.columnCount) 942 { 943 auto t = desc.OID(i); 944 945 foreach(sup; resultSupportedTypes) 946 { 947 try 948 if(t == sup || t == oidConvTo!"array"(sup)) continue resultTypesLoop; 949 catch(ValueConvException) 950 {} 951 } 952 953 throw new Exception("unsupported result field "~desc.columnName(i)~" ("~i.to!string~") type: "~t.to!string, __FILE__, __LINE__); 954 } 955 956 return ret; 957 } 958 959 // Actually this is types what described in BSON specification 960 private immutable OidType[] argsSupportedTypes = 961 [ 962 OidType.Bool, 963 OidType.Int4, 964 OidType.Int8, 965 OidType.Float8, 966 OidType.Text, 967 OidType.Json 968 ]; 969 970 // Types what can be converted to BSON 971 private immutable OidType[] resultSupportedTypes = argsSupportedTypes ~ 972 [ 973 OidType.Numeric, 974 OidType.FixedString, 975 OidType.Jsonb, 976 //OidType.UUID 977 ];