apb_duckdb_utils
Package apb_duckdb_utils
Modules to add functionality (spatial and generic) over duckdb
For all the functionality available requires GDAL
library version 3.6<=3.9 and instant client Oracle installed.
To install:
pip install apb_duckdb_utils
Documentation here apb_duckdb_utils
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 5# Copyright (c) 6""" 7.. include:: ../README.md 8""" 9from __future__ import annotations 10 11import os 12import warnings 13from typing import Iterable 14 15import duckdb 16import ibis 17from geopandas import GeoDataFrame 18from ibis.expr.datatypes import GeoSpatial 19from pandas import DataFrame 20 21from apb_extra_utils.misc import create_dir 22from apb_extra_utils.utils_logging import get_base_logger 23from apb_pandas_utils.geopandas_utils import df_geometry_columns 24 25# Suppress specific warning 26warnings.filterwarnings("ignore", message="Geometry column does not contain geometry") 27 28MEMORY_DDBB = ':memory:' 29CACHE_DUCK_DDBBS = {} 30CURRENT_DB_PATH = None 31GZIP = 'gzip' 32 33 34def set_current_db_path(db_path: str): 35 """ 36 Set used db path 37 38 Args: 39 db_path (str): path to duckdb database file 40 """ 41 global CURRENT_DB_PATH 42 CURRENT_DB_PATH = parse_path(db_path) 43 44 45def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False, 46 extensions: list[str] = None, 47 **connect_args) -> duckdb.DuckDBPyConnection: 48 """ 49 Get duckdb connection 50 51 Args: 52 db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH 53 as_current (bool=False): set db_path as current db path 54 no_cached (bool=False): not use cached connection 55 extensions (list[str]=None): list of extensions to load 56 **connect_args (dict): duckdb.connect args 57 58 Returns: 59 duckdb connection 60 """ 61 if not db_path: 62 if CURRENT_DB_PATH and not no_cached: 63 db_path = CURRENT_DB_PATH 64 else: 65 db_path = MEMORY_DDBB 66 67 parsed_path = parse_path(db_path) 68 k_path = parsed_path.lower() 69 if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)): 70 conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args) 71 72 if extensions: 73 for ext in extensions: 74 conn_db.install_extension(ext) 75 conn_db.load_extension(ext) 76 77 if as_current: 78 set_current_db_path(parsed_path) 79 80 return conn_db 81 82 83def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True): 84 """ 85 Save duckdb database to dir path as parquets or csvs files 86 87 Args: 88 dir_db (str=None): Path to save database 89 duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. 90 If None, get connection default with get_duckdb_connection 91 parquet (bool=True): Save as parquet file 92 """ 93 create_dir(dir_db) 94 95 if not duck_db_conn: 96 duck_db_conn = get_duckdb_connection() 97 98 if parquet: 99 format_db = "(FORMAT PARQUET)" 100 else: 101 format_db = "(FORMAT CSV, COMPRESSION 'GZIP')" 102 103 duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}") 104 105 106def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str: 107 """ 108 Get current schema 109 110 Args: 111 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 112 113 Returns: 114 current schema 115 """ 116 if not conn_db: 117 conn_db = get_duckdb_connection() 118 119 return conn_db.sql("SELECT current_schema()").fetchone()[0] 120 121 122def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]: 123 """ 124 List tables in duckdb 125 126 Args: 127 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 128 schemas (tuple[str]=None): tuple schemas. If not informed, list all tables 129 130 Returns: 131 list of tables 132 """ 133 if not conn_db: 134 conn_db = get_duckdb_connection() 135 136 current_schema = current_schema_duckdb(conn_db) 137 138 def get_table_name(row): 139 schema = quote_name_duckdb(row['schema']) 140 table = quote_name_duckdb(row['name']) 141 return f"{schema}.{table}" if schema != current_schema else table 142 143 sql_tables = conn_db.sql(f"SHOW ALL TABLES") 144 if schemas: 145 sql_tables = sql_tables.filter(f"schema in {schemas}") 146 147 tables = [] 148 if sql_tables.count('name').fetchone()[0] > 0: 149 tables = sql_tables.df().apply(get_table_name, axis=1).tolist() 150 151 return tables 152 153 154def quote_name_duckdb(object_sql_name: str) -> str: 155 """ 156 Quote name to use on duckdb if has spaces 157 158 Args: 159 object_sql_name (str): name to quote (table, view, schema, index, ...) 160 161 Returns: 162 quoted name (str) 163 """ 164 object_sql_name = object_sql_name.strip().replace('"', '') 165 166 res_name = '' 167 if '.' in object_sql_name: 168 schema, name_obj = object_sql_name.split('.') 169 res_name = f'"{schema}".' if " " in schema else f'{schema}.' 170 else: 171 name_obj = object_sql_name 172 173 name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj 174 175 res_name += name_obj 176 177 return res_name 178 179 180def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool: 181 """ 182 Check if table exists in duckdb 183 184 Args: 185 table_name (str): table name 186 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 187 188 Returns: 189 bool: True if table exists 190 """ 191 if not conn_db: 192 conn_db = get_duckdb_connection() 193 194 return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db) 195 196 197def parse_path(path): 198 """ 199 Parse path to duckdb format 200 Args: 201 path (str): path to use on duckdb 202 203 Returns: 204 normalized path (str) 205 """ 206 normalize_path = os.path.normpath(path).replace('\\', '/') 207 return normalize_path 208 209 210def escape_name_table_view(name: str): 211 """ 212 Escape characters and lowercase name table/view to use on duckdb 213 Args: 214 name (str): name to use on duckdb 215 216 Returns: 217 normalized name (str) 218 """ 219 return name.replace( 220 '-', '_').replace( 221 '(', '').replace( 222 ')', '').lower() 223 224 225def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation): 226 """ 227 Check columns in sql relation 228 229 Args: 230 cols_to_check (Iterable[str]): list of columns to check 231 sql (duckdb.DuckDBPyRelation): sql relation 232 233 Returns: 234 list[str]: list of columns to check 235 """ 236 cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check] 237 cols_sql = [col.lower() for col in sql.columns] 238 239 if not all(col in cols_sql for col in cols_to_check_lower): 240 raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}") 241 242 return cols_to_check 243 244 245def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation): 246 """ 247 Set columns to GEOMETRY from WKT text 248 249 Args: 250 cols_wkt (list[str]): list of columns WKT to set as geometry 251 sql (duckdb.DuckDBPyRelation): sql relation 252 253 Returns: 254 duckdb.DuckDBPyRelation: Duckdb database relation 255 """ 256 check_columns_in_sql(cols_wkt, sql) 257 258 cols_wkt_lower = [col.lower() for col in cols_wkt] 259 sql_wkt_cols = { 260 col: f'ST_GeomFromText("{col}")' 261 for col in sql.columns if col.lower() in cols_wkt_lower 262 } 263 return replace_cols_on_sql(sql_wkt_cols, sql) 264 265 266def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 267 """ 268 Replace columns by the passed sql function 269 270 Args: 271 replace_cols (dict[str]): dict of columns with the sql function to use instead 272 sql (duckdb.DuckDBPyRelation): sql relation 273 274 Returns: 275 duckdb.DuckDBPyRelation: Duckdb database relation 276 """ 277 check_columns_in_sql(replace_cols.keys(), sql) 278 279 sql_replace = ", ".join( 280 f'{sql_func} AS "{col}"' 281 for col, sql_func in replace_cols.items() 282 ) 283 return sql.select(f"* REPLACE ({sql_replace})") 284 285 286def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 287 """ 288 Rename columns in sql select 289 290 Args: 291 alias_cols (dict[str]): dict of renamed columns with alias 292 sql (duckdb.DuckDBPyRelation): sql relation 293 294 Returns: 295 duckdb.DuckDBPyRelation: Duckdb database relation 296 """ 297 check_columns_in_sql(alias_cols.keys(), sql) 298 alias_cols = {col.lower(): alias for col, alias in alias_cols.items()} 299 300 sql_cols = ", ".join( 301 f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"' 302 for col in sql.columns 303 ) 304 305 return sql.select(f"{sql_cols}") 306 307 308def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation): 309 """ 310 Exclude columns in sql select 311 312 Args: 313 excluded_cols (list[str]): list of columns to exclude 314 sql (duckdb.DuckDBPyRelation): sql relation 315 316 Returns: 317 duckdb.DuckDBPyRelation: Duckdb database relation 318 """ 319 check_columns_in_sql(excluded_cols, sql) 320 321 str_cols = ", ".join(f'"{col}"' for col in excluded_cols) 322 sql_cols = f'* EXCLUDE ({str_cols})' 323 324 return sql.select(f"{sql_cols}") 325 326 327def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None, 328 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 329 table_or_view_name: str = None, col_id: str = None, as_view: bool = False, 330 overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation: 331 """ 332 Set new config to a Duckdb SQL Relation 333 334 Args: 335 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 336 cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry 337 cols_exclude (list[str]=None): list of columns to exclude 338 cols_alias (dict[str, str]=None): dictionary of columns aliases 339 cols_replace (dict[str, str]=None): dictionary of columns to replace 340 table_or_view_name (str=None): table or view name. If informed, create table or view 341 col_id (str=None): column primary key and index unique if create table 342 as_view (bool=False): create sql as view instead of table 343 overwrite (bool=False): overwrite table_name if exists 344 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view 345 346 Returns: 347 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 348 """ 349 if cols_exclude: 350 a_sql = exclude_cols_on_sql(cols_exclude, a_sql) 351 352 if cols_alias: 353 a_sql = rename_cols_on_sql(cols_alias, a_sql) 354 355 if cols_replace: 356 a_sql = replace_cols_on_sql(cols_replace, a_sql) 357 358 if cols_wkt: 359 a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql) 360 361 if table_or_view_name: 362 if not conn_db: 363 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 364 365 table_or_view_name = quote_name_duckdb(table_or_view_name) 366 if as_view: 367 a_sql.create_view(table_or_view_name, replace=overwrite) 368 else: 369 if overwrite: 370 conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}") 371 372 if col_id := (cols_alias or {}).get(col_id, col_id): 373 a_sql = a_sql.order(f'"{col_id}"') 374 375 a_sql.create(table_or_view_name) 376 377 if col_id: 378 n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx") 379 conn_db.execute( 380 f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")' 381 ) 382 383 a_sql = conn_db.sql(f"FROM {table_or_view_name}") 384 385 return a_sql 386 387 388def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None, 389 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 390 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, 391 conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation: 392 """ 393 Import csv file as table on duckdb 394 395 Args: 396 csv_path (str): path to csv file 397 table_or_view_name (str=None): table or view name. If informed, create table or view 398 header (bool=True): csv file has header 399 col_id (str=None): column primary key and index unique if create table 400 cols_wkt (list[str] = None): list of columns WKT to use as geometry 401 cols_exclude (list[str]=None): list of columns to exclude 402 cols_alias (dict[str, str]=None): dictionary of columns aliases 403 cols_replace (dict[str, str]=None): dictionary of columns to replace 404 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 405 as_view (bool=False): create table as view instead of table 406 overwrite (bool=False): overwrite table_name if exists 407 zipped (bool=False): compression type. If informed, use it 408 409 Returns: 410 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 411 """ 412 if not conn_db: 413 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 414 415 compression = f', compression={GZIP}' if zipped else '' 416 417 a_sql = conn_db.sql( 418 f""" 419 FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression}) 420 """ 421 ) 422 423 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 424 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 425 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 426 427 return a_sql 428 429 430def import_gdal_file_to_duckdb( 431 path_file: str, 432 table_or_view_name: str = None, 433 gdal_open_options: list[str] = None, 434 col_id: str = None, 435 cols_exclude: list[str] = None, 436 cols_alias: dict[str, str] = None, 437 cols_replace: dict[str, str] = None, 438 conn_db: duckdb.DuckDBPyConnection = None, 439 as_view: bool = False, 440 overwrite: bool = False, 441 **st_read_kwargs) -> duckdb.DuckDBPyRelation: 442 """ 443 Load GDAL file driver on duckdb database 444 445 Args: 446 path_file (str): path to GDAL file 447 table_or_view_name (str=None): table or view name. If informed, create table or view 448 gdal_open_options (list[str]=None): list of GDAL open options. 449 See GDAL documentation (https://gdal.org/drivers/vector/index.html) 450 (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude']) 451 col_id (str=None): column primary key and index unique if create table 452 cols_exclude (list[str]=None): list of columns to exclude 453 cols_alias (dict[str, str]=None): dictionary of columns aliases 454 cols_replace (dict[str, str]=None): dictionary of columns to replace 455 conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection 456 as_view (bool=False): create sql as view instead of table 457 overwrite (bool=False): overwrite table_name if exists 458 **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation 459 (https://duckdb.org/docs/extensions/spatial/functions.html#st_read) 460 461 Returns: 462 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 463 """ 464 params_st_read = f"'{parse_path(path_file)}'" 465 if gdal_open_options: 466 params_st_read = f"{params_st_read}, open_options={gdal_open_options}" 467 if st_read_kwargs: 468 params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}" 469 470 query = f""" 471 FROM ST_Read({params_st_read}) 472 """ 473 474 if not conn_db: 475 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 476 477 a_sql = conn_db.sql(query) 478 479 a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace, 480 table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view, 481 overwrite=overwrite, conn_db=conn_db) 482 483 return a_sql 484 485 486def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None, 487 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 488 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 489 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, 490 overwrite=False) -> duckdb.DuckDBPyRelation: 491 """ 492 Import DataFrame/GeoDataframe as table on duckdb 493 494 Args: 495 df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame 496 table_or_view_name (str=None): table or view name. If informed, create table or view 497 col_id (str=None): column primary key and index unique if create table 498 cols_wkt (list[str] = None): list of columns WKT to use as geometry 499 cols_exclude (list[str]=None): list of columns to exclude 500 cols_alias (dict[str, str]=None): dictionary of columns aliases 501 cols_replace (dict[str, str]=None): dictionary of columns to replace 502 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 503 as_view (bool=False): create table as view instead of table 504 overwrite (bool=False): overwrite table_name if exists 505 506 Returns: 507 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 508 """ 509 if not conn_db: 510 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 511 512 for col in (cols_geom := df_geometry_columns(df)): 513 df[col] = df[col].to_wkb(include_srid=True) 514 515 cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom] 516 alias_cols_geom = ', '.join(cols_as_wkb) 517 exclude_geom_cols = ', '.join(cols_geom) 518 519 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *" 520 a_sql = conn_db.sql(f""" 521 {select_expr} FROM df 522 """) 523 524 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 525 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 526 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 527 528 return a_sql 529 530 531def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None, 532 cols_geom: list[str] = None, 533 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 534 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 535 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, 536 **read_parquet_params) -> duckdb.DuckDBPyRelation: 537 """ 538 Import Parquet/Geoparquet file as table on duckdb 539 540 Args: 541 parquet_path (str): path to parquet/geoparquet file 542 table_or_view_name (str=None): table or view name. If informed, create table or view 543 col_id (str=None): column primary key and index unique if create table 544 cols_geom (list[str]=None): list of columns type geometry 545 cols_wkt (list[str] = None): list of columns WKT to use as geometry 546 cols_exclude (list[str]=None): list of columns to exclude 547 cols_alias (dict[str, str]=None): dictionary of columns aliases 548 cols_replace (dict[str, str]=None): dictionary of columns to replace 549 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 550 as_view (bool=False): create table as view instead of table 551 overwrite (bool=False): overwrite table_name if exists 552 **read_parquet_params (Any): read_parquet function kwargs. 553 See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters 554 555 Returns: 556 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 557 """ 558 if not conn_db: 559 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 560 561 select_expr = 'SELECT *' 562 if cols_geom: 563 alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom) 564 exclude_geom_cols = ', '.join(cols_geom) 565 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" 566 567 sql_read_parquet_params = '' 568 if read_parquet_params: 569 sql_read_parquet_params += ', ' 570 sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()]) 571 572 a_sql = conn_db.sql( 573 f""" 574 {select_expr} 575 FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params}) 576 """ 577 ) 578 579 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 580 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 581 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 582 583 return a_sql 584 585 586def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table: 587 """ 588 Filter ibis table 589 590 Args: 591 table (ibis.Table): The table to filter 592 sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table 593 594 Returns: 595 table (ibis.Table): The table filtered 596 """ 597 if isinstance(sql_or_ibis_filter, str): # Filter by SQL on duckdb backend 598 # Check if geospatial fields to cast as geometry 599 cols_geom = [col for col, fld in table.schema().fields.items() 600 if isinstance(fld, GeoSpatial)] 601 if cols_geom: 602 cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom]) 603 sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}" 604 else: 605 sql_select_cols = "*" 606 607 n_tab = table.get_name() 608 sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}" 609 get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}") 610 res = table.sql(sql_str) 611 else: 612 res = table.filter(sql_or_ibis_filter) 613 614 return res
35def set_current_db_path(db_path: str): 36 """ 37 Set used db path 38 39 Args: 40 db_path (str): path to duckdb database file 41 """ 42 global CURRENT_DB_PATH 43 CURRENT_DB_PATH = parse_path(db_path)
Set used db path
Arguments:
- db_path (str): path to duckdb database file
46def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False, 47 extensions: list[str] = None, 48 **connect_args) -> duckdb.DuckDBPyConnection: 49 """ 50 Get duckdb connection 51 52 Args: 53 db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH 54 as_current (bool=False): set db_path as current db path 55 no_cached (bool=False): not use cached connection 56 extensions (list[str]=None): list of extensions to load 57 **connect_args (dict): duckdb.connect args 58 59 Returns: 60 duckdb connection 61 """ 62 if not db_path: 63 if CURRENT_DB_PATH and not no_cached: 64 db_path = CURRENT_DB_PATH 65 else: 66 db_path = MEMORY_DDBB 67 68 parsed_path = parse_path(db_path) 69 k_path = parsed_path.lower() 70 if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)): 71 conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args) 72 73 if extensions: 74 for ext in extensions: 75 conn_db.install_extension(ext) 76 conn_db.load_extension(ext) 77 78 if as_current: 79 set_current_db_path(parsed_path) 80 81 return conn_db
Get duckdb connection
Arguments:
- db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
- as_current (bool=False): set db_path as current db path
- no_cached (bool=False): not use cached connection
- extensions (list[str]=None): list of extensions to load
- **connect_args (dict): duckdb.connect args
Returns:
duckdb connection
84def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True): 85 """ 86 Save duckdb database to dir path as parquets or csvs files 87 88 Args: 89 dir_db (str=None): Path to save database 90 duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. 91 If None, get connection default with get_duckdb_connection 92 parquet (bool=True): Save as parquet file 93 """ 94 create_dir(dir_db) 95 96 if not duck_db_conn: 97 duck_db_conn = get_duckdb_connection() 98 99 if parquet: 100 format_db = "(FORMAT PARQUET)" 101 else: 102 format_db = "(FORMAT CSV, COMPRESSION 'GZIP')" 103 104 duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}")
Save duckdb database to dir path as parquets or csvs files
Arguments:
- dir_db (str=None): Path to save database
- duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. If None, get connection default with get_duckdb_connection
- parquet (bool=True): Save as parquet file
107def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str: 108 """ 109 Get current schema 110 111 Args: 112 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 113 114 Returns: 115 current schema 116 """ 117 if not conn_db: 118 conn_db = get_duckdb_connection() 119 120 return conn_db.sql("SELECT current_schema()").fetchone()[0]
Get current schema
Arguments:
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:
current schema
123def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]: 124 """ 125 List tables in duckdb 126 127 Args: 128 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 129 schemas (tuple[str]=None): tuple schemas. If not informed, list all tables 130 131 Returns: 132 list of tables 133 """ 134 if not conn_db: 135 conn_db = get_duckdb_connection() 136 137 current_schema = current_schema_duckdb(conn_db) 138 139 def get_table_name(row): 140 schema = quote_name_duckdb(row['schema']) 141 table = quote_name_duckdb(row['name']) 142 return f"{schema}.{table}" if schema != current_schema else table 143 144 sql_tables = conn_db.sql(f"SHOW ALL TABLES") 145 if schemas: 146 sql_tables = sql_tables.filter(f"schema in {schemas}") 147 148 tables = [] 149 if sql_tables.count('name').fetchone()[0] > 0: 150 tables = sql_tables.df().apply(get_table_name, axis=1).tolist() 151 152 return tables
List tables in duckdb
Arguments:
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
- schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
Returns:
list of tables
155def quote_name_duckdb(object_sql_name: str) -> str: 156 """ 157 Quote name to use on duckdb if has spaces 158 159 Args: 160 object_sql_name (str): name to quote (table, view, schema, index, ...) 161 162 Returns: 163 quoted name (str) 164 """ 165 object_sql_name = object_sql_name.strip().replace('"', '') 166 167 res_name = '' 168 if '.' in object_sql_name: 169 schema, name_obj = object_sql_name.split('.') 170 res_name = f'"{schema}".' if " " in schema else f'{schema}.' 171 else: 172 name_obj = object_sql_name 173 174 name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj 175 176 res_name += name_obj 177 178 return res_name
Quote name to use on duckdb if has spaces
Arguments:
- object_sql_name (str): name to quote (table, view, schema, index, ...)
Returns:
quoted name (str)
181def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool: 182 """ 183 Check if table exists in duckdb 184 185 Args: 186 table_name (str): table name 187 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb 188 189 Returns: 190 bool: True if table exists 191 """ 192 if not conn_db: 193 conn_db = get_duckdb_connection() 194 195 return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db)
Check if table exists in duckdb
Arguments:
- table_name (str): table name
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:
bool: True if table exists
198def parse_path(path): 199 """ 200 Parse path to duckdb format 201 Args: 202 path (str): path to use on duckdb 203 204 Returns: 205 normalized path (str) 206 """ 207 normalize_path = os.path.normpath(path).replace('\\', '/') 208 return normalize_path
Parse path to duckdb format
Arguments:
- path (str): path to use on duckdb
Returns:
normalized path (str)
211def escape_name_table_view(name: str): 212 """ 213 Escape characters and lowercase name table/view to use on duckdb 214 Args: 215 name (str): name to use on duckdb 216 217 Returns: 218 normalized name (str) 219 """ 220 return name.replace( 221 '-', '_').replace( 222 '(', '').replace( 223 ')', '').lower()
Escape characters and lowercase name table/view to use on duckdb
Arguments:
- name (str): name to use on duckdb
Returns:
normalized name (str)
226def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation): 227 """ 228 Check columns in sql relation 229 230 Args: 231 cols_to_check (Iterable[str]): list of columns to check 232 sql (duckdb.DuckDBPyRelation): sql relation 233 234 Returns: 235 list[str]: list of columns to check 236 """ 237 cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check] 238 cols_sql = [col.lower() for col in sql.columns] 239 240 if not all(col in cols_sql for col in cols_to_check_lower): 241 raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}") 242 243 return cols_to_check
Check columns in sql relation
Arguments:
- cols_to_check (Iterable[str]): list of columns to check
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
list[str]: list of columns to check
246def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation): 247 """ 248 Set columns to GEOMETRY from WKT text 249 250 Args: 251 cols_wkt (list[str]): list of columns WKT to set as geometry 252 sql (duckdb.DuckDBPyRelation): sql relation 253 254 Returns: 255 duckdb.DuckDBPyRelation: Duckdb database relation 256 """ 257 check_columns_in_sql(cols_wkt, sql) 258 259 cols_wkt_lower = [col.lower() for col in cols_wkt] 260 sql_wkt_cols = { 261 col: f'ST_GeomFromText("{col}")' 262 for col in sql.columns if col.lower() in cols_wkt_lower 263 } 264 return replace_cols_on_sql(sql_wkt_cols, sql)
Set columns to GEOMETRY from WKT text
Arguments:
- cols_wkt (list[str]): list of columns WKT to set as geometry
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
267def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 268 """ 269 Replace columns by the passed sql function 270 271 Args: 272 replace_cols (dict[str]): dict of columns with the sql function to use instead 273 sql (duckdb.DuckDBPyRelation): sql relation 274 275 Returns: 276 duckdb.DuckDBPyRelation: Duckdb database relation 277 """ 278 check_columns_in_sql(replace_cols.keys(), sql) 279 280 sql_replace = ", ".join( 281 f'{sql_func} AS "{col}"' 282 for col, sql_func in replace_cols.items() 283 ) 284 return sql.select(f"* REPLACE ({sql_replace})")
Replace columns by the passed sql function
Arguments:
- replace_cols (dict[str]): dict of columns with the sql function to use instead
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
287def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation): 288 """ 289 Rename columns in sql select 290 291 Args: 292 alias_cols (dict[str]): dict of renamed columns with alias 293 sql (duckdb.DuckDBPyRelation): sql relation 294 295 Returns: 296 duckdb.DuckDBPyRelation: Duckdb database relation 297 """ 298 check_columns_in_sql(alias_cols.keys(), sql) 299 alias_cols = {col.lower(): alias for col, alias in alias_cols.items()} 300 301 sql_cols = ", ".join( 302 f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"' 303 for col in sql.columns 304 ) 305 306 return sql.select(f"{sql_cols}")
Rename columns in sql select
Arguments:
- alias_cols (dict[str]): dict of renamed columns with alias
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
309def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation): 310 """ 311 Exclude columns in sql select 312 313 Args: 314 excluded_cols (list[str]): list of columns to exclude 315 sql (duckdb.DuckDBPyRelation): sql relation 316 317 Returns: 318 duckdb.DuckDBPyRelation: Duckdb database relation 319 """ 320 check_columns_in_sql(excluded_cols, sql) 321 322 str_cols = ", ".join(f'"{col}"' for col in excluded_cols) 323 sql_cols = f'* EXCLUDE ({str_cols})' 324 325 return sql.select(f"{sql_cols}")
Exclude columns in sql select
Arguments:
- excluded_cols (list[str]): list of columns to exclude
- sql (duckdb.DuckDBPyRelation): sql relation
Returns:
duckdb.DuckDBPyRelation: Duckdb database relation
328def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None, 329 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 330 table_or_view_name: str = None, col_id: str = None, as_view: bool = False, 331 overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation: 332 """ 333 Set new config to a Duckdb SQL Relation 334 335 Args: 336 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 337 cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry 338 cols_exclude (list[str]=None): list of columns to exclude 339 cols_alias (dict[str, str]=None): dictionary of columns aliases 340 cols_replace (dict[str, str]=None): dictionary of columns to replace 341 table_or_view_name (str=None): table or view name. If informed, create table or view 342 col_id (str=None): column primary key and index unique if create table 343 as_view (bool=False): create sql as view instead of table 344 overwrite (bool=False): overwrite table_name if exists 345 conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view 346 347 Returns: 348 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 349 """ 350 if cols_exclude: 351 a_sql = exclude_cols_on_sql(cols_exclude, a_sql) 352 353 if cols_alias: 354 a_sql = rename_cols_on_sql(cols_alias, a_sql) 355 356 if cols_replace: 357 a_sql = replace_cols_on_sql(cols_replace, a_sql) 358 359 if cols_wkt: 360 a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql) 361 362 if table_or_view_name: 363 if not conn_db: 364 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 365 366 table_or_view_name = quote_name_duckdb(table_or_view_name) 367 if as_view: 368 a_sql.create_view(table_or_view_name, replace=overwrite) 369 else: 370 if overwrite: 371 conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}") 372 373 if col_id := (cols_alias or {}).get(col_id, col_id): 374 a_sql = a_sql.order(f'"{col_id}"') 375 376 a_sql.create(table_or_view_name) 377 378 if col_id: 379 n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx") 380 conn_db.execute( 381 f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")' 382 ) 383 384 a_sql = conn_db.sql(f"FROM {table_or_view_name}") 385 386 return a_sql
Set new config to a Duckdb SQL Relation
Arguments:
- a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
- cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- table_or_view_name (str=None): table or view name. If informed, create table or view
- col_id (str=None): column primary key and index unique if create table
- as_view (bool=False): create sql as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
389def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None, 390 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 391 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, 392 conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation: 393 """ 394 Import csv file as table on duckdb 395 396 Args: 397 csv_path (str): path to csv file 398 table_or_view_name (str=None): table or view name. If informed, create table or view 399 header (bool=True): csv file has header 400 col_id (str=None): column primary key and index unique if create table 401 cols_wkt (list[str] = None): list of columns WKT to use as geometry 402 cols_exclude (list[str]=None): list of columns to exclude 403 cols_alias (dict[str, str]=None): dictionary of columns aliases 404 cols_replace (dict[str, str]=None): dictionary of columns to replace 405 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 406 as_view (bool=False): create table as view instead of table 407 overwrite (bool=False): overwrite table_name if exists 408 zipped (bool=False): compression type. If informed, use it 409 410 Returns: 411 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 412 """ 413 if not conn_db: 414 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 415 416 compression = f', compression={GZIP}' if zipped else '' 417 418 a_sql = conn_db.sql( 419 f""" 420 FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression}) 421 """ 422 ) 423 424 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 425 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 426 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 427 428 return a_sql
Import csv file as table on duckdb
Arguments:
- csv_path (str): path to csv file
- table_or_view_name (str=None): table or view name. If informed, create table or view
- header (bool=True): csv file has header
- col_id (str=None): column primary key and index unique if create table
- cols_wkt (list[str] = None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
- as_view (bool=False): create table as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- zipped (bool=False): compression type. If informed, use it
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
431def import_gdal_file_to_duckdb( 432 path_file: str, 433 table_or_view_name: str = None, 434 gdal_open_options: list[str] = None, 435 col_id: str = None, 436 cols_exclude: list[str] = None, 437 cols_alias: dict[str, str] = None, 438 cols_replace: dict[str, str] = None, 439 conn_db: duckdb.DuckDBPyConnection = None, 440 as_view: bool = False, 441 overwrite: bool = False, 442 **st_read_kwargs) -> duckdb.DuckDBPyRelation: 443 """ 444 Load GDAL file driver on duckdb database 445 446 Args: 447 path_file (str): path to GDAL file 448 table_or_view_name (str=None): table or view name. If informed, create table or view 449 gdal_open_options (list[str]=None): list of GDAL open options. 450 See GDAL documentation (https://gdal.org/drivers/vector/index.html) 451 (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude']) 452 col_id (str=None): column primary key and index unique if create table 453 cols_exclude (list[str]=None): list of columns to exclude 454 cols_alias (dict[str, str]=None): dictionary of columns aliases 455 cols_replace (dict[str, str]=None): dictionary of columns to replace 456 conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection 457 as_view (bool=False): create sql as view instead of table 458 overwrite (bool=False): overwrite table_name if exists 459 **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation 460 (https://duckdb.org/docs/extensions/spatial/functions.html#st_read) 461 462 Returns: 463 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 464 """ 465 params_st_read = f"'{parse_path(path_file)}'" 466 if gdal_open_options: 467 params_st_read = f"{params_st_read}, open_options={gdal_open_options}" 468 if st_read_kwargs: 469 params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}" 470 471 query = f""" 472 FROM ST_Read({params_st_read}) 473 """ 474 475 if not conn_db: 476 conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True) 477 478 a_sql = conn_db.sql(query) 479 480 a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace, 481 table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view, 482 overwrite=overwrite, conn_db=conn_db) 483 484 return a_sql
Load GDAL file driver on duckdb database
Arguments:
- path_file (str): path to GDAL file
- table_or_view_name (str=None): table or view name. If informed, create table or view
- gdal_open_options (list[str]=None): list of GDAL open options. See GDAL documentation (https://gdal.org/drivers/vector/index.html) (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
- col_id (str=None): column primary key and index unique if create table
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
- as_view (bool=False): create sql as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
487def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None, 488 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 489 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 490 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, 491 overwrite=False) -> duckdb.DuckDBPyRelation: 492 """ 493 Import DataFrame/GeoDataframe as table on duckdb 494 495 Args: 496 df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame 497 table_or_view_name (str=None): table or view name. If informed, create table or view 498 col_id (str=None): column primary key and index unique if create table 499 cols_wkt (list[str] = None): list of columns WKT to use as geometry 500 cols_exclude (list[str]=None): list of columns to exclude 501 cols_alias (dict[str, str]=None): dictionary of columns aliases 502 cols_replace (dict[str, str]=None): dictionary of columns to replace 503 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 504 as_view (bool=False): create table as view instead of table 505 overwrite (bool=False): overwrite table_name if exists 506 507 Returns: 508 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 509 """ 510 if not conn_db: 511 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 512 513 for col in (cols_geom := df_geometry_columns(df)): 514 df[col] = df[col].to_wkb(include_srid=True) 515 516 cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom] 517 alias_cols_geom = ', '.join(cols_as_wkb) 518 exclude_geom_cols = ', '.join(cols_geom) 519 520 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *" 521 a_sql = conn_db.sql(f""" 522 {select_expr} FROM df 523 """) 524 525 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 526 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 527 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 528 529 return a_sql
Import DataFrame/GeoDataframe as table on duckdb
Arguments:
- df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
- table_or_view_name (str=None): table or view name. If informed, create table or view
- col_id (str=None): column primary key and index unique if create table
- cols_wkt (list[str] = None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
- as_view (bool=False): create table as view instead of table
- overwrite (bool=False): overwrite table_name if exists
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
532def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None, 533 cols_geom: list[str] = None, 534 cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, 535 cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, 536 as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, 537 **read_parquet_params) -> duckdb.DuckDBPyRelation: 538 """ 539 Import Parquet/Geoparquet file as table on duckdb 540 541 Args: 542 parquet_path (str): path to parquet/geoparquet file 543 table_or_view_name (str=None): table or view name. If informed, create table or view 544 col_id (str=None): column primary key and index unique if create table 545 cols_geom (list[str]=None): list of columns type geometry 546 cols_wkt (list[str] = None): list of columns WKT to use as geometry 547 cols_exclude (list[str]=None): list of columns to exclude 548 cols_alias (dict[str, str]=None): dictionary of columns aliases 549 cols_replace (dict[str, str]=None): dictionary of columns to replace 550 conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb 551 as_view (bool=False): create table as view instead of table 552 overwrite (bool=False): overwrite table_name if exists 553 **read_parquet_params (Any): read_parquet function kwargs. 554 See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters 555 556 Returns: 557 a_sql (duckdb.DuckDBPyRelation): Duckdb database relation 558 """ 559 if not conn_db: 560 conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True) 561 562 select_expr = 'SELECT *' 563 if cols_geom: 564 alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom) 565 exclude_geom_cols = ', '.join(cols_geom) 566 select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" 567 568 sql_read_parquet_params = '' 569 if read_parquet_params: 570 sql_read_parquet_params += ', ' 571 sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()]) 572 573 a_sql = conn_db.sql( 574 f""" 575 {select_expr} 576 FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params}) 577 """ 578 ) 579 580 a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias, 581 cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id, 582 as_view=as_view, overwrite=overwrite, conn_db=conn_db) 583 584 return a_sql
Import Parquet/Geoparquet file as table on duckdb
Arguments:
- parquet_path (str): path to parquet/geoparquet file
- table_or_view_name (str=None): table or view name. If informed, create table or view
- col_id (str=None): column primary key and index unique if create table
- cols_geom (list[str]=None): list of columns type geometry
- cols_wkt (list[str] = None): list of columns WKT to use as geometry
- cols_exclude (list[str]=None): list of columns to exclude
- cols_alias (dict[str, str]=None): dictionary of columns aliases
- cols_replace (dict[str, str]=None): dictionary of columns to replace
- conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
- as_view (bool=False): create table as view instead of table
- overwrite (bool=False): overwrite table_name if exists
- **read_parquet_params (Any): read_parquet function kwargs. See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
Returns:
a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
587def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table: 588 """ 589 Filter ibis table 590 591 Args: 592 table (ibis.Table): The table to filter 593 sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table 594 595 Returns: 596 table (ibis.Table): The table filtered 597 """ 598 if isinstance(sql_or_ibis_filter, str): # Filter by SQL on duckdb backend 599 # Check if geospatial fields to cast as geometry 600 cols_geom = [col for col, fld in table.schema().fields.items() 601 if isinstance(fld, GeoSpatial)] 602 if cols_geom: 603 cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom]) 604 sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}" 605 else: 606 sql_select_cols = "*" 607 608 n_tab = table.get_name() 609 sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}" 610 get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}") 611 res = table.sql(sql_str) 612 else: 613 res = table.filter(sql_or_ibis_filter) 614 615 return res
Filter ibis table
Arguments:
- table (ibis.Table): The table to filter
- sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
Returns:
table (ibis.Table): The table filtered