apb_duckdb_utils

Package apb_duckdb_utils

Modules to add functionality (spatial and generic) over duckdb

For all the functionality available requires GDAL library version 3.6<=3.9 and instant client Oracle installed.

To install:

pip install apb_duckdb_utils

Documentation here apb_duckdb_utils

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 
  5#  Copyright (c)
  6"""
  7.. include:: ../README.md
  8"""
  9from __future__ import annotations
 10
 11import os
 12import warnings
 13from typing import Iterable
 14
 15import duckdb
 16import ibis
 17from geopandas import GeoDataFrame
 18from ibis.expr.datatypes import GeoSpatial
 19from pandas import DataFrame
 20
 21from apb_extra_utils.misc import create_dir
 22from apb_extra_utils.utils_logging import get_base_logger
 23from apb_pandas_utils.geopandas_utils import df_geometry_columns
 24
 25# Suppress specific warning
 26warnings.filterwarnings("ignore", message="Geometry column does not contain geometry")
 27
 28MEMORY_DDBB = ':memory:'
 29CACHE_DUCK_DDBBS = {}
 30CURRENT_DB_PATH = None
 31GZIP = 'gzip'
 32
 33
 34def set_current_db_path(db_path: str):
 35    """
 36    Set used db path
 37
 38    Args:
 39        db_path (str): path to duckdb database file
 40    """
 41    global CURRENT_DB_PATH
 42    CURRENT_DB_PATH = parse_path(db_path)
 43
 44
 45def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False,
 46                          extensions: list[str] = None,
 47                          **connect_args) -> duckdb.DuckDBPyConnection:
 48    """
 49    Get duckdb connection
 50
 51    Args:
 52        db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
 53        as_current (bool=False): set db_path as current db path
 54        no_cached (bool=False): not use cached connection
 55        extensions (list[str]=None): list of extensions to load
 56        **connect_args (dict): duckdb.connect args 
 57
 58    Returns:
 59         duckdb connection
 60    """
 61    if not db_path:
 62        if CURRENT_DB_PATH and not no_cached:
 63            db_path = CURRENT_DB_PATH
 64        else:
 65            db_path = MEMORY_DDBB
 66
 67    parsed_path = parse_path(db_path)
 68    k_path = parsed_path.lower()
 69    if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)):
 70        conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args)
 71
 72    if extensions:
 73        for ext in extensions:
 74            conn_db.install_extension(ext)
 75            conn_db.load_extension(ext)
 76
 77    if as_current:
 78        set_current_db_path(parsed_path)
 79
 80    return conn_db
 81
 82
 83def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True):
 84    """
 85    Save duckdb database to dir path as parquets or csvs files
 86
 87    Args:
 88        dir_db (str=None): Path to save database
 89        duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection.
 90            If None, get connection default with get_duckdb_connection
 91        parquet (bool=True): Save as parquet file
 92    """
 93    create_dir(dir_db)
 94
 95    if not duck_db_conn:
 96        duck_db_conn = get_duckdb_connection()
 97
 98    if parquet:
 99        format_db = "(FORMAT PARQUET)"
