apb_extra_utils.postgres_pckg.psql_alchemy

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 31/03/2019
  5#  Copyright (c)
  6from apb_extra_utils.utils_logging import get_file_logger
  7from sqlalchemy.orm import sessionmaker
  8from sqlalchemy.engine import create_engine
  9from sqlalchemy import MetaData, Table, text
 10from collections import namedtuple
 11
 12
 13class EngPsqlAlchemy(object):
 14    """
 15    Clase que gestiona conexion sqlalchemy a Postgres
 16    """
 17    __slots__ = 'nom_con_db', 'eng_db', 'psw_con_db', 'logger', 'session_db'
 18
 19    def __init__(self, user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None):
 20        """
 21        Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado
 22
 23        Args:
 24            user (str):
 25            psw (str):
 26            srvr_db (str=None):
 27            port_db (int=None):
 28            db (str=None):
 29
 30        Returns:
 31            sqlalchemy.engine.base.Engine
 32        """
 33        nom_con = f"{user.upper()}@{db.upper()}"
 34        self.nom_con_db = nom_con
 35
 36        self.logger = a_logger
 37
 38        self.__set_logger()
 39        self.__set_conexion(user, psw, srvr_db, port_db, db)
 40
 41    def __set_logger(self):
 42        """
 43        Asigna el LOGGER po defecto si este no se ha informado al inicializar el gestor
 44
 45        Returns:
 46        """
 47        if self.logger is None:
 48            self.logger = get_file_logger(f'{self.__class__.__name__}({self.nom_con_db})')
 49
 50    def __set_conexion(self, user, psw, srvr_db, port_db, db):
 51        """
 52        Crea engine para database postgres con sqlalchemy
 53
 54        Args:
 55            user (str):
 56            psw (str):
 57            srvr_db (str):
 58            port_db (int):
 59            db (str):
 60        """
 61        str_conn = f'postgresql://{user}:{psw}@{srvr_db}:{port_db}/{db}'
 62        eng_db = create_engine(str_conn)
 63
 64        eng_db.connect()
 65
 66        self.eng_db = eng_db
 67        # self.eng_db.logger = self.logger
 68        self._set_session()
 69        self.psw_con_db = psw
 70
 71    def _set_session(self, eng_db=None):
 72        """
 73        Configura session para controlar workflow de transacciones (commit, rollback, close...)
 74
 75        Args:
 76            eng_db:
 77        """
 78        Session = sessionmaker()
 79        Session.configure(bind=eng_db if eng_db else self.eng_db)
 80        self.session_db = Session()
 81
 82    def __del__(self):
 83        """
 84        Cierra la conexion al matar la instancia
 85        """
 86        try:
 87            if hasattr(self, 'session_db'):
 88                self.session_db.close()
 89            if hasattr(self, "con_db"):
 90                self.eng_db = None
 91        except:
 92            pass
 93
 94    def commit(self):
 95        """
 96        Hace commit sobre la sesion actual
 97        """
 98        if self.session_db:
 99            self.session_db.commit()
