apb_duckdb_utils

Package apb_duckdb_utils

Modules to add functionality (spatial and generic) over duckdb

For all the functionality available requires GDAL library version 3.6<=3.10 and instant client Oracle installed.

To install:

pip install apb_duckdb_utils

Documentation here apb_duckdb_utils

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 
  5#  Copyright (c)
  6"""
  7.. include:: ../README.md
  8"""
  9from __future__ import annotations
 10
 11import os
 12import warnings
 13from typing import Iterable
 14
 15import duckdb
 16import ibis
 17from geopandas import GeoDataFrame
 18from ibis.expr.datatypes import GeoSpatial
 19from pandas import DataFrame
 20
 21from apb_extra_utils.misc import create_dir
 22from apb_extra_utils.utils_logging import get_base_logger
 23from apb_pandas_utils.geopandas_utils import df_geometry_columns
 24
 25# Suppress specific warning
 26warnings.filterwarnings("ignore", message="Geometry column does not contain geometry")
 27
 28MEMORY_DDBB = ':memory:'
 29CACHE_DUCK_DDBBS = {}
 30CURRENT_DB_PATH = None
 31GZIP = 'gzip'
 32SECRET_S3 = 'secret_s3'
 33
 34
 35def set_current_db_path(db_path: str):
 36    """
 37    Set used db path
 38
 39    Args:
 40        db_path (str): path to duckdb database file
 41    """
 42    global CURRENT_DB_PATH
 43    CURRENT_DB_PATH = parse_path(db_path)
 44
 45
 46def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False,
 47                          extensions: list[str] = None,
 48                          **connect_args) -> duckdb.DuckDBPyConnection:
 49    """
 50    Get duckdb connection
 51
 52    Args:
 53        db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
 54        as_current (bool=False): set db_path as current db path
 55        no_cached (bool=False): not use cached connection
 56        extensions (list[str]=None): list of extensions to load
 57        **connect_args (dict): duckdb.connect args 
 58
 59    Returns:
 60         duckdb connection
 61    """
 62    if not db_path:
 63        if CURRENT_DB_PATH and not no_cached:
 64            db_path = CURRENT_DB_PATH
 65        else:
 66            db_path = MEMORY_DDBB
 67
 68    parsed_path = parse_path(db_path)
 69    k_path = parsed_path.lower()
 70    if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)):
 71        conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args)
 72
 73    if extensions:
 74        for ext in extensions:
 75            conn_db.install_extension(ext)
 76            conn_db.load_extension(ext)
 77
 78    if as_current:
 79        set_current_db_path(parsed_path)
 80
 81    return conn_db
 82
 83
 84def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True):
 85    """
 86    Save duckdb database to dir path as parquets or csvs files
 87
 88    Args:
 89        dir_db (str=None): Path to save database
 90        duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection.
 91            If None, get connection default with get_duckdb_connection
 92        parquet (bool=True): Save as parquet file
 93    """
 94    create_dir(dir_db)
 95
 96    if not duck_db_conn:
 97        duck_db_conn = get_duckdb_connection()
 98
 99    if parquet:
100        format_db = "(FORMAT PARQUET)"
101    else:
102        format_db = "(FORMAT CSV, COMPRESSION 'GZIP')"
103
104    duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}")
105
106
107def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str:
108    """
109    Get current schema
110
111    Args:
112        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
113
114    Returns:
115        current schema
116    """
117    if not conn_db:
118        conn_db = get_duckdb_connection()
119
120    return conn_db.sql("SELECT current_schema()").fetchone()[0]
121
122
123def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]:
124    """
125    List tables in duckdb
126
127    Args:
128        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
129        schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
130
131    Returns:
132        list of tables
133    """
134    if not conn_db:
135        conn_db = get_duckdb_connection()
136
137    current_schema = current_schema_duckdb(conn_db)
138
139    def get_table_name(row):
140        """
141        Get table name with schema if not current schema
142
143        Args:
144            row:
145
146        Returns:
147            str
148        """
149        schema = quote_name_duckdb(row['schema'])
150        table = quote_name_duckdb(row['name'])
151        return f"{schema}.{table}" if schema != current_schema else table
152
153    sql_tables = conn_db.sql(f"SHOW ALL TABLES")
154    if schemas:
155        sql_tables = sql_tables.filter(f"schema in {schemas}")
156
157    tables = []
158    if sql_tables.count('name').fetchone()[0] > 0:
159        tables = sql_tables.df().apply(get_table_name, axis=1).tolist()
160
161    return tables
162
163
164def quote_name_duckdb(object_sql_name: str) -> str:
165    """
166    Quote name to use on duckdb if has spaces
167
168    Args:
169        object_sql_name (str): name to quote (table, view, schema, index, ...)
170
171    Returns:
172        quoted name (str)
173    """
174    object_sql_name = object_sql_name.strip().replace('"', '')
175
176    res_name = ''
177    if '.' in object_sql_name:
178        schema, name_obj = object_sql_name.split('.')
179        res_name = f'"{schema}".' if " " in schema else f'{schema}.'
180    else:
181        name_obj = object_sql_name
182
183    name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj
184
185    res_name += name_obj
186
187    return res_name
188
189
190def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool:
191    """
192    Check if table exists in duckdb
193
194    Args:
195        table_name (str): table name
196        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
197
198    Returns:
199        bool: True if table exists
200    """
201    if not conn_db:
202        conn_db = get_duckdb_connection()
203
204    return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db)
205
206
207def parse_path(path):
208    """
209    Parse path to duckdb format
210    Args:
211        path (str): path to use on duckdb
212
213    Returns:
214        normalized path (str)
215    """
216    if path.startswith('s3://'):
217        normalize_path = path
218    else:
219        normalize_path = os.path.normpath(path).replace('\\', '/')
220
221    return normalize_path
222
223
224def escape_name_table_view(name: str):
225    """
226    Escape characters and lowercase name table/view to use on duckdb
227    Args:
228        name (str): name to use on duckdb
229
230    Returns:
231        normalized name (str)
232    """
233    return name.replace(
234        '-', '_').replace(
235        '(', '').replace(
236        ')', '').lower()
237
238
239def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation):
240    """
241    Check columns in sql relation
242
243    Args:
244        cols_to_check (Iterable[str]): list of columns to check
245        sql (duckdb.DuckDBPyRelation): sql relation
246
247    Returns:
248        list[str]: list of columns to check
249    """
250    cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check]
251    cols_sql = [col.lower() for col in sql.columns]
252
253    if not all(col in cols_sql for col in cols_to_check_lower):
254        raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}")
255
256    return cols_to_check
257
258
259def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation):
260    """
261    Set columns to GEOMETRY from WKT text
262
263    Args:
264        cols_wkt (list[str]): list of columns WKT to set as geometry
265        sql (duckdb.DuckDBPyRelation): sql relation
266
267    Returns:
268        duckdb.DuckDBPyRelation: Duckdb database relation
269    """
270    check_columns_in_sql(cols_wkt, sql)
271
272    cols_wkt_lower = [col.lower() for col in cols_wkt]
273    sql_wkt_cols = {
274        col: f'ST_GeomFromText("{col}")'
275        for col in sql.columns if col.lower() in cols_wkt_lower
276    }
277    return replace_cols_on_sql(sql_wkt_cols, sql)
278
279
280def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
281    """
282    Replace columns by the passed sql function
283
284    Args:
285        replace_cols (dict[str]): dict of columns with the sql function to use instead
286        sql (duckdb.DuckDBPyRelation): sql relation
287
288    Returns:
289        duckdb.DuckDBPyRelation: Duckdb database relation
290    """
291    check_columns_in_sql(replace_cols.keys(), sql)
292
293    sql_replace = ", ".join(
294        f'{sql_func} AS "{col}"'
295        for col, sql_func in replace_cols.items()
296    )
297    return sql.select(f"* REPLACE ({sql_replace})")
298
299
300def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
301    """
302    Rename columns in sql select
303
304    Args:
305        alias_cols (dict[str]): dict of renamed columns with alias
306        sql (duckdb.DuckDBPyRelation): sql relation
307
308    Returns:
309        duckdb.DuckDBPyRelation: Duckdb database relation
310    """
311    check_columns_in_sql(alias_cols.keys(), sql)
312    alias_cols = {col.lower(): alias for col, alias in alias_cols.items()}
313
314    sql_cols = ", ".join(
315        f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"'
316        for col in sql.columns
317    )
318
319    return sql.select(f"{sql_cols}")
320
321
322def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation):
323    """
324    Exclude columns in sql select
325
326    Args:
327        excluded_cols (list[str]): list of columns to exclude
328        sql (duckdb.DuckDBPyRelation): sql relation
329
330    Returns:
331        duckdb.DuckDBPyRelation: Duckdb database relation
332    """
333    check_columns_in_sql(excluded_cols, sql)
334
335    str_cols = ", ".join(f'"{col}"' for col in excluded_cols)
336    sql_cols = f'* EXCLUDE ({str_cols})'
337
338    return sql.select(f"{sql_cols}")
339
340
341def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None,
342                      cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
343                      table_or_view_name: str = None, col_id: str = None, as_view: bool = False,
344                      overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation:
345    """
346    Set new config to a Duckdb SQL Relation
347
348    Args:
349        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
350        cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
351        cols_exclude (list[str]=None): list of columns to exclude
352        cols_alias (dict[str, str]=None): dictionary of columns aliases
353        cols_replace (dict[str, str]=None): dictionary of columns to replace
354        table_or_view_name (str=None): table or view name. If informed, create table or view
355        col_id (str=None): column primary key and index unique if create table
356        as_view (bool=False): create sql as view instead of table
357        overwrite (bool=False): overwrite table_name if exists
358        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
359
360    Returns:
361        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
362    """
363    if cols_exclude:
364        a_sql = exclude_cols_on_sql(cols_exclude, a_sql)
365
366    if cols_alias:
367        a_sql = rename_cols_on_sql(cols_alias, a_sql)
368
369    if cols_replace:
370        a_sql = replace_cols_on_sql(cols_replace, a_sql)
371
372    if cols_wkt:
373        a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql)
374
375    if table_or_view_name:
376        if not conn_db:
377            conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
378
379        table_or_view_name = quote_name_duckdb(table_or_view_name)
380        if as_view:
381            a_sql.create_view(table_or_view_name, replace=overwrite)
382        else:
383            if overwrite:
384                conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}")
385
386            if col_id := (cols_alias or {}).get(col_id, col_id):
387                a_sql = a_sql.order(f'"{col_id}"')
388
389            a_sql.create(table_or_view_name)
390
391            if col_id:
392                n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx")
393                conn_db.execute(
394                    f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")'
395                )
396
397        a_sql = conn_db.sql(f"FROM {table_or_view_name}")
398
399    return a_sql
400
401
402def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None,
403                         cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
404                         cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False,
405                         conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation:
406    """
407    Import csv file as table on duckdb
408
409    Args:
410        csv_path (str): path to csv file
411        table_or_view_name (str=None): table or view name. If informed, create table or view
412        header (bool=True): csv file has header
413        col_id (str=None): column primary key and index unique if create table
414        cols_wkt (list[str] = None): list of columns WKT to use as geometry
415        cols_exclude (list[str]=None): list of columns to exclude
416        cols_alias (dict[str, str]=None): dictionary of columns aliases
417        cols_replace (dict[str, str]=None): dictionary of columns to replace
418        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
419        as_view (bool=False): create table as view instead of table
420        overwrite (bool=False): overwrite table_name if exists
421        zipped (bool=False): compression type. If informed, use it
422
423    Returns:
424         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
425    """
426    if not conn_db:
427        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
428
429    compression = f', compression={GZIP}' if zipped else ''
430
431    a_sql = conn_db.sql(
432        f"""
433        FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression})
434        """
435    )
436
437    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
438                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
439                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
440
441    return a_sql
442
443
444def import_gdal_file_to_duckdb(
445        path_file: str,
446        table_or_view_name: str = None,
447        gdal_open_options: list[str] = None,
448        col_id: str = None,
449        cols_exclude: list[str] = None,
450        cols_alias: dict[str, str] = None,
451        cols_replace: dict[str, str] = None,
452        conn_db: duckdb.DuckDBPyConnection = None,
453        as_view: bool = False,
454        overwrite: bool = False,
455        **st_read_kwargs) -> duckdb.DuckDBPyRelation:
456    """
457    Load GDAL file driver on duckdb database
458
459    Args:
460        path_file (str): path to GDAL file
461        table_or_view_name (str=None): table or view name. If informed, create table or view
462        gdal_open_options (list[str]=None): list of GDAL open options.
463                    See GDAL documentation (https://gdal.org/drivers/vector/index.html)
464                    (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
465        col_id (str=None): column primary key and index unique if create table
466        cols_exclude (list[str]=None): list of columns to exclude
467        cols_alias (dict[str, str]=None): dictionary of columns aliases
468        cols_replace (dict[str, str]=None): dictionary of columns to replace
469        conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
470        as_view (bool=False): create sql as view instead of table
471        overwrite (bool=False): overwrite table_name if exists
472        **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation
473            (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
474
475    Returns:
476        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
477    """
478    params_st_read = f"'{parse_path(path_file)}'"
479    if gdal_open_options:
480        params_st_read = f"{params_st_read}, open_options={gdal_open_options}"
481    if st_read_kwargs:
482        params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}"
483
484    query = f"""
485        FROM ST_Read({params_st_read})
486        """
487
488    if not conn_db:
489        conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
490
491    a_sql = conn_db.sql(query)
492
493    a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace,
494                              table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view,
495                              overwrite=overwrite, conn_db=conn_db)
496
497    return a_sql
498
499
500def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None,
501                               cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
502                               cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
503                               as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None,
504                               overwrite=False) -> duckdb.DuckDBPyRelation:
505    """
506    Import DataFrame/GeoDataframe as table on duckdb
507
508    Args:
509        df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
510        table_or_view_name (str=None): table or view name. If informed, create table or view
511        col_id (str=None): column primary key and index unique if create table
512        cols_wkt (list[str] = None): list of columns WKT to use as geometry
513        cols_exclude (list[str]=None): list of columns to exclude
514        cols_alias (dict[str, str]=None): dictionary of columns aliases
515        cols_replace (dict[str, str]=None): dictionary of columns to replace
516        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
517        as_view (bool=False): create table as view instead of table
518        overwrite (bool=False): overwrite table_name if exists
519
520    Returns:
521         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
522    """
523    if not conn_db:
524        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
525
526    for col in (cols_geom := df_geometry_columns(df)):
527        df[col] = df[col].to_wkb(include_srid=True)
528
529    cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom]
530    alias_cols_geom = ', '.join(cols_as_wkb)
531    exclude_geom_cols = ', '.join(cols_geom)
532
533    select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *"
534    a_sql = conn_db.sql(f"""
535        {select_expr} FROM df
536        """)
537
538    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
539                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
540                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
541
542    return a_sql
543
544
545def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None,
546                             cols_geom: list[str] = None,
547                             cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
548                             cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
549                             as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False,
550                             **read_parquet_params) -> duckdb.DuckDBPyRelation:
551    """
552    Import Parquet/Geoparquet file as table on duckdb
553
554    Args:
555        parquet_path (str): path to parquet/geoparquet file
556        table_or_view_name (str=None): table or view name. If informed, create table or view
557        col_id (str=None): column primary key and index unique if create table
558        cols_geom (list[str]=None): list of columns type geometry
559        cols_wkt (list[str] = None): list of columns WKT to use as geometry
560        cols_exclude (list[str]=None): list of columns to exclude
561        cols_alias (dict[str, str]=None): dictionary of columns aliases
562        cols_replace (dict[str, str]=None): dictionary of columns to replace
563        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
564        as_view (bool=False): create table as view instead of table
565        overwrite (bool=False): overwrite table_name if exists
566        **read_parquet_params (Any): read_parquet function kwargs.
567                See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
568
569    Returns:
570         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
571    """
572    if not conn_db:
573        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
574
575    select_expr = 'SELECT *'
576    if cols_geom:
577        alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom)
578        exclude_geom_cols = ', '.join(cols_geom)
579        select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}"
580
581    sql_read_parquet_params = ''
582    if read_parquet_params:
583        sql_read_parquet_params += ', '
584        sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()])
585
586    a_sql = conn_db.sql(
587        f"""
588        {select_expr}
589        FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params})
590        """
591    )
592
593    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
594                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
595                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
596
597    return a_sql
598
599
600def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table:
601    """
602    Filter ibis table
603
604    Args:
605        table (ibis.Table): The table to filter
606        sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
607
608    Returns:
609        table (ibis.Table): The table filtered
610    """
611    if isinstance(sql_or_ibis_filter, str):  # Filter by SQL on duckdb backend
612        # Check if geospatial fields to cast as geometry
613        cols_geom = [col for col, fld in table.schema().fields.items()
614                     if isinstance(fld, GeoSpatial)]
615        if cols_geom:
616            cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom])
617            sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}"
618        else:
619            sql_select_cols = "*"
620
621        n_tab = table.get_name()
622        sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}"
623        get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}")
624        res = table.sql(sql_str)
625    else:
626        res = table.filter(sql_or_ibis_filter)
627
628    return res
629
630
631def set_secret_s3_storage(duckdb_conn: duckdb.DuckDBPyConnection, endpoint: str, access_key_id: str,
632                          secret_access_key: str, url_style: str = 'path', use_ssl: bool = False, region: str = None,
633                          secret_name: str = SECRET_S3) -> bool:
634    """
635    Set secret S3 storage on duckdb connection
636
637    Args:
638        duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
639        endpoint (str): endpoint url. (e.g. 's3.amazonaws.com')
640        access_key_id (str): access key id
641        secret_access_key (str): secret access key
642        url_style (str='path'): url style. 'path' or 'virtual_hosted'
643        use_ssl (bool=False): use ssl
644        region (str=None): region name
645        secret_name (str=SECRET_S3): secret name
646
647    Returns:
648        ok
649    """
650    ok = False
651    try:
652        duckdb_conn.execute(f"""
653        CREATE OR REPLACE SECRET {secret_name} (
654            TYPE S3,
655            KEY_ID '{access_key_id}',
656            SECRET '{secret_access_key}',
657            ENDPOINT '{endpoint}',
658            URL_STYLE '{url_style}',
659            USE_SSL {use_ssl},
660            REGION '{region}'
661        );
662        """)
663        ok = True
664    except Exception as e:
665        get_base_logger().error(f"Error setting S3 access keys: {e}")
666
667    return ok
668
669
670def exists_secret(duckdb_conn: duckdb.DuckDBPyConnection, secret_name: str = SECRET_S3) -> bool:
671    """
672    Check if secret exists on duckdb connection
673
674    Args:
675        duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
676        secret_name (str=SECRET_S3): secret name
677
678    Returns:
679        bool: True if secret exists
680    """
681    exists = False
682    try:
683        exists = duckdb_conn.execute(
684            f"SELECT COUNT(*) > 0 AS existe FROM duckdb_secrets() WHERE name = '{secret_name}'").fetchone()[0]
685    except Exception as e:
686        get_base_logger().error(f"Error checking secret existence: {e}")
687
688    return exists
MEMORY_DDBB = ':memory:'
CACHE_DUCK_DDBBS = {}
CURRENT_DB_PATH = None
GZIP = 'gzip'
SECRET_S3 = 'secret_s3'
def set_current_db_path(db_path: str):
36def set_current_db_path(db_path: str):
37    """
38    Set used db path
39
40    Args:
41        db_path (str): path to duckdb database file
42    """
43    global CURRENT_DB_PATH
44    CURRENT_DB_PATH = parse_path(db_path)

