Source: Common/Services/Db.cs

using System.Data;
using Npgsql;
using Dapper;
using Newtonsoft.Json;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Options;
using ExcelDataReader;
using PublicAPI.Common.Extensions;
using PublicAPI.Specific.Services;
using Newtonsoft.Json.Linq;
using NpgsqlTypes;

namespace PublicAPI.Common.Services {

    /**
     * Database related operations   
     * @module Db
    */

    public class Db {

        private readonly ILogger<Db> _logger;
        private readonly IDistributedCache _cache;
        private readonly CurrentUserInfo _currentUserInfo;
        private readonly AppOptions _options;

        public Db(ILogger<Db> logger, IDistributedCache cache, CurrentUserInfo currentUserInfo, IOptions<AppOptions> options) {
            _logger = logger;
            _cache = cache;
            _currentUserInfo = currentUserInfo;
            _options = options.Value;
        }

        /**
            * Create database context with current user and language
            * @param {object} sender - Sender
            * @param {StateChangeEventArgs} e - Event arguments
        */
        private void CreateContext(object sender, StateChangeEventArgs e) {
            if (e.CurrentState == ConnectionState.Open) {
                NpgsqlConnection conn = sender as NpgsqlConnection;
                using var cmd = conn.CreateCommand();
                cmd.CommandText = "auth.create_public_context";
                cmd.CommandType = CommandType.StoredProcedure;
                //cmd.Parameters.Add(new NpgsqlParameter("PersonId", NpgsqlDbType.Integer) { Value = _currentUserInfo.PersonId });
                cmd.Parameters.AddWithValue("PersonId", _currentUserInfo.PersonId ?? (object)DBNull.Value);
                cmd.Parameters.AddWithValue("LangId", _currentUserInfo.LangId);
                cmd.Parameters.AddWithValue("IsAdmin", _currentUserInfo.IsAdmin);
                cmd.ExecuteNonQuery();
            }
        }

        /**
            * Create database connection
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @return {IDbConnection} - Connection
        */
        public IDbConnection CreateConnection(string connectionString, bool withContext = false) {
            var conn = new NpgsqlConnection(connectionString);
            if (withContext) {
                conn.StateChange += CreateContext;
            }
            return conn;
        }

        /**
            * Normalize procedure name
            * @param {string} procName - Procedure name
            * @return {string} - Normalized procedure name
        */
        public string NormalizeProcedureName(string procName) {
            return procName;
        }

