apb_duckdb_utils
Package apb_duckdb_utils
Modules to add functionality (spatial and generic) over duckdb
For all the functionality available requires GDAL library version 3.6<=3.10 and instant client Oracle installed.
To install:
pip install apb_duckdb_utils
Documentation here apb_duckdb_utils
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 5# Copyright (c) 6""" 7.. include:: ../README.md 8""" 9from __future__ import annotations 10 11import os 12import warnings 13from typing import Iterable 14 15import duckdb 16import ibis 17from geopandas import GeoDataFrame 18from ibis.expr.datatypes import GeoSpatial 19from pandas import DataFrame 20 21from apb_extra_utils.misc import create_dir 22from apb_extra_utils.utils_logging import get_base_logger 23from apb_pandas_utils.geopandas_utils import df_geometry_columns 24 25# Suppress specific warning 26warnings.filterwarnings("ignore", message="Geometry column does not contain geometry") 27 28MEMORY_DDBB = ':memory:' 29CACHE_DUCK_DDBBS = {} 30CURRENT_DB_PATH = None 31GZIP = 'gzip' 32SECRET_S3 = 'secret_s3' 33 34 35def set_current_db_path(db_path: str): 36 """ 37 Set used db path 38 39 Args: 40 db_path (str): path to duckdb database file 41 """ 42 global CURRENT_DB_PATH 43 CURRENT_DB_PATH = parse_path(db_path) 44 45 46def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False, 47 extensions: list[str] = None, 48 **connect_args) -> duckdb.DuckDBPyConnection: 49 """ 50 Get duckdb connection 51 52 Args: 53 db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH 54 as_current (bool=False): set db_path as current db path 55 no_cached (bool=False): not use cached connection 56 extensions (list[str]=None): list of extensions to load 57 **connect_args (dict): duckdb.connect args 58 59 Returns: 60 duckdb connection 61 """ 62 if not db_path: 63 if CURRENT_DB_PATH and not no_cached: 64 db_path = CURRENT_DB_PATH 65 else: 66 db_path = MEMORY_DDBB 67 68 parsed_path = parse_path(db_path) 69 k_path = parsed_path.lower() 70 if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)): 71 conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args) 72 73 if extensions: 74 for ext in extensions: 75 conn_db.install_extension(ext) 76 conn_db.load_extension(ext) 77 78 if as_current: 79 set_current_db_path(parsed_path) 80 81 return conn_db 82 83 84def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True): 85 """ 86 Save duckdb database to dir path as parquets or csvs files 87 88 Args: 89 dir_db (str=None): Path to save database 90 duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. 91 If None, get connection default with get_duckdb_connection 92 parquet (bool=True): Save as parquet file 93 """ 94 create_dir(dir_db) 95 96 if not duck_db_conn: 97 duck_db_conn = get_duckdb_connection() 98 99 if parquet: 100 format_db = "(FORMAT PARQUET)" 101 else: 102 format_db = "(FORMAT CSV, COMPRESSION 'GZIP')" 103 104 duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}") 105 106 107def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str: 108 """ 109 Get current schema 110 111 Args: 112 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 113 114 Returns: 115 current schema 116 """ 117 if not conn_db: 118 conn_db = get_duckdb_connection() 119 120 return conn_db.sql("SELECT current_schema()").fetchone()[0] 121 122 123def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]: 124 """ 125 List tables in duckdb 126 127 Args: 128 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 129 schemas (tuple[str]=None): tuple schemas. If not informed, list all tables 130 131 Returns: 132 list of tables 133 """ 134 if not conn_db: 135 conn_db = get_duckdb_connection() 136 137 current_schema = current_schema_duckdb(conn_db) 138 139 def get_table_name(row): 140 """ 141 Get table name with schema if not current schema 142 143 Args: 144 row: 145 146 Returns: 147 str 148 """ 149 schema = quote_name_duckdb(row['schema']) 150 table = quote_name_duckdb(row['name']) 151 return f"{schema}.{table}" if schema != current_schema else table 152 153 sql_tables = conn_db.sql(f"SHOW ALL TABLES") 154 if schemas: 155 sql_tables = sql_tables.filter(f"schema in {schemas}") 156 157 tables = [] 158 if sql_tables.count('name').fetchone()[0] > 0: 159 tables = sql_tables.df().apply(get_table_name, axis=1).tolist() 160 161 return tables 162 163 164def quote_name_duckdb(object_sql_name: str) -> str: 165 """ 166 Quote name to use on duckdb if has spaces 167 168 Args: 169 object_sql_name (str): name to quote (table, view, schema, index, ...) 170 171 Returns: 172 quoted name (str) 173 """ 174 object_sql_name = object_sql_name.strip().replace('"', '') 175 176 res_name = '' 177 if '.' in object_sql_name: 178 schema, name_obj = object_sql_name.split('.') 179 res_name = f'"{schema}".' if " " in schema else f'{schema}.' 180 else: 181 name_obj = object_sql_name 182 183 name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj 184 185 res_name += name_obj 186 187 return res_name 188 189 190def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool: 191 """ 192 Check if table exists in duckdb 193 194 Args: 195 table_name (str): table name 196 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 197 198 Returns: 199 bool: True if table exists 200 """ 201 if not conn_db: 202 conn_db = get_duckdb_connection() 203 204 return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db) 205 206 207def parse_path(path): 208 """ 209 Parse path to duckdb format 210 Args: 211 path (str): path to use on duckdb 212 213 Returns: 214 normalized path (str) 215 """ 216 if path.startswith('s3://'): 217 normalize_path = path 218 else: 219 normalize_path = os.path.normpath(path).replace('\\', '/') 220 221 return normalize_path 222 223 224def escape_name_table_view(name: str): 225 """ 226 Escape characters and lowercase name table/view to use on duckdb 227 Args: 228 name (str): name to use on duckdb 229 230 Returns: 231 normalized name (str) 232 """ 233 return name.replace( 234 '-', '_').replace( 235 '(', '').replace( 236 ')', '').lower() 237 238 239def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation): 240 """ 241 Check columns in sql relation 242 243 Args: 244 cols_to_check (Iterable[str]): list of columns to check 245 sql (duckdb.DuckDBPyRelation): sql relation 246 247 Returns: 248 list[str]: list of columns to check 249 """ 250 cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check] 251 cols_sql = [col.lower() for col in sql.columns] 252 253 if not all(col in cols_sql for col in cols_to_check_lower): 254 raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}") 255 256 return cols_to_check 257 258 259def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation): 260 """ 261 Set columns to GEOMETRY from WKT text 262 263 Args: 264 cols_wkt (list[str]): list of columns WKT to set as geometry 265 sql (duckdb.DuckDBPyRelation): sql relation 266 267 Returns: 268 duckdb.DuckDBPyRelation: Duckdb database relation 269 """ 270 check_columns_in_sql(cols_wkt, sql) 271 272 cols_wkt_lower = [col.lower() for col in cols_wkt] 273 sql_wkt_cols = { 274 col: f'ST_GeomFromText("{col}")' 275 for col in sql.columns if col.lower() in cols_wkt_lower 276 } 277 return replace_cols_on_sql(sql_wkt_cols, sql) 278 279 280def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 281 """ 282 Replace columns by the passed sql function 283 284 Args: 285 replace_cols (dict[str]): dict of columns with the sql function to use instead 286 sql (duckdb.DuckDBPyRelation): sql relation 287 288 Returns: 289 duckdb.DuckDBPyRelation: Duckdb database relation 290 """ 291 check_columns_in_sql(replace_cols.keys(), sql) 292 293 sql_replace = ", ".join( 294 f'{sql_func} AS "{col}"' 295 for col, sql_func in replace_cols.items() 296 ) 297 return sql.select(f"* REPLACE ({sql_replace})") 298 299 300def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 301 """ 302 Rename columns in sql select 303 304 Args: 305 alias_cols (dict[str]): dict of renamed columns with alias 306 sql (duckdb.DuckDBPyRelation): sql relation 307 308 Returns: 309 duckdb.DuckDBPyRelation: Duckdb database relation 310 """ 311 check_columns_in_sql(alias_cols.keys(), sql) 312 alias_cols = {col.lower(): alias for col, alias in alias_cols.items()} 313 314 sql_cols = ", ".join( 315 f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"' 316 for col in sql.columns 317 ) 318 319 return sql.select(f"{sql_cols}") 320 321 322def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation): 323 """ 324 Exclude columns in sql select 325 326 Args: 327 excluded_cols (list[str]): list of columns to exclude 328 sql (duckdb.DuckDBPyRelation): sql relation 329 330 Returns: 331 duckdb.DuckDBPyRelation: Duckdb database relation 332 """ 333 check_columns_in_sql(excluded_cols, sql) 334 335 str_cols = ", ".join(f'"{col}"' for col in excluded_cols) 336 sql_cols = f'* EXCLUDE ({str_cols})' 337 338 return sql.select(f"{sql_cols}") 339 340 341def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None, 342 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 343 table_or_view_name: str = None, col_id: str = None, as_view: bool = False, 344 overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation: 345 """ 346 Set new config to a Duckdb SQL Relation 347 348 Args: 349 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 350 cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry 351 cols_exclude (list[str]=None): list of columns to exclude 352 cols_alias (dict[str, str]=None): dictionary of columns aliases 353 cols_replace (dict[str, str]=None): dictionary of columns to replace 354 table_or_view_name (str=None): table or view name. If informed, create table or view 355 col_id (str=None): column primary key and index unique if create table 356 as_view (bool=False): create sql as view instead of table 357 overwrite (bool=False): overwrite table_name if exists 358 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view 359 360 Returns: 361 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 362 """ 363 if cols_exclude: 364 a_sql = exclude_cols_on_sql(cols_exclude, a_sql) 365 366 if cols_alias: 367 a_sql = rename_cols_on_sql(cols_alias, a_sql) 368 369 if cols_replace: 370 a_sql = replace_cols_on_sql(cols_replace, a_sql) 371 372 if cols_wkt: 373 a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql) 374 375 if table_or_view_name: 376 if not conn_db: 377 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 378 379 table_or_view_name = quote_name_duckdb(table_or_view_name) 380 if as_view: 381 a_sql.create_view(table_or_view_name, replace=overwrite) 382 else: 383 if overwrite: 384 conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}") 385 386 if col_id := (cols_alias or {}).get(col_id, col_id): 387 a_sql = a_sql.order(f'"{col_id}"') 388 389 a_sql.create(table_or_view_name) 390 391 if col_id: 392 n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx") 393 conn_db.execute( 394 f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")' 395 ) 396 397 a_sql = conn_db.sql(f"FROM {table_or_view_name}") 398 399 return a_sql 400 401 402def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None, 403 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 404 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, 405 conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation: 406 """ 407 Import csv file as table on duckdb 408 409 Args: 410 csv_path (str): path to csv file 411 table_or_view_name (str=None): table or view name. If informed, create table or view 412 header (bool=True): csv file has header 413 col_id (str=None): column primary key and index unique if create table 414 cols_wkt (list[str] = None): list of columns WKT to use as geometry 415 cols_exclude (list[str]=None): list of columns to exclude 416 cols_alias (dict[str, str]=None): dictionary of columns aliases 417 cols_replace (dict[str, str]=None): dictionary of columns to replace 418 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 419 as_view (bool=False): create table as view instead of table 420 overwrite (bool=False): overwrite table_name if exists 421 zipped (bool=False): compression type. If informed, use it 422 423 Returns: 424 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 425 """ 426 if not conn_db: 427 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 428 429 compression = f', compression={GZIP}' if zipped else '' 430 431 a_sql = conn_db.sql( 432 f""" 433 FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression}) 434 """ 435 ) 436 437 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 438 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 439 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 440 441 return a_sql 442 443 444def import_gdal_file_to_duckdb( 445 path_file: str, 446 table_or_view_name: str = None, 447 gdal_open_options: list[str] = None, 448 col_id: str = None, 449 cols_exclude: list[str] = None, 450 cols_alias: dict[str, str] = None, 451 cols_replace: dict[str, str] = None, 452 conn_db: duckdb.DuckDBPyConnection = None, 453 as_view: bool = False, 454 overwrite: bool = False, 455 **st_read_kwargs) -> duckdb.DuckDBPyRelation: 456 """ 457 Load GDAL file driver on duckdb database 458 459 Args: 460 path_file (str): path to GDAL file 461 table_or_view_name (str=None): table or view name. If informed, create table or view 462 gdal_open_options (list[str]=None): list of GDAL open options. 463 See GDAL documentation (https://gdal.org/drivers/vector/index.html) 464 (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude']) 465 col_id (str=None): column primary key and index unique if create table 466 cols_exclude (list[str]=None): list of columns to exclude 467 cols_alias (dict[str, str]=None): dictionary of columns aliases 468 cols_replace (dict[str, str]=None): dictionary of columns to replace 469 conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection 470 as_view (bool=False): create sql as view instead of table 471 overwrite (bool=False): overwrite table_name if exists 472 **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation 473 (https://duckdb.org/docs/extensions/spatial/functions.html#st_read) 474 475 Returns: 476 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 477 """ 478 params_st_read = f"'{parse_path(path_file)}'" 479 if gdal_open_options: 480 params_st_read = f"{params_st_read}, open_options={gdal_open_options}" 481 if st_read_kwargs: 482 params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}" 483 484 query = f""" 485 FROM ST_Read({params_st_read}) 486 """ 487 488 if not conn_db: 489 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 490 491 a_sql = conn_db.sql(query) 492 493 a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace, 494 table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view, 495 overwrite=overwrite, conn_db=conn_db) 496 497 return a_sql 498 499 500def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None, 501 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 502 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 503 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, 504 overwrite=False) -> duckdb.DuckDBPyRelation: 505 """ 506 Import DataFrame/GeoDataframe as table on duckdb 507 508 Args: 509 df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame 510 table_or_view_name (str=None): table or view name. If informed, create table or view 511 col_id (str=None): column primary key and index unique if create table 512 cols_wkt (list[str] = None): list of columns WKT to use as geometry 513 cols_exclude (list[str]=None): list of columns to exclude 514 cols_alias (dict[str, str]=None): dictionary of columns aliases 515 cols_replace (dict[str, str]=None): dictionary of columns to replace 516 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 517 as_view (bool=False): create table as view instead of table 518 overwrite (bool=False): overwrite table_name if exists 519 520 Returns: 521 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 522 """ 523 if not conn_db: 524 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 525 526 for col in (cols_geom := df_geometry_columns(df)): 527 df[col] = df[col].to_wkb(include_srid=True) 528 529 cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom] 530 alias_cols_geom = ', '.join(cols_as_wkb) 531 exclude_geom_cols = ', '.join(cols_geom) 532 533 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *" 534 a_sql = conn_db.sql(f""" 535 {select_expr} FROM df 536 """) 537 538 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 539 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 540 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 541 542 return a_sql 543 544 545def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None, 546 cols_geom: list[str] = None, 547 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 548 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 549 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, 550 **read_parquet_params) -> duckdb.DuckDBPyRelation: 551 """ 552 Import Parquet/Geoparquet file as table on duckdb 553 554 Args: 555 parquet_path (str): path to parquet/geoparquet file 556 table_or_view_name (str=None): table or view name. If informed, create table or view 557 col_id (str=None): column primary key and index unique if create table 558 cols_geom (list[str]=None): list of columns type geometry 559 cols_wkt (list[str] = None): list of columns WKT to use as geometry 560 cols_exclude (list[str]=None): list of columns to exclude 561 cols_alias (dict[str, str]=None): dictionary of columns aliases 562 cols_replace (dict[str, str]=None): dictionary of columns to replace 563 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 564 as_view (bool=False): create table as view instead of table 565 overwrite (bool=False): overwrite table_name if exists 566 **read_parquet_params (Any): read_parquet function kwargs. 567 See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters 568 569 Returns: 570 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 571 """ 572 if not conn_db: 573 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 574 575 select_expr = 'SELECT *' 576 if cols_geom: 577 alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom) 578 exclude_geom_cols = ', '.join(cols_geom) 579 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" 580 581 sql_read_parquet_params = '' 582 if read_parquet_params: 583 sql_read_parquet_params += ', ' 584 sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()]) 585 586 a_sql = conn_db.sql( 587 f""" 588 {select_expr} 589 FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params}) 590 """ 591 ) 592 593 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 594 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 595 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 596 597 return a_sql 598 599 600def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table: 601 """ 602 Filter ibis table 603 604 Args: 605 table (ibis.Table): The table to filter 606 sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table 607 608 Returns: 609 table (ibis.Table): The table filtered 610 """ 611 if isinstance(sql_or_ibis_filter, str): # Filter by SQL on duckdb backend 612 # Check if geospatial fields to cast as geometry 613 cols_geom = [col for col, fld in table.schema().fields.items() 614 if isinstance(fld, GeoSpatial)] 615 if cols_geom: 616 cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom]) 617 sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}" 618 else: 619 sql_select_cols = "*" 620 621 n_tab = table.get_name() 622 sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}" 623 get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}") 624 res = table.sql(sql_str) 625 else: 626 res = table.filter(sql_or_ibis_filter) 627 628 return res 629 630 631def set_secret_s3_storage(duckdb_conn: duckdb.DuckDBPyConnection, endpoint: str, access_key_id: str, 632 secret_access_key: str, url_style: str = 'path', use_ssl: bool = False, region: str = None, 633 secret_name: str = SECRET_S3) -> bool: 634 """ 635 Set secret S3 storage on duckdb connection 636 637 Args: 638 duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection 639 endpoint (str): endpoint url. (e.g. 's3.amazonaws.com') 640 access_key_id (str): access key id 641 secret_access_key (str): secret access key 642 url_style (str='path'): url style. 'path' or 'virtual_hosted' 643 use_ssl (bool=False): use ssl 644 region (str=None): region name 645 secret_name (str=SECRET_S3): secret name 646 647 Returns: 648 ok 649 """ 650 ok = False 651 try: 652 duckdb_conn.execute(f""" 653 CREATE OR REPLACE SECRET {secret_name} ( 654 TYPE S3, 655 KEY_ID '{access_key_id}', 656 SECRET '{secret_access_key}', 657 ENDPOINT '{endpoint}', 658 URL_STYLE '{url_style}', 659 USE_SSL {use_ssl}, 660 REGION '{region}' 661 ); 662 """) 663 ok = True 664 except Exception as e: 665 get_base_logger().error(f"Error setting S3 access keys: {e}") 666 667 return ok 668 669 670def exists_secret(duckdb_conn: duckdb.DuckDBPyConnection, secret_name: str = SECRET_S3) -> bool: 671 """ 672 Check if secret exists on duckdb connection 673 674 Args: 675 duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection 676 secret_name (str=SECRET_S3): secret name 677 678 Returns: 679 bool: True if secret exists 680 """ 681 exists = False 682 try: 683 exists = duckdb_conn.execute( 684 f"SELECT COUNT(*) > 0 AS existe FROM duckdb_secrets() WHERE name = '{secret_name}'").fetchone()[0] 685 except Exception as e: 686 get_base_logger().error(f"Error checking secret existence: {e}") 687 688 return exists
36def set_current_db_path(db_path: str): 37 """ 38 Set used db path 39 40 Args: 41 db_path (str): path to duckdb database file 42 """ 43 global CURRENT_DB_PATH 44 CURRENT_DB_PATH = parse_path(db_path)
Set used db path
Arguments:
- db_path (str): path to duckdb database file
47def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False, 48 extensions: list[str] = None, 49 **connect_args) -> duckdb.DuckDBPyConnection: 50 """ 51 Get duckdb connection 52 53 Args: 54 db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH 55 as_current (bool=False): set db_path as current db path 56 no_cached (bool=False): not use cached connection 57 extensions (list[str]=None): list of extensions to load 58 **connect_args (dict): duckdb.connect args 59 60 Returns: 61 duckdb connection 62 """ 63 if not db_path: 64 if CURRENT_DB_PATH and not no_cached: 65 db_path = CURRENT_DB_PATH 66 else: 67 db_path = MEMORY_DDBB 68 69 parsed_path = parse_path(db_path) 70 k_path = parsed_path.lower() 71 if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)): 72 conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args) 73 74 if extensions: 75 for ext in extensions: 76 conn_db.install_extension(ext) 77 conn_db.load_extension(ext) 78 79 if as_current: 80 set_current_db_path(parsed_path) 81 82 return conn_db
Get duckdb connection
Arguments:
- db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
- as_current (bool=False): set db_path as current db path
- no_cached (bool=False): not use cached connection
- extensions (list[str]=None): list of extensions to load
- **connect_args (dict): duckdb.connect args
Returns:
duckdb connection
85def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True): 86 """ 87 Save duckdb database to dir path as parquets or csvs files 88 89 Args: 90 dir_db (str=None): Path to save database 91 duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. 92 If None, get connection default with get_duckdb_connection 93 parquet (bool=True): Save as parquet file 94 """ 95 create_dir(dir_db) 96 97 if not duck_db_conn: 98 duck_db_conn = get_duckdb_connection() 99 100 if parquet: 101 format_db = "(FORMAT PARQUET)" 102 else: 103 format_db = "(FORMAT CSV, COMPRESSION 'GZIP')" 104 105 duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}")
Save duckdb database to dir path as parquets or csvs files
Arguments:
- dir_db (str=None): Path to save database
- duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. If None, get connection default with get_duckdb_connection
- parquet (bool=True): Save as parquet file
108def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str: 109 """ 110 Get current schema 111 112 Args: 113 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 114 115 Returns: 116 current schema 117 """ 118 if not conn_db: 119 conn_db = get_duckdb_connection() 120 121 return conn_db.sql("SELECT current_schema()").fetchone()[0]
Get current schema
Arguments:
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:
current schema
124def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]: 125 """ 126 List tables in duckdb 127 128 Args: 129 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 130 schemas (tuple[str]=None): tuple schemas. If not informed, list all tables 131 132 Returns: 133 list of tables 134 """ 135 if not conn_db: 136 conn_db = get_duckdb_connection() 137 138 current_schema = current_schema_duckdb(conn_db) 139 140 def get_table_name(row): 141 """ 142 Get table name with schema if not current schema 143 144 Args: 145 row: 146 147 Returns: 148 str 149 """ 150 schema = quote_name_duckdb(row['schema']) 151 table = quote_name_duckdb(row['name']) 152 return f"{schema}.{table}" if schema != current_schema else table 153 154 sql_tables = conn_db.sql(f"SHOW ALL TABLES") 155 if schemas: 156 sql_tables = sql_tables.filter(f"schema in {schemas}") 157 158 tables = [] 159 if sql_tables.count('name').fetchone()[0] > 0: 160 tables = sql_tables.df().apply(get_table_name, axis=1).tolist() 161 162 return tables
List tables in duckdb
Arguments:
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
- schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
Returns:
list of tables
165def quote_name_duckdb(object_sql_name: str) -> str: 166 """ 167 Quote name to use on duckdb if has spaces 168 169 Args: 170 object_sql_name (str): name to quote (table, view, schema, index, ...) 171 172 Returns: 173 quoted name (str) 174 """ 175 object_sql_name = object_sql_name.strip().replace('"', '') 176 177 res_name = '' 178 if '.' in object_sql_name: 179 schema, name_obj = object_sql_name.split('.') 180 res_name = f'"{schema}".' if " " in schema else f'{schema}.' 181 else: 182 name_obj = object_sql_name 183 184 name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj 185 186 res_name += name_obj 187 188 return res_name
Quote name to use on duckdb if has spaces
Arguments:
- object_sql_name (str): name to quote (table, view, schema, index, ...)
Returns:
quoted name (str)
191def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool: 192 """ 193 Check if table exists in duckdb 194 195 Args: 196 table_name (str): table name 197 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 198 199 Returns: 200 bool: True if table exists 201 """ 202 if not conn_db: 203 conn_db = get_duckdb_connection() 204 205 return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db)
Check if table exists in duckdb
Arguments:
- table_name (str): table name
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:
bool: True if table exists
208def parse_path(path): 209 """ 210 Parse path to duckdb format 211 Args: 212 path (str): path to use on duckdb 213 214 Returns: 215 normalized path (str) 216 """ 217 if path.startswith('s3://'): 218 normalize_path = path 219 else: 220 normalize_path = os.path.normpath(path).replace('\\', '/') 221 222 return normalize_path
Parse path to duckdb format
Arguments:
- path (str): path to use on duckdb
Returns:
normalized path (str)
225def escape_name_table_view(name: str): 226 """ 227 Escape characters and lowercase name table/view to use on duckdb 228 Args: 229 name (str): name to use on duckdb 230 231 Returns: 232 normalized name (str) 233 """ 234 return name.replace( 235 '-', '_').replace( 236 '(', '').replace( 237 ')', '').lower()
Escape characters and lowercase name table/view to use on duckdb
Arguments:
- name (str): name to use on duckdb
Returns:
normalized name (str)
240def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation): 241 """ 242 Check columns in sql relation 243 244 Args: 245 cols_to_check (Iterable[str]): list of columns to check 246 sql (duckdb.DuckDBPyRelation): sql relation 247 248 Returns: 249 list[str]: list of columns to check 250 """ 251 cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check] 252 cols_sql = [col.lower() for col in sql.columns] 253 254 if not all(col in cols_sql for col in cols_to_check_lower): 255 raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}") 256 257 return cols_to_check
Check columns in sql relation
Arguments:
- cols_to_check (Iterable[str]): list of columns to check
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
list[str]: list of columns to check
260def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation): 261 """ 262 Set columns to GEOMETRY from WKT text 263 264 Args: 265 cols_wkt (list[str]): list of columns WKT to set as geometry 266 sql (duckdb.DuckDBPyRelation): sql relation 267 268 Returns: 269 duckdb.DuckDBPyRelation: Duckdb database relation 270 """ 271 check_columns_in_sql(cols_wkt, sql) 272 273 cols_wkt_lower = [col.lower() for col in cols_wkt] 274 sql_wkt_cols = { 275 col: f'ST_GeomFromText("{col}")' 276 for col in sql.columns if col.lower() in cols_wkt_lower 277 } 278 return replace_cols_on_sql(sql_wkt_cols, sql)
Set columns to GEOMETRY from WKT text
Arguments:
- cols_wkt (list[str]): list of columns WKT to set as geometry
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
281def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 282 """ 283 Replace columns by the passed sql function 284 285 Args: 286 replace_cols (dict[str]): dict of columns with the sql function to use instead 287 sql (duckdb.DuckDBPyRelation): sql relation 288 289 Returns: 290 duckdb.DuckDBPyRelation: Duckdb database relation 291 """ 292 check_columns_in_sql(replace_cols.keys(), sql) 293 294 sql_replace = ", ".join( 295 f'{sql_func} AS "{col}"' 296 for col, sql_func in replace_cols.items() 297 ) 298 return sql.select(f"* REPLACE ({sql_replace})")
Replace columns by the passed sql function
Arguments:
- replace_cols (dict[str]): dict of columns with the sql function to use instead
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
301def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 302 """ 303 Rename columns in sql select 304 305 Args: 306 alias_cols (dict[str]): dict of renamed columns with alias 307 sql (duckdb.DuckDBPyRelation): sql relation 308 309 Returns: 310 duckdb.DuckDBPyRelation: Duckdb database relation 311 """ 312 check_columns_in_sql(alias_cols.keys(), sql) 313 alias_cols = {col.lower(): alias for col, alias in alias_cols.items()} 314 315 sql_cols = ", ".join( 316 f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"' 317 for col in sql.columns 318 ) 319 320 return sql.select(f"{sql_cols}")
Rename columns in sql select
Arguments:
- alias_cols (dict[str]): dict of renamed columns with alias
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
323def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation): 324 """ 325 Exclude columns in sql select 326 327 Args: 328 excluded_cols (list[str]): list of columns to exclude 329 sql (duckdb.DuckDBPyRelation): sql relation 330 331 Returns: 332 duckdb.DuckDBPyRelation: Duckdb database relation 333 """ 334 check_columns_in_sql(excluded_cols, sql) 335 336 str_cols = ", ".join(f'"{col}"' for col in excluded_cols) 337 sql_cols = f'* EXCLUDE ({str_cols})' 338 339 return sql.select(f"{sql_cols}")
Exclude columns in sql select
Arguments:
- excluded_cols (list[str]): list of columns to exclude
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
342def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None, 343 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 344 table_or_view_name: str = None, col_id: str = None, as_view: bool = False, 345 overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation: 346 """ 347 Set new config to a Duckdb SQL Relation 348 349 Args: 350 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 351 cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry 352 cols_exclude (list[str]=None): list of columns to exclude 353 cols_alias (dict[str, str]=None): dictionary of columns aliases 354 cols_replace (dict[str, str]=None): dictionary of columns to replace 355 table_or_view_name (str=None): table or view name. If informed, create table or view 356 col_id (str=None): column primary key and index unique if create table 357 as_view (bool=False): create sql as view instead of table 358 overwrite (bool=False): overwrite table_name if exists 359 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view 360 361 Returns: 362 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 363 """ 364 if cols_exclude: 365 a_sql = exclude_cols_on_sql(cols_exclude, a_sql) 366 367 if cols_alias: 368 a_sql = rename_cols_on_sql(cols_alias, a_sql) 369 370 if cols_replace: 371 a_sql = replace_cols_on_sql(cols_replace, a_sql) 372 373 if cols_wkt: 374 a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql) 375 376 if table_or_view_name: 377 if not conn_db: 378 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 379 380 table_or_view_name = quote_name_duckdb(table_or_view_name) 381 if as_view: 382 a_sql.create_view(table_or_view_name, replace=overwrite) 383 else: 384 if overwrite: 385 conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}") 386 387 if col_id := (cols_alias or {}).get(col_id, col_id): 388 a_sql = a_sql.order(f'"{col_id}"') 389 390 a_sql.create(table_or_view_name) 391 392 if col_id: 393 n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx") 394 conn_db.execute( 395 f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")' 396 ) 397 398 a_sql = conn_db.sql(f"FROM {table_or_view_name}") 399 400 return a_sql
Set new config to a Duckdb SQL Relation
Arguments:
- a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
- cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- table_or_view_name (str=None): table or view name. If informed, create table or view
- col_id (str=None): column primary key and index unique if create table
- as_view (bool=False): create sql as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
403def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None, 404 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 405 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, 406 conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation: 407 """ 408 Import csv file as table on duckdb 409 410 Args: 411 csv_path (str): path to csv file 412 table_or_view_name (str=None): table or view name. If informed, create table or view 413 header (bool=True): csv file has header 414 col_id (str=None): column primary key and index unique if create table 415 cols_wkt (list[str] = None): list of columns WKT to use as geometry 416 cols_exclude (list[str]=None): list of columns to exclude 417 cols_alias (dict[str, str]=None): dictionary of columns aliases 418 cols_replace (dict[str, str]=None): dictionary of columns to replace 419 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 420 as_view (bool=False): create table as view instead of table 421 overwrite (bool=False): overwrite table_name if exists 422 zipped (bool=False): compression type. If informed, use it 423 424 Returns: 425 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 426 """ 427 if not conn_db: 428 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 429 430 compression = f', compression={GZIP}' if zipped else '' 431 432 a_sql = conn_db.sql( 433 f""" 434 FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression}) 435 """ 436 ) 437 438 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 439 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 440 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 441 442 return a_sql
Import csv file as table on duckdb
Arguments:
- csv_path (str): path to csv file
- table_or_view_name (str=None): table or view name. If informed, create table or view
- header (bool=True): csv file has header
- col_id (str=None): column primary key and index unique if create table
- cols_wkt (list[str] = None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
- as_view (bool=False): create table as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- zipped (bool=False): compression type. If informed, use it
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
445def import_gdal_file_to_duckdb( 446 path_file: str, 447 table_or_view_name: str = None, 448 gdal_open_options: list[str] = None, 449 col_id: str = None, 450 cols_exclude: list[str] = None, 451 cols_alias: dict[str, str] = None, 452 cols_replace: dict[str, str] = None, 453 conn_db: duckdb.DuckDBPyConnection = None, 454 as_view: bool = False, 455 overwrite: bool = False, 456 **st_read_kwargs) -> duckdb.DuckDBPyRelation: 457 """ 458 Load GDAL file driver on duckdb database 459 460 Args: 461 path_file (str): path to GDAL file 462 table_or_view_name (str=None): table or view name. If informed, create table or view 463 gdal_open_options (list[str]=None): list of GDAL open options. 464 See GDAL documentation (https://gdal.org/drivers/vector/index.html) 465 (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude']) 466 col_id (str=None): column primary key and index unique if create table 467 cols_exclude (list[str]=None): list of columns to exclude 468 cols_alias (dict[str, str]=None): dictionary of columns aliases 469 cols_replace (dict[str, str]=None): dictionary of columns to replace 470 conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection 471 as_view (bool=False): create sql as view instead of table 472 overwrite (bool=False): overwrite table_name if exists 473 **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation 474 (https://duckdb.org/docs/extensions/spatial/functions.html#st_read) 475 476 Returns: 477 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 478 """ 479 params_st_read = f"'{parse_path(path_file)}'" 480 if gdal_open_options: 481 params_st_read = f"{params_st_read}, open_options={gdal_open_options}" 482 if st_read_kwargs: 483 params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}" 484 485 query = f""" 486 FROM ST_Read({params_st_read}) 487 """ 488 489 if not conn_db: 490 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 491 492 a_sql = conn_db.sql(query) 493 494 a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace, 495 table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view, 496 overwrite=overwrite, conn_db=conn_db) 497 498 return a_sql
Load GDAL file driver on duckdb database
Arguments:
- path_file (str): path to GDAL file
- table_or_view_name (str=None): table or view name. If informed, create table or view
- gdal_open_options (list[str]=None): list of GDAL open options. See GDAL documentation (https://gdal.org/drivers/vector/index.html) (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
- col_id (str=None): column primary key and index unique if create table
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
- as_view (bool=False): create sql as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
501def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None, 502 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 503 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 504 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, 505 overwrite=False) -> duckdb.DuckDBPyRelation: 506 """ 507 Import DataFrame/GeoDataframe as table on duckdb 508 509 Args: 510 df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame 511 table_or_view_name (str=None): table or view name. If informed, create table or view 512 col_id (str=None): column primary key and index unique if create table 513 cols_wkt (list[str] = None): list of columns WKT to use as geometry 514 cols_exclude (list[str]=None): list of columns to exclude 515 cols_alias (dict[str, str]=None): dictionary of columns aliases 516 cols_replace (dict[str, str]=None): dictionary of columns to replace 517 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 518 as_view (bool=False): create table as view instead of table 519 overwrite (bool=False): overwrite table_name if exists 520 521 Returns: 522 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 523 """ 524 if not conn_db: 525 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 526 527 for col in (cols_geom := df_geometry_columns(df)): 528 df[col] = df[col].to_wkb(include_srid=True) 529 530 cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom] 531 alias_cols_geom = ', '.join(cols_as_wkb) 532 exclude_geom_cols = ', '.join(cols_geom) 533 534 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *" 535 a_sql = conn_db.sql(f""" 536 {select_expr} FROM df 537 """) 538 539 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 540 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 541 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 542 543 return a_sql
Import DataFrame/GeoDataframe as table on duckdb
Arguments:
- df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
- table_or_view_name (str=None): table or view name. If informed, create table or view
- col_id (str=None): column primary key and index unique if create table
- cols_wkt (list[str] = None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
- as_view (bool=False): create table as view instead of table
- overwrite (bool=False): overwrite table_name if exists
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
546def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None, 547 cols_geom: list[str] = None, 548 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 549 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 550 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, 551 **read_parquet_params) -> duckdb.DuckDBPyRelation: 552 """ 553 Import Parquet/Geoparquet file as table on duckdb 554 555 Args: 556 parquet_path (str): path to parquet/geoparquet file 557 table_or_view_name (str=None): table or view name. If informed, create table or view 558 col_id (str=None): column primary key and index unique if create table 559 cols_geom (list[str]=None): list of columns type geometry 560 cols_wkt (list[str] = None): list of columns WKT to use as geometry 561 cols_exclude (list[str]=None): list of columns to exclude 562 cols_alias (dict[str, str]=None): dictionary of columns aliases 563 cols_replace (dict[str, str]=None): dictionary of columns to replace 564 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 565 as_view (bool=False): create table as view instead of table 566 overwrite (bool=False): overwrite table_name if exists 567 **read_parquet_params (Any): read_parquet function kwargs. 568 See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters 569 570 Returns: 571 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 572 """ 573 if not conn_db: 574 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 575 576 select_expr = 'SELECT *' 577 if cols_geom: 578 alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom) 579 exclude_geom_cols = ', '.join(cols_geom) 580 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" 581 582 sql_read_parquet_params = '' 583 if read_parquet_params: 584 sql_read_parquet_params += ', ' 585 sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()]) 586 587 a_sql = conn_db.sql( 588 f""" 589 {select_expr} 590 FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params}) 591 """ 592 ) 593 594 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 595 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 596 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 597 598 return a_sql
Import Parquet/Geoparquet file as table on duckdb
Arguments:
- parquet_path (str): path to parquet/geoparquet file
- table_or_view_name (str=None): table or view name. If informed, create table or view
- col_id (str=None): column primary key and index unique if create table
- cols_geom (list[str]=None): list of columns type geometry
- cols_wkt (list[str] = None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
- as_view (bool=False): create table as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- **read_parquet_params (Any): read_parquet function kwargs. See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
601def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table: 602 """ 603 Filter ibis table 604 605 Args: 606 table (ibis.Table): The table to filter 607 sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table 608 609 Returns: 610 table (ibis.Table): The table filtered 611 """ 612 if isinstance(sql_or_ibis_filter, str): # Filter by SQL on duckdb backend 613 # Check if geospatial fields to cast as geometry 614 cols_geom = [col for col, fld in table.schema().fields.items() 615 if isinstance(fld, GeoSpatial)] 616 if cols_geom: 617 cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom]) 618 sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}" 619 else: 620 sql_select_cols = "*" 621 622 n_tab = table.get_name() 623 sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}" 624 get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}") 625 res = table.sql(sql_str) 626 else: 627 res = table.filter(sql_or_ibis_filter) 628 629 return res
Filter ibis table
Arguments:
- table (ibis.Table): The table to filter
- sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
Returns:
table (ibis.Table): The table filtered
632def set_secret_s3_storage(duckdb_conn: duckdb.DuckDBPyConnection, endpoint: str, access_key_id: str, 633 secret_access_key: str, url_style: str = 'path', use_ssl: bool = False, region: str = None, 634 secret_name: str = SECRET_S3) -> bool: 635 """ 636 Set secret S3 storage on duckdb connection 637 638 Args: 639 duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection 640 endpoint (str): endpoint url. (e.g. 's3.amazonaws.com') 641 access_key_id (str): access key id 642 secret_access_key (str): secret access key 643 url_style (str='path'): url style. 'path' or 'virtual_hosted' 644 use_ssl (bool=False): use ssl 645 region (str=None): region name 646 secret_name (str=SECRET_S3): secret name 647 648 Returns: 649 ok 650 """ 651 ok = False 652 try: 653 duckdb_conn.execute(f""" 654 CREATE OR REPLACE SECRET {secret_name} ( 655 TYPE S3, 656 KEY_ID '{access_key_id}', 657 SECRET '{secret_access_key}', 658 ENDPOINT '{endpoint}', 659 URL_STYLE '{url_style}', 660 USE_SSL {use_ssl}, 661 REGION '{region}' 662 ); 663 """) 664 ok = True 665 except Exception as e: 666 get_base_logger().error(f"Error setting S3 access keys: {e}") 667 668 return ok
Set secret S3 storage on duckdb connection
Arguments:
- duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
- endpoint (str): endpoint url. (e.g. 's3.amazonaws.com')
- access_key_id (str): access key id
- secret_access_key (str): secret access key
- url_style (str='path'): url style. 'path' or 'virtual_hosted'
- use_ssl (bool=False): use ssl
- region (str=None): region name
- secret_name (str=SECRET_S3): secret name
Returns:
ok
671def exists_secret(duckdb_conn: duckdb.DuckDBPyConnection, secret_name: str = SECRET_S3) -> bool: 672 """ 673 Check if secret exists on duckdb connection 674 675 Args: 676 duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection 677 secret_name (str=SECRET_S3): secret name 678 679 Returns: 680 bool: True if secret exists 681 """ 682 exists = False 683 try: 684 exists = duckdb_conn.execute( 685 f"SELECT COUNT(*) > 0 AS existe FROM duckdb_secrets() WHERE name = '{secret_name}'").fetchone()[0] 686 except Exception as e: 687 get_base_logger().error(f"Error checking secret existence: {e}") 688 689 return exists
Check if secret exists on duckdb connection
Arguments:
- duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
- secret_name (str=SECRET_S3): secret name
Returns:
bool: True if secret exists