apb_pandas_utils

Package apb_pandas_utils

Modules to add functionality over pandas and geopandas

Requires GDAL library version 3.6<=3.10 and instant client Oracle installed.

To install:

pip install apb_pandas_utils

Documentation here apb_pandas_utils

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 
  5#  Copyright (c)
  6"""
  7.. include:: ../README.md
  8"""
  9from __future__ import annotations
 10
 11import re
 12from collections.abc import Iterable
 13from datetime import datetime, time, date
 14from typing import Union, Generator
 15
 16import numpy as np
 17import pandas as pd
 18import requests
 19from geopandas import GeoDataFrame
 20from pandas import DataFrame, Timestamp, NaT, CategoricalDtype
 21from requests.adapters import HTTPAdapter
 22from urllib3.util.retry import Retry
 23
 24from apb_extra_utils.utils_logging import get_base_logger
 25
 26logger = get_base_logger(__name__)
 27
 28EXCLUDED_TYPES_TO_CATEGORIZE = ['datetime', 'category', 'geometry']
 29DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY = 0.5
 30MAX_DATETIME = datetime(2250, 12, 31, 23, 59, 59)
 31
 32
 33def optimize_df(df: DataFrame | GeoDataFrame, max_perc_unique_vals: float = DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY,
 34                floats_as_categ: bool = False) -> DataFrame | GeoDataFrame:
 35    """
 36    Retorna el pd.Dataframe optimizado segun columnas que encuentre
 37
 38    Args:
 39        df (Dataframe | GeoDataFrame): Dataframe a optimizar
 40        max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
 41        floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
 42
 43    Returns:
 44        opt_df (Dataframe | GeoDataFrame): Dataframe optimizado
 45    """
 46    opt_df = df.copy()
 47    df_ints = opt_df.select_dtypes(include=['int64'])
 48    opt_df[df_ints.columns] = df_ints.apply(pd.to_numeric, downcast='signed')
 49    df_floats = opt_df.select_dtypes(include='float')
 50    opt_df[df_floats.columns] = df_floats.apply(pd.to_numeric, downcast='float')
 51
 52    excl_types_cat = EXCLUDED_TYPES_TO_CATEGORIZE
 53    if not floats_as_categ:
 54        excl_types_cat.append('float')
 55
 56    for col in opt_df.select_dtypes(exclude=excl_types_cat).columns:
 57        try:
 58            unic_vals = opt_df[col].unique()
 59        except (pd.errors.DataError, TypeError):
 60            continue
 61
 62        num_unique_values = len(unic_vals)
 63        num_total_values = len(opt_df[col]) - len(opt_df.loc[opt_df[col].isnull()])
 64        if num_total_values > 0 and (num_unique_values / num_total_values) < max_perc_unique_vals:
 65            try:
 66                opt_df[col] = opt_df[col].astype(CategoricalDtype(ordered=True))
 67            except (NotImplementedError, TypeError):
 68                continue
 69
 70    return opt_df
 71
 72
 73def df_filtered_by_prop(df: DataFrame | GeoDataFrame, filter_prop: dict[str, object]) -> DataFrame | GeoDataFrame | None:
 74    """
 75    Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors
 76    separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna
 77    comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre.
 78    En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats.
 79    Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors
 80    i valors distints per aplicar filtres diferents
 81
 82    Args:
 83        df (DataFrame | GeoDataFrame): DataFrame a filtrar
 84        filter_prop (dict[str, object]): Propietats de filtrat
 85
 86    Returns:
 87        DataFrame | GeoDataFrame: DataFrame filtrat
 88    """
 89    if df is None or not filter_prop:
 90        return df
 91
 92    idx_names = [idx_col for idx_col in df.index.names if idx_col]
 93    if idx_names:
 94        df = df.reset_index()
 95
 96    def _df_individual_filter(_df_ind: DataFrame, type_col_ind, column: str, value, col_operator: str = '='):
 97        type_column = type_col_ind.categories.dtype if (type_col_name := type_col_ind.name) == 'category' else type_col_ind
 98
 99        if type_col_name == 'object':