        /**
            * Query json asynchroneously 
            * @param {string} cmd - Command returning json
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryJsonAsync(string cmd, object paramObject, string connectionString, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            string ret;
            try {
                using var connection = CreateConnection(connectionString, withContext);
                ret = (string)await connection.ExecuteScalarAsync(cmd, paramObject, commandType: cmdType);
            } catch (Exception ex) {
                ret = Helper.HandleException(_logger, ex);
            }
            return ret;
        }

        /**
            * Query json asynchroneously with caching
            * @param {string} cmd - Command returning json
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {string} cacheType - Cache type
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryJsonAsyncCached(string cmd, object paramObject, string connectionString, string cacheType, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            string key = $"{cmd}_{JsonConvert.SerializeObject(paramObject)}";
            if (withContext) key += "_" + _currentUserInfo.PersonId.ToString() + "_" + _currentUserInfo.LangId.ToString();
            string ret = await _cache.GetAsync<string>(key);
            if (ret == null) {
                try {
                    using var connection = CreateConnection(connectionString, withContext);
                    ret = (string)await connection.ExecuteScalarAsync(cmd, paramObject, commandType: cmdType);
                    await _cache.SetAsync(key, ret, cacheType);
                } catch (Exception ex) {
                    ret = Helper.HandleException(_logger, ex);
                }
            }
            return ret;
        }

        /**
            * Execute asynchroneously
            * @param {string} cmd - Command
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> ExecuteAsync(string cmd, object paramObject, string connectionString, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            string ret = "";
            try {
                using var connection = CreateConnection(connectionString, withContext);
                //await connection.ExecuteAsync("delete from meta.route where id = @Id", new { Id = 224 }, commandType: CommandType.Text);
                var result = await connection.ExecuteAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
            } catch (Exception ex) {
                ret = Helper.HandleException(_logger, ex);
            }
            return ret;
        }

        /**
            * Query asynchroneously
            * @param {string} cmd - Command returning table
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {bool} multiple - Multiple
            * @param {bool} camelCase - Camel case
            * @param {HashSet<string>} keys - Keys
            * @param {string} keyExpr - Key expression
            * @param {string[]} tableNames - Table names
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryAsync(string cmd, object paramObject, string connectionString, bool withContext = false, bool multiple = false, bool camelCase = false, HashSet<string> keys = null, string keyExpr = null, string[] tableNames = null, CommandType cmdType = CommandType.StoredProcedure) {
            string ret;
            try {
                using var connection = CreateConnection(connectionString, withContext);
                using var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                ret = Helper.SerializeReader(reader, multiple, camelCase, keys, keyExpr, tableNames);
            } catch (Exception ex) {
                ret = Helper.HandleException(_logger, ex);
            }
            return ret;
        }

        /**
            * Query asynchroneously
            * @param {string} cmd - Command returning frugal table (optimized for large tables)
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {bool} multiple - Multiple
            * @param {bool} camelCase - Camel case
            * @param {HashSet<string>} keys - Keys
            * @param {string} keyExpr - Key expression
            * @param {string[]} tableNames - Table names
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryAsyncFrugal(string cmd, object paramObject, string connectionString, bool withContext = false, bool multiple = false, bool camelCase = false, HashSet<string> keys = null, string keyExpr = null, string[] tableNames = null, CommandType cmdType = CommandType.StoredProcedure) {
            string ret;
            try {
                using var connection = CreateConnection(connectionString, withContext);
                using var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                ret = Helper.SerializeReaderFrugal(reader);
            } catch (Exception ex) {
                ret = Helper.HandleException(_logger, ex);
            }
            return ret;
        }

        /**
            * Query asynchroneously with caching
            * @param {string} cmd - Command returning frugal table (optimized for large tables)
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {string} cacheType - Cache type
            * @param {bool} withContext - With context
            * @param {bool} multiple - Multiple
            * @param {bool} camelCase - Camel case
            * @param {HashSet<string>} keys - Keys
            * @param {string} keyExpr - Key expression
            * @param {string[]} tableNames - Table names
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryAsyncFrugalCached(string cmd, object paramObject, string connectionString, string cacheType, bool withContext = false, bool multiple = false, bool camelCase = false, HashSet<string> keys = null, string keyExpr = null, string[] tableNames = null, CommandType cmdType = CommandType.StoredProcedure) {
            string key = $"{cmd}_{JsonConvert.SerializeObject(paramObject)}";
            if (withContext) key += "_" + _currentUserInfo.PersonId.ToString() + "_" + _currentUserInfo.LangId.ToString();
            string ret = await _cache.GetAsync<string>(key);
            if (ret == null) {
                try {
                    using var connection = CreateConnection(connectionString, withContext);
                    using var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                    ret = Helper.SerializeReaderFrugal(reader);
                    await _cache.SetAsync(key, ret, cacheType);
                } catch (Exception ex) {
                    ret = Helper.HandleException(_logger, ex);
                }
            }
            return ret;
        }

        /**
            * Query asynchroneously with caching
            * @param {string} cmd - Command returning table
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {string} cacheType - Cache type
            * @param {bool} withContext - With context
            * @param {bool} multiple - Multiple
            * @param {bool} camelCase - Camel case
            * @param {HashSet<string>} keys - Keys
            * @param {string} keyExpr - Key expression
            * @param {string[]} tableNames - Table names
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryAsyncCached(string cmd, object paramObject, string connectionString, string cacheType, bool withContext = false, bool multiple = false, bool camelCase = false, HashSet<string> keys = null, string keyExpr = null, string[] tableNames = null, CommandType cmdType = CommandType.StoredProcedure) {
            string key = $"{cmd}_{JsonConvert.SerializeObject(paramObject)}";
            if (withContext) key += "_" + _currentUserInfo.PersonId.ToString() + "_" + _currentUserInfo.LangId.ToString();
            string ret = await _cache.GetAsync<string>(key);
            if (ret == null) {
                try {
                    using var connection = CreateConnection(connectionString, withContext);
                    using var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                    ret = Helper.SerializeReader(reader, multiple, camelCase, keys, keyExpr, tableNames);
                    await _cache.SetAsync(key, ret, cacheType);
                } catch (Exception ex) {
                    ret = Helper.HandleException(_logger, ex);
                }
            }
            return ret;
        }

        /**
            * Query byte array asynchroneously
            * @param {string} cmd - Command returning byte array
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<byte[]>} - Result of the operation
        */
        public async Task<byte[]> QueryByteAsync(string cmd, object paramObject, string connectionString, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            using var connection = CreateConnection(connectionString, withContext);
            var scalar = await connection.ExecuteScalarAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
            return (byte[])scalar;
        }

        /**
            * Query table asynchroneously
            * @param {string} cmd - Command returning reader
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<DataTable>} - Result of the operation
        */
        public async Task<DataTable> QueryTableAsync(string cmd, object paramObject, string connectionString, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            using var connection = CreateConnection(connectionString, withContext);
            var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
            DataTable ret = new DataTable();
            ret.Load(reader);
            return ret;
        }