100
101    def rollback(self):
102        """
103        Hace rollback sobre la sesion actual
104        """
105        if self.session_db:
106            self.session_db.rollback()
107
108    def iter_rows_result(self, query_res, nom_row=None):
109        """
110        Itera las filas del resultado como namedtuple con las claves de los campos seleccionados
111
112        Args:
113            query_res (sqlalchemy.engine.result.ResultProxy)
114            nom_row (str=None): nombre de la clase namedtuple si aplica
115
116        Yields:
117            namedtuple o tuple
118        """
119        row_dd = None
120        if query_res.keys():
121            row_dd = namedtuple(nom_row if nom_row else 'row_result',
122                                [n_col.replace(" ", "_") for n_col in query_res.keys()])
123
124        for row in query_res:
125            yield row_dd(*row) if row_dd else row
126
127    def table(self, nom_tab, schema=None):
128        """
129        Retorna acceso a tabla sobre engine DB (
130
131        Args:
132            nom_tab (str):
133            schema (str=None):
134
135        Returns:
136
137        """
138        if not self.eng_db.has_table(nom_tab, schema):
139            raise Warning(f"No existe la tabla '{nom_tab}' "
140                          f"{'para el esquema {} '.format(schema) if schema else ''}"
141                          f"sobre el user@database '{self.nom_con_db}'")
142
143        meta = MetaData()
144
145        extra_args = {}
146        if schema:
147            extra_args['schema'] = schema
148
149        a_tab = Table(nom_tab, meta, autoload=True, autoload_with=self.eng_db, **extra_args)
150
151        self.session_db.bind_table(a_tab, self.eng_db)
152
153        return a_tab
154
155    def rows_table(self, nom_tab, sql_query=None, **table_args):
156        """
157        Itera sobre los registros de la tabla
158
159        Args:
160            nom_tab (str):
161            sql_query (str=None):
162            **table_args: argumentos de la funcion table()
163
164        Yields:
165            namedtuple
166        """
167        tab = self.table(nom_tab, **table_args)
168
169        query = tab.select(text(sql_query) if sql_query else None)
170
171        res = self.session_db.execute(query)
172
173        for row in self.iter_rows_result(res, tab.name):
174            yield row
175
176    def insert_rows_table(self, nom_tab, row_values, **table_args):
177        """
178        Inserta los registros en la tabla
179
180        Args:
181            nom_tab (str):
182            row_values (list): lista de dicts con los valores de los campos a insertar por cada fila
183            **table_args: argumentos de la funcion table()
184
185        Returns:
186            list: lista de los rows_inserted (namedtuple)
187        """
188        tab = self.table(nom_tab, **table_args)
189        query = tab.insert(values=row_values).returning(*tab.columns)
190
191        res = self.session_db.execute(query)
192
193        rows_inserted = [*self.iter_rows_result(res, tab.name)]
194
195        return rows_inserted
196
197    def update_rows_table(self, tab, values, sql_query=None, **table_args):
198        """
199        Actualiza los registros de la tabla que cumplan con where_query (todos por defecto)
200
201        Args:
202            tab (str):
203            values (dict):
204            sql_query (str):
205            **table_args: argumentos de la funcion table()
206
207        Returns:
208            generator: iterador con la lista de elementos a devolver
209        """
210        tab = self.table(tab, **table_args)
211        query = tab.update().where(text(sql_query)).values(values).returning(*tab.columns)
212
213        res = self.session_db.execute(query)
214
215        return res
216
217    def remove_rows_table(self, nom_tab, sql_query=None, **table_args):
218        """
219        Borra los registros de la tabla que cumplan con where_query (todos por defecto)
220
221        Args:
222            nom_tab (str):
223            sql_query (str):
224            **table_args: argumentos de la funcion table()
225
226        Returns:
227            res
228        """
229        tab = self.table(nom_tab, **table_args)
230        query = tab.delete()
231
232        if sql_query:
233            query = query.where(text(sql_query))
234
235        res = self.session_db.execute(query)
236
237        return res
238
239
240if __name__ == '__main__':
241    from fire import Fire
242
243    Fire()
class EngPsqlAlchemy:
 14class EngPsqlAlchemy(object):
 15    """
 16    Clase que gestiona conexion sqlalchemy a Postgres
 17    """
 18    __slots__ = 'nom_con_db', 'eng_db', 'psw_con_db', 'logger', 'session_db'
 19
 20    def __init__(self, user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None):
 21        """
 22        Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado
 23
 24        Args:
 25            user (str):
 26            psw (str):
 27            srvr_db (str=None):
 28            port_db (int=None):
 29            db (str=None):
 30
 31        Returns:
 32            sqlalchemy.engine.base.Engine
 33        """
 34        nom_con = f"{user.upper()}@{db.upper()}"
 35        self.nom_con_db = nom_con
 36
 37        self.logger = a_logger
 38
 39        self.__set_logger()
 40        self.__set_conexion(user, psw, srvr_db, port_db, db)
 41
 42    def __set_logger(self):
 43        """
 44        Asigna el LOGGER po defecto si este no se ha informado al inicializar el gestor
 45
 46        Returns:
 47        """
 48        if self.logger is None:
 49            self.logger = get_file_logger(f'{self.__class__.__name__}({self.nom_con_db})')
 50
 51    def __set_conexion(self, user, psw, srvr_db, port_db, db):
 52        """
 53        Crea engine para database postgres con sqlalchemy
 54
 55        Args:
 56            user (str):
 57            psw (str):
 58            srvr_db (str):
 59            port_db (int):
 60            db (str):
 61        """
 62        str_conn = f'postgresql://{user}:{psw}@{srvr_db}:{port_db}/{db}'
 63        eng_db = create_engine(str_conn)
 64
 65        eng_db.connect()
 66
 67        self.eng_db = eng_db
 68        # self.eng_db.logger = self.logger
 69        self._set_session()
 70        self.psw_con_db = psw
 71
 72    def _set_session(self, eng_db=None):
 73        """
 74        Configura session para controlar workflow de transacciones (commit, rollback, close...)
 75
 76        Args:
 77            eng_db:
 78        """
 79        Session = sessionmaker()
 80        Session.configure(bind=eng_db if eng_db else self.eng_db)
 81        self.session_db = Session()
 82
 83    def __del__(self):
 84        """
 85        Cierra la conexion al matar la instancia
 86        """
 87        try:
 88            if hasattr(self, 'session_db'):
 89                self.session_db.close()
 90            if hasattr(self, "con_db"):
 91                self.eng_db = None
 92        except:
 93            pass
 94
 95    def commit(self):
 96        """
 97        Hace commit sobre la sesion actual
 98        """
 99        if self.session_db:
100            self.session_db.commit()
101
102    def rollback(self):
103        """
104        Hace rollback sobre la sesion actual
105        """
106        if self.session_db:
107            self.session_db.rollback()
108
109    def iter_rows_result(self, query_res, nom_row=None):
110        """
111        Itera las filas del resultado como namedtuple con las claves de los campos seleccionados
112
113        Args:
114            query_res (sqlalchemy.engine.result.ResultProxy)
115            nom_row (str=None): nombre de la clase namedtuple si aplica
116
117        Yields:
118            namedtuple o tuple
119        """
120        row_dd = None
121        if query_res.keys():
122            row_dd = namedtuple(nom_row if nom_row else 'row_result',
123                                [n_col.replace(" ", "_") for n_col in query_res.keys()])
124
125        for row in query_res:
126            yield row_dd(*row) if row_dd else row
127
128    def table(self, nom_tab, schema=None):
129        """
130        Retorna acceso a tabla sobre engine DB (
131
132        Args:
133            nom_tab (str):
134            schema (str=None):
135
136        Returns:
137
138        """
139        if not self.eng_db.has_table(nom_tab, schema):
140            raise Warning(f"No existe la tabla '{nom_tab}' "
141                          f"{'para el esquema {} '.format(schema) if schema else ''}"
142                          f"sobre el user@database '{self.nom_con_db}'")
143
144        meta = MetaData()
145
146        extra_args = {}
147        if schema:
148            extra_args['schema'] = schema
149
150        a_tab = Table(nom_tab, meta, autoload=True, autoload_with=self.eng_db, **extra_args)
151
152        self.session_db.bind_table(a_tab, self.eng_db)
153
154        return a_tab
155
156    def rows_table(self, nom_tab, sql_query=None, **table_args):
157        """
158        Itera sobre los registros de la tabla
159
160        Args:
161            nom_tab (str):
162            sql_query (str=None):
163            **table_args: argumentos de la funcion table()
164
165        Yields:
166            namedtuple
167        """
168        tab = self.table(nom_tab, **table_args)
169
170        query = tab.select(text(sql_query) if sql_query else None)
171
172        res = self.session_db.execute(query)
173
174        for row in self.iter_rows_result(res, tab.name):
175            yield row
176
177    def insert_rows_table(self, nom_tab, row_values, **table_args):
178        """
179        Inserta los registros en la tabla
180
181        Args:
182            nom_tab (str):
183            row_values (list): lista de dicts con los valores de los campos a insertar por cada fila
184            **table_args: argumentos de la funcion table()
185
186        Returns:
187            list: lista de los rows_inserted (namedtuple)
188        """
189        tab = self.table(nom_tab, **table_args)
190        query = tab.insert(values=row_values).returning(*tab.columns)
191
192        res = self.session_db.execute(query)
193
194        rows_inserted = [*self.iter_rows_result(res, tab.name)]
195
196        return rows_inserted
197
198    def update_rows_table(self, tab, values, sql_query=None, **table_args):
199        """
200        Actualiza los registros de la tabla que cumplan con where_query (todos por defecto)
201
202        Args:
203            tab (str):
204            values (dict):
205            sql_query (str):
206            **table_args: argumentos de la funcion table()
207
208        Returns:
209            generator: iterador con la lista de elementos a devolver
210        """
211        tab = self.table(tab, **table_args)
212        query = tab.update().where(text(sql_query)).values(values).returning(*tab.columns)
213
214        res = self.session_db.execute(query)
215
216        return res
217
218    def remove_rows_table(self, nom_tab, sql_query=None, **table_args):
219        """
220        Borra los registros de la tabla que cumplan con where_query (todos por defecto)
221
222        Args:
223            nom_tab (str):
224            sql_query (str):
225            **table_args: argumentos de la funcion table()
226
227        Returns:
228            res
229        """
230        tab = self.table(nom_tab, **table_args)
231        query = tab.delete()
232
233        if sql_query:
234            query = query.where(text(sql_query))
235
236        res = self.session_db.execute(query)
237
238        return res