100            if col_operator == '=':
101                _df_ind = _df_ind[_df_ind[column].str.contains(str(value), case=False, na=False)]
102            elif col_operator == '-' or col_operator == '!':
103                _df_ind = _df_ind[~_df_ind[column].str.contains(str(value), case=False, na=False)]
104        else:
105            value = type_column.type(value)
106            if col_operator == '=':
107                _df_ind = _df_ind.loc[_df_ind[column] == value]
108            elif col_operator == '-' or col_operator == '!':
109                _df_ind = _df_ind.loc[_df_ind[column] != value]
110            elif col_operator == '>':
111                _df_ind = _df_ind.loc[_df_ind[column] > value]
112            elif col_operator == '<':
113                _df_ind = _df_ind.loc[_df_ind[column] < value]
114
115        return _df_ind
116
117    col_names = df.columns.values.tolist()
118    for k, v in filter_prop.items():
119        k_operator = "="
120        if k.startswith(('-', '=', '<', '>', '!')):
121            k_operator = k[0:1]
122            k = k[1:]
123        if k.upper() in (col_names + idx_names):
124            k = k.upper()
125        elif k.lower() in (col_names + idx_names):
126            k = k.lower()
127
128        if k in col_names and v is not None:
129            type_col = df.dtypes.get(k)
130            if isinstance(v, list):
131                # es fa amb un bucle i no amb isin perque no val per floats
132                df_list = None
133                for ind_val in v:
134                    df_temp = _df_individual_filter(df, type_col, k, ind_val, k_operator)
135                    if df_list is None:
136                        df_list = df_temp
137                    elif k_operator == '=':
138                        df_list = pd.concat([df_list, df_temp])
139                    if k_operator != '=':
140                        # per als operadors que exclouen s'ha de treballar sobre el df filtrat resultant
141                        df = df_list = df_temp
142                df = df_list
143            else:
144                df = _df_individual_filter(df, type_col, k, v, k_operator)
145
146    if idx_names:
147        df.set_index(idx_names, inplace=True)
148
149    return df
150
151
152def rename_and_drop_columns(df: Union[DataFrame, GeoDataFrame], map_old_new_col_names: dict[str, str],
153                            drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[
154    DataFrame, GeoDataFrame]:
155    """
156    Function to rename and remove columns from a dataframe. If the drop_col parameter is True,
157    the columns that are not in the map will be removed. If the strict parameter is True,
158    the names that do not exist in the map as a column will be skipped.
159    Args:
160        df: to remove and rename
161        map_old_new_col_names: the key is the actual name and the value is the new name
162        drop_col: True to remove columns that are not included in the map
163        strict: False to skip names that are not included in the map
164        reordered: True to reorder columns of dataframe
165
166    Returns: modified DataFrame
167
168    """
169    if df is not None and map_old_new_col_names:
170        col_names = df.columns.values.tolist()
171        col_names_to_drop = col_names.copy()
172        final_map = {}
173        for k, v in map_old_new_col_names.items():
174            if k in col_names:
175                final_map[k] = v
176                col_names_to_drop.remove(k)
177        if drop_col:
178            df = df.drop(col_names_to_drop, axis=1)
179        if strict:
180            final_map = map_old_new_col_names
181        df = df.rename(columns=final_map)
182        if reordered:
183            new_cols = list(map_old_new_col_names.values())
184            act_cols = df.columns.tolist()
185            reord_cols = [value for value in new_cols if value in act_cols]
186            df = df[reord_cols]
187        return df
188
189
190def set_null_and_default_values(df: DataFrame | GeoDataFrame) -> DataFrame | GeoDataFrame:
191    """
192    Function to replace NaN values with None in a DataFrame
193    Args:
194        df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
195
196    Returns:
197        DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None
198    """
199    df = df.replace({np.nan: None})
200    return df
201
202
203def replace_values_with_null(df: Union[DataFrame | GeoDataFrame], dict_col_values: dict) -> Union[
204    DataFrame | GeoDataFrame]:
205    """
206    Function to replace values with None in a DataFrame
207    Args:
208        df (DataFrame | GeoDataFrame): DataFrame to replace values with None
209        dict_col_values (dict): Dictionary with the column name and the value to replace with None
210
211    Returns:
212        DataFrame | GeoDataFrame: DataFrame with values replaced with None
213    """
214    if df is not None and not df.empty and dict_col_values:
215        for name_col, value in dict_col_values.items():
216            df[name_col] = df[name_col].replace(value, None)
217    return df
218
219
220def convert_to_datetime_col_df(df: DataFrame, cols: list[str],
221                               set_end_day: bool = False, set_nat: bool = False) -> DataFrame:
222    """
223    Force convert date columns to datetime format.
224    If init_date is True, the time is set to 00:00:00 if not to 23:59:59
225
226    Args:
227        df (DataFrame): DataFrame
228        cols (list[str]): Columns to convert
229        set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
230        set_nat (bool=False): If True set NaT to MAX_DATETIME
231
232    Returns:
233        DataFrame: DataFrame with datetime columns
234    """
235    if not set_end_day:
236        delta_time = time.min
237    else:
238        delta_time = time(23, 59, 59)
239
240    def _convert_date(value):
241        if type(value) is date:
242            value = datetime.combine(value, time.min)
243
244        if set_nat and (value is NaT or value is None):
245            return MAX_DATETIME
246        elif value is NaT:
247            return value
248        elif ((isinstance(value, Timestamp) or isinstance(value, datetime))
249              and set_end_day and value.time() == time.min):
250            return datetime.combine(value, delta_time)
251        else:
252            return value
253
254    for col in cols:
255        df[col] = df[col].apply(_convert_date)
256
257    return df
258
259
260def df_memory_usage(df: DataFrame | GeoDataFrame) -> float:
261    """
262    Return the memory usage of a DataFrame in MB
263
264    Args:
265        df (DataFrame | GeoDataFrame): DataFrame
266
267    Returns:
268        float: Memory usage in MB
269    """
270    return df.memory_usage(deep=True).sum() / 1024 ** 2
271
272
273def extract_operator(input_string):
274    """
275    Extract sql operator from input string
276
277    Args:
278        input_string:
279
280    Returns:
281
282    """
283    special_opers = ('-', '!') # Special operators for negation
284    sql_opers = ('=', '!=', '<>', '>', '<', '>=', '<=') # SQL operators
285
286    match = re.match(r'^(\W+)', input_string)
287    if match:
288        symbols = match.group(1)
289
290        if symbols not in special_opers and symbols not in sql_opers:
291            raise ValueError(f"Operator '{symbols}' not supported")
292
293        return symbols
294
295    return None
296
297
298def sql_from_filter_by_props(**filter_by_props: dict) -> str:
299    """
300    Get SQL from filter by properties
301
302    Args:
303        **filter_by_props: The filter by properties
304
305    Returns:
306        sql (str): The SQL from filter by properties
307    """
308    sql_parts = []
309    for k_fld_oper, value in filter_by_props.items():
310        if k_operator := extract_operator(k_fld_oper):
311            k_fld = k_fld_oper.replace(k_operator, '')
312        else:
313            k_operator = "="
314            k_fld = k_fld_oper
315
316        if isinstance(value, str):
317            value = f"'{value}'"
318        elif isinstance(value, Iterable):
319            value = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in value])
320            value = f"({value})"
321            if k_operator in ('=', '!=', '-'):
322                k_operator = "IN" if k_operator == "=" else "NOT IN"
323            else:
324                raise ValueError(f"Operator '{k_operator}' not supported for iterable values")
325
326        sql_parts.append(f"{k_fld} {k_operator} {value}")
327
328    sql_filter = ' AND '.join(sql_parts)
329
330    return sql_filter
331
332
333def _build_session(max_retries: int = 3) -> requests.Session:
334    """
335    Create a :class:`requests.Session` with a retry strategy and exponential backoff.
336
337    Args:
338        max_retries (int): Number of retries on transient HTTP errors
339            (429, 500, 502, 503, 504). Defaults to ``3``.
340
341    Returns:
342        requests.Session: Configured session with retry logic.
343    """
344    session = requests.Session()
345    retry_strategy = Retry(
346        total=max_retries,
347        backoff_factor=1,
348        status_forcelist=[429, 500, 502, 503, 504],
349        allowed_methods=["GET"],
350        raise_on_status=False,
351    )
352    adapter = HTTPAdapter(max_retries=retry_strategy)
353    session.mount("http://", adapter)
354    session.mount("https://", adapter)
355    return session
356
357
358def _iter_fetch_pages(
359    url_rest_api: str,
360    api_params: dict | None = None,
361    headers: dict | None = None,
362    next_key: str = 'next',
363    timeout: int | tuple[int, int] = (10, 30),
364    max_retries: int = 3,
365    session: requests.Session | None = None,
366) -> Generator[dict | list]:
367    """
368    Internal helper: fetch all pages from a paginated REST API.
369
370    Handles session lifecycle, retry logic and pagination automatically.
371    Each element in the returned list is the raw JSON body of one page.
372
373    Args:
374        url_rest_api (str): The base URL of the API endpoint.
375        api_params (dict, optional): Query parameters for the first request only.
376        headers (dict, optional): HTTP headers added to the session.
377        next_key (str): Key in JSON dict responses that carries the next-page URL.
378            Defaults to ``'next'``.
379        timeout (int | tuple[int, int]): Request timeout ``(connect, read)`` in seconds.
380            Defaults to ``(10, 30)``.
381        max_retries (int): Retries on transient errors. Defaults to ``3``.
382        session (requests.Session, optional): Existing session to reuse.
383            If None, a new session is created and closed after all pages are fetched.
384
385    Yields:
386        dict | list: Raw JSON responses, one element per page.
387
388    Raises:
389        requests.HTTPError: If any HTTP request returns an error status.
390        requests.ConnectionError: If the connection fails after all retries.
391    """
392    own_session = session is None
393    if own_session:
394        session = _build_session(max_retries)
395
396    if headers:
397        session.headers.update(headers)
398
399    url: str | None = url_rest_api
400    params = api_params or {}
401    page = 0
402
403    try:
404        while url:
405            page += 1
406            logger.debug(f"Fetching page {page}: {url}")
407            response = session.get(
408                url,
409                params=params if page == 1 else None,
410                timeout=timeout,
411            )
412            response.raise_for_status()
413            data = response.json()
414            yield data
415
416            # Advance to next page only when response is a dict with a next link
417            url = data.get(next_key) if isinstance(data, dict) else None
418    finally:
419        if own_session:
420            session.close()
421
422
423def df_from_url(
424    url_rest_api: str,
425    api_params: dict | None = None,
426    headers: dict | None = None,
427    results_key: str | None = 'results',
428    next_key: str = 'next',
429    timeout: int | tuple[int, int] = (10, 30),
430    max_retries: int = 3,
431    session: requests.Session | None = None,
432) -> DataFrame | None:
433    """
434    Fetch paginated JSON from a REST API and return a Pandas DataFrame.
435
436    Delegates HTTP handling and pagination to :func:`_fetch_pages`.
437
438    Args:
439        url_rest_api (str): The base URL of the API endpoint.
440        api_params (dict, optional): Query parameters for the initial request.
441        headers (dict, optional): HTTP headers for the request.
442        results_key (str | None, optional): Key in the JSON response that contains
443            the data list. If ``None``, the entire response body is wrapped as a
444            single record. If the key is absent, the first list value found in the
445            dict is used as fallback. Defaults to ``'results'``.
446        next_key (str): Key in the JSON response containing the next-page URL.
447            Defaults to ``'next'``.
448        timeout (int | tuple[int, int]): Request timeout ``(connect, read)`` in seconds.
449            Defaults to ``(10, 30)``.
450        max_retries (int): Retries on transient HTTP errors. Defaults to ``3``.
451        session (requests.Session, optional): Existing session to reuse.
452            If None, a new session is created and closed after use.
453
454    Returns:
455        DataFrame | None: A DataFrame with all collected data, or ``None`` if empty.
456
457    Raises:
458        requests.HTTPError: If any HTTP request returns an error status.
459        requests.ConnectionError: If the connection fails after all retries.
460        ValueError: If the JSON response has an unexpected structure.
461    """
462    all_data: list = []
463
464    for data in _iter_fetch_pages(url_rest_api, api_params, headers, next_key, timeout, max_retries, session):
465        if isinstance(data, list):
466            page_data = data
467        elif isinstance(data, dict):
468            if results_key and results_key in data:
469                page_data = data[results_key]
470            elif results_key is None:
471                page_data = [data]
472            else:
473                # Fallback: primer valor de tipo lista encontrado en el dict
474                page_data = next(
475                    (v for v in data.values() if isinstance(v, list)), [data]
476                )
477        else:
478            raise ValueError(
479                f"Unexpected JSON structure: expected list or dict, got {type(data).__name__}"
480            )
481
482        all_data.extend(page_data)
483        logger.debug(f"Got {len(page_data)} records (total so far: {len(all_data)})")
484
485    return DataFrame(all_data) if all_data else None
logger = <Logger apb_pandas_utils (DEBUG)>
EXCLUDED_TYPES_TO_CATEGORIZE = ['datetime', 'category', 'geometry']
DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY = 0.5
MAX_DATETIME = datetime.datetime(2250, 12, 31, 23, 59, 59)
def optimize_df( df: pandas.DataFrame | geopandas.geodataframe.GeoDataFrame, max_perc_unique_vals: float = 0.5, floats_as_categ: bool = False) -> pandas.DataFrame | geopandas.geodataframe.GeoDataFrame:
34def optimize_df(df: DataFrame | GeoDataFrame, max_perc_unique_vals: float = DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY,
35                floats_as_categ: bool = False) -> DataFrame | GeoDataFrame:
36    """
37    Retorna el pd.Dataframe optimizado segun columnas que encuentre
38
39    Args:
40        df (Dataframe | GeoDataFrame): Dataframe a optimizar
41        max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
42        floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
43
44    Returns:
45        opt_df (Dataframe | GeoDataFrame): Dataframe optimizado
46    """
47    opt_df = df.copy()
48    df_ints = opt_df.select_dtypes(include=['int64'])
49    opt_df[df_ints.columns] = df_ints.apply(pd.to_numeric, downcast='signed')
50    df_floats = opt_df.select_dtypes(include='float')
51    opt_df[df_floats.columns] = df_floats.apply(pd.to_numeric, downcast='float')
52
53    excl_types_cat = EXCLUDED_TYPES_TO_CATEGORIZE
54    if not floats_as_categ:
55        excl_types_cat.append('float')
56
57    for col in opt_df.select_dtypes(exclude=excl_types_cat).columns:
58        try:
59            unic_vals = opt_df[col].unique()
60        except (pd.errors.DataError, TypeError):
61            continue
62
63        num_unique_values = len(unic_vals)
64        num_total_values = len(opt_df[col]) - len(opt_df.loc[opt_df[col].isnull()])
65        if num_total_values > 0 and (num_unique_values / num_total_values) < max_perc_unique_vals:
66            try:
67                opt_df[col] = opt_df[col].astype(CategoricalDtype(ordered=True))
68            except (NotImplementedError, TypeError):
69                continue
70
71    return opt_df