100    else:
101        format_db = "(FORMAT CSV, COMPRESSION 'GZIP')"
102
103    duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}")
104
105
106def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str:
107    """
108    Get current schema
109
110    Args:
111        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
112
113    Returns:
114        current schema
115    """
116    if not conn_db:
117        conn_db = get_duckdb_connection()
118
119    return conn_db.sql("SELECT current_schema()").fetchone()[0]
120
121
122def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]:
123    """
124    List tables in duckdb
125
126    Args:
127        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
128        schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
129
130    Returns:
131        list of tables
132    """
133    if not conn_db:
134        conn_db = get_duckdb_connection()
135
136    current_schema = current_schema_duckdb(conn_db)
137
138    def get_table_name(row):
139        schema = quote_name_duckdb(row['schema'])
140        table = quote_name_duckdb(row['name'])
141        return f"{schema}.{table}" if schema != current_schema else table
142
143    sql_tables = conn_db.sql(f"SHOW ALL TABLES")
144    if schemas:
145        sql_tables = sql_tables.filter(f"schema in {schemas}")
146
147    tables = []
148    if sql_tables.count('name').fetchone()[0] > 0:
149        tables = sql_tables.df().apply(get_table_name, axis=1).tolist()
150
151    return tables
152
153
154def quote_name_duckdb(object_sql_name: str) -> str:
155    """
156    Quote name to use on duckdb if has spaces
157
158    Args:
159        object_sql_name (str): name to quote (table, view, schema, index, ...)
160
161    Returns:
162        quoted name (str)
163    """
164    object_sql_name = object_sql_name.strip().replace('"', '')
165
166    res_name = ''
167    if '.' in object_sql_name:
168        schema, name_obj = object_sql_name.split('.')
169        res_name = f'"{schema}".' if " " in schema else f'{schema}.'
170    else:
171        name_obj = object_sql_name
172
173    name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj
174
175    res_name += name_obj
176
177    return res_name
178
179
180def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool:
181    """
182    Check if table exists in duckdb
183
184    Args:
185        table_name (str): table name
186        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
187
188    Returns:
189        bool: True if table exists
190    """
191    if not conn_db:
192        conn_db = get_duckdb_connection()
193
194    return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db)
195
196
197def parse_path(path):
198    """
199    Parse path to duckdb format
200    Args:
201        path (str): path to use on duckdb
202
203    Returns:
204        normalized path (str)
205    """
206    normalize_path = os.path.normpath(path).replace('\\', '/')
207    return normalize_path
208
209
210def escape_name_table_view(name: str):
211    """
212    Escape characters and lowercase name table/view to use on duckdb
213    Args:
214        name (str): name to use on duckdb
215
216    Returns:
217        normalized name (str)
218    """
219    return name.replace(
220        '-', '_').replace(
221        '(', '').replace(
222        ')', '').lower()
223
224
225def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation):
226    """
227    Check columns in sql relation
228
229    Args:
230        cols_to_check (Iterable[str]): list of columns to check
231        sql (duckdb.DuckDBPyRelation): sql relation
232
233    Returns:
234        list[str]: list of columns to check
235    """
236    cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check]
237    cols_sql = [col.lower() for col in sql.columns]
238
239    if not all(col in cols_sql for col in cols_to_check_lower):
240        raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}")
241
242    return cols_to_check
243
244
245def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation):
246    """
247    Set columns to GEOMETRY from WKT text
248
249    Args:
250        cols_wkt (list[str]): list of columns WKT to set as geometry
251        sql (duckdb.DuckDBPyRelation): sql relation
252
253    Returns:
254        duckdb.DuckDBPyRelation: Duckdb database relation
255    """
256    check_columns_in_sql(cols_wkt, sql)
257
258    cols_wkt_lower = [col.lower() for col in cols_wkt]
259    sql_wkt_cols = {
260        col: f'ST_GeomFromText("{col}")'
261        for col in sql.columns if col.lower() in cols_wkt_lower
262    }
263    return replace_cols_on_sql(sql_wkt_cols, sql)
264
265
266def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
267    """
268    Replace columns by the passed sql function
269
270    Args:
271        replace_cols (dict[str]): dict of columns with the sql function to use instead
272        sql (duckdb.DuckDBPyRelation): sql relation
273
274    Returns:
275        duckdb.DuckDBPyRelation: Duckdb database relation
276    """
277    check_columns_in_sql(replace_cols.keys(), sql)
278
279    sql_replace = ", ".join(
280        f'{sql_func} AS "{col}"'
281        for col, sql_func in replace_cols.items()
282    )
283    return sql.select(f"* REPLACE ({sql_replace})")
284
285
286def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
287    """
288    Rename columns in sql select
289
290    Args:
291        alias_cols (dict[str]): dict of renamed columns with alias
292        sql (duckdb.DuckDBPyRelation): sql relation
293
294    Returns:
295        duckdb.DuckDBPyRelation: Duckdb database relation
296    """
297    check_columns_in_sql(alias_cols.keys(), sql)
298    alias_cols = {col.lower(): alias for col, alias in alias_cols.items()}
299
300    sql_cols = ", ".join(
301        f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"'
302        for col in sql.columns
303    )
304
305    return sql.select(f"{sql_cols}")
306
307
308def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation):
309    """
310    Exclude columns in sql select
311
312    Args:
313        excluded_cols (list[str]): list of columns to exclude
314        sql (duckdb.DuckDBPyRelation): sql relation
315
316    Returns:
317        duckdb.DuckDBPyRelation: Duckdb database relation
318    """
319    check_columns_in_sql(excluded_cols, sql)
320
321    str_cols = ", ".join(f'"{col}"' for col in excluded_cols)
322    sql_cols = f'* EXCLUDE ({str_cols})'
323
324    return sql.select(f"{sql_cols}")
325
326
327def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None,
328                      cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
329                      table_or_view_name: str = None, col_id: str = None, as_view: bool = False,
330                      overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation:
331    """
332    Set new config to a Duckdb SQL Relation
333
334    Args:
335        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
336        cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
337        cols_exclude (list[str]=None): list of columns to exclude
338        cols_alias (dict[str, str]=None): dictionary of columns aliases
339        cols_replace (dict[str, str]=None): dictionary of columns to replace
340        table_or_view_name (str=None): table or view name. If informed, create table or view
341        col_id (str=None): column primary key and index unique if create table
342        as_view (bool=False): create sql as view instead of table
343        overwrite (bool=False): overwrite table_name if exists
344        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
345
346    Returns:
347        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
348    """
349    if cols_exclude:
350        a_sql = exclude_cols_on_sql(cols_exclude, a_sql)
351
352    if cols_alias:
353        a_sql = rename_cols_on_sql(cols_alias, a_sql)
354
355    if cols_replace:
356        a_sql = replace_cols_on_sql(cols_replace, a_sql)
357
358    if cols_wkt:
359        a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql)
360
361    if table_or_view_name:
362        if not conn_db:
363            conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
364
365        table_or_view_name = quote_name_duckdb(table_or_view_name)
366        if as_view:
367            a_sql.create_view(table_or_view_name, replace=overwrite)
368        else:
369            if overwrite:
370                conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}")
371
372            if col_id := (cols_alias or {}).get(col_id, col_id):
373                a_sql = a_sql.order(f'"{col_id}"')
374
375            a_sql.create(table_or_view_name)
376
377            if col_id:
378                n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx")
379                conn_db.execute(
380                    f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")'
381                )
382
383        a_sql = conn_db.sql(f"FROM {table_or_view_name}")
384
385    return a_sql
386
387
388def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None,
389                         cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
390                         cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False,
391                         conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation:
392    """
393    Import csv file as table on duckdb
394
395    Args:
396        csv_path (str): path to csv file
397        table_or_view_name (str=None): table or view name. If informed, create table or view
398        header (bool=True): csv file has header
399        col_id (str=None): column primary key and index unique if create table
400        cols_wkt (list[str] = None): list of columns WKT to use as geometry
401        cols_exclude (list[str]=None): list of columns to exclude
402        cols_alias (dict[str, str]=None): dictionary of columns aliases
403        cols_replace (dict[str, str]=None): dictionary of columns to replace
404        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
405        as_view (bool=False): create table as view instead of table
406        overwrite (bool=False): overwrite table_name if exists
407        zipped (bool=False): compression type. If informed, use it
408
409    Returns:
410         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
411    """
412    if not conn_db:
413        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
414
415    compression = f', compression={GZIP}' if zipped else ''
416
417    a_sql = conn_db.sql(
418        f"""
419        FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression})
420        """
421    )
422
423    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
424                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
425                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
426
427    return a_sql
428
429
430def import_gdal_file_to_duckdb(
431        path_file: str,
432        table_or_view_name: str = None,
433        gdal_open_options: list[str] = None,
434        col_id: str = None,
435        cols_exclude: list[str] = None,
436        cols_alias: dict[str, str] = None,
437        cols_replace: dict[str, str] = None,
438        conn_db: duckdb.DuckDBPyConnection = None,
439        as_view: bool = False,
440        overwrite: bool = False,
441        **st_read_kwargs) -> duckdb.DuckDBPyRelation:
442    """
443    Load GDAL file driver on duckdb database
444
445    Args:
446        path_file (str): path to GDAL file
447        table_or_view_name (str=None): table or view name. If informed, create table or view
448        gdal_open_options (list[str]=None): list of GDAL open options.
449                    See GDAL documentation (https://gdal.org/drivers/vector/index.html)
450                    (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
451        col_id (str=None): column primary key and index unique if create table
452        cols_exclude (list[str]=None): list of columns to exclude
453        cols_alias (dict[str, str]=None): dictionary of columns aliases
454        cols_replace (dict[str, str]=None): dictionary of columns to replace
455        conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
456        as_view (bool=False): create sql as view instead of table
457        overwrite (bool=False): overwrite table_name if exists
458        **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation
459            (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
460
461    Returns:
462        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
463    """
464    params_st_read = f"'{parse_path(path_file)}'"
465    if gdal_open_options:
466        params_st_read = f"{params_st_read}, open_options={gdal_open_options}"
467    if st_read_kwargs:
468        params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}"
469
470    query = f"""
471        FROM ST_Read({params_st_read})
472        """
473
474    if not conn_db:
475        conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
476
477    a_sql = conn_db.sql(query)
478
479    a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace,
480                              table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view,
481                              overwrite=overwrite, conn_db=conn_db)
482
483    return a_sql
484
485
486def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None,
487                               cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
488                               cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
489                               as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None,
490                               overwrite=False) -> duckdb.DuckDBPyRelation:
491    """
492    Import DataFrame/GeoDataframe as table on duckdb
493
494    Args:
495        df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
496        table_or_view_name (str=None): table or view name. If informed, create table or view
497        col_id (str=None): column primary key and index unique if create table
498        cols_wkt (list[str] = None): list of columns WKT to use as geometry
499        cols_exclude (list[str]=None): list of columns to exclude
500        cols_alias (dict[str, str]=None): dictionary of columns aliases
501        cols_replace (dict[str, str]=None): dictionary of columns to replace
502        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
503        as_view (bool=False): create table as view instead of table
504        overwrite (bool=False): overwrite table_name if exists
505
506    Returns:
507         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
508    """
509    if not conn_db:
510        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
511
512    for col in (cols_geom := df_geometry_columns(df)):
513        df[col] = df[col].to_wkb(include_srid=True)
514
515    cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom]
516    alias_cols_geom = ', '.join(cols_as_wkb)
517    exclude_geom_cols = ', '.join(cols_geom)
518
519    select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *"
520    a_sql = conn_db.sql(f"""
521        {select_expr} FROM df
522        """)
523
524    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
525                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
526                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
527
528    return a_sql
529
530
531def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None,
532                             cols_geom: list[str] = None,
533                             cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
534                             cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
535                             as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False,
536                             **read_parquet_params) -> duckdb.DuckDBPyRelation:
537    """
538    Import Parquet/Geoparquet file as table on duckdb
539
540    Args:
541        parquet_path (str): path to parquet/geoparquet file
542        table_or_view_name (str=None): table or view name. If informed, create table or view
543        col_id (str=None): column primary key and index unique if create table
544        cols_geom (list[str]=None): list of columns type geometry
545        cols_wkt (list[str] = None): list of columns WKT to use as geometry
546        cols_exclude (list[str]=None): list of columns to exclude
547        cols_alias (dict[str, str]=None): dictionary of columns aliases
548        cols_replace (dict[str, str]=None): dictionary of columns to replace
549        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
550        as_view (bool=False): create table as view instead of table
551        overwrite (bool=False): overwrite table_name if exists
552        **read_parquet_params (Any): read_parquet function kwargs.
553                See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
554
555    Returns:
556         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
557    """
558    if not conn_db:
559        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
560
561    select_expr = 'SELECT *'
562    if cols_geom:
563        alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom)
564        exclude_geom_cols = ', '.join(cols_geom)
565        select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}"
566
567    sql_read_parquet_params = ''
568    if read_parquet_params:
569        sql_read_parquet_params += ', '
570        sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()])
571
572    a_sql = conn_db.sql(
573        f"""
574        {select_expr}
575        FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params})
576        """
577    )
578
579    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
580                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
581                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
582
583    return a_sql
584
585
586def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table:
587    """
588    Filter ibis table
589
590    Args:
591        table (ibis.Table): The table to filter
592        sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
593
594    Returns:
595        table (ibis.Table): The table filtered
596    """
597    if isinstance(sql_or_ibis_filter, str): # Filter by SQL on duckdb backend
598        # Check if geospatial fields to cast as geometry
599        cols_geom = [col for col, fld in table.schema().fields.items()
600                     if isinstance(fld, GeoSpatial)]
601        if cols_geom:
602            cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom])
603            sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}"
604        else:
605            sql_select_cols = "*"
606
607        n_tab = table.get_name()
608        sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}"
609        get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}")
610        res = table.sql(sql_str)
611    else:
612        res = table.filter(sql_or_ibis_filter)
613
614    return res
MEMORY_DDBB = ':memory:'
CACHE_DUCK_DDBBS = {}
CURRENT_DB_PATH = None
GZIP = 'gzip'
def set_current_db_path(db_path: str):
35def set_current_db_path(db_path: str):
36    """
37    Set used db path
38
39    Args:
40        db_path (str): path to duckdb database file
41    """
42    global CURRENT_DB_PATH
43    CURRENT_DB_PATH = parse_path(db_path)