Clase que gestiona conexion sqlalchemy a Postgres

EngPsqlAlchemy( user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None)
20    def __init__(self, user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None):
21        """
22        Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado
23
24        Args:
25            user (str):
26            psw (str):
27            srvr_db (str=None):
28            port_db (int=None):
29            db (str=None):
30
31        Returns:
32            sqlalchemy.engine.base.Engine
33        """
34        nom_con = f"{user.upper()}@{db.upper()}"
35        self.nom_con_db = nom_con
36
37        self.logger = a_logger
38
39        self.__set_logger()
40        self.__set_conexion(user, psw, srvr_db, port_db, db)

Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado

Arguments:
  • user (str):
  • psw (str):
  • srvr_db (str=None):
  • port_db (int=None):
  • db (str=None):
Returns:

sqlalchemy.engine.base.Engine

nom_con_db
logger
def commit(self):
 95    def commit(self):
 96        """
 97        Hace commit sobre la sesion actual
 98        """
 99        if self.session_db:
100            self.session_db.commit()

Hace commit sobre la sesion actual

def rollback(self):
102    def rollback(self):
103        """
104        Hace rollback sobre la sesion actual
105        """
106        if self.session_db:
107            self.session_db.rollback()

Hace rollback sobre la sesion actual

def iter_rows_result(self, query_res, nom_row=None):
109    def iter_rows_result(self, query_res, nom_row=None):
110        """
111        Itera las filas del resultado como namedtuple con las claves de los campos seleccionados
112
113        Args:
114            query_res (sqlalchemy.engine.result.ResultProxy)
115            nom_row (str=None): nombre de la clase namedtuple si aplica
116
117        Yields:
118            namedtuple o tuple
119        """
120        row_dd = None
121        if query_res.keys():
122            row_dd = namedtuple(nom_row if nom_row else 'row_result',
123                                [n_col.replace(" ", "_") for n_col in query_res.keys()])
124
125        for row in query_res:
126            yield row_dd(*row) if row_dd else row

Itera las filas del resultado como namedtuple con las claves de los campos seleccionados

Arguments:
  • query_res (sqlalchemy.engine.result.ResultProxy)
  • nom_row (str=None): nombre de la clase namedtuple si aplica
Yields:

namedtuple o tuple

def table(self, nom_tab, schema=None):
128    def table(self, nom_tab, schema=None):
129        """
130        Retorna acceso a tabla sobre engine DB (
131
132        Args:
133            nom_tab (str):
134            schema (str=None):
135
136        Returns:
137
138        """
139        if not self.eng_db.has_table(nom_tab, schema):
140            raise Warning(f"No existe la tabla '{nom_tab}' "
141                          f"{'para el esquema {} '.format(schema) if schema else ''}"
142                          f"sobre el user@database '{self.nom_con_db}'")
143
144        meta = MetaData()
145
146        extra_args = {}
147        if schema:
148            extra_args['schema'] = schema
149
150        a_tab = Table(nom_tab, meta, autoload=True, autoload_with=self.eng_db, **extra_args)
151
152        self.session_db.bind_table(a_tab, self.eng_db)
153
154        return a_tab

