1 module pgator.app;
2 
3 import pgator.rpc_table;
4 import pgator.sql_transaction;
5 import dpq2.oids: OidType;
6 import dpq2.value: ValueConvException;
7 import std.getopt;
8 import std.typecons: Tuple;
9 import std.exception: enforce;
10 import std.conv: to, ConvException;
11 import vibe.http.server;
12 import vibe.http.auth.basic_auth: checkBasicAuth;
13 import vibe.core.concurrency;
14 import vibe.core.log;
15 import vibe.data.json;
16 import vibe.data.bson;
17 import vibe.db.postgresql;
18 
19 string configFileName = "/wrong/path/to/file.json";
20 bool debugEnabled = false;
21 bool checkStatements = false;
22 
23 void readOpts(string[] args)
24 {
25     try
26     {
27         auto helpInformation = getopt(
28                 args,
29                 "debug", &debugEnabled,
30                 "check", &checkStatements,
31                 "config", &configFileName
32             );
33     }
34     catch(Exception e)
35     {
36         logFatal(e.msg);
37         throw e;
38     }
39 
40     if(debugEnabled) setLogLevel = LogLevel.debugV;
41 }
42 
43 private Bson _cfg;
44 
45 Bson readConfig()
46 {
47     import std.file;
48 
49     Bson cfg;
50 
51     try
52     {
53         auto text = readText(configFileName);
54         cfg = Bson(parseJsonString(text));
55     }
56     catch(Exception e)
57     {
58         logFatal(e.msg);
59         throw e;
60     }
61 
62     return cfg;
63 }
64 
65 private struct PrepareStatementsArgs
66 {
67     const SQLVariablesNames varNames;
68     bool methodsLoadedFlag = false; // need for bootstrap
69     Method[string] methods;
70     size_t rpcTableLength;
71     size_t failedCount;
72     string tableName;
73 }
74 
75 int main(string[] args)
76 {
77     try
78     {
79         readOpts(args);
80         Bson cfg = readConfig();
81 
82         auto server = cfg["sqlServer"];
83         const connString = server["connString"].get!string;
84         auto maxConn = to!uint(server["maxConn"].get!long);
85 
86         PrepareStatementsArgs prepArgs = {
87             varNames: SQLVariablesNames(
88                 cfg["sqlAuthVariables"]["username"].get!string,
89                 cfg["sqlAuthVariables"]["password"].get!string
90             )
91         };
92 
93         // delegate
94         void afterConnectOrReconnect(__Conn conn)
95         {
96             if(prepArgs.methodsLoadedFlag)
97             {
98                 logDebugV("Preparing");
99                 auto failedStatementsNames = prepareStatements(conn, prepArgs);
100                 prepArgs.failedCount += failedStatementsNames.length;
101 
102                 foreach(n; failedStatementsNames)
103                     prepArgs.methods.remove(n);
104 
105                 logInfo("Number of statements in the table "~prepArgs.tableName~": "~
106                     prepArgs.rpcTableLength.to!string~", failed to prepare: "~prepArgs.failedCount.to!string);
107             }
108         }
109 
110         // connect to db
111         auto client = new PostgresClient(connString, maxConn, &afterConnectOrReconnect);
112 
113         {
114             auto conn = client.lockConnection();
115             auto sqlPgatorTable = cfg["sqlPgatorTable"].get!string;
116 
117             // read pgator_rpc
118             prepArgs.tableName = conn.escapeIdentifier(sqlPgatorTable);
119 
120             QueryParams p;
121             p.sqlCommand = "SELECT * FROM "~prepArgs.tableName;
122             auto answer = conn.execStatement(p);
123 
124             prepArgs.rpcTableLength = answer.length;
125             auto readMethodsResult = readMethods(answer);
126             prepArgs.methods = readMethodsResult.methods;
127             prepArgs.methodsLoadedFlag = true;
128 
129             {
130                 prepArgs.failedCount = prepArgs.rpcTableLength - readMethodsResult.loaded;
131                 logDebugV("Number of statements in the table "~prepArgs.tableName~": "~prepArgs.rpcTableLength.to!string~", failed to load into pgator: "~prepArgs.failedCount.to!string);
132             }
133 
134             // prepare statements for previously used connection
135             afterConnectOrReconnect(conn);
136 
137             delete conn;
138         }
139 
140         if(!checkStatements)
141         {
142             immutable methods = cast(immutable) prepArgs.methods.dup;
143             loop(cfg, client, methods);
144         }
145 
146         return prepArgs.failedCount ? 2 : 0;
147     }
148     catch(Exception e)
149     {
150         logFatal(e.msg);
151 
152         return 1;
153     }
154 }
155 
156 void loop(in Bson cfg, PostgresClient client, immutable Method[string] methods)
157 {
158     debug(BreakUpSomeConnections) // for testing purposes
159     {
160         auto s1 = client.lockConnection();
161         auto s2 = client.lockConnection();
162 
163         s1.socket.send([0x12, 0x13]); // just garbage
164         s2.socket.send([0x12, 0x13]);
165 
166         delete s1;
167         delete s2;
168     }
169 
170     // http-server
171     import vibe.core.core;
172 
173     void httpRequestHandler(scope HTTPServerRequest req, HTTPServerResponse res)
174     {
175         try
176         {
177             RpcRequestResults results = performRpcRequests(methods, client, req);
178 
179             final switch(results.type)
180             {
181                 case RpcType.vibedREST:
182                     auto result = &results.results[0];
183 
184                     if(result.exception is null)
185                     {
186                         res.writeJsonBody(result.responseBody["result"]);
187                     }
188                     else
189                     {
190                         res.writeJsonBody(result.responseBody, result.exception.httpCode);
191                     }
192 
193                     break;
194 
195                 case RpcType.jsonRpc:
196                     auto result = &results.results[0];
197 
198                     if(result.exception is null)
199                     {
200                         if(result.isNotify)
201                         {
202                             res.statusCode = HTTPStatus.noContent;
203                             res.writeVoidBody();
204                         }
205                         else
206                         {
207                             res.writeJsonBody(result.responseBody);
208                         }
209                     }
210                     else // error handling
211                     {
212                         if(result.isNotify)
213                         {
214                             res.statusCode = HTTPStatus.noContent;
215                             res.statusPhrase = result.exception.msg;
216                             res.writeVoidBody();
217                         }
218                         else
219                         {
220                             res.writeJsonBody(result.responseBody, result.exception.httpCode);
221                         }
222                     }
223 
224                     break;
225 
226                 case RpcType.jsonRpcBatchMode:
227                     Bson[] ret;
228 
229                     foreach(ref r; results.results) // fill response array
230                     {
231                         if(!r.isNotify) // skip notify responses
232                             ret ~= r.responseBody;
233                     }
234 
235                     if(ret.length)
236                     {
237                         res.writeJsonBody(Bson(ret)); // normal batch response
238                     }
239                     else
240                     {
241                         res.statusCode = HTTPStatus.noContent;
242                         res.writeVoidBody(); // empty response for batch with notifies only
243                     }
244 
245                     break;
246             }
247         }
248         catch(LoopException e)
249         {
250             res.writeJsonBody(Bson(e.msg), e.httpCode); // FIXME: wrong error body format
251 
252             logWarn(e.msg);
253         }
254         catch(Exception e)
255         {
256             logFatal(e.toString);
257         }
258     }
259 
260     //setupWorkerThreads(); // TODO: read number of threads from config
261 
262     auto settings = new HTTPServerSettings;
263     // settings.options |= HTTPServerOption.distribute; // causes stuck on epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6
264     settings.options |= HTTPServerOption.parseJsonBody;
265     settings.bindAddresses = cfg["listenAddresses"].deserializeBson!(string[]);
266     settings.port = to!ushort(cfg["listenPort"].get!long);
267 
268     auto listenHandler = listenHTTP(settings, &httpRequestHandler);
269 
270     runEventLoop();
271 }
272 
273 struct SQLVariablesNames
274 {
275     string username;
276     string password;
277 }
278 
279 private Bson execMethod(
280     PostgresClient client,
281     in Method method,
282     in RpcRequest rpcRequest
283 )
284 {
285     TransactionQueryParams qp;
286     qp.auth = rpcRequest.auth;
287     qp.queryParams.length = method.statements.length;
288     size_t paramCounter = 0;
289 
290     foreach(i, statement; method.statements)
291     {
292         qp.queryParams[i].preparedStatementName = preparedName(method, statement);
293 
294         if(rpcRequest.positionParams.length == 0)
295         {
296             if(rpcRequest.namedParams !is null) // named parameters with types
297             {
298                 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParams);
299             }
300             else // named parameters without types
301             {
302                 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParamsStringValues);
303             }
304         }
305         else // positional parameters
306         {
307             if(rpcRequest.positionParams.length - paramCounter < statement.argsNames.length)
308                 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too few", __FILE__, __LINE__);
309 
310             qp.queryParams[i].args = new Value[statement.argsNames.length];
311 
312             foreach(n, ref b; rpcRequest.positionParams[paramCounter .. paramCounter + statement.argsNames.length])
313             {
314                 auto v = &qp.queryParams[i].args[n];
315                 const oid = statement.argsOids[n];
316 
317                 *v = bsonToValue(b, oid);
318 
319                 if(v.oidType != oid)
320                     throw new LoopException(
321                         JsonRpcErrorCode.invalidParams,
322                         HTTPStatus.badRequest,
323                         "Parameter #"~i.to!string~" type is "~v.oidType.to!string~", but expected "~oid.to!string,
324                         __FILE__, __LINE__);
325             }
326 
327             paramCounter += statement.argsNames.length;
328         }
329     }
330 
331     if(rpcRequest.positionParams.length != 0 && paramCounter != rpcRequest.positionParams.length)
332     {
333         assert(paramCounter < rpcRequest.positionParams.length);
334         throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too big", __FILE__, __LINE__);
335     }
336 
337     SQLTransaction trans = SQLTransaction(client);
338 
339     try
340     {
341         if(method.needAuthVariablesFlag && !qp.auth.authVariablesSet)
342             throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.unauthorized, "Basic HTTP authentication need", __FILE__, __LINE__);
343 
344         trans.begin(method.readOnlyFlag);
345 
346         immutable answer = trans.execMethod(method, qp);
347 
348         enforce(answer.length == method.statements.length);
349 
350         Bson ret = Bson.emptyObject;
351 
352         if(!method.isMultiStatement)
353         {
354             ret = formatResult(answer[0], method.statements[0].resultFormat);
355         }
356         else
357         {
358             foreach(i, statement; method.statements)
359             {
360                 if(statement.resultFormat != ResultFormat.VOID)
361                     ret[statement.resultName] = formatResult(answer[i], statement.resultFormat);
362             }
363         }
364 
365         trans.commit();
366 
367         return ret;
368     }
369     catch(PostgresClientTimeoutException e)
370     {
371         trans.resetStart();
372         throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__);
373     }
374     catch(ConnectionException e)
375     {
376         trans.resetStart();
377         throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__);
378     }
379     catch(AnswerCreationException e)
380     {
381         throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__, e);
382     }
383     finally
384     {
385         destroy(trans);
386     }
387 }
388 
389 private Bson formatResult(immutable Answer answer, ResultFormat format)
390 {
391     Bson getValue(size_t rowNum, size_t colNum)
392     {
393         string columnName = answer.columnName(colNum);
394 
395         try
396         {
397             return answer[rowNum][colNum].as!Bson;
398         }
399         catch(ValueConvException e)
400         {
401             e.msg = "Column "~columnName~" ("~rowNum.to!string~" row): "~e.msg;
402             throw e;
403         }
404     }
405 
406     with(ResultFormat)
407     final switch(format)
408     {
409         case CELL:
410         {
411             if(answer.length != 1 || answer.columnCount != 1)
412                 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One cell flag constraint failed", __FILE__, __LINE__);
413 
414             return getValue(0, 0);
415         }
416 
417         case ROW:
418         {
419             if(answer.length != 1)
420                 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One row flag constraint failed", __FILE__, __LINE__);
421 
422             Bson ret = Bson.emptyObject;
423 
424             foreach(colNum; 0 .. answer.columnCount)
425                 ret[answer.columnName(colNum)] = getValue(0, colNum);
426 
427             return ret;
428         }
429 
430         case TABLE:
431         {
432             Bson ret = Bson.emptyObject;
433 
434             foreach(colNum; 0 .. answer.columnCount)
435             {
436                 Bson[] col = new Bson[answer.length];
437 
438                 foreach(rowNum; 0 .. answer.length)
439                     col[rowNum] = getValue(rowNum, colNum);
440 
441                 ret[answer.columnName(colNum)] = col;
442             }
443 
444             return ret;
445         }
446 
447         case ROTATED:
448         {
449             Bson[] ret = new Bson[answer.length];
450 
451             foreach(rowNum; 0 .. answer.length)
452             {
453                 Bson row = Bson.emptyObject;
454 
455                 foreach(colNum; 0 .. answer.columnCount)
456                     row[answer.columnName(colNum)] = getValue(rowNum, colNum);
457 
458                 ret[rowNum] = row;
459             }
460 
461             return Bson(ret);
462         }
463 
464         case VOID:
465         {
466             return Bson.emptyObject;
467         }
468     }
469 }
470 
471 Value[] named2positionalParameters(T)(in Statement method, in T[string] namedParams)
472 if(is(T == Bson) || is(T == string))
473 {
474     Value[] ret = new Value[method.argsNames.length];
475 
476     foreach(i, argName; method.argsNames)
477     {
478         auto argValue = argName in namedParams;
479 
480         if(argValue)
481         {
482             const oid = method.argsOids[i];
483             Value v;
484 
485             static if(is(T == Bson))
486             {
487                 v = bsonToValue(*argValue, oid);
488 
489                 if(v.oidType != oid)
490                     throw new LoopException(
491                         JsonRpcErrorCode.invalidParams,
492                         HTTPStatus.badRequest,
493                         argName~" parameter type is "~v.oidType.to!string~", but expected "~oid.to!string,
494                         __FILE__, __LINE__);
495             }
496             else // T == string, unknown parameter type
497             {
498                 // Using Postgres ability to determine argument type
499                 v = toValue(*argValue, ValueFormat.TEXT);
500             }
501 
502             ret[i] = v;
503         }
504         else
505         {
506             throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Missing required parameter "~argName, __FILE__, __LINE__);
507         }
508     }
509 
510     return ret;
511 }
512 
513 private struct AuthorizationCredentials
514 {
515     bool authVariablesSet = false;
516     string username;
517     string password;
518 }
519 
520 RpcRequestResults performRpcRequests(immutable Method[string] methods, PostgresClient client, scope HTTPServerRequest req)
521 {
522     RpcRequestResults ret;
523 
524     // Recognition of request type
525     RpcRequest[] dbRequests;
526 
527     if(req.method == HTTPMethod.GET)
528     {
529         ret.type = RpcType.vibedREST;
530 
531         dbRequests.length = 1;
532         dbRequests[0] = RpcRequest.vibeRestGetToRpcRequest(req);
533         dbRequests[0].type = RpcType.vibedREST;
534     }
535     else
536     {
537         if(req.contentType != "application/json")
538             throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.unsupportedMediaType, "Supported only application/json content type", __FILE__, __LINE__);
539 
540         Json j = req.json;
541 
542         switch(j.type)
543         {
544             case Json.Type.array:
545             {
546                 if(!j.length)
547                     throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Empty JSON-RPC 2.0 batch array", __FILE__, __LINE__);
548 
549                 ret.type = RpcType.jsonRpcBatchMode;
550                 dbRequests.length = j.length;
551 
552                 foreach(i, ref request; dbRequests)
553                 {
554                     if(!RpcRequest.isValidJsonRpcRequest(j[i]))
555                         throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Isn't JSON-RPC 2.0 protocol", __FILE__, __LINE__);
556 
557                     request = RpcRequest.jsonToRpcRequest(j[i], req);
558                     request.type = RpcType.jsonRpcBatchMode;
559                 }
560 
561                 break;
562             }
563 
564             case Json.Type.object:
565                 dbRequests.length = 1;
566 
567                 if(RpcRequest.isValidJsonRpcRequest(j))
568                 {
569                     dbRequests[0] = RpcRequest.jsonToRpcRequest(j, req);
570                     dbRequests[0].type = RpcType.jsonRpc;
571                     ret.type = RpcType.jsonRpc;
572                 }
573                 else // JSON vibe.d REST POST
574                 {
575                     dbRequests[0] = RpcRequest.vibeRestToRpcRequest(j, req);
576                     dbRequests[0].type = RpcType.vibedREST;
577                     ret.type = RpcType.vibedREST;
578                 }
579 
580                 break;
581 
582             default:
583                 throw new LoopException(JsonRpcErrorCode.parseError, HTTPStatus.badRequest, "Parse error", __FILE__, __LINE__);
584         }
585     }
586 
587     ret.results.length = dbRequests.length;
588 
589     foreach(i, const ref request; dbRequests)
590     {
591         ret.results[i] = async({
592             return request.performRpcRequest(methods, client);
593         });
594     }
595 
596     return ret;
597 }
598 
599 struct RpcRequest
600 {
601     Bson id;
602     string methodName;
603     Bson[string] namedParams = null;
604     string[string] namedParamsStringValues = null; /// used if types of parameters is unknown
605     Bson[] positionParams = null;
606     AuthorizationCredentials auth;
607     RpcType type; // used only for pretty error formatting
608 
609     invariant()
610     {
611         size_t count = 0;
612 
613         if(namedParams !is null) count++;
614         if(namedParamsStringValues !is null) count++;
615         if(positionParams !is null) count++;
616 
617         assert(count <= 1);
618     }
619 
620     bool isNotify() const
621     {
622         return id.type == Bson.Type.undefined;
623     }
624 
625     private static bool isValidJsonRpcRequest(scope Json j)
626     {
627         return j["jsonrpc"] == "2.0";
628     }
629 
630     private static RpcRequest jsonToRpcRequest(scope Json j, scope HTTPServerRequest req)
631     {
632         RpcRequest r;
633 
634         r.id = j["id"];
635         r.methodName = j["method"].get!string;
636 
637         Json params = j["params"];
638 
639         switch(params.type)
640         {
641             case Json.Type.undefined: // params omitted
642                 break;
643 
644             case Json.Type.object:
645                 foreach(string key, value; params)
646                     r.namedParams[key] = value;
647 
648                 break;
649 
650             case Json.Type.array:
651                 foreach(value; params)
652                     r.positionParams ~= Bson(value);
653 
654                 break;
655 
656             default:
657                 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Unexpected params type", __FILE__, __LINE__);
658         }
659 
660         // pick out name and password from the request
661         {
662             bool pwcheck(string _username, string _password)
663             {
664                 r.auth.username = _username;
665                 r.auth.password = _password;
666 
667                 return true;
668             }
669 
670             r.auth.authVariablesSet = checkBasicAuth(req, &pwcheck);
671         }
672 
673         return r;
674     }
675 
676     /// Converts Vibe.d REST client GET request to RpcRequest
677     private static RpcRequest vibeRestGetToRpcRequest(ref HTTPServerRequest req)
678     {
679         RpcRequest r;
680 
681         enforce(req.path.length > 0);
682         r.methodName = req.path[1..$]; // strips first '/'
683 
684         foreach(string key, ref value; req.query)
685             r.namedParamsStringValues[key] = value;
686 
687         r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify"
688 
689         return r;
690     }
691 
692 
693     /// Converts Vibe.d REST client request to RpcRequest
694     private static RpcRequest vibeRestToRpcRequest(scope Json j, ref HTTPServerRequest req)
695     {
696         RpcRequest r;
697 
698         enforce(req.path.length > 0);
699         r.methodName = req.path[1..$]; // strips first '/'
700 
701         foreach(string key, ref value; j)
702             r.namedParams[key] = value;
703 
704         r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify"
705 
706         return r;
707     }
708 
709     RpcRequestResult performRpcRequest(immutable Method[string] methods, PostgresClient client) const
710     {
711         try
712         {
713             try
714             {
715                 const method = (methodName in methods);
716 
717                 if(method is null)
718                     throw new LoopException(JsonRpcErrorCode.methodNotFound, HTTPStatus.badRequest, "Method "~methodName~" not found", __FILE__, __LINE__);
719 
720                 RpcRequestResult ret;
721                 ret.isNotify = isNotify;
722 
723                 if(!ret.isNotify)
724                 {
725                     ret.responseBody = Bson(["id": id]);
726                     ret.responseBody["jsonrpc"] = "2.0";
727                     ret.responseBody["result"] = client.execMethod(*method, this);
728                 }
729                 else // JSON-RPC 2.0 Notification
730                 {
731                     client.execMethod(*method, this);
732                     ret.responseBody = Bson.emptyObject;
733                 }
734 
735                 return ret;
736             }
737             catch(ValueConvException e)
738             {
739                 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__);
740             }
741         }
742         catch(LoopException e)
743         {
744             Bson err = Bson.emptyObject;
745 
746             if(type != RpcType.vibedREST)
747             {
748                 err["id"] = id;
749                 err["jsonrpc"] = "2.0";
750             }
751 
752             if(e.answerException is null)
753             {
754                 err["error"] = Bson([
755                     "message": Bson(e.msg),
756                     "code": Bson(e.jsonCode)
757                 ]);
758             }
759             else
760             {
761                 Bson data = Bson([
762                     "hint":    Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_HINT)),
763                     "detail":  Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_DETAIL)),
764                     "errcode": Bson(e.answerException.resultErrorField(PG_DIAG_SQLSTATE))
765                 ]);
766 
767                 err["error"] = Bson([
768                     "message": Bson(e.msg),
769                     "code": Bson(e.jsonCode),
770                     "data": data
771                 ]);
772             }
773 
774             RpcRequestResult ret;
775             ret.isNotify = isNotify;
776             ret.responseBody = err;
777             ret.exception = e;
778 
779             logWarn(methodName~": "~e.httpCode.to!string~" "~err.toString);
780 
781             return ret;
782         }
783     }
784 }
785 
786 private struct RpcRequestResult
787 {
788     Bson responseBody;
789     LoopException exception;
790     bool isNotify;
791 
792     void opAssign(shared RpcRequestResult s) shared
793     {
794         synchronized
795         {
796             // This need because Bson don't have shared opAssign
797             Bson copy = s.responseBody;
798             (cast() this.responseBody) = copy;
799 
800             this.exception = s.exception;
801             this.isNotify = s.isNotify;
802         }
803     }
804 }
805 
806 private struct RpcRequestResults
807 {
808     Future!RpcRequestResult[] results;
809     RpcType type;
810 }
811 
812 private enum RpcType
813 {
814     jsonRpc, /// Normal JSON mode response
815     jsonRpcBatchMode, /// Batch JSON mode response
816     vibedREST /// Vibe.d REST client mode response
817 }
818 
819 enum JsonRpcErrorCode : short
820 {
821     /// Invalid JSON was received by the server.
822     /// An error occurred on the server while parsing the JSON text
823     parseError = -32700,
824 
825     /// The JSON sent is not a valid Request object.
826     invalidRequest = -32600,
827 
828     /// Statement not found
829     methodNotFound = -32601,
830 
831     /// Invalid params
832     invalidParams = -32602,
833 
834     /// Internal error
835     internalError = -32603,
836 }
837 
838 class LoopException : Exception
839 {
840     const JsonRpcErrorCode jsonCode;
841     const HTTPStatus httpCode;
842     const AnswerCreationException answerException;
843 
844     this(JsonRpcErrorCode jsonCode, HTTPStatus httpCode, string msg, string file, size_t line, AnswerCreationException ae = null) pure
845     {
846         this.jsonCode = jsonCode;
847         this.httpCode = httpCode;
848         this.answerException = ae;
849 
850         super(msg, file, line);
851     }
852 }
853 
854 /// returns names of unprepared methods, but length is number of unprepared statements
855 private string[] prepareStatements(__Conn conn, ref PrepareStatementsArgs args)
856 {
857     {
858         logDebugV("try to prepare internal statements");
859 
860         conn.prepareStatement(BuiltInPrep.BEGIN, "BEGIN");
861         conn.prepareStatement(BuiltInPrep.BEGIN_RO, "BEGIN READ ONLY");
862         conn.prepareStatement(BuiltInPrep.COMMIT, "COMMIT");
863         conn.prepareStatement(BuiltInPrep.ROLLBACK, "ROLLBACK");
864         conn.prepareStatement(BuiltInPrep.SET_AUTH_VARS,
865             "SELECT set_config("~conn.escapeLiteral(args.varNames.username)~", $1, true), "~
866             "set_config("~conn.escapeLiteral(args.varNames.password)~", $2, true)");
867 
868         logDebugV("internal statements prepared");
869     }
870 
871     string[] failedStatements;
872 
873     foreach(ref method; args.methods.byValue)
874     {
875         foreach(ref statement; method.statements)
876         {
877             const prepName = preparedName(method, statement);
878 
879             logDebugV("try to prepare statement "~prepName~": "~statement.sqlCommand);
880 
881             try
882             {
883                 conn.prepareStatement(prepName, statement.sqlCommand);
884                 statement.argsOids = conn.retrieveArgsTypes(prepName);
885 
886                 logDebugV("statement "~prepName~" prepared");
887             }
888             catch(ConnectionException e)
889             {
890                 throw e;
891             }
892             catch(Exception e)
893             {
894                 logWarn("Skipping "~prepName~": "~e.msg);
895                 failedStatements ~= method.name;
896             }
897         }
898     }
899 
900     return failedStatements;
901 }
902 
903 string preparedName(in Method method, in Statement statement)
904 {
905     if(statement.statementNum < 0)
906     {
907         return method.name;
908     }
909     else
910     {
911         import std.conv: to;
912 
913         return method.name~"_"~statement.statementNum.to!string;
914     }
915 }
916 
917 private OidType[] retrieveArgsTypes(__Conn conn, string preparedStatementName)
918 {
919     auto desc = conn.describePreparedStatement(preparedStatementName);
920 
921     OidType[] ret = new OidType[desc.nParams];
922 
923     argsLoop:
924     foreach(i, ref t; ret)
925     {
926         t = desc.paramType(i);
927 
928         foreach(sup; argsSupportedTypes)
929         {
930             try
931                 if(t == sup || t == oidConvTo!"array"(sup)) continue argsLoop;
932             catch(ValueConvException)
933             {}
934         }
935 
936         throw new Exception("unsupported parameter $"~(i+1).to!string~" type: "~t.to!string, __FILE__, __LINE__);
937     }
938 
939     // Result fields type check
940     resultTypesLoop:
941     foreach(i; 0 .. desc.columnCount)
942     {
943         auto t = desc.OID(i);
944 
945         foreach(sup; resultSupportedTypes)
946         {
947             try
948                 if(t == sup || t == oidConvTo!"array"(sup)) continue resultTypesLoop;
949             catch(ValueConvException)
950             {}
951         }
952 
953         throw new Exception("unsupported result field "~desc.columnName(i)~" ("~i.to!string~") type: "~t.to!string, __FILE__, __LINE__);
954     }
955 
956     return ret;
957 }
958 
959 // Actually this is types what described in BSON specification
960 private immutable OidType[] argsSupportedTypes =
961 [
962     OidType.Bool,
963     OidType.Int4,
964     OidType.Int8,
965     OidType.Float8,
966     OidType.Text,
967     OidType.Json
968 ];
969 
970 // Types what can be converted to BSON
971 private immutable OidType[] resultSupportedTypes = argsSupportedTypes ~
972 [
973     OidType.Numeric,
974     OidType.FixedString,
975     OidType.Jsonb,
976     //OidType.UUID
977 ];