        /**
            * Query scalar asynchroneously cached
            * @param {string} cmd - Command returning scalar
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {string} cacheType - type of cache e.g. long, short etc.
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryScalarAsyncCached(string cmd, object paramObject, string connectionString, string cacheType, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            string key = $"{cmd}_{JsonConvert.SerializeObject(paramObject)}";
            if (withContext) key += "_" + _currentUserInfo.PersonId.ToString() + "_" + _currentUserInfo.LangId.ToString();
            string ret = await _cache.GetAsync<string>(key);
            if (ret == null) {
                try {
                    using var connection = CreateConnection(connectionString, withContext);
                    var scalar = await connection.ExecuteScalarAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                    ret = (scalar ?? "").ToString(); // JsonConvert.SerializeObject(scalar);
                    await _cache.SetAsync(key, ret, cacheType);
                } catch (Exception ex) {
                    ret = Helper.HandleException(_logger, ex);
                }
            }
            return ret;
        }


        /**
            * Query scalar asynchroneously 
            * @param {string} cmd - Command returning scalar
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QueryScalarAsync(string cmd, object paramObject, string connectionString, bool withContext = false, CommandType cmdType = CommandType.StoredProcedure) {
            string ret;
            try {
                using var connection = CreateConnection(connectionString, withContext);
                var scalar = await connection.ExecuteScalarAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                ret = (scalar ?? "").ToString(); // JsonConvert.SerializeObject(scalar);
            } catch (Exception ex) {
                ret = Helper.HandleException(_logger, ex);
            }
            return ret;
        }

        /**
            * Query single row asynchroneously 
            * @param {string} cmd - Command returning single row
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QuerySingleAsync(string cmd, object paramObject, string connectionString, bool withContext = false, bool camelCase = false, CommandType cmdType = CommandType.StoredProcedure, bool returnAttributesIfEmpty = false) {
            string ret;
            try {
                using var connection = CreateConnection(connectionString, withContext);
                using var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                ret = Helper.SerializeSingle(reader, camelCase, returnAttributesIfEmpty);
            } catch (Exception ex) {
                ret = Helper.HandleException(_logger, ex);
            }
            return ret;
        }

        /**
            * Query single row asynchroneously cached
            * @param {string} cmd - Command returning single row
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {string} cacheType - type of cache e.g. long, short etc.
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<string>} - Result of the operation
        */
        public async Task<string> QuerySingleAsyncCached(string cmd, object paramObject, string connectionString, string cacheType, bool withContext = false, bool camelCase = false, CommandType cmdType = CommandType.StoredProcedure, bool returnAttributesIfEmpty = false) {
            string key = $"{cmd}_{JsonConvert.SerializeObject(paramObject)}";
            if (withContext) key += "_" + _currentUserInfo.PersonId.ToString() + "_" + _currentUserInfo.LangId.ToString();
            string ret = await _cache.GetAsync<string>(key);
            if (ret == null) {
                try {
                    using var connection = CreateConnection(connectionString, withContext);
                    using var reader = await connection.ExecuteReaderAsync(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
                    ret = Helper.SerializeSingle(reader, camelCase, returnAttributesIfEmpty);
                    await _cache.SetAsync(key, ret, cacheType);
                } catch (Exception ex) {
                    ret = Helper.HandleException(_logger, ex);
                }
            }
            return ret;
        }

        /**
            * Query single object of type T asyncronously
            * @param {string} cmd - Command returning single row compatible with T. Must return TABLE.
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<T>} - Result of the operation
        */
        public async Task<T> QuerySingleAsync<T>(string cmd, object paramObject, string connectionString, bool withContext = false, bool camelCase = false, CommandType cmdType = CommandType.StoredProcedure) {
            using var connection = CreateConnection(connectionString, withContext);
            connection.Open();
            var ret = await connection.QuerySingleAsync<T>(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
            return ret;
        }

        /**
            * Query list of type T asyncronously
            * @param {string} cmd - Command returning recordset compatible with T. Must return TABLE.
            * @param {object} paramObject - Parameters
            * @param {string} connectionString - Connection string
            * @param {bool} withContext - With context
            * @param {CommandType} cmdType - Command type
            * @return {Task<T>} - Result of the operation
        */
        public async Task<IEnumerable<List<T>>> QueryAsync<T>(string cmd, object paramObject, string connectionString, bool withContext = false, bool camelCase = false, CommandType cmdType = CommandType.StoredProcedure) {
            using var connection = CreateConnection(connectionString, withContext);
            connection.Open();
            var ret = await connection.QueryAsync<List<T>>(NormalizeProcedureName(cmd), paramObject, commandType: cmdType);
            return ret;
        }

        public async Task<List<T>> QueryListAsync<T>(string cmd, object paramObject, string connectionString, bool withContext = false, bool camelCase = false, CommandType cmdType = CommandType.StoredProcedure) {
            using var connection = CreateConnection(connectionString, withContext);
            connection.Open();
            var ret = (await connection.QueryAsync<T>(NormalizeProcedureName(cmd), paramObject, commandType: cmdType)).AsList();
            return ret;
        }

        /**
         * Check if the function can be safely called
         * @param {string} procName - Name of the function
         * @param {string} connectionString - Connection string
         * @return {Task<string>} - Result of the operation
         */
        public async Task<string> IsFunctionSafe(string procName, string connectionString) {
            using var connection = CreateConnection(connectionString);
            var ret = await QueryScalarAsyncCached("meta.is_function_safe", new { Function = procName }, connectionString, "long");
            return ret;
        }

        /**
            * Imports excel file into database
            * @param {string} filename - Name of the file
            * @param {string} sheet - Name of the sheet (can be null)
            * @param {string} preprocess - Name of the function that preprocesses the data
            * @param {Stream} file - Data stream
            * @param {string} procName - procedure what handles JSON data 
            * @param {string} pars - function parameters in JSON format, 
            * @param {string} connectionString 
            * @param {bool} withContext - With context
            * @return {Task<string>} - Status of the operation
        */
        public async Task<string> ImportExcel(string filename, Stream file, string pars, string connectionString, bool withContext = false) {
            
            var p = JsonConvert.DeserializeObject<Dictionary<string, string>>(pars);
            string sheet = null, preprocess = null, procName = null, analyze = null;
            
            p.TryGetValue("sheet", out sheet);
            p.TryGetValue("preprocess", out preprocess);
            p.TryGetValue("proc_name", out procName);
            p.TryGetValue("analyze", out analyze);
           
            if (procName != null && await IsFunctionSafe(procName, connectionString) != "True") return Helper.CreateError(_logger, "Unauthorized function " + procName);

            IExcelDataReader reader = null;
            DataTableReader r = null;

            try {
                
                System.Text.Encoding.RegisterProvider(System.Text.CodePagesEncodingProvider.Instance);
                if (filename.EndsWith(".csv")) {
                    reader = ExcelReaderFactory.CreateCsvReader(file);
                } else {
                    reader = ExcelReaderFactory.CreateReader(file);
                }
                var ds = reader.AsDataSet(new ExcelDataSetConfiguration()
                {
                    ConfigureDataTable = (_) => new ExcelDataTableConfiguration()
                    {
                        UseHeaderRow = true
                    }
                });

                DataTable t = null;
                if (sheet != "null" && sheet != null) {
                    t = ds.Tables[sheet];
                }
                if (t == null) {
                    t = ds.Tables[0];
                }

                string jsonColumns = "";

                if (analyze != "true") {  // real import
                    
                    r = t.CreateDataReader();
                    var jsonData = Helper.SerializeReader(r, excelReader: false);
                    PreprocessService.Preprocess(preprocess, ref jsonData, ref jsonColumns);
                    jsonData = jsonData.Replace("[]", "");
                    return await ExecuteAsync(procName, new { Params = pars, Json = jsonData }, connectionString, withContext);
                
                } else {  // analyze - create columns in import_detail

                    if (preprocess != "null") {
                        r = t.CreateDataReader();
                        var jsonData = Helper.SerializeReader(r, excelReader: false);
                        PreprocessService.Preprocess(preprocess, ref jsonData, ref jsonColumns);
                    } else { 
                        var stream = new StringWriter();
                        using var writer = new JsonTextWriter(stream);
                        writer.WriteStartArray();
                        foreach (DataColumn c in t.Columns) {
                            writer.WriteStartObject();
                            writer.WritePropertyName("col_name");
                            writer.WriteValue(c.ColumnName);
                            writer.WritePropertyName("col_type");
                            writer.WriteValue(c.DataType.ToString());
                            writer.WriteEndObject();
                        }
                        writer.WriteEndArray();
                        jsonColumns = stream.ToString();
                    }

                    using var connection = CreateConnection(connectionString, withContext);
                    var ret = await ExecuteAsync("meta.analyze_file",
                    new { Id = Int32.Parse(p["id"]), JsonData = jsonColumns }, connectionString, withContext);
                    return ret;
                }
            } catch (Exception ex) {
                return Helper.CreateError(_logger, ex.Message);
            } finally {
                reader.Close();
                r.Close();

            }
        }
    }
}