Set used db path

Arguments:
  • db_path (str): path to duckdb database file
def get_duckdb_connection( db_path: str = None, as_current: bool = False, no_cached: bool = False, extensions: list[str] = None, **connect_args) -> duckdb.duckdb.DuckDBPyConnection:
46def get_duckdb_connection(db_path: str = None, as_current: bool = False, no_cached: bool = False,
47                          extensions: list[str] = None,
48                          **connect_args) -> duckdb.DuckDBPyConnection:
49    """
50    Get duckdb connection
51
52    Args:
53        db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
54        as_current (bool=False): set db_path as current db path
55        no_cached (bool=False): not use cached connection
56        extensions (list[str]=None): list of extensions to load
57        **connect_args (dict): duckdb.connect args 
58
59    Returns:
60         duckdb connection
61    """
62    if not db_path:
63        if CURRENT_DB_PATH and not no_cached:
64            db_path = CURRENT_DB_PATH
65        else:
66            db_path = MEMORY_DDBB
67
68    parsed_path = parse_path(db_path)
69    k_path = parsed_path.lower()
70    if no_cached or not (conn_db := CACHE_DUCK_DDBBS.get(k_path)):
71        conn_db = CACHE_DUCK_DDBBS[k_path] = duckdb.connect(parsed_path, **connect_args)
72
73    if extensions:
74        for ext in extensions:
75            conn_db.install_extension(ext)
76            conn_db.load_extension(ext)
77
78    if as_current:
79        set_current_db_path(parsed_path)
80
81    return conn_db