Set used db path

Arguments:
  • db_path (str): path to duckdb database file
def get_duckdb_connection( db_path: str = None, as_current: bool = False, no_cached: bool = False, extensions: list[str] = None, **connect_args) -> _duckdb.DuckDBPyConnection:
47def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False,
48                          extensions: list[str] = None,
49                          **connect_args) -> duckdb.DuckDBPyConnection:
50    """
51    Get duckdb connection
52
53    Args:
54        db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
55        as_current (bool=False): set db_path as current db path
56        no_cached (bool=False): not use cached connection
57        extensions (list[str]=None): list of extensions to load
58        **connect_args (dict): duckdb.connect args 
59
60    Returns:
61         duckdb connection
62    """
63    if not db_path:
64        if CURRENT_DB_PATH and not no_cached:
65            db_path = CURRENT_DB_PATH
66        else:
67            db_path = MEMORY_DDBB
68
69    parsed_path = parse_path(db_path)
70    k_path = parsed_path.lower()
71    if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)):
72        conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args)
73
74    if extensions:
75        for ext in extensions:
76            conn_db.install_extension(ext)
77            conn_db.load_extension(ext)
78
79    if as_current:
80        set_current_db_path(parsed_path)
81
82    return conn_db

Get duckdb connection

Arguments:
  • db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
  • as_current (bool=False): set db_path as current db path
  • no_cached (bool=False): not use cached connection
  • extensions (list[str]=None): list of extensions to load
  • **connect_args (dict): duckdb.connect args
