1 module pgator.app;
2 
3 import pgator.rpc_table;
4 import pgator.sql_transaction;
5 import dpq2.oids: OidType;
6 import dpq2.value: ValueConvException;
7 import std.getopt;
8 import std.typecons: Tuple;
9 import std.exception: enforce;
10 import std.conv: to, ConvException;
11 import vibe.http.server;
12 import vibe.http.auth.basic_auth: checkBasicAuth;
13 import vibe.core.concurrency;
14 import vibe.core.log;
15 import vibe.data.json;
16 import vibe.data.bson;
17 import vibe.db.postgresql;
18 
19 string configFileName = "/wrong/path/to/file.json";
20 bool debugEnabled = false;
21 bool checkStatements = false;
22 
23 void readOpts(string[] args)
24 {
25     try
26     {
27         auto helpInformation = getopt(
28                 args,
29                 "debug", &debugEnabled,
30                 "check", &checkStatements,
31                 "config", &configFileName
32             );
33     }
34     catch(Exception e)
35     {
36         logFatal(e.msg);
37         throw e;
38     }
39 
40     if(debugEnabled) setLogLevel = LogLevel.debugV;
41 }
42 
43 private Bson _cfg;
44 
45 Bson readConfig()
46 {
47     import std.file;
48 
49     Bson cfg;
50 
51     try
52     {
53         auto text = readText(configFileName);
54         cfg = Bson(parseJsonString(text));
55     }
56     catch(Exception e)
57     {
58         logFatal(e.msg);
59         throw e;
60     }
61 
62     return cfg;
63 }
64 
65 private struct PrepareStatementsArgs
66 {
67     const SQLVariablesNames varNames;
68     bool methodsLoadedFlag = false; // need for bootstrap
69     Method[string] methods;
70     size_t rpcTableLength;
71     size_t failedCount;
72     string tableName;
73 }
74 
75 int main(string[] args)
76 {
77     try
78     {
79         readOpts(args);
80         Bson cfg = readConfig();
81 
82         auto server = cfg["sqlServer"];
83         const connString = server["connString"].get!string;
84         auto maxConn = to!uint(server["maxConn"].get!long);
85 
86         PrepareStatementsArgs prepArgs = {
87             varNames: SQLVariablesNames(
88                 cfg["sqlAuthVariables"]["username"].get!string,
89                 cfg["sqlAuthVariables"]["password"].get!string
90             )
91         };
92 
93         // delegate
94         void afterConnectOrReconnect(Connection conn)
95         {
96             if(prepArgs.methodsLoadedFlag)
97             {
98                 logDebugV("Preparing");
99                 auto failedStatementsNames = prepareStatements(conn, prepArgs);
100                 prepArgs.failedCount += failedStatementsNames.length;
101 
102                 foreach(n; failedStatementsNames)
103                     prepArgs.methods.remove(n);
104 
105                 logInfo("Number of statements in the table "~prepArgs.tableName~": "~
106                     prepArgs.rpcTableLength.to!string~", failed to prepare: "~prepArgs.failedCount.to!string);
107             }
108         }
109 
110         // connect to db
111         auto client = new PostgresClient(connString, maxConn, &afterConnectOrReconnect);
112 
113         {
114             auto conn = client.lockConnection();
115             auto sqlPgatorTable = cfg["sqlPgatorTable"].get!string;
116 
117             // read pgator_rpc
118             prepArgs.tableName = conn.escapeIdentifier(sqlPgatorTable);
119 
120             QueryParams p;
121             p.sqlCommand = "SELECT * FROM "~prepArgs.tableName;
122             auto answer = conn.execStatement(p);
123 
124             prepArgs.rpcTableLength = answer.length;
125             auto readMethodsResult = readMethods(answer);
126             prepArgs.methods = readMethodsResult.methods;
127             prepArgs.methodsLoadedFlag = true;
128 
129             {
130                 prepArgs.failedCount = prepArgs.rpcTableLength - readMethodsResult.loaded;
131                 logDebugV("Number of statements in the table "~prepArgs.tableName~": "~prepArgs.rpcTableLength.to!string~", failed to load into pgator: "~prepArgs.failedCount.to!string);
132             }
133 
134             // prepare statements for previously used connection
135             afterConnectOrReconnect(conn);
136 
137             destroy(conn);
138         }
139 
140         if(!checkStatements)
141         {
142             immutable methods = cast(immutable) prepArgs.methods.dup;
143             loop(cfg, client, methods);
144         }
145 
146         return prepArgs.failedCount ? 2 : 0;
147     }
148     catch(Exception e)
149     {
150         logFatal(e.msg);
151 
152         return 1;
153     }
154 }
155 
156 void loop(in Bson cfg, PostgresClient client, immutable Method[string] methods)
157 {
158     // http-server
159     import vibe.core.core;
160 
161     void httpRequestHandler(scope HTTPServerRequest req, HTTPServerResponse res)
162     {
163         try
164         {
165             RpcRequestResults results = performRpcRequests(methods, client, req);
166 
167             final switch(results.type)
168             {
169                 case RpcType.vibedREST:
170                     auto result = &results.results[0];
171 
172                     if(result.exception is null)
173                     {
174                         res.writeJsonBody(result.responseBody["result"]);
175                     }
176                     else
177                     {
178                         res.writeJsonBody(result.responseBody, result.exception.httpCode);
179                     }
180 
181                     break;
182 
183                 case RpcType.jsonRpc:
184                     auto result = &results.results[0];
185 
186                     if(result.exception is null)
187                     {
188                         if(result.isNotify)
189                         {
190                             res.statusCode = HTTPStatus.noContent;
191                             res.writeVoidBody();
192                         }
193                         else
194                         {
195                             res.writeJsonBody(result.responseBody);
196                         }
197                     }
198                     else // error handling
199                     {
200                         if(result.isNotify)
201                         {
202                             res.statusCode = HTTPStatus.noContent;
203                             res.statusPhrase = result.exception.msg;
204                             res.writeVoidBody();
205                         }
206                         else
207                         {
208                             res.writeJsonBody(result.responseBody, result.exception.httpCode);
209                         }
210                     }
211 
212                     break;
213 
214                 case RpcType.jsonRpcBatchMode:
215                     Bson[] ret;
216 
217                     foreach(ref r; results.results) // fill response array
218                     {
219                         if(!r.isNotify) // skip notify responses
220                             ret ~= r.responseBody;
221                     }
222 
223                     if(ret.length)
224                     {
225                         res.writeJsonBody(Bson(ret)); // normal batch response
226                     }
227                     else
228                     {
229                         res.statusCode = HTTPStatus.noContent;
230                         res.writeVoidBody(); // empty response for batch with notifies only
231                     }
232 
233                     break;
234             }
235         }
236         catch(LoopException e)
237         {
238             res.writeJsonBody(Bson(e.msg), e.httpCode); // FIXME: wrong error body format
239 
240             logWarn(e.msg);
241         }
242         catch(Exception e)
243         {
244             logFatal(e.toString);
245         }
246     }
247 
248     //setupWorkerThreads(); // TODO: read number of threads from config
249 
250     auto settings = new HTTPServerSettings;
251     // settings.options |= HTTPServerOption.distribute; // causes stuck on epoll_wait () from /lib/x86_64-linux-gnu/libc.so.6
252     settings.options |= HTTPServerOption.parseJsonBody;
253     settings.bindAddresses = cfg["listenAddresses"].deserializeBson!(string[]);
254     settings.port = to!ushort(cfg["listenPort"].get!long);
255 
256     auto listenHandler = listenHTTP(settings, &httpRequestHandler);
257 
258     runEventLoop();
259 }
260 
261 struct SQLVariablesNames
262 {
263     string username;
264     string password;
265 }
266 
267 private Bson execMethod(
268     PostgresClient client,
269     in Method method,
270     in RpcRequest rpcRequest
271 )
272 {
273     TransactionQueryParams qp;
274     qp.auth = rpcRequest.auth;
275     qp.queryParams.length = method.statements.length;
276     size_t paramCounter = 0;
277 
278     foreach(i, statement; method.statements)
279     {
280         qp.queryParams[i].preparedStatementName = preparedName(method, statement);
281 
282         if(rpcRequest.positionParams.length == 0)
283         {
284             if(rpcRequest.namedParams !is null) // named parameters with types
285             {
286                 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParams);
287             }
288             else // named parameters without types
289             {
290                 qp.queryParams[i].args = named2positionalParameters(statement, rpcRequest.namedParamsStringValues);
291             }
292         }
293         else // positional parameters
294         {
295             if(rpcRequest.positionParams.length - paramCounter < statement.argsNames.length)
296                 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too few", __FILE__, __LINE__);
297 
298             qp.queryParams[i].args = new Value[statement.argsNames.length];
299 
300             foreach(n, ref b; rpcRequest.positionParams[paramCounter .. paramCounter + statement.argsNames.length])
301             {
302                 auto v = &qp.queryParams[i].args[n];
303                 const oid = statement.argsOids[n];
304 
305                 *v = bsonToValue(b, oid);
306 
307                 if(v.oidType != oid)
308                     throw new LoopException(
309                         JsonRpcErrorCode.invalidParams,
310                         HTTPStatus.badRequest,
311                         "Parameter #"~i.to!string~" type is "~v.oidType.to!string~", but expected "~oid.to!string,
312                         __FILE__, __LINE__);
313             }
314 
315             paramCounter += statement.argsNames.length;
316         }
317     }
318 
319     if(rpcRequest.positionParams.length != 0 && paramCounter != rpcRequest.positionParams.length)
320     {
321         assert(paramCounter < rpcRequest.positionParams.length);
322         throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Parameters number is too big", __FILE__, __LINE__);
323     }
324 
325     SQLTransaction trans = SQLTransaction(client);
326 
327     try
328     {
329         if(method.needAuthVariablesFlag && !qp.auth.authVariablesSet)
330             throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.unauthorized, "Basic HTTP authentication need", __FILE__, __LINE__);
331 
332         trans.begin(method.readOnlyFlag);
333 
334         immutable answer = trans.execMethod(method, qp);
335 
336         enforce(answer.length == method.statements.length);
337 
338         Bson ret = Bson.emptyObject;
339 
340         if(!method.isMultiStatement)
341         {
342             ret = formatResult(answer[0], method.statements[0].resultFormat);
343         }
344         else
345         {
346             foreach(i, statement; method.statements)
347             {
348                 if(statement.resultFormat != ResultFormat.VOID)
349                     ret[statement.resultName] = formatResult(answer[i], statement.resultFormat);
350             }
351         }
352 
353         trans.commit();
354 
355         return ret;
356     }
357     catch(PostgresClientTimeoutException e)
358     {
359         trans.resetStart();
360         throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__);
361     }
362     catch(ConnectionException e)
363     {
364         trans.resetStart();
365         throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__);
366     }
367     catch(AnswerCreationException e)
368     {
369         throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__, e);
370     }
371     finally
372     {
373         destroy(trans);
374     }
375 }
376 
377 private Bson formatResult(immutable Answer answer, ResultFormat format)
378 {
379     Bson getValue(size_t rowNum, size_t colNum)
380     {
381         string columnName = answer.columnName(colNum);
382 
383         try
384         {
385             return answer[rowNum][colNum].as!Bson;
386         }
387         catch(ValueConvException e)
388         {
389             e.msg = "Column "~columnName~" ("~rowNum.to!string~" row): "~e.msg;
390             throw e;
391         }
392     }
393 
394     with(ResultFormat)
395     final switch(format)
396     {
397         case CELL:
398         {
399             if(answer.length != 1 || answer.columnCount != 1)
400                 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One cell flag constraint failed", __FILE__, __LINE__);
401 
402             return getValue(0, 0);
403         }
404 
405         case ROW:
406         {
407             if(answer.length != 1)
408                 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, "One row flag constraint failed", __FILE__, __LINE__);
409 
410             Bson ret = Bson.emptyObject;
411 
412             foreach(colNum; 0 .. answer.columnCount)
413                 ret[answer.columnName(colNum)] = getValue(0, colNum);
414 
415             return ret;
416         }
417 
418         case TABLE:
419         {
420             Bson ret = Bson.emptyObject;
421 
422             foreach(colNum; 0 .. answer.columnCount)
423             {
424                 Bson[] col = new Bson[answer.length];
425 
426                 foreach(rowNum; 0 .. answer.length)
427                     col[rowNum] = getValue(rowNum, colNum);
428 
429                 ret[answer.columnName(colNum)] = col;
430             }
431 
432             return ret;
433         }
434 
435         case ROTATED:
436         {
437             Bson[] ret = new Bson[answer.length];
438 
439             foreach(rowNum; 0 .. answer.length)
440             {
441                 Bson row = Bson.emptyObject;
442 
443                 foreach(colNum; 0 .. answer.columnCount)
444                     row[answer.columnName(colNum)] = getValue(rowNum, colNum);
445 
446                 ret[rowNum] = row;
447             }
448 
449             return Bson(ret);
450         }
451 
452         case VOID:
453         {
454             return Bson.emptyObject;
455         }
456     }
457 }
458 
459 Value[] named2positionalParameters(T)(in Statement method, in T[string] namedParams)
460 if(is(T == Bson) || is(T == string))
461 {
462     Value[] ret = new Value[method.argsNames.length];
463 
464     foreach(i, argName; method.argsNames)
465     {
466         auto argValue = argName in namedParams;
467 
468         if(argValue)
469         {
470             const oid = method.argsOids[i];
471             Value v;
472 
473             static if(is(T == Bson))
474             {
475                 v = bsonToValue(*argValue, oid);
476 
477                 if(v.oidType != oid)
478                     throw new LoopException(
479                         JsonRpcErrorCode.invalidParams,
480                         HTTPStatus.badRequest,
481                         argName~" parameter type is "~v.oidType.to!string~", but expected "~oid.to!string,
482                         __FILE__, __LINE__);
483             }
484             else // T == string, unknown parameter type
485             {
486                 // Using Postgres ability to determine argument type
487                 v = toValue(*argValue, ValueFormat.TEXT);
488             }
489 
490             ret[i] = v;
491         }
492         else
493         {
494             throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Missing required parameter "~argName, __FILE__, __LINE__);
495         }
496     }
497 
498     return ret;
499 }
500 
501 private struct AuthorizationCredentials
502 {
503     bool authVariablesSet = false;
504     string username;
505     string password;
506 }
507 
508 RpcRequestResults performRpcRequests(immutable Method[string] methods, PostgresClient client, scope HTTPServerRequest req)
509 {
510     RpcRequestResults ret;
511 
512     // Recognition of request type
513     RpcRequest[] dbRequests;
514 
515     if(req.method == HTTPMethod.GET)
516     {
517         ret.type = RpcType.vibedREST;
518 
519         dbRequests.length = 1;
520         dbRequests[0] = RpcRequest.vibeRestGetToRpcRequest(req);
521         dbRequests[0].type = RpcType.vibedREST;
522     }
523     else
524     {
525         if(req.contentType != "application/json")
526             throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.unsupportedMediaType, "Supported only application/json content type", __FILE__, __LINE__);
527 
528         Json j = req.json;
529 
530         switch(j.type)
531         {
532             case Json.Type.array:
533             {
534                 if(!j.length)
535                     throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Empty JSON-RPC 2.0 batch array", __FILE__, __LINE__);
536 
537                 ret.type = RpcType.jsonRpcBatchMode;
538                 dbRequests.length = j.length;
539 
540                 foreach(i, ref request; dbRequests)
541                 {
542                     if(!RpcRequest.isValidJsonRpcRequest(j[i]))
543                         throw new LoopException(JsonRpcErrorCode.invalidRequest, HTTPStatus.badRequest, "Isn't JSON-RPC 2.0 protocol", __FILE__, __LINE__);
544 
545                     request = RpcRequest.jsonToRpcRequest(j[i], req);
546                     request.type = RpcType.jsonRpcBatchMode;
547                 }
548 
549                 break;
550             }
551 
552             case Json.Type.object:
553                 dbRequests.length = 1;
554 
555                 if(RpcRequest.isValidJsonRpcRequest(j))
556                 {
557                     dbRequests[0] = RpcRequest.jsonToRpcRequest(j, req);
558                     dbRequests[0].type = RpcType.jsonRpc;
559                     ret.type = RpcType.jsonRpc;
560                 }
561                 else // JSON vibe.d REST POST
562                 {
563                     dbRequests[0] = RpcRequest.vibeRestToRpcRequest(j, req);
564                     dbRequests[0].type = RpcType.vibedREST;
565                     ret.type = RpcType.vibedREST;
566                 }
567 
568                 break;
569 
570             default:
571                 throw new LoopException(JsonRpcErrorCode.parseError, HTTPStatus.badRequest, "Parse error", __FILE__, __LINE__);
572         }
573     }
574 
575     ret.results.length = dbRequests.length;
576 
577     foreach(i, const ref request; dbRequests)
578     {
579         ret.results[i] = async({
580             return request.performRpcRequest(methods, client);
581         });
582     }
583 
584     return ret;
585 }
586 
587 struct RpcRequest
588 {
589     Bson id;
590     string methodName;
591     Bson[string] namedParams = null;
592     string[string] namedParamsStringValues = null; /// used if types of parameters is unknown
593     Bson[] positionParams = null;
594     AuthorizationCredentials auth;
595     RpcType type; // used only for pretty error formatting
596 
597     invariant()
598     {
599         size_t count = 0;
600 
601         if(namedParams !is null) count++;
602         if(namedParamsStringValues !is null) count++;
603         if(positionParams !is null) count++;
604 
605         assert(count <= 1);
606     }
607 
608     bool isNotify() const
609     {
610         return id.type == Bson.Type.undefined;
611     }
612 
613     private static bool isValidJsonRpcRequest(scope Json j)
614     {
615         return j["jsonrpc"] == "2.0";
616     }
617 
618     private static RpcRequest jsonToRpcRequest(scope Json j, scope HTTPServerRequest req)
619     {
620         RpcRequest r;
621 
622         r.id = j["id"];
623         r.methodName = j["method"].get!string;
624 
625         Json params = j["params"];
626 
627         switch(params.type)
628         {
629             case Json.Type.undefined: // params omitted
630                 break;
631 
632             case Json.Type.object:
633                 foreach(string key, value; params)
634                     r.namedParams[key] = value;
635 
636                 break;
637 
638             case Json.Type.array:
639                 foreach(value; params)
640                     r.positionParams ~= Bson(value);
641 
642                 break;
643 
644             default:
645                 throw new LoopException(JsonRpcErrorCode.invalidParams, HTTPStatus.badRequest, "Unexpected params type", __FILE__, __LINE__);
646         }
647 
648         // pick out name and password from the request
649         {
650             bool pwcheck(string _username, string _password)
651             {
652                 r.auth.username = _username;
653                 r.auth.password = _password;
654 
655                 return true;
656             }
657 
658             r.auth.authVariablesSet = checkBasicAuth(req, &pwcheck);
659         }
660 
661         return r;
662     }
663 
664     /// Converts Vibe.d REST client GET request to RpcRequest
665     private static RpcRequest vibeRestGetToRpcRequest(ref HTTPServerRequest req)
666     {
667         RpcRequest r;
668 
669         enforce(req.path.length > 0);
670         r.methodName = req.path[1..$]; // strips first '/'
671 
672         foreach(string key, ref value; req.query)
673             r.namedParamsStringValues[key] = value;
674 
675         r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify"
676 
677         return r;
678     }
679 
680 
681     /// Converts Vibe.d REST client request to RpcRequest
682     private static RpcRequest vibeRestToRpcRequest(scope Json j, ref HTTPServerRequest req)
683     {
684         RpcRequest r;
685 
686         enforce(req.path.length > 0);
687         r.methodName = req.path[1..$]; // strips first '/'
688 
689         foreach(string key, ref value; j)
690             r.namedParams[key] = value;
691 
692         r.id = Bson("REST request"); // Means what it isn't JSON-RPC "notify"
693 
694         return r;
695     }
696 
697     RpcRequestResult performRpcRequest(immutable Method[string] methods, PostgresClient client) const
698     {
699         try
700         {
701             try
702             {
703                 const method = (methodName in methods);
704 
705                 if(method is null)
706                     throw new LoopException(JsonRpcErrorCode.methodNotFound, HTTPStatus.badRequest, "Method "~methodName~" not found", __FILE__, __LINE__);
707 
708                 RpcRequestResult ret;
709                 ret.isNotify = isNotify;
710 
711                 if(!ret.isNotify)
712                 {
713                     ret.responseBody = Bson(["id": id]);
714                     ret.responseBody["jsonrpc"] = "2.0";
715                     ret.responseBody["result"] = client.execMethod(*method, this);
716                 }
717                 else // JSON-RPC 2.0 Notification
718                 {
719                     client.execMethod(*method, this);
720                     ret.responseBody = Bson.emptyObject;
721                 }
722 
723                 return ret;
724             }
725             catch(ValueConvException e)
726             {
727                 throw new LoopException(JsonRpcErrorCode.internalError, HTTPStatus.internalServerError, e.msg, __FILE__, __LINE__);
728             }
729         }
730         catch(LoopException e)
731         {
732             Bson err = Bson.emptyObject;
733 
734             if(type != RpcType.vibedREST)
735             {
736                 err["id"] = id;
737                 err["jsonrpc"] = "2.0";
738             }
739 
740             if(e.answerException is null)
741             {
742                 err["error"] = Bson([
743                     "message": Bson(e.msg),
744                     "code": Bson(e.jsonCode)
745                 ]);
746             }
747             else
748             {
749                 Bson data = Bson([
750                     "hint":    Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_HINT)),
751                     "detail":  Bson(e.answerException.resultErrorField(PG_DIAG_MESSAGE_DETAIL)),
752                     "errcode": Bson(e.answerException.resultErrorField(PG_DIAG_SQLSTATE))
753                 ]);
754 
755                 err["error"] = Bson([
756                     "message": Bson(e.msg),
757                     "code": Bson(e.jsonCode),
758                     "data": data
759                 ]);
760             }
761 
762             RpcRequestResult ret;
763             ret.isNotify = isNotify;
764             ret.responseBody = err;
765             ret.exception = e;
766 
767             logWarn(methodName~": "~e.httpCode.to!string~" "~err.toString);
768 
769             return ret;
770         }
771     }
772 }
773 
774 private struct RpcRequestResult
775 {
776     Bson responseBody;
777     LoopException exception;
778     bool isNotify;
779 
780     void opAssign(shared RpcRequestResult s) shared
781     {
782         synchronized
783         {
784             // This need because Bson don't have shared opAssign
785             Bson copy = s.responseBody;
786             (cast() this.responseBody) = copy;
787 
788             this.exception = s.exception;
789             this.isNotify = s.isNotify;
790         }
791     }
792 }
793 
794 private struct RpcRequestResults
795 {
796     Future!RpcRequestResult[] results;
797     RpcType type;
798 }
799 
800 private enum RpcType
801 {
802     jsonRpc, /// Normal JSON mode response
803     jsonRpcBatchMode, /// Batch JSON mode response
804     vibedREST /// Vibe.d REST client mode response
805 }
806 
807 enum JsonRpcErrorCode : short
808 {
809     /// Invalid JSON was received by the server.
810     /// An error occurred on the server while parsing the JSON text
811     parseError = -32700,
812 
813     /// The JSON sent is not a valid Request object.
814     invalidRequest = -32600,
815 
816     /// Statement not found
817     methodNotFound = -32601,
818 
819     /// Invalid params
820     invalidParams = -32602,
821 
822     /// Internal error
823     internalError = -32603,
824 }
825 
826 class LoopException : Exception
827 {
828     const JsonRpcErrorCode jsonCode;
829     const HTTPStatus httpCode;
830     const AnswerCreationException answerException;
831 
832     this(JsonRpcErrorCode jsonCode, HTTPStatus httpCode, string msg, string file, size_t line, AnswerCreationException ae = null) pure
833     {
834         this.jsonCode = jsonCode;
835         this.httpCode = httpCode;
836         this.answerException = ae;
837 
838         super(msg, file, line);
839     }
840 }
841 
842 /// returns names of unprepared methods, but length is number of unprepared statements
843 private string[] prepareStatements(Connection conn, ref PrepareStatementsArgs args)
844 {
845     {
846         logDebugV("try to prepare internal statements");
847 
848         conn.prepareStatement(BuiltInPrep.BEGIN, "BEGIN");
849         conn.prepareStatement(BuiltInPrep.BEGIN_RO, "BEGIN READ ONLY");
850         conn.prepareStatement(BuiltInPrep.COMMIT, "COMMIT");
851         conn.prepareStatement(BuiltInPrep.ROLLBACK, "ROLLBACK");
852         conn.prepareStatement(BuiltInPrep.SET_AUTH_VARS,
853             "SELECT set_config("~conn.escapeLiteral(args.varNames.username)~", $1, true), "~
854             "set_config("~conn.escapeLiteral(args.varNames.password)~", $2, true)");
855 
856         logDebugV("internal statements prepared");
857     }
858 
859     string[] failedStatements;
860 
861     foreach(ref method; args.methods.byValue)
862     {
863         foreach(ref statement; method.statements)
864         {
865             const prepName = preparedName(method, statement);
866 
867             logDebugV("try to prepare statement "~prepName~": "~statement.sqlCommand);
868 
869             try
870             {
871                 conn.prepareStatement(prepName, statement.sqlCommand);
872                 statement.argsOids = conn.retrieveArgsTypes(prepName);
873 
874                 logDebugV("statement "~prepName~" prepared");
875             }
876             catch(ConnectionException e)
877             {
878                 throw e;
879             }
880             catch(Exception e)
881             {
882                 logWarn("Skipping "~prepName~": "~e.msg);
883                 failedStatements ~= method.name;
884             }
885         }
886     }
887 
888     return failedStatements;
889 }
890 
891 string preparedName(in Method method, in Statement statement)
892 {
893     if(statement.statementNum < 0)
894     {
895         return method.name;
896     }
897     else
898     {
899         import std.conv: to;
900 
901         return method.name~"_"~statement.statementNum.to!string;
902     }
903 }
904 
905 private OidType[] retrieveArgsTypes(Connection conn, string preparedStatementName)
906 {
907     auto desc = conn.describePreparedStatement(preparedStatementName);
908 
909     OidType[] ret = new OidType[desc.nParams];
910 
911     argsLoop:
912     foreach(i, ref t; ret)
913     {
914         t = desc.paramType(i);
915 
916         foreach(sup; argsSupportedTypes)
917         {
918             try
919                 if(t == sup || t == oidConvTo!"array"(sup)) continue argsLoop;
920             catch(ValueConvException)
921             {}
922         }
923 
924         throw new Exception("unsupported parameter $"~(i+1).to!string~" type: "~t.to!string, __FILE__, __LINE__);
925     }
926 
927     // Result fields type check
928     resultTypesLoop:
929     foreach(i; 0 .. desc.columnCount)
930     {
931         auto t = desc.OID(i);
932 
933         foreach(sup; resultSupportedTypes)
934         {
935             try
936                 if(t == sup || t == oidConvTo!"array"(sup)) continue resultTypesLoop;
937             catch(ValueConvException)
938             {}
939         }
940 
941         throw new Exception("unsupported result field "~desc.columnName(i)~" ("~i.to!string~") type: "~t.to!string, __FILE__, __LINE__);
942     }
943 
944     return ret;
945 }
946 
947 // Actually this is types what described in BSON specification
948 private immutable OidType[] argsSupportedTypes =
949 [
950     OidType.Bool,
951     OidType.Int4,
952     OidType.Int8,
953     OidType.Float8,
954     OidType.Text,
955     OidType.Json
956 ];
957 
958 // Types what can be converted to BSON
959 private immutable OidType[] resultSupportedTypes = argsSupportedTypes ~
960 [
961     OidType.Numeric,
962     OidType.FixedString,
963     OidType.Jsonb,
964     //OidType.UUID
965 ];