Get duckdb connection

Arguments:
  • db_path (str=None): path to duckdb database file. By default, use CURRENT_DB_PATH
  • as_current (bool=False): set db_path as current db path
  • no_cached (bool=False): not use cached connection
  • extensions (list[str]=None): list of extensions to load
  • **connect_args (dict): duckdb.connect args
Returns:

duckdb connection

def export_database( dir_db: str, duck_db_conn: duckdb.duckdb.DuckDBPyConnection = None, parquet: bool = True):
 84def export_database(dir_db: str, duck_db_conn: duckdb.DuckDBPyConnection = None, parquet: bool = True):
 85    """
 86    Save duckdb database to dir path as parquets or csvs files
 87
 88    Args:
 89        dir_db (str=None): Path to save database
 90        duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection.
 91            If None, get connection default with get_duckdb_connection
 92        parquet (bool=True): Save as parquet file
 93    """
 94    create_dir(dir_db)
 95
 96    if not duck_db_conn:
 97        duck_db_conn = get_duckdb_connection()
 98
 99    if parquet:
100        format_db = "(FORMAT PARQUET)"
101    else:
102        format_db = "(FORMAT CSV, COMPRESSION 'GZIP')"
103
104    duck_db_conn.sql(f"EXPORT DATABASE '{parse_path(dir_db)}' {format_db}")

Save duckdb database to dir path as parquets or csvs files

Arguments:
  • dir_db (str=None): Path to save database
  • duck_db_conn (duckdb.DuckDBPyConnection=None): Duckdb database connection. If None, get connection default with get_duckdb_connection
  • parquet (bool=True): Save as parquet file
def current_schema_duckdb(conn_db: duckdb.duckdb.DuckDBPyConnection = None) -> str:
107def current_schema_duckdb(conn_db: duckdb.DuckDBPyConnection = None) -> str:
108    """
109    Get current schema
110
111    Args:
112        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
113
114    Returns:
115        current schema
116    """
117    if not conn_db:
118        conn_db = get_duckdb_connection()
119
120    return conn_db.sql("SELECT current_schema()").fetchone()[0]

Get current schema

Arguments:
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:

current schema

def list_tables_duckdb( conn_db: duckdb.duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]:
123def list_tables_duckdb(conn_db: duckdb.DuckDBPyConnection = None, schemas: tuple[str] = None) -> list[str]:
124    """
125    List tables in duckdb
126
127    Args:
128        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
129        schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
130
131    Returns:
132        list of tables
133    """
134    if not conn_db:
135        conn_db = get_duckdb_connection()
136
137    current_schema = current_schema_duckdb(conn_db)
138
139    def get_table_name(row):
140        schema = quote_name_duckdb(row['schema'])
141        table = quote_name_duckdb(row['name'])
142        return f"{schema}.{table}" if schema != current_schema else table
143
144    sql_tables = conn_db.sql(f"SHOW ALL TABLES")
145    if schemas:
146        sql_tables = sql_tables.filter(f"schema in {schemas}")
147
148    tables = []
149    if sql_tables.count('name').fetchone()[0] > 0:
150        tables = sql_tables.df().apply(get_table_name, axis=1).tolist()
151
152    return tables

List tables in duckdb

Arguments:
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
  • schemas (tuple[str]=None): tuple schemas. If not informed, list all tables
Returns:

list of tables