Returns:

duckdb connection

def export_database( dir_db: str, duck_db_conn: _duckdb.DuckDBPyConnection = None, parquet: bool = True):
 85def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True):
 86    """
 87    Save duckdb database to dir path as parquets or csvs files
 88
 89    Args:
 90        dir_db (str=None): Path to save database
 91        duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection.
 92            If None, get connection default with get_duckdb_connection
 93        parquet (bool=True): Save as parquet file
 94    """
 95    create_dir(dir_db)
 96
 97    if not duck_db_conn:
 98        duck_db_conn = get_duckdb_connection()
 99
100    if parquet:
101        format_db = "(FORMAT PARQUET)"
102    else:
103        format_db = "(FORMAT CSV, COMPRESSION 'GZIP')"
104
105    duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}")

Save duckdb database to dir path as parquets or csvs files

Arguments:
  • dir_db (str=None): Path to save database
  • duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. If None, get connection default with get_duckdb_connection
  • parquet (bool=True): Save as parquet file
def current_schema_duckdb(conn_db: _duckdb.DuckDBPyConnection = None) -> str:
108def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str:
109    """
110    Get current schema
111
112    Args:
113        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
114
115    Returns:
116        current schema
117    """
118    if not conn_db:
119        conn_db = get_duckdb_connection()
120
121    return conn_db.sql("SELECT current_schema()").fetchone()[0]