Retorna el pd.Dataframe optimizado segun columnas que encuentre

Arguments:
  • df (Dataframe | GeoDataFrame): Dataframe a optimizar
  • max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
  • floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
Returns:

opt_df (Dataframe | GeoDataFrame): Dataframe optimizado

def df_filtered_by_prop( df: pandas.DataFrame | geopandas.geodataframe.GeoDataFrame, filter_prop: dict[str, object]) -> pandas.DataFrame | geopandas.geodataframe.GeoDataFrame | None:
 74def df_filtered_by_prop(df: DataFrame | GeoDataFrame, filter_prop: dict[str, object]) -> DataFrame | GeoDataFrame | None:
 75    """
 76    Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors
 77    separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna
 78    comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre.
 79    En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats.
 80    Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors
 81    i valors distints per aplicar filtres diferents
 82
 83    Args:
 84        df (DataFrame | GeoDataFrame): DataFrame a filtrar
 85        filter_prop (dict[str, object]): Propietats de filtrat
 86
 87    Returns:
 88        DataFrame | GeoDataFrame: DataFrame filtrat
 89    """
 90    if df is None or not filter_prop:
 91        return df
 92
 93    idx_names = [idx_col for idx_col in df.index.names if idx_col]
 94    if idx_names:
 95        df = df.reset_index()
 96
 97    def _df_individual_filter(_df_ind: DataFrame, type_col_ind, column: str, value, col_operator: str = '='):
 98        type_column = type_col_ind.categories.dtype if (type_col_name := type_col_ind.name) == 'category' else type_col_ind
 99