def quote_name_duckdb(object_sql_name: str) -> str:
155def quote_name_duckdb(object_sql_name: str) -> str:
156    """
157    Quote name to use on duckdb if has spaces
158
159    Args:
160        object_sql_name (str): name to quote (table, view, schema, index, ...)
161
162    Returns:
163        quoted name (str)
164    """
165    object_sql_name = object_sql_name.strip().replace('"', '')
166
167    res_name = ''
168    if '.' in object_sql_name:
169        schema, name_obj = object_sql_name.split('.')
170        res_name = f'"{schema}".' if " " in schema else f'{schema}.'
171    else:
172        name_obj = object_sql_name
173
174    name_obj = f'"{name_obj}"' if ' ' in name_obj else name_obj
175
176    res_name += name_obj
177
178    return res_name

Quote name to use on duckdb if has spaces

Arguments:
  • object_sql_name (str): name to quote (table, view, schema, index, ...)
Returns:

quoted name (str)

def exists_table_duckdb( table_name: str, conn_db: duckdb.duckdb.DuckDBPyConnection = None) -> bool:
181def exists_table_duckdb(table_name: str, conn_db: duckdb.DuckDBPyConnection = None) -> bool:
182    """
183    Check if table exists in duckdb
184
185    Args:
186        table_name (str): table name
187        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
188
189    Returns:
190        bool: True if table exists
191    """
192    if not conn_db:
193        conn_db = get_duckdb_connection()
194
195    return quote_name_duckdb(table_name) in list_tables_duckdb(conn_db)

Check if table exists in duckdb

Arguments:
  • table_name (str): table name
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb
Returns:

bool: True if table exists

def parse_path(path):
198def parse_path(path):
199    """
200    Parse path to duckdb format
201    Args:
202        path (str): path to use on duckdb
203
204    Returns:
205        normalized path (str)
206    """
207    normalize_path = os.path.normpath(path).replace('\\', '/')
208    return normalize_path

Parse path to duckdb format

Arguments:
  • path (str): path to use on duckdb
Returns:

normalized path (str)

def escape_name_table_view(name: str):
211def escape_name_table_view(name: str):
212    """
213    Escape characters and lowercase name table/view to use on duckdb
214    Args:
215        name (str): name to use on duckdb
216
217    Returns:
218        normalized name (str)
219    """
220    return name.replace(
221        '-', '_').replace(
222        '(', '').replace(
223        ')', '').lower()

Escape characters and lowercase name table/view to use on duckdb

Arguments:
  • name (str): name to use on duckdb
Returns:

normalized name (str)

def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.duckdb.DuckDBPyRelation):
226def check_columns_in_sql(cols_to_check: Iterable[str], sql: duckdb.DuckDBPyRelation):
227    """
228    Check columns in sql relation
229
230    Args:
231        cols_to_check (Iterable[str]): list of columns to check
232        sql (duckdb.DuckDBPyRelation): sql relation
233
234    Returns:
235        list[str]: list of columns to check
236    """
237    cols_to_check_lower = [cols_geom.lower() for cols_geom in cols_to_check]
238    cols_sql = [col.lower() for col in sql.columns]
239
240    if not all(col in cols_sql for col in cols_to_check_lower):
241        raise ValueError(f"There are columns {cols_to_check} not found on sql columns {cols_sql}")
242
243    return cols_to_check

Check columns in sql relation

Arguments:
  • cols_to_check (Iterable[str]): list of columns to check
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

list[str]: list of columns to check

def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.duckdb.DuckDBPyRelation):
246def set_types_geom_to_cols_wkt_on_sql(cols_wkt: list[str], sql: duckdb.DuckDBPyRelation):
247    """
248    Set columns to GEOMETRY from WKT text
249
250    Args:
251        cols_wkt (list[str]): list of columns WKT to set as geometry
252        sql (duckdb.DuckDBPyRelation): sql relation
253
254    Returns:
255        duckdb.DuckDBPyRelation: Duckdb database relation
256    """
257    check_columns_in_sql(cols_wkt, sql)
258
259    cols_wkt_lower = [col.lower() for col in cols_wkt]
260    sql_wkt_cols = {
261        col: f'ST_GeomFromText("{col}")'
262        for col in sql.columns if col.lower() in cols_wkt_lower
263    }
264    return replace_cols_on_sql(sql_wkt_cols, sql)

Set columns to GEOMETRY from WKT text

Arguments:
  • cols_wkt (list[str]): list of columns WKT to set as geometry
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.duckdb.DuckDBPyRelation):
267def replace_cols_on_sql(replace_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
268    """
269    Replace columns by the passed sql function
270
271    Args:
272        replace_cols (dict[str]): dict of columns with the sql function to use instead
273        sql (duckdb.DuckDBPyRelation): sql relation
274
275    Returns:
276        duckdb.DuckDBPyRelation: Duckdb database relation
277    """
278    check_columns_in_sql(replace_cols.keys(), sql)
279
280    sql_replace = ", ".join(
281        f'{sql_func} AS "{col}"'
282        for col, sql_func in replace_cols.items()
283    )
284    return sql.select(f"* REPLACE ({sql_replace})")

Replace columns by the passed sql function

Arguments:
  • replace_cols (dict[str]): dict of columns with the sql function to use instead
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.duckdb.DuckDBPyRelation):
287def rename_cols_on_sql(alias_cols: dict[str, str], sql: duckdb.DuckDBPyRelation):
288    """
289    Rename columns in sql select
290
291    Args:
292        alias_cols (dict[str]): dict of renamed columns with alias
293        sql (duckdb.DuckDBPyRelation): sql relation
294
295    Returns:
296        duckdb.DuckDBPyRelation: Duckdb database relation
297    """
298    check_columns_in_sql(alias_cols.keys(), sql)
299    alias_cols = {col.lower(): alias for col, alias in alias_cols.items()}
300
301    sql_cols = ", ".join(
302        f'"{col}" AS "{alias_col}"' if (alias_col := alias_cols.get(col.lower())) else f'"{col}"'
303        for col in sql.columns
304    )
305
306    return sql.select(f"{sql_cols}")