Get current schema

Arguments:
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:

current schema

def list_tables_duckdb( conn_db: _duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]:
124def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]:
125    """
126    List tables in duckdb
127
128    Args:
129        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
130        schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
131
132    Returns:
133        list of tables
134    """
135    if not conn_db:
136        conn_db = get_duckdb_connection()
137
138    current_schema = current_schema_duckdb(conn_db)
139
140    def get_table_name(row):
141        """
142        Get table name with schema if not current schema
143
144        Args:
145            row:
146
147        Returns:
148            str
149        """
150        schema = quote_name_duckdb(row['schema'])
151        table = quote_name_duckdb(row['name'])
152        return f"{schema}.{table}" if schema != current_schema else table
153
154    sql_tables = conn_db.sql(f"SHOW ALL TABLES")
155    if schemas:
156        sql_tables = sql_tables.filter(f"schema in {schemas}")
157
158    tables = []
159    if sql_tables.count('name').fetchone()[0] > 0:
160        tables = sql_tables.df().apply(get_table_name, axis=1).tolist()
161
162    return tables

List tables in duckdb

Arguments:
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
  • schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
Returns:

list of tables

def quote_name_duckdb(object_sql_name: str) -> str:
165def quote_name_duckdb(object_sql_name: str) -> str:
166    """
167    Quote name to use on duckdb if has spaces
168
169    Args:
170        object_sql_name (str): name to quote (table, view, schema, index, ...)
171
172    Returns:
173        quoted name (str)
174    """
175    object_sql_name = object_sql_name.strip().replace('"', '')
176
177    res_name = ''
178    if '.' in object_sql_name:
179        schema, name_obj = object_sql_name.split('.')
180        res_name = f'"{schema}".' if " " in schema else f'{schema}.'
181    else:
182        name_obj = object_sql_name
183
184    name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj
185
186    res_name += name_obj
187
188    return res_name

Quote name to use on duckdb if has spaces

Arguments:
  • object_sql_name (str): name to quote (table, view, schema, index, ...)
Returns:

quoted name (str)

def exists_table_duckdb(table_name: str, conn_db: _duckdb.DuckDBPyConnection = None) -> bool:
191def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool:
192    """
193    Check if table exists in duckdb
194
195    Args:
196        table_name (str): table name
197        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
198
199    Returns:
200        bool: True if table exists
201    """
202    if not conn_db:
203        conn_db = get_duckdb_connection()
204
205    return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db)

Check if table exists in duckdb

Arguments:
  • table_name (str): table name
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:

bool: True if table exists

def parse_path(path):
208def parse_path(path):
209    """
210    Parse path to duckdb format
211    Args:
212        path (str): path to use on duckdb
213
214    Returns:
215        normalized path (str)
216    """
217    if path.startswith('s3://'):
218        normalize_path = path
219    else:
220        normalize_path = os.path.normpath(path).replace('\\', '/')
221
222    return normalize_path

Parse path to duckdb format

Arguments:
  • path (str): path to use on duckdb
Returns:

normalized path (str)

def escape_name_table_view(name: str):
225def escape_name_table_view(name: str):
226    """
227    Escape characters and lowercase name table/view to use on duckdb
228    Args:
229        name (str): name to use on duckdb
230
231    Returns:
232        normalized name (str)
233    """
234    return name.replace(
235        '-', '_').replace(
236        '(', '').replace(
237        ')', '').lower()

Escape characters and lowercase name table/view to use on duckdb

Arguments:
  • name (str): name to use on duckdb
Returns:

normalized name (str)

def check_columns_in_sql(cols_to_check: Iterable[str], sql: _duckdb.DuckDBPyRelation):
240def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation):
241    """
242    Check columns in sql relation
243
244    Args:
245        cols_to_check (Iterable[str]): list of columns to check
246        sql (duckdb.DuckDBPyRelation): sql relation
247
248    Returns:
249        list[str]: list of columns to check
250    """
251    cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check]
252    cols_sql = [col.lower() for col in sql.columns]
253
254    if not all(col in cols_sql for col in cols_to_check_lower):
255        raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}")
256
257    return cols_to_check

Check columns in sql relation

Arguments:
  • cols_to_check (Iterable[str]): list of columns to check
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

list[str]: list of columns to check

def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: _duckdb.DuckDBPyRelation):
260def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation):
261    """
262    Set columns to GEOMETRY from WKT text
263
264    Args:
265        cols_wkt (list[str]): list of columns WKT to set as geometry
266        sql (duckdb.DuckDBPyRelation): sql relation
267
268    Returns:
269        duckdb.DuckDBPyRelation: Duckdb database relation
270    """
271    check_columns_in_sql(cols_wkt, sql)
272
273    cols_wkt_lower = [col.lower() for col in cols_wkt]
274    sql_wkt_cols = {
275        col: f'ST_GeomFromText("{col}")'
276        for col in sql.columns if col.lower() in cols_wkt_lower
277    }
278    return replace_cols_on_sql(sql_wkt_cols, sql)

Set columns to GEOMETRY from WKT text

