apb_extra_utils.postgres_pckg.psql_alchemy
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 31/03/2019 5# Copyright (c) 6from apb_extra_utils.utils_logging import get_file_logger 7from sqlalchemy.orm import sessionmaker 8from sqlalchemy.engine import create_engine 9from sqlalchemy import MetaData, Table, text 10from collections import namedtuple 11 12 13class EngPsqlAlchemy(object): 14 """ 15 Clase que gestiona conexion sqlalchemy a Postgres 16 """ 17 __slots__ = 'nom_con_db', 'eng_db', 'psw_con_db', 'logger', 'session_db' 18 19 def __init__(self, user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None): 20 """ 21 Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado 22 23 Args: 24 user (str): 25 psw (str): 26 srvr_db (str=None): 27 port_db (int=None): 28 db (str=None): 29 30 Returns: 31 sqlalchemy.engine.base.Engine 32 """ 33 nom_con = f"{user.upper()}@{db.upper()}" 34 self.nom_con_db = nom_con 35 36 self.logger = a_logger 37 38 self.__set_logger() 39 self.__set_conexion(user, psw, srvr_db, port_db, db) 40 41 def __set_logger(self): 42 """ 43 Asigna el LOGGER po defecto si este no se ha informado al inicializar el gestor 44 45 Returns: 46 """ 47 if self.logger is None: 48 self.logger = get_file_logger(f'{self.__class__.__name__}({self.nom_con_db})') 49 50 def __set_conexion(self, user, psw, srvr_db, port_db, db): 51 """ 52 Crea engine para database postgres con sqlalchemy 53 54 Args: 55 user (str): 56 psw (str): 57 srvr_db (str): 58 port_db (int): 59 db (str): 60 """ 61 str_conn = f'postgresql://{user}:{psw}@{srvr_db}:{port_db}/{db}' 62 eng_db = create_engine(str_conn) 63 64 eng_db.connect() 65 66 self.eng_db = eng_db 67 # self.eng_db.logger = self.logger 68 self._set_session() 69 self.psw_con_db = psw 70 71 def _set_session(self, eng_db=None): 72 """ 73 Configura session para controlar workflow de transacciones (commit, rollback, close...) 74 75 Args: 76 eng_db: 77 """ 78 Session = sessionmaker() 79 Session.configure(bind=eng_db if eng_db else self.eng_db) 80 self.session_db = Session() 81 82 def __del__(self): 83 """ 84 Cierra la conexion al matar la instancia 85 """ 86 try: 87 if hasattr(self, 'session_db'): 88 self.session_db.close() 89 if hasattr(self, "con_db"): 90 self.eng_db = None 91 except: 92 pass 93 94 def commit(self): 95 """ 96 Hace commit sobre la sesion actual 97 """ 98 if self.session_db: 99 self.session_db.commit() 100 101 def rollback(self): 102 """ 103 Hace rollback sobre la sesion actual 104 """ 105 if self.session_db: 106 self.session_db.rollback() 107 108 def iter_rows_result(self, query_res, nom_row=None): 109 """ 110 Itera las filas del resultado como namedtuple con las claves de los campos seleccionados 111 112 Args: 113 query_res (sqlalchemy.engine.result.ResultProxy) 114 nom_row (str=None): nombre de la clase namedtuple si aplica 115 116 Yields: 117 namedtuple o tuple 118 """ 119 row_dd = None 120 if query_res.keys(): 121 row_dd = namedtuple(nom_row if nom_row else 'row_result', 122 [n_col.replace(" ", "_") for n_col in query_res.keys()]) 123 124 for row in query_res: 125 yield row_dd(*row) if row_dd else row 126 127 def table(self, nom_tab, schema=None): 128 """ 129 Retorna acceso a tabla sobre engine DB ( 130 131 Args: 132 nom_tab (str): 133 schema (str=None): 134 135 Returns: 136 137 """ 138 if not self.eng_db.has_table(nom_tab, schema): 139 raise Warning(f"No existe la tabla '{nom_tab}' " 140 f"{'para el esquema {} '.format(schema) if schema else ''}" 141 f"sobre el user@database '{self.nom_con_db}'") 142 143 meta = MetaData() 144 145 extra_args = {} 146 if schema: 147 extra_args['schema'] = schema 148 149 a_tab = Table(nom_tab, meta, autoload=True, autoload_with=self.eng_db, **extra_args) 150 151 self.session_db.bind_table(a_tab, self.eng_db) 152 153 return a_tab 154 155 def rows_table(self, nom_tab, sql_query=None, **table_args): 156 """ 157 Itera sobre los registros de la tabla 158 159 Args: 160 nom_tab (str): 161 sql_query (str=None): 162 **table_args: argumentos de la funcion table() 163 164 Yields: 165 namedtuple 166 """ 167 tab = self.table(nom_tab, **table_args) 168 169 query = tab.select(text(sql_query) if sql_query else None) 170 171 res = self.session_db.execute(query) 172 173 for row in self.iter_rows_result(res, tab.name): 174 yield row 175 176 def insert_rows_table(self, nom_tab, row_values, **table_args): 177 """ 178 Inserta los registros en la tabla 179 180 Args: 181 nom_tab (str): 182 row_values (list): lista de dicts con los valores de los campos a insertar por cada fila 183 **table_args: argumentos de la funcion table() 184 185 Returns: 186 list: lista de los rows_inserted (namedtuple) 187 """ 188 tab = self.table(nom_tab, **table_args) 189 query = tab.insert(values=row_values).returning(*tab.columns) 190 191 res = self.session_db.execute(query) 192 193 rows_inserted = [*self.iter_rows_result(res, tab.name)] 194 195 return rows_inserted 196 197 def update_rows_table(self, tab, values, sql_query=None, **table_args): 198 """ 199 Actualiza los registros de la tabla que cumplan con where_query (todos por defecto) 200 201 Args: 202 tab (str): 203 values (dict): 204 sql_query (str): 205 **table_args: argumentos de la funcion table() 206 207 Returns: 208 generator: iterador con la lista de elementos a devolver 209 """ 210 tab = self.table(tab, **table_args) 211 query = tab.update().where(text(sql_query)).values(values).returning(*tab.columns) 212 213 res = self.session_db.execute(query) 214 215 return res 216 217 def remove_rows_table(self, nom_tab, sql_query=None, **table_args): 218 """ 219 Borra los registros de la tabla que cumplan con where_query (todos por defecto) 220 221 Args: 222 nom_tab (str): 223 sql_query (str): 224 **table_args: argumentos de la funcion table() 225 226 Returns: 227 res 228 """ 229 tab = self.table(nom_tab, **table_args) 230 query = tab.delete() 231 232 if sql_query: 233 query = query.where(text(sql_query)) 234 235 res = self.session_db.execute(query) 236 237 return res 238 239 240if __name__ == '__main__': 241 from fire import Fire 242 243 Fire()
class
EngPsqlAlchemy:
14class EngPsqlAlchemy(object): 15 """ 16 Clase que gestiona conexion sqlalchemy a Postgres 17 """ 18 __slots__ = 'nom_con_db', 'eng_db', 'psw_con_db', 'logger', 'session_db' 19 20 def __init__(self, user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None): 21 """ 22 Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado 23 24 Args: 25 user (str): 26 psw (str): 27 srvr_db (str=None): 28 port_db (int=None): 29 db (str=None): 30 31 Returns: 32 sqlalchemy.engine.base.Engine 33 """ 34 nom_con = f"{user.upper()}@{db.upper()}" 35 self.nom_con_db = nom_con 36 37 self.logger = a_logger 38 39 self.__set_logger() 40 self.__set_conexion(user, psw, srvr_db, port_db, db) 41 42 def __set_logger(self): 43 """ 44 Asigna el LOGGER po defecto si este no se ha informado al inicializar el gestor 45 46 Returns: 47 """ 48 if self.logger is None: 49 self.logger = get_file_logger(f'{self.__class__.__name__}({self.nom_con_db})') 50 51 def __set_conexion(self, user, psw, srvr_db, port_db, db): 52 """ 53 Crea engine para database postgres con sqlalchemy 54 55 Args: 56 user (str): 57 psw (str): 58 srvr_db (str): 59 port_db (int): 60 db (str): 61 """ 62 str_conn = f'postgresql://{user}:{psw}@{srvr_db}:{port_db}/{db}' 63 eng_db = create_engine(str_conn) 64 65 eng_db.connect() 66 67 self.eng_db = eng_db 68 # self.eng_db.logger = self.logger 69 self._set_session() 70 self.psw_con_db = psw 71 72 def _set_session(self, eng_db=None): 73 """ 74 Configura session para controlar workflow de transacciones (commit, rollback, close...) 75 76 Args: 77 eng_db: 78 """ 79 Session = sessionmaker() 80 Session.configure(bind=eng_db if eng_db else self.eng_db) 81 self.session_db = Session() 82 83 def __del__(self): 84 """ 85 Cierra la conexion al matar la instancia 86 """ 87 try: 88 if hasattr(self, 'session_db'): 89 self.session_db.close() 90 if hasattr(self, "con_db"): 91 self.eng_db = None 92 except: 93 pass 94 95 def commit(self): 96 """ 97 Hace commit sobre la sesion actual 98 """ 99 if self.session_db: 100 self.session_db.commit() 101 102 def rollback(self): 103 """ 104 Hace rollback sobre la sesion actual 105 """ 106 if self.session_db: 107 self.session_db.rollback() 108 109 def iter_rows_result(self, query_res, nom_row=None): 110 """ 111 Itera las filas del resultado como namedtuple con las claves de los campos seleccionados 112 113 Args: 114 query_res (sqlalchemy.engine.result.ResultProxy) 115 nom_row (str=None): nombre de la clase namedtuple si aplica 116 117 Yields: 118 namedtuple o tuple 119 """ 120 row_dd = None 121 if query_res.keys(): 122 row_dd = namedtuple(nom_row if nom_row else 'row_result', 123 [n_col.replace(" ", "_") for n_col in query_res.keys()]) 124 125 for row in query_res: 126 yield row_dd(*row) if row_dd else row 127 128 def table(self, nom_tab, schema=None): 129 """ 130 Retorna acceso a tabla sobre engine DB ( 131 132 Args: 133 nom_tab (str): 134 schema (str=None): 135 136 Returns: 137 138 """ 139 if not self.eng_db.has_table(nom_tab, schema): 140 raise Warning(f"No existe la tabla '{nom_tab}' " 141 f"{'para el esquema {} '.format(schema) if schema else ''}" 142 f"sobre el user@database '{self.nom_con_db}'") 143 144 meta = MetaData() 145 146 extra_args = {} 147 if schema: 148 extra_args['schema'] = schema 149 150 a_tab = Table(nom_tab, meta, autoload=True, autoload_with=self.eng_db, **extra_args) 151 152 self.session_db.bind_table(a_tab, self.eng_db) 153 154 return a_tab 155 156 def rows_table(self, nom_tab, sql_query=None, **table_args): 157 """ 158 Itera sobre los registros de la tabla 159 160 Args: 161 nom_tab (str): 162 sql_query (str=None): 163 **table_args: argumentos de la funcion table() 164 165 Yields: 166 namedtuple 167 """ 168 tab = self.table(nom_tab, **table_args) 169 170 query = tab.select(text(sql_query) if sql_query else None) 171 172 res = self.session_db.execute(query) 173 174 for row in self.iter_rows_result(res, tab.name): 175 yield row 176 177 def insert_rows_table(self, nom_tab, row_values, **table_args): 178 """ 179 Inserta los registros en la tabla 180 181 Args: 182 nom_tab (str): 183 row_values (list): lista de dicts con los valores de los campos a insertar por cada fila 184 **table_args: argumentos de la funcion table() 185 186 Returns: 187 list: lista de los rows_inserted (namedtuple) 188 """ 189 tab = self.table(nom_tab, **table_args) 190 query = tab.insert(values=row_values).returning(*tab.columns) 191 192 res = self.session_db.execute(query) 193 194 rows_inserted = [*self.iter_rows_result(res, tab.name)] 195 196 return rows_inserted 197 198 def update_rows_table(self, tab, values, sql_query=None, **table_args): 199 """ 200 Actualiza los registros de la tabla que cumplan con where_query (todos por defecto) 201 202 Args: 203 tab (str): 204 values (dict): 205 sql_query (str): 206 **table_args: argumentos de la funcion table() 207 208 Returns: 209 generator: iterador con la lista de elementos a devolver 210 """ 211 tab = self.table(tab, **table_args) 212 query = tab.update().where(text(sql_query)).values(values).returning(*tab.columns) 213 214 res = self.session_db.execute(query) 215 216 return res 217 218 def remove_rows_table(self, nom_tab, sql_query=None, **table_args): 219 """ 220 Borra los registros de la tabla que cumplan con where_query (todos por defecto) 221 222 Args: 223 nom_tab (str): 224 sql_query (str): 225 **table_args: argumentos de la funcion table() 226 227 Returns: 228 res 229 """ 230 tab = self.table(nom_tab, **table_args) 231 query = tab.delete() 232 233 if sql_query: 234 query = query.where(text(sql_query)) 235 236 res = self.session_db.execute(query) 237 238 return res
Clase que gestiona conexion sqlalchemy a Postgres
EngPsqlAlchemy( user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None)
20 def __init__(self, user, psw, srvr_db='localhost', port_db=5432, db='postgres', a_logger=None): 21 """ 22 Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado 23 24 Args: 25 user (str): 26 psw (str): 27 srvr_db (str=None): 28 port_db (int=None): 29 db (str=None): 30 31 Returns: 32 sqlalchemy.engine.base.Engine 33 """ 34 nom_con = f"{user.upper()}@{db.upper()}" 35 self.nom_con_db = nom_con 36 37 self.logger = a_logger 38 39 self.__set_logger() 40 self.__set_conexion(user, psw, srvr_db, port_db, db)
Retorna engine para database postgres. Si no se informa ningun argumento retorna el cacheado
Arguments:
- user (str):
- psw (str):
- srvr_db (str=None):
- port_db (int=None):
- db (str=None):
Returns:
sqlalchemy.engine.base.Engine
def
commit(self):
95 def commit(self): 96 """ 97 Hace commit sobre la sesion actual 98 """ 99 if self.session_db: 100 self.session_db.commit()
Hace commit sobre la sesion actual
def
rollback(self):
102 def rollback(self): 103 """ 104 Hace rollback sobre la sesion actual 105 """ 106 if self.session_db: 107 self.session_db.rollback()
Hace rollback sobre la sesion actual
def
iter_rows_result(self, query_res, nom_row=None):
109 def iter_rows_result(self, query_res, nom_row=None): 110 """ 111 Itera las filas del resultado como namedtuple con las claves de los campos seleccionados 112 113 Args: 114 query_res (sqlalchemy.engine.result.ResultProxy) 115 nom_row (str=None): nombre de la clase namedtuple si aplica 116 117 Yields: 118 namedtuple o tuple 119 """ 120 row_dd = None 121 if query_res.keys(): 122 row_dd = namedtuple(nom_row if nom_row else 'row_result', 123 [n_col.replace(" ", "_") for n_col in query_res.keys()]) 124 125 for row in query_res: 126 yield row_dd(*row) if row_dd else row
Itera las filas del resultado como namedtuple con las claves de los campos seleccionados
Arguments:
- query_res (sqlalchemy.engine.result.ResultProxy)
- nom_row (str=None): nombre de la clase namedtuple si aplica
Yields:
namedtuple o tuple
def
table(self, nom_tab, schema=None):
128 def table(self, nom_tab, schema=None): 129 """ 130 Retorna acceso a tabla sobre engine DB ( 131 132 Args: 133 nom_tab (str): 134 schema (str=None): 135 136 Returns: 137 138 """ 139 if not self.eng_db.has_table(nom_tab, schema): 140 raise Warning(f"No existe la tabla '{nom_tab}' " 141 f"{'para el esquema {} '.format(schema) if schema else ''}" 142 f"sobre el user@database '{self.nom_con_db}'") 143 144 meta = MetaData() 145 146 extra_args = {} 147 if schema: 148 extra_args['schema'] = schema 149 150 a_tab = Table(nom_tab, meta, autoload=True, autoload_with=self.eng_db, **extra_args) 151 152 self.session_db.bind_table(a_tab, self.eng_db) 153 154 return a_tab
Retorna acceso a tabla sobre engine DB (
Arguments:
- nom_tab (str):
- schema (str=None):
Returns:
def
rows_table(self, nom_tab, sql_query=None, **table_args):
156 def rows_table(self, nom_tab, sql_query=None, **table_args): 157 """ 158 Itera sobre los registros de la tabla 159 160 Args: 161 nom_tab (str): 162 sql_query (str=None): 163 **table_args: argumentos de la funcion table() 164 165 Yields: 166 namedtuple 167 """ 168 tab = self.table(nom_tab, **table_args) 169 170 query = tab.select(text(sql_query) if sql_query else None) 171 172 res = self.session_db.execute(query) 173 174 for row in self.iter_rows_result(res, tab.name): 175 yield row
Itera sobre los registros de la tabla
Arguments:
- nom_tab (str):
- sql_query (str=None):
- **table_args: argumentos de la funcion table()
Yields:
namedtuple
def
insert_rows_table(self, nom_tab, row_values, **table_args):
177 def insert_rows_table(self, nom_tab, row_values, **table_args): 178 """ 179 Inserta los registros en la tabla 180 181 Args: 182 nom_tab (str): 183 row_values (list): lista de dicts con los valores de los campos a insertar por cada fila 184 **table_args: argumentos de la funcion table() 185 186 Returns: 187 list: lista de los rows_inserted (namedtuple) 188 """ 189 tab = self.table(nom_tab, **table_args) 190 query = tab.insert(values=row_values).returning(*tab.columns) 191 192 res = self.session_db.execute(query) 193 194 rows_inserted = [*self.iter_rows_result(res, tab.name)] 195 196 return rows_inserted
Inserta los registros en la tabla
Arguments:
- nom_tab (str):
- row_values (list): lista de dicts con los valores de los campos a insertar por cada fila
- **table_args: argumentos de la funcion table()
Returns:
list: lista de los rows_inserted (namedtuple)
def
update_rows_table(self, tab, values, sql_query=None, **table_args):
198 def update_rows_table(self, tab, values, sql_query=None, **table_args): 199 """ 200 Actualiza los registros de la tabla que cumplan con where_query (todos por defecto) 201 202 Args: 203 tab (str): 204 values (dict): 205 sql_query (str): 206 **table_args: argumentos de la funcion table() 207 208 Returns: 209 generator: iterador con la lista de elementos a devolver 210 """ 211 tab = self.table(tab, **table_args) 212 query = tab.update().where(text(sql_query)).values(values).returning(*tab.columns) 213 214 res = self.session_db.execute(query) 215 216 return res
Actualiza los registros de la tabla que cumplan con where_query (todos por defecto)
Arguments:
- tab (str):
- values (dict):
- sql_query (str):
- **table_args: argumentos de la funcion table()
Returns:
generator: iterador con la lista de elementos a devolver
def
remove_rows_table(self, nom_tab, sql_query=None, **table_args):
218 def remove_rows_table(self, nom_tab, sql_query=None, **table_args): 219 """ 220 Borra los registros de la tabla que cumplan con where_query (todos por defecto) 221 222 Args: 223 nom_tab (str): 224 sql_query (str): 225 **table_args: argumentos de la funcion table() 226 227 Returns: 228 res 229 """ 230 tab = self.table(nom_tab, **table_args) 231 query = tab.delete() 232 233 if sql_query: 234 query = query.where(text(sql_query)) 235 236 res = self.session_db.execute(query) 237 238 return res
Borra los registros de la tabla que cumplan con where_query (todos por defecto)
Arguments:
- nom_tab (str):
- sql_query (str):
- **table_args: argumentos de la funcion table()
Returns:
res