Rename columns in sql select

Arguments:
  • alias_cols (dict[str]): dict of renamed columns with alias
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.duckdb.DuckDBPyRelation):
309def exclude_cols_on_sql(excluded_cols: list[str], sql: duckdb.DuckDBPyRelation):
310    """
311    Exclude columns in sql select
312
313    Args:
314        excluded_cols (list[str]): list of columns to exclude
315        sql (duckdb.DuckDBPyRelation): sql relation
316
317    Returns:
318        duckdb.DuckDBPyRelation: Duckdb database relation
319    """
320    check_columns_in_sql(excluded_cols, sql)
321
322    str_cols = ", ".join(f'"{col}"' for col in excluded_cols)
323    sql_cols = f'* EXCLUDE ({str_cols})'
324
325    return sql.select(f"{sql_cols}")

Exclude columns in sql select

Arguments:
  • excluded_cols (list[str]): list of columns to exclude
  • sql (duckdb.DuckDBPyRelation): sql relation
Returns:

duckdb.DuckDBPyRelation: Duckdb database relation

def config_sql_duckdb( a_sql: duckdb.duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, table_or_view_name: str = None, col_id: str = None, as_view: bool = False, overwrite: bool = False, conn_db=None) -> duckdb.duckdb.DuckDBPyRelation:
328def config_sql_duckdb(a_sql: duckdb.DuckDBPyRelation, cols_wkt: list[str] = None, cols_exclude: list[str] = None,
329                      cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
330                      table_or_view_name: str = None, col_id: str = None, as_view: bool = False,
331                      overwrite: bool = False, conn_db=None) -> duckdb.DuckDBPyRelation:
332    """
333    Set new config to a Duckdb SQL Relation
334
335    Args:
336        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
337        cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
338        cols_exclude (list[str]=None): list of columns to exclude
339        cols_alias (dict[str, str]=None): dictionary of columns aliases
340        cols_replace (dict[str, str]=None): dictionary of columns to replace
341        table_or_view_name (str=None): table or view name. If informed, create table or view
342        col_id (str=None): column primary key and index unique if create table
343        as_view (bool=False): create sql as view instead of table
344        overwrite (bool=False): overwrite table_name if exists
345        conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
346
347    Returns:
348        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
349    """
350    if cols_exclude:
351        a_sql = exclude_cols_on_sql(cols_exclude, a_sql)
352
353    if cols_alias:
354        a_sql = rename_cols_on_sql(cols_alias, a_sql)
355
356    if cols_replace:
357        a_sql = replace_cols_on_sql(cols_replace, a_sql)
358
359    if cols_wkt:
360        a_sql = set_types_geom_to_cols_wkt_on_sql(cols_wkt, a_sql)
361
362    if table_or_view_name:
363        if not conn_db:
364            conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
365
366        table_or_view_name = quote_name_duckdb(table_or_view_name)
367        if as_view:
368            a_sql.create_view(table_or_view_name, replace=overwrite)
369        else:
370            if overwrite:
371                conn_db.execute(f"DROP TABLE IF EXISTS {table_or_view_name}")
372
373            if col_id := (cols_alias or {}).get(col_id, col_id):
374                a_sql = a_sql.order(f'"{col_id}"')
375
376            a_sql.create(table_or_view_name)
377
378            if col_id:
379                n_idx = quote_name_duckdb(f"{table_or_view_name}_{col_id}_idx")
380                conn_db.execute(
381                    f'CREATE UNIQUE INDEX {n_idx} ON {table_or_view_name} ("{col_id}")'
382                )
383
384        a_sql = conn_db.sql(f"FROM {table_or_view_name}")
385
386    return a_sql

Set new config to a Duckdb SQL Relation