Arguments:
  • cols_wkt (list[str]): list of columns WKT to set as geometry
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def replace_cols_on_sql(replace_cols: dict[str, str], sql: _duckdb.DuckDBPyRelation):
281def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
282    """
283    Replace columns by the passed sql function
284
285    Args:
286        replace_cols (dict[str]): dict of columns with the sql function to use instead
287        sql (duckdb.DuckDBPyRelation): sql relation
288
289    Returns:
290        duckdb.DuckDBPyRelation: Duckdb database relation
291    """
292    check_columns_in_sql(replace_cols.keys(), sql)
293
294    sql_replace = ", ".join(
295        f'{sql_func} AS "{col}"'
296        for col, sql_func in replace_cols.items()
297    )
298    return sql.select(f"* REPLACE ({sql_replace})")

Replace columns by the passed sql function

Arguments:
  • replace_cols (dict[str]): dict of columns with the sql function to use instead
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def rename_cols_on_sql(alias_cols: dict[str, str], sql: _duckdb.DuckDBPyRelation):
301def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
302    """
303    Rename columns in sql select
304
305    Args:
306        alias_cols (dict[str]): dict of renamed columns with alias
307        sql (duckdb.DuckDBPyRelation): sql relation
308
309    Returns:
310        duckdb.DuckDBPyRelation: Duckdb database relation
311    """
312    check_columns_in_sql(alias_cols.keys(), sql)
313    alias_cols = {col.lower(): alias for col, alias in alias_cols.items()}
314
315    sql_cols = ", ".join(
316        f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"'
317        for col in sql.columns
318    )
319
320    return sql.select(f"{sql_cols}")

Rename columns in sql select

Arguments:
  • alias_cols (dict[str]): dict of renamed columns with alias
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def exclude_cols_on_sql(excluded_cols: list[str], sql: _duckdb.DuckDBPyRelation):
323def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation):
324    """
325    Exclude columns in sql select
326
327    Args:
328        excluded_cols (list[str]): list of columns to exclude
329        sql (duckdb.DuckDBPyRelation): sql relation
330
331    Returns:
332        duckdb.DuckDBPyRelation: Duckdb database relation
333    """
334    check_columns_in_sql(excluded_cols, sql)
335
336    str_cols = ", ".join(f'"{col}"' for col in excluded_cols)
337    sql_cols = f'* EXCLUDE ({str_cols})'
338
339    return sql.select(f"{sql_cols}")

Exclude columns in sql select

Arguments:
  • excluded_cols (list[str]): list of columns to exclude
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def config_sql_duckdb( a_sql: _duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, table_or_view_name: str = None, col_id: str = None, as_view: bool = False, overwrite: bool = False, conn_db=None) -> _duckdb.DuckDBPyRelation:
342def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None,
343                      cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
344                      table_or_view_name: str = None, col_id: str = None, as_view: bool = False,
345                      overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation:
346    """
347    Set new config to a Duckdb SQL Relation
348
349    Args:
350        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
351        cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
352        cols_exclude (list[str]=None): list of columns to exclude
353        cols_alias (dict[str, str]=None): dictionary of columns aliases
354        cols_replace (dict[str, str]=None): dictionary of columns to replace
355        table_or_view_name (str=None): table or view name. If informed, create table or view
356        col_id (str=None): column primary key and index unique if create table
357        as_view (bool=False): create sql as view instead of table
358        overwrite (bool=False): overwrite table_name if exists
359        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
360
361    Returns:
362        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
363    """
364    if cols_exclude:
365        a_sql = exclude_cols_on_sql(cols_exclude, a_sql)
366
367    if cols_alias:
368        a_sql = rename_cols_on_sql(cols_alias, a_sql)
369
370    if cols_replace:
371        a_sql = replace_cols_on_sql(cols_replace, a_sql)
372
373    if cols_wkt:
374        a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql)
375
376    if table_or_view_name:
377        if not conn_db:
378            conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
379
380        table_or_view_name = quote_name_duckdb(table_or_view_name)
381        if as_view:
382            a_sql.create_view(table_or_view_name, replace=overwrite)
383        else:
384            if overwrite:
385                conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}")
386
387            if col_id := (cols_alias or {}).get(col_id, col_id):
388                a_sql = a_sql.order(f'"{col_id}"')
389
390            a_sql.create(table_or_view_name)
391
392            if col_id:
393                n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx")
394                conn_db.execute(
395                    f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")'
396                )
397
398        a_sql = conn_db.sql(f"FROM {table_or_view_name}")
399
400    return a_sql

Set new config to a Duckdb SQL Relation