100        if type_col_name == 'object':
101            if col_operator == '=':
102                _df_ind = _df_ind[_df_ind[column].str.contains(str(value), case=False, na=False)]
103            elif col_operator == '-' or col_operator == '!':
104                _df_ind = _df_ind[~_df_ind[column].str.contains(str(value), case=False, na=False)]
105        else:
106            value = type_column.type(value)
107            if col_operator == '=':
108                _df_ind = _df_ind.loc[_df_ind[column] == value]
109            elif col_operator == '-' or col_operator == '!':
110                _df_ind = _df_ind.loc[_df_ind[column] != value]
111            elif col_operator == '>':
112                _df_ind = _df_ind.loc[_df_ind[column] > value]
113            elif col_operator == '<':
114                _df_ind = _df_ind.loc[_df_ind[column] < value]
115
116        return _df_ind
117
118    col_names = df.columns.values.tolist()
119    for k, v in filter_prop.items():
120        k_operator = "="
121        if k.startswith(('-', '=', '<', '>', '!')):
122            k_operator = k[0:1]
123            k = k[1:]
124        if k.upper() in (col_names + idx_names):
125            k = k.upper()
126        elif k.lower() in (col_names + idx_names):
127            k = k.lower()
128
129        if k in col_names and v is not None:
130            type_col = df.dtypes.get(k)
131            if isinstance(v, list):
132                # es fa amb un bucle i no amb isin perque no val per floats
133                df_list = None
134                for ind_val in v:
135                    df_temp = _df_individual_filter(df, type_col, k, ind_val, k_operator)
136                    if df_list is None:
137                        df_list = df_temp
138                    elif k_operator == '=':
139                        df_list = pd.concat([df_list, df_temp])
140                    if k_operator != '=':
141                        # per als operadors que exclouen s'ha de treballar sobre el df filtrat resultant
142                        df = df_list = df_temp
143                df = df_list
144            else:
145                df = _df_individual_filter(df, type_col, k, v, k_operator)
146
147    if idx_names:
148        df.set_index(idx_names, inplace=True)
149
150    return df

Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre. En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats. Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors i valors distints per aplicar filtres diferents

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame a filtrar
  • filter_prop (dict[str, object]): Propietats de filtrat