Arguments:
  • a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
  • cols_wkt (list[str] | dict=None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • col_id (str=None): column primary key and index unique if create table
  • as_view (bool=False): create sql as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • conn_db (duckdb.DuckDBPyConnection=None): connection to duckdb if create table or view
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_csv_to_duckdb( csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None, cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, conn_db: duckdb.duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.duckdb.DuckDBPyRelation:
389def import_csv_to_duckdb(csv_path: str, table_or_view_name: str = None, header: bool = True, col_id: str = None,
390                         cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
391                         cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False,
392                         conn_db: duckdb.DuckDBPyConnection = None, overwrite=False, zipped: bool = False) -> duckdb.DuckDBPyRelation:
393    """
394    Import csv file as table on duckdb
395
396    Args:
397        csv_path (str): path to csv file
398        table_or_view_name (str=None): table or view name. If informed, create table or view
399        header (bool=True): csv file has header
400        col_id (str=None): column primary key and index unique if create table
401        cols_wkt (list[str] = None): list of columns WKT to use as geometry
402        cols_exclude (list[str]=None): list of columns to exclude
403        cols_alias (dict[str, str]=None): dictionary of columns aliases
404        cols_replace (dict[str, str]=None): dictionary of columns to replace
405        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
406        as_view (bool=False): create table as view instead of table
407        overwrite (bool=False): overwrite table_name if exists
408        zipped (bool=False): compression type. If informed, use it
409
410    Returns:
411         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
412    """
413    if not conn_db:
414        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
415
416    compression = f', compression={GZIP}' if zipped else ''
417
418    a_sql = conn_db.sql(
419        f"""
420        FROM read_csv('{parse_path(csv_path)}', header={header}, auto_detect=True{compression})
421        """
422    )
423
424    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
425                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
426                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
427
428    return a_sql

Import csv file as table on duckdb

Arguments:
  • csv_path (str): path to csv file
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • header (bool=True): csv file has header
  • col_id (str=None): column primary key and index unique if create table
  • cols_wkt (list[str] = None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
  • as_view (bool=False): create table as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • zipped (bool=False): compression type. If informed, use it
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_gdal_file_to_duckdb( path_file: str, table_or_view_name: str = None, gdal_open_options: list[str] = None, col_id: str = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, conn_db: duckdb.duckdb.DuckDBPyConnection = None, as_view: bool = False, overwrite: bool = False, **st_read_kwargs) -> duckdb.duckdb.DuckDBPyRelation:
431def import_gdal_file_to_duckdb(
432        path_file: str,
433        table_or_view_name: str = None,
434        gdal_open_options: list[str] = None,
435        col_id: str = None,
436        cols_exclude: list[str] = None,
437        cols_alias: dict[str, str] = None,
438        cols_replace: dict[str, str] = None,
439        conn_db: duckdb.DuckDBPyConnection = None,
440        as_view: bool = False,
441        overwrite: bool = False,
442        **st_read_kwargs) -> duckdb.DuckDBPyRelation:
443    """
444    Load GDAL file driver on duckdb database
445
446    Args:
447        path_file (str): path to GDAL file
448        table_or_view_name (str=None): table or view name. If informed, create table or view
449        gdal_open_options (list[str]=None): list of GDAL open options.
450                    See GDAL documentation (https://gdal.org/drivers/vector/index.html)
451                    (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
452        col_id (str=None): column primary key and index unique if create table
453        cols_exclude (list[str]=None): list of columns to exclude
454        cols_alias (dict[str, str]=None): dictionary of columns aliases
455        cols_replace (dict[str, str]=None): dictionary of columns to replace
456        conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
457        as_view (bool=False): create sql as view instead of table
458        overwrite (bool=False): overwrite table_name if exists
459        **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation
460            (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
461
462    Returns:
463        a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
464    """
465    params_st_read = f"'{parse_path(path_file)}'"
466    if gdal_open_options:
467        params_st_read = f"{params_st_read}, open_options={gdal_open_options}"
468    if st_read_kwargs:
469        params_st_read = f"{params_st_read}, {', '.join([f'{k}={v}' for k, v in st_read_kwargs.items()])}"
470
471    query = f"""
472        FROM ST_Read({params_st_read})
473        """
474
475    if not conn_db:
476        conn_db = get_duckdb_connection(extensions=['spatial', 'parquet'], no_cached=True)
477
478    a_sql = conn_db.sql(query)
479
480    a_sql = config_sql_duckdb(a_sql, cols_exclude=cols_exclude, cols_alias=cols_alias, cols_replace=cols_replace,
481                              table_or_view_name=table_or_view_name, col_id=col_id, as_view=as_view,
482                              overwrite=overwrite, conn_db=conn_db)
483
484    return a_sql

Load GDAL file driver on duckdb database

Arguments:
  • path_file (str): path to GDAL file
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • gdal_open_options (list[str]=None): list of GDAL open options. See GDAL documentation (https://gdal.org/drivers/vector/index.html) (e.g.['HEADERS=TRUE', 'X_POSSIBLE_NAMES=Longitude', 'Y_POSSIBLE_NAMES=Latitude'])
  • col_id (str=None): column primary key and index unique if create table
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection=None): Duckdb database connection
  • as_view (bool=False): create sql as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • **st_read_kwargs (str): ST_Read function kwargs. See ST_Read documentation (https://duckdb.org/docs/extensions/spatial/functions.html#st_read)
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_dataframe_to_duckdb( df: pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame, table_or_view_name: str = None, col_id: str = None, cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, conn_db: duckdb.duckdb.DuckDBPyConnection = None, overwrite=False) -> duckdb.duckdb.DuckDBPyRelation:
487def import_dataframe_to_duckdb(df: DataFrame | GeoDataFrame, table_or_view_name: str = None, col_id: str = None,
488                               cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
489                               cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
490                               as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None,
491                               overwrite=False) -> duckdb.DuckDBPyRelation:
492    """
493    Import DataFrame/GeoDataframe as table on duckdb
494
495    Args:
496        df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
497        table_or_view_name (str=None): table or view name. If informed, create table or view
498        col_id (str=None): column primary key and index unique if create table
499        cols_wkt (list[str] = None): list of columns WKT to use as geometry
500        cols_exclude (list[str]=None): list of columns to exclude
501        cols_alias (dict[str, str]=None): dictionary of columns aliases
502        cols_replace (dict[str, str]=None): dictionary of columns to replace
503        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
504        as_view (bool=False): create table as view instead of table
505        overwrite (bool=False): overwrite table_name if exists
506
507    Returns:
508         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
509    """
510    if not conn_db:
511        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
512
513    for col in (cols_geom := df_geometry_columns(df)):
514        df[col] = df[col].to_wkb(include_srid=True)
515
516    cols_as_wkb = [f'ST_GeomFromWKB({col}) AS {col}' for col in cols_geom]
517    alias_cols_geom = ', '.join(cols_as_wkb)
518    exclude_geom_cols = ', '.join(cols_geom)
519
520    select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}" if cols_geom else "SELECT *"
521    a_sql = conn_db.sql(f"""
522        {select_expr} FROM df
523        """)
524
525    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
526                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
527                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
528
529    return a_sql

Import DataFrame/GeoDataframe as table on duckdb

Arguments:
  • df (DataFrame | GeoDataFrame): A DataFrame/GeoDataFrame
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • col_id (str=None): column primary key and index unique if create table
  • cols_wkt (list[str] = None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
  • as_view (bool=False): create table as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def import_parquet_to_duckdb( parquet_path: str, table_or_view_name: str = None, col_id: str = None, cols_geom: list[str] = None, cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None, cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None, as_view: bool = False, conn_db: duckdb.duckdb.DuckDBPyConnection = None, overwrite=False, **read_parquet_params) -> duckdb.duckdb.DuckDBPyRelation:
532def import_parquet_to_duckdb(parquet_path: str, table_or_view_name: str = None, col_id: str = None,
533                             cols_geom: list[str] = None,
534                             cols_wkt: list[str] | dict = None, cols_exclude: list[str] = None,
535                             cols_alias: dict[str, str] = None, cols_replace: dict[str, str] = None,
536                             as_view: bool = False, conn_db: duckdb.DuckDBPyConnection = None, overwrite=False,
537                             **read_parquet_params) -> duckdb.DuckDBPyRelation:
538    """
539    Import Parquet/Geoparquet file as table on duckdb
540
541    Args:
542        parquet_path (str): path to parquet/geoparquet file
543        table_or_view_name (str=None): table or view name. If informed, create table or view
544        col_id (str=None): column primary key and index unique if create table
545        cols_geom (list[str]=None): list of columns type geometry
546        cols_wkt (list[str] = None): list of columns WKT to use as geometry
547        cols_exclude (list[str]=None): list of columns to exclude
548        cols_alias (dict[str, str]=None): dictionary of columns aliases
549        cols_replace (dict[str, str]=None): dictionary of columns to replace
550        conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
551        as_view (bool=False): create table as view instead of table
552        overwrite (bool=False): overwrite table_name if exists
553        **read_parquet_params (Any): read_parquet function kwargs.
554                See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
555
556    Returns:
557         a_sql (duckdb.DuckDBPyRelation): Duckdb database relation
558    """
559    if not conn_db:
560        conn_db = get_duckdb_connection(extensions=['spatial'], no_cached=True)
561
562    select_expr = 'SELECT *'
563    if cols_geom:
564        alias_cols_geom = ', '.join(f'{col}::GEOMETRY AS {col}' for col in cols_geom)
565        exclude_geom_cols = ', '.join(cols_geom)
566        select_expr = f"SELECT * EXCLUDE({exclude_geom_cols}), {alias_cols_geom}"
567
568    sql_read_parquet_params = ''
569    if read_parquet_params:
570        sql_read_parquet_params += ', '
571        sql_read_parquet_params += ', '.join([f'{k}={v}' for k, v in read_parquet_params.items()])
572
573    a_sql = conn_db.sql(
574        f"""
575        {select_expr}
576        FROM read_parquet('{parse_path(parquet_path)}'{sql_read_parquet_params})
577        """
578    )
579
580    a_sql = config_sql_duckdb(a_sql, cols_wkt=cols_wkt, cols_exclude=cols_exclude, cols_alias=cols_alias,
581                              cols_replace=cols_replace, table_or_view_name=table_or_view_name, col_id=col_id,
582                              as_view=as_view, overwrite=overwrite, conn_db=conn_db)
583
584    return a_sql

Import Parquet/Geoparquet file as table on duckdb

Arguments:
  • parquet_path (str): path to parquet/geoparquet file
  • table_or_view_name (str=None): table or view name. If informed, create table or view
  • col_id (str=None): column primary key and index unique if create table
  • cols_geom (list[str]=None): list of columns type geometry
  • cols_wkt (list[str] = None): list of columns WKT to use as geometry
  • cols_exclude (list[str]=None): list of columns to exclude
  • cols_alias (dict[str, str]=None): dictionary of columns aliases
  • cols_replace (dict[str, str]=None): dictionary of columns to replace
  • conn_db (duckdb.DuckDBPyConnection = None): connection to duckdb
  • as_view (bool=False): create table as view instead of table
  • overwrite (bool=False): overwrite table_name if exists
  • **read_parquet_params (Any): read_parquet function kwargs. See duckdb read_parquet documentation on https://duckdb.org/docs/data/parquet/overview.html#parameters
Returns:

a_sql (duckdb.DuckDBPyRelation): Duckdb database relation

def filter_ibis_table( table: ibis.expr.types.relations.Table, sql_or_ibis_filter: 'str | ibis.expr') -> ibis.expr.types.relations.Table:
587def filter_ibis_table(table: ibis.Table, sql_or_ibis_filter: str | ibis.expr) -> ibis.Table:
588    """
589    Filter ibis table
590
591    Args:
592        table (ibis.Table): The table to filter
593        sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
594
595    Returns:
596        table (ibis.Table): The table filtered
597    """
598    if isinstance(sql_or_ibis_filter, str): # Filter by SQL on duckdb backend
599        # Check if geospatial fields to cast as geometry
600        cols_geom = [col for col, fld in table.schema().fields.items()
601                     if isinstance(fld, GeoSpatial)]
602        if cols_geom:
603            cast_geom_cols = ', '.join([f"CAST({col} AS geometry) AS {col}" for col in cols_geom])
604            sql_select_cols = f"* EXCLUDE({', '.join(cols_geom)}), {cast_geom_cols}"
605        else:
606            sql_select_cols = "*"
607
608        n_tab = table.get_name()
609        sql_str = f"SELECT {sql_select_cols} FROM {n_tab} WHERE {sql_or_ibis_filter}"
610        get_base_logger().debug(f"filter_ibis_table {n_tab}: {sql_str}")
611        res = table.sql(sql_str)
612    else:
613        res = table.filter(sql_or_ibis_filter)
614
615    return res

Filter ibis table

Arguments:
  • table (ibis.Table): The table to filter
  • sql_or_ibis_filter (str | ibis.expr): The filter to apply to the table
Returns:

table (ibis.Table): The table filtered