Arguments:
  • a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
  • cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • col_id (str=None): column primary key and index unique if create table
  • as_view (bool=False): create sql as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_csv_to_duckdb( csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None, cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, conn_db: _duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> _duckdb.DuckDBPyRelation:
403def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None,
404                         cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
405                         cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False,
406                         conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation:
407    """
408    Import csv file as table on duckdb
409
410    Args:
411        csv_path (str): path to csv file
412        table_or_view_name (str=None): table or view name. If informed, create table or view
413        header (bool=True): csv file has header
414        col_id (str=None): column primary key and index unique if create table
415        cols_wkt (list[str] = None): list of columns WKT to use as geometry
416        cols_exclude (list[str]=None): list of columns to exclude
417        cols_alias (dict[str, str]=None): dictionary of columns aliases
418        cols_replace (dict[str, str]=None): dictionary of columns to replace
419        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
420        as_view (bool=False): create table as view instead of table
421        overwrite (bool=False): overwrite table_name if exists
422        zipped (bool=False): compression type. If informed, use it
423
424    Returns:
425         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
426    """
427    if not conn_db:
428        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
429
430    compression = f', compression={GZIP}' if zipped else ''
431
432    a_sql = conn_db.sql(
433        f"""
434        FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression})
435        """
436    )
437
438    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
439                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
440                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
441
442    return a_sql

Import csv file as table on duckdb

Arguments:
  • csv_path (str): path to csv file
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • header (bool=True): csv file has header
  • col_id (str=None): column primary key and index unique if create table
  • cols_wkt (list[str] = None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
  • as_view (bool=False): create table as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • zipped (bool=False): compression type. If informed, use it
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_gdal_file_to_duckdb( path_file: str, table_or_view_name: str = None, gdal_open_options: list[str] = None, col_id: str = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, conn_db: _duckdb.DuckDBPyConnection = None, as_view: bool = False, overwrite: bool = False, **st_read_kwargs) -> _duckdb.DuckDBPyRelation:
445def import_gdal_file_to_duckdb(
446        path_file: str,
447        table_or_view_name: str = None,
448        gdal_open_options: list[str] = None,
449        col_id: str = None,
450        cols_exclude: list[str] = None,
451        cols_alias: dict[str, str] = None,
452        cols_replace: dict[str, str] = None,
453        conn_db: duckdb.DuckDBPyConnection = None,
454        as_view: bool = False,
455        overwrite: bool = False,
456        **st_read_kwargs) -> duckdb.DuckDBPyRelation:
457    """
458    Load GDAL file driver on duckdb database
459
460    Args:
461        path_file (str): path to GDAL file
462        table_or_view_name (str=None): table or view name. If informed, create table or view
463        gdal_open_options (list[str]=None): list of GDAL open options.
464                    See GDAL documentation (https://gdal.org/drivers/vector/index.html)
465                    (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
466        col_id (str=None): column primary key and index unique if create table
467        cols_exclude (list[str]=None): list of columns to exclude
468        cols_alias (dict[str, str]=None): dictionary of columns aliases
469        cols_replace (dict[str, str]=None): dictionary of columns to replace
470        conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
471        as_view (bool=False): create sql as view instead of table
472        overwrite (bool=False): overwrite table_name if exists
473        **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation
474            (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
475
476    Returns:
477        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
478    """
479    params_st_read = f"'{parse_path(path_file)}'"
480    if gdal_open_options:
481        params_st_read = f"{params_st_read}, open_options={gdal_open_options}"
482    if st_read_kwargs:
483        params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}"
484
485    query = f"""
486        FROM ST_Read({params_st_read})
487        """
488
489    if not conn_db:
490        conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
491
492    a_sql = conn_db.sql(query)
493
494    a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace,
495                              table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view,
496                              overwrite=overwrite, conn_db=conn_db)
497
498    return a_sql

Load GDAL file driver on duckdb database

Arguments:
  • path_file (str): path to GDAL file
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • gdal_open_options (list[str]=None): list of GDAL open options. See GDAL documentation (https://gdal.org/drivers/vector/index.html) (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
  • col_id (str=None): column primary key and index unique if create table
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
  • as_view (bool=False): create sql as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_dataframe_to_duckdb( df: pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame, table_or_view_name: str = None, col_id: str = None, cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, conn_db: _duckdb.DuckDBPyConnection = None, overwrite=False) -> _duckdb.DuckDBPyRelation:
501def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None,
502                               cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
503                               cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
504                               as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None,
505                               overwrite=False) -> duckdb.DuckDBPyRelation:
506    """
507    Import DataFrame/GeoDataframe as table on duckdb
508
509    Args:
510        df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
511        table_or_view_name (str=None): table or view name. If informed, create table or view
512        col_id (str=None): column primary key and index unique if create table
513        cols_wkt (list[str] = None): list of columns WKT to use as geometry
514        cols_exclude (list[str]=None): list of columns to exclude
515        cols_alias (dict[str, str]=None): dictionary of columns aliases
516        cols_replace (dict[str, str]=None): dictionary of columns to replace
517        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
518        as_view (bool=False): create table as view instead of table
519        overwrite (bool=False): overwrite table_name if exists
520
521    Returns:
522         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
523    """
524    if not conn_db:
525        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
526
527    for col in (cols_geom := df_geometry_columns(df)):
528        df[col] = df[col].to_wkb(include_srid=True)
529
530    cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom]
531    alias_cols_geom = ', '.join(cols_as_wkb)
532    exclude_geom_cols = ', '.join(cols_geom)
533
534    select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *"
535    a_sql = conn_db.sql(f"""
536        {select_expr} FROM df
537        """)
538
539    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
540                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
541                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
542
543    return a_sql

Import DataFrame/GeoDataframe as table on duckdb