Returns:

DataFrame | GeoDataFrame: DataFrame filtrat

def rename_and_drop_columns( df: Union[pandas.DataFrame, geopandas.geodataframe.GeoDataFrame], map_old_new_col_names: dict[str, str], drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[pandas.DataFrame, geopandas.geodataframe.GeoDataFrame]:
153def rename_and_drop_columns(df: Union[DataFrame, GeoDataFrame], map_old_new_col_names: dict[str, str],
154                            drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[
155    DataFrame, GeoDataFrame]:
156    """
157    Function to rename and remove columns from a dataframe. If the drop_col parameter is True,
158    the columns that are not in the map will be removed. If the strict parameter is True,
159    the names that do not exist in the map as a column will be skipped.
160    Args:
161        df: to remove and rename
162        map_old_new_col_names: the key is the actual name and the value is the new name
163        drop_col: True to remove columns that are not included in the map
164        strict: False to skip names that are not included in the map
165        reordered: True to reorder columns of dataframe
166
167    Returns: modified DataFrame
168
169    """
170    if df is not None and map_old_new_col_names:
171        col_names = df.columns.values.tolist()
172        col_names_to_drop = col_names.copy()
173        final_map = {}
174        for k, v in map_old_new_col_names.items():
175            if k in col_names:
176                final_map[k] = v
177                col_names_to_drop.remove(k)
178        if drop_col:
179            df = df.drop(col_names_to_drop, axis=1)
180        if strict:
181            final_map = map_old_new_col_names
182        df = df.rename(columns=final_map)
183        if reordered:
184            new_cols = list(map_old_new_col_names.values())
185            act_cols = df.columns.tolist()
186            reord_cols = [value for value in new_cols if value in act_cols]
187            df = df[reord_cols]
188        return df

Function to rename and remove columns from a dataframe. If the drop_col parameter is True, the columns that are not in the map will be removed. If the strict parameter is True, the names that do not exist in the map as a column will be skipped.

Arguments:
  • df: to remove and rename
  • map_old_new_col_names: the key is the actual name and the value is the new name
  • drop_col: True to remove columns that are not included in the map
  • strict: False to skip names that are not included in the map
  • reordered: True to reorder columns of dataframe

Returns: modified DataFrame

def set_null_and_default_values( df: pandas.DataFrame | geopandas.geodataframe.GeoDataFrame) -> pandas.DataFrame | geopandas.geodataframe.GeoDataFrame:
191def set_null_and_default_values(df: DataFrame | GeoDataFrame) -> DataFrame | GeoDataFrame:
192    """
193    Function to replace NaN values with None in a DataFrame
194    Args:
195        df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
196
197    Returns:
198        DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None
199    """
200    df = df.replace({np.nan: None})
201    return df

Function to replace NaN values with None in a DataFrame

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
Returns:

DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None

def replace_values_with_null( df: Union[pandas.DataFrame, geopandas.geodataframe.GeoDataFrame], dict_col_values: dict) -> Union[pandas.DataFrame, geopandas.geodataframe.GeoDataFrame]:
204def replace_values_with_null(df: Union[DataFrame | GeoDataFrame], dict_col_values: dict) -> Union[
205    DataFrame | GeoDataFrame]:
206    """
207    Function to replace values with None in a DataFrame
208    Args:
209        df (DataFrame | GeoDataFrame): DataFrame to replace values with None
210        dict_col_values (dict): Dictionary with the column name and the value to replace with None
211
212    Returns:
213        DataFrame | GeoDataFrame: DataFrame with values replaced with None
214    """
215    if df is not None and not df.empty and dict_col_values:
216        for name_col, value in dict_col_values.items():
217            df[name_col] = df[name_col].replace(value, None)
218    return df

Function to replace values with None in a DataFrame

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame to replace values with None
  • dict_col_values (dict): Dictionary with the column name and the value to replace with None
Returns:

DataFrame | GeoDataFrame: DataFrame with values replaced with None

def convert_to_datetime_col_df( df: pandas.DataFrame, cols: list[str], set_end_day: bool = False, set_nat: bool = False) -> pandas.DataFrame:
221def convert_to_datetime_col_df(df: DataFrame, cols: list[str],
222                               set_end_day: bool = False, set_nat: bool = False) -> DataFrame:
223    """
224    Force convert date columns to datetime format.
225    If init_date is True, the time is set to 00:00:00 if not to 23:59:59
226
227    Args:
228        df (DataFrame): DataFrame
229        cols (list[str]): Columns to convert
230        set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
231        set_nat (bool=False): If True set NaT to MAX_DATETIME
232
233    Returns:
234        DataFrame: DataFrame with datetime columns
235    """
236    if not set_end_day:
237        delta_time = time.min
238    else:
239        delta_time = time(23, 59, 59)
240
241    def _convert_date(value):
242        if type(value) is date:
243            value = datetime.combine(value, time.min)
244
245        if set_nat and (value is NaT or value is None):
246            return MAX_DATETIME
247        elif value is NaT:
248            return value
249        elif ((isinstance(value, Timestamp) or isinstance(value, datetime))
250              and set_end_day and value.time() == time.min):
251            return datetime.combine(value, delta_time)
252        else:
253            return value
254
255    for col in cols:
256        df[col] = df[col].apply(_convert_date)
257
258    return df

Force convert date columns to datetime format. If init_date is True, the time is set to 00:00:00 if not to 23:59:59

Arguments:
  • df (DataFrame): DataFrame
  • cols (list[str]): Columns to convert
  • set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
  • set_nat (bool=False): If True set NaT to MAX_DATETIME
Returns:

DataFrame: DataFrame with datetime columns

def df_memory_usage(df: pandas.DataFrame | geopandas.geodataframe.GeoDataFrame) -> float:
261def df_memory_usage(df: DataFrame | GeoDataFrame) -> float:
262    """
263    Return the memory usage of a DataFrame in MB
264
265    Args:
266        df (DataFrame | GeoDataFrame): DataFrame
267
268    Returns:
269        float: Memory usage in MB
270    """
271    return df.memory_usage(deep=True).sum() / 1024 ** 2

Return the memory usage of a DataFrame in MB

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame
Returns:

float: Memory usage in MB

def extract_operator(input_string):
274def extract_operator(input_string):
275    """
276    Extract sql operator from input string
277
278    Args:
279        input_string:
280
281    Returns:
282
283    """
284    special_opers = ('-', '!') # Special operators for negation
285    sql_opers = ('=', '!=', '<>', '>', '<', '>=', '<=') # SQL operators
286
287    match = re.match(r'^(\W+)', input_string)
288    if match:
289        symbols = match.group(1)
290
291        if symbols not in special_opers and symbols not in sql_opers:
292            raise ValueError(f"Operator '{symbols}' not supported")
293
294        return symbols
295
296    return None

Extract sql operator from input string

Arguments:
  • input_string:

Returns:

def sql_from_filter_by_props(**filter_by_props: dict) -> str:
299def sql_from_filter_by_props(**filter_by_props: dict) -> str:
300    """
301    Get SQL from filter by properties
302
303    Args:
304        **filter_by_props: The filter by properties
305
306    Returns:
307        sql (str): The SQL from filter by properties
308    """
309    sql_parts = []
310    for k_fld_oper, value in filter_by_props.items():
311        if k_operator := extract_operator(k_fld_oper):
312            k_fld = k_fld_oper.replace(k_operator, '')
313        else:
314            k_operator = "="
315            k_fld = k_fld_oper
316
317        if isinstance(value, str):
318            value = f"'{value}'"
319        elif isinstance(value, Iterable):
320            value = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in value])
321            value = f"({value})"
322            if k_operator in ('=', '!=', '-'):
323                k_operator = "IN" if k_operator == "=" else "NOT IN"
324            else:
325                raise ValueError(f"Operator '{k_operator}' not supported for iterable values")
326
327        sql_parts.append(f"{k_fld} {k_operator} {value}")
328
329    sql_filter = ' AND '.join(sql_parts)
330
331    return sql_filter

Get SQL from filter by properties

Arguments:
  • **filter_by_props: The filter by properties
Returns:

sql (str): The SQL from filter by properties

def df_from_url( url_rest_api: str, api_params: dict | None = None, headers: dict | None = None, results_key: str | None = 'results', next_key: str = 'next', timeout: int | tuple[int, int] = (10, 30), max_retries: int = 3, session: requests.sessions.Session | None = None) -> pandas.DataFrame | None:
424def df_from_url(
425    url_rest_api: str,
426    api_params: dict | None = None,
427    headers: dict | None = None,
428    results_key: str | None = 'results',
429    next_key: str = 'next',
430    timeout: int | tuple[int, int] = (10, 30),
431    max_retries: int = 3,
432    session: requests.Session | None = None,
433) -> DataFrame | None:
434    """
435    Fetch paginated JSON from a REST API and return a Pandas DataFrame.
436
437    Delegates HTTP handling and pagination to :func:`_fetch_pages`.
438
439    Args:
440        url_rest_api (str): The base URL of the API endpoint.
441        api_params (dict, optional): Query parameters for the initial request.
442        headers (dict, optional): HTTP headers for the request.
443        results_key (str | None, optional): Key in the JSON response that contains
444            the data list. If ``None``, the entire response body is wrapped as a
445            single record. If the key is absent, the first list value found in the
446            dict is used as fallback. Defaults to ``'results'``.
447        next_key (str): Key in the JSON response containing the next-page URL.
448            Defaults to ``'next'``.
449        timeout (int | tuple[int, int]): Request timeout ``(connect, read)`` in seconds.
450            Defaults to ``(10, 30)``.
451        max_retries (int): Retries on transient HTTP errors. Defaults to ``3``.
452        session (requests.Session, optional): Existing session to reuse.
453            If None, a new session is created and closed after use.
454
455    Returns:
456        DataFrame | None: A DataFrame with all collected data, or ``None`` if empty.
457
458    Raises:
459        requests.HTTPError: If any HTTP request returns an error status.
460        requests.ConnectionError: If the connection fails after all retries.
461        ValueError: If the JSON response has an unexpected structure.
462    """
463    all_data: list = []
464
465    for data in _iter_fetch_pages(url_rest_api, api_params, headers, next_key, timeout, max_retries, session):
466        if isinstance(data, list):
467            page_data = data
468        elif isinstance(data, dict):
469            if results_key and results_key in data:
470                page_data = data[results_key]
471            elif results_key is None:
472                page_data = [data]
473            else:
474                # Fallback: primer valor de tipo lista encontrado en el dict
475                page_data = next(
476                    (v for v in data.values() if isinstance(v, list)), [data]
477                )
478        else:
479            raise ValueError(
480                f"Unexpected JSON structure: expected list or dict, got {type(data).__name__}"
481            )
482
483        all_data.extend(page_data)
484        logger.debug(f"Got {len(page_data)} records (total so far: {len(all_data)})")
485
486    return DataFrame(all_data) if all_data else None

Fetch paginated JSON from a REST API and return a Pandas DataFrame.

Delegates HTTP handling and pagination to _fetch_pages().

Arguments:
  • url_rest_api (str): The base URL of the API endpoint.
  • api_params (dict, optional): Query parameters for the initial request.
  • headers (dict, optional): HTTP headers for the request.
  • results_key (str | None, optional): Key in the JSON response that contains the data list. If None, the entire response body is wrapped as a single record. If the key is absent, the first list value found in the dict is used as fallback. Defaults to 'results'.
  • next_key (str): Key in the JSON response containing the next-page URL. Defaults to 'next'.
  • timeout (int | tuple[int, int]): Request timeout (connect, read) in seconds. Defaults to (10, 30).
  • max_retries (int): Retries on transient HTTP errors. Defaults to 3.
  • session (requests.Session, optional): Existing session to reuse. If None, a new session is created and closed after use.
Returns:

DataFrame | None: A DataFrame with all collected data, or None if empty.

Raises:
  • requests.HTTPError: If any HTTP request returns an error status.
  • requests.ConnectionError: If the connection fails after all retries.
  • ValueError: If the JSON response has an unexpected structure.