Retorna acceso a tabla sobre engine DB (

Arguments:
  • nom_tab (str):
  • schema (str=None):

Returns:

def rows_table(self, nom_tab, sql_query=None, **table_args):
156    def rows_table(self, nom_tab, sql_query=None, **table_args):
157        """
158        Itera sobre los registros de la tabla
159
160        Args:
161            nom_tab (str):
162            sql_query (str=None):
163            **table_args: argumentos de la funcion table()
164
165        Yields:
166            namedtuple
167        """
168        tab = self.table(nom_tab, **table_args)
169
170        query = tab.select(text(sql_query) if sql_query else None)
171
172        res = self.session_db.execute(query)
173
174        for row in self.iter_rows_result(res, tab.name):
175            yield row

Itera sobre los registros de la tabla

Arguments:
  • nom_tab (str):
  • sql_query (str=None):
  • **table_args: argumentos de la funcion table()
Yields:

namedtuple

def insert_rows_table(self, nom_tab, row_values, **table_args):
177    def insert_rows_table(self, nom_tab, row_values, **table_args):
178        """
179        Inserta los registros en la tabla
180
181        Args:
182            nom_tab (str):
183            row_values (list): lista de dicts con los valores de los campos a insertar por cada fila
184            **table_args: argumentos de la funcion table()
185
186        Returns:
187            list: lista de los rows_inserted (namedtuple)
188        """
189        tab = self.table(nom_tab, **table_args)
190        query = tab.insert(values=row_values).returning(*tab.columns)
191
192        res = self.session_db.execute(query)
193
194        rows_inserted = [*self.iter_rows_result(res, tab.name)]
195
196        return rows_inserted

Inserta los registros en la tabla

Arguments:
  • nom_tab (str):
  • row_values (list): lista de dicts con los valores de los campos a insertar por cada fila
  • **table_args: argumentos de la funcion table()
Returns:

list: lista de los rows_inserted (namedtuple)

def update_rows_table(self, tab, values, sql_query=None, **table_args):
198    def update_rows_table(self, tab, values, sql_query=None, **table_args):
199        """
200        Actualiza los registros de la tabla que cumplan con where_query (todos por defecto)
201
202        Args:
203            tab (str):
204            values (dict):
205            sql_query (str):
206            **table_args: argumentos de la funcion table()
207
208        Returns:
209            generator: iterador con la lista de elementos a devolver
210        """
211        tab = self.table(tab, **table_args)
212        query = tab.update().where(text(sql_query)).values(values).returning(*tab.columns)
213
214        res = self.session_db.execute(query)
215
216        return res

Actualiza los registros de la tabla que cumplan con where_query (todos por defecto)

Arguments:
  • tab (str):
  • values (dict):
  • sql_query (str):
  • **table_args: argumentos de la funcion table()
Returns:

generator: iterador con la lista de elementos a devolver

def remove_rows_table(self, nom_tab, sql_query=None, **table_args):
218    def remove_rows_table(self, nom_tab, sql_query=None, **table_args):
219        """
220        Borra los registros de la tabla que cumplan con where_query (todos por defecto)
221
222        Args:
223            nom_tab (str):
224            sql_query (str):
225            **table_args: argumentos de la funcion table()
226
227        Returns:
228            res
229        """
230        tab = self.table(nom_tab, **table_args)
231        query = tab.delete()
232
233        if sql_query:
234            query = query.where(text(sql_query))
235
236        res = self.session_db.execute(query)
237
238        return res

Borra los registros de la tabla que cumplan con where_query (todos por defecto)

Arguments:
  • nom_tab (str):
  • sql_query (str):
  • **table_args: argumentos de la funcion table()
Returns:

res

eng_db
psw_con_db
session_db