Arguments:
  • df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • col_id (str=None): column primary key and index unique if create table
  • cols_wkt (list[str] = None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
  • as_view (bool=False): create table as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_parquet_to_duckdb( parquet_path: str, table_or_view_name: str = None, col_id: str = None, cols_geom: list[str] = None, cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, conn_db: _duckdb.DuckDBPyConnection = None, overwrite=False, **read_parquet_params) -> _duckdb.DuckDBPyRelation:
546def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None,
547                             cols_geom: list[str] = None,
548                             cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
549                             cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
550                             as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False,
551                             **read_parquet_params) -> duckdb.DuckDBPyRelation:
552    """
553    Import Parquet/Geoparquet file as table on duckdb
554
555    Args:
556        parquet_path (str): path to parquet/geoparquet file
557        table_or_view_name (str=None): table or view name. If informed, create table or view
558        col_id (str=None): column primary key and index unique if create table
559        cols_geom (list[str]=None): list of columns type geometry
560        cols_wkt (list[str] = None): list of columns WKT to use as geometry
561        cols_exclude (list[str]=None): list of columns to exclude
562        cols_alias (dict[str, str]=None): dictionary of columns aliases
563        cols_replace (dict[str, str]=None): dictionary of columns to replace
564        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
565        as_view (bool=False): create table as view instead of table
566        overwrite (bool=False): overwrite table_name if exists
567        **read_parquet_params (Any): read_parquet function kwargs.
568                See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
569
570    Returns:
571         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
572    """
573    if not conn_db:
574        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
575
576    select_expr = 'SELECT *'
577    if cols_geom:
578        alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom)
579        exclude_geom_cols = ', '.join(cols_geom)
580        select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}"
581
582    sql_read_parquet_params = ''
583    if read_parquet_params:
584        sql_read_parquet_params += ', '
585        sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()])
586
587    a_sql = conn_db.sql(
588        f"""
589        {select_expr}
590        FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params})
591        """
592    )
593
594    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
595                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
596                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
597
598    return a_sql

Import Parquet/Geoparquet file as table on duckdb

Arguments:
  • parquet_path (str): path to parquet/geoparquet file
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • col_id (str=None): column primary key and index unique if create table
  • cols_geom (list[str]=None): list of columns type geometry
  • cols_wkt (list[str] = None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
  • as_view (bool=False): create table as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • **read_parquet_params (Any): read_parquet function kwargs. See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def filter_ibis_table( table: ibis.expr.types.relations.Table, sql_or_ibis_filter: 'str | ibis.expr') -> ibis.expr.types.relations.Table:
601def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table:
602    """
603    Filter ibis table
604
605    Args:
606        table (ibis.Table): The table to filter
607        sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
608
609    Returns:
610        table (ibis.Table): The table filtered
611    """
612    if isinstance(sql_or_ibis_filter, str):  # Filter by SQL on duckdb backend
613        # Check if geospatial fields to cast as geometry
614        cols_geom = [col for col, fld in table.schema().fields.items()
615                     if isinstance(fld, GeoSpatial)]
616        if cols_geom:
617            cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom])
618            sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}"
619        else:
620            sql_select_cols = "*"
621
622        n_tab = table.get_name()
623        sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}"
624        get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}")
625        res = table.sql(sql_str)
626    else:
627        res = table.filter(sql_or_ibis_filter)
628
629    return res

Filter ibis table

Arguments:
  • table (ibis.Table): The table to filter
  • sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
Returns:

table (ibis.Table): The table filtered

def set_secret_s3_storage( duckdb_conn: _duckdb.DuckDBPyConnection, endpoint: str, access_key_id: str, secret_access_key: str, url_style: str = 'path', use_ssl: bool = False, region: str = None, secret_name: str = 'secret_s3') -> bool:
632def set_secret_s3_storage(duckdb_conn: duckdb.DuckDBPyConnection, endpoint: str, access_key_id: str,
633                          secret_access_key: str, url_style: str = 'path', use_ssl: bool = False, region: str = None,
634                          secret_name: str = SECRET_S3) -> bool:
635    """
636    Set secret S3 storage on duckdb connection
637
638    Args:
639        duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
640        endpoint (str): endpoint url. (e.g. 's3.amazonaws.com')
641        access_key_id (str): access key id
642        secret_access_key (str): secret access key
643        url_style (str='path'): url style. 'path' or 'virtual_hosted'
644        use_ssl (bool=False): use ssl
645        region (str=None): region name
646        secret_name (str=SECRET_S3): secret name
647
648    Returns:
649        ok
650    """
651    ok = False
652    try:
653        duckdb_conn.execute(f"""
654        CREATE OR REPLACE SECRET {secret_name} (
655            TYPE S3,
656            KEY_ID '{access_key_id}',
657            SECRET '{secret_access_key}',
658            ENDPOINT '{endpoint}',
659            URL_STYLE '{url_style}',
660            USE_SSL {use_ssl},
661            REGION '{region}'
662        );
663        """)
664        ok = True
665    except Exception as e:
666        get_base_logger().error(f"Error setting S3 access keys: {e}")
667
668    return ok

Set secret S3 storage on duckdb connection

Arguments:
  • duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
  • endpoint (str): endpoint url. (e.g. 's3.amazonaws.com')
  • access_key_id (str): access key id
  • secret_access_key (str): secret access key
  • url_style (str='path'): url style. 'path' or 'virtual_hosted'
  • use_ssl (bool=False): use ssl
  • region (str=None): region name
  • secret_name (str=SECRET_S3): secret name
Returns:

ok

def exists_secret( duckdb_conn: _duckdb.DuckDBPyConnection, secret_name: str = 'secret_s3') -> bool:
671def exists_secret(duckdb_conn: duckdb.DuckDBPyConnection, secret_name: str = SECRET_S3) -> bool:
672    """
673    Check if secret exists on duckdb connection
674
675    Args:
676        duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
677        secret_name (str=SECRET_S3): secret name
678
679    Returns:
680        bool: True if secret exists
681    """
682    exists = False
683    try:
684        exists = duckdb_conn.execute(
685            f"SELECT COUNT(*) > 0 AS existe FROM duckdb_secrets() WHERE name = '{secret_name}'").fetchone()[0]
686    except Exception as e:
687        get_base_logger().error(f"Error checking secret existence: {e}")
688
689    return exists

Check if secret exists on duckdb connection

Arguments:
  • duckdb_conn (duckdb.DuckDBPyConnection): duckdb connection
  • secret_name (str=SECRET_S3): secret name
Returns:

bool: True if secret exists