apb_pandas_utils
Package apb_pandas_utils
Modules to add functionality over pandas and geopandas
Requires GDAL library version 3.6<=3.10 and instant client Oracle installed.
To install:
pip install apb_pandas_utils
Documentation here apb_pandas_utils
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 5# Copyright (c) 6""" 7.. include:: ../README.md 8""" 9from __future__ import annotations 10 11import re 12from collections.abc import Iterable 13from datetime import datetime, time, date 14from typing import Union, Generator 15 16import numpy as np 17import pandas as pd 18import requests 19from geopandas import GeoDataFrame 20from pandas import DataFrame, Timestamp, NaT, CategoricalDtype 21from requests.adapters import HTTPAdapter 22from urllib3.util.retry import Retry 23 24from apb_extra_utils.utils_logging import get_base_logger 25 26logger = get_base_logger(__name__) 27 28EXCLUDED_TYPES_TO_CATEGORIZE = ['datetime', 'category', 'geometry'] 29DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY = 0.5 30MAX_DATETIME = datetime(2250, 12, 31, 23, 59, 59) 31 32 33def optimize_df(df: DataFrame | GeoDataFrame, max_perc_unique_vals: float = DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY, 34 floats_as_categ: bool = False) -> DataFrame | GeoDataFrame: 35 """ 36 Retorna el pd.Dataframe optimizado segun columnas que encuentre 37 38 Args: 39 df (Dataframe | GeoDataFrame): Dataframe a optimizar 40 max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%) 41 floats_as_categ (bool=False): Si True, els floats es converteixen a categoria 42 43 Returns: 44 opt_df (Dataframe | GeoDataFrame): Dataframe optimizado 45 """ 46 opt_df = df.copy() 47 df_ints = opt_df.select_dtypes(include=['int64']) 48 opt_df[df_ints.columns] = df_ints.apply(pd.to_numeric, downcast='signed') 49 df_floats = opt_df.select_dtypes(include='float') 50 opt_df[df_floats.columns] = df_floats.apply(pd.to_numeric, downcast='float') 51 52 excl_types_cat = EXCLUDED_TYPES_TO_CATEGORIZE 53 if not floats_as_categ: 54 excl_types_cat.append('float') 55 56 for col in opt_df.select_dtypes(exclude=excl_types_cat).columns: 57 try: 58 unic_vals = opt_df[col].unique() 59 except (pd.errors.DataError, TypeError): 60 continue 61 62 num_unique_values = len(unic_vals) 63 num_total_values = len(opt_df[col]) - len(opt_df.loc[opt_df[col].isnull()]) 64 if num_total_values > 0 and (num_unique_values / num_total_values) < max_perc_unique_vals: 65 try: 66 opt_df[col] = opt_df[col].astype(CategoricalDtype(ordered=True)) 67 except (NotImplementedError, TypeError): 68 continue 69 70 return opt_df 71 72 73def df_filtered_by_prop(df: DataFrame | GeoDataFrame, filter_prop: dict[str, object]) -> DataFrame | GeoDataFrame | None: 74 """ 75 Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors 76 separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna 77 comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre. 78 En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats. 79 Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors 80 i valors distints per aplicar filtres diferents 81 82 Args: 83 df (DataFrame | GeoDataFrame): DataFrame a filtrar 84 filter_prop (dict[str, object]): Propietats de filtrat 85 86 Returns: 87 DataFrame | GeoDataFrame: DataFrame filtrat 88 """ 89 if df is None or not filter_prop: 90 return df 91 92 idx_names = [idx_col for idx_col in df.index.names if idx_col] 93 if idx_names: 94 df = df.reset_index() 95 96 def _df_individual_filter(_df_ind: DataFrame, type_col_ind, column: str, value, col_operator: str = '='): 97 type_column = type_col_ind.categories.dtype if (type_col_name := type_col_ind.name) == 'category' else type_col_ind 98 99 if type_col_name == 'object': 100 if col_operator == '=': 101 _df_ind = _df_ind[_df_ind[column].str.contains(str(value), case=False, na=False)] 102 elif col_operator == '-' or col_operator == '!': 103 _df_ind = _df_ind[~_df_ind[column].str.contains(str(value), case=False, na=False)] 104 else: 105 value = type_column.type(value) 106 if col_operator == '=': 107 _df_ind = _df_ind.loc[_df_ind[column] == value] 108 elif col_operator == '-' or col_operator == '!': 109 _df_ind = _df_ind.loc[_df_ind[column] != value] 110 elif col_operator == '>': 111 _df_ind = _df_ind.loc[_df_ind[column] > value] 112 elif col_operator == '<': 113 _df_ind = _df_ind.loc[_df_ind[column] < value] 114 115 return _df_ind 116 117 col_names = df.columns.values.tolist() 118 for k, v in filter_prop.items(): 119 k_operator = "=" 120 if k.startswith(('-', '=', '<', '>', '!')): 121 k_operator = k[0:1] 122 k = k[1:] 123 if k.upper() in (col_names + idx_names): 124 k = k.upper() 125 elif k.lower() in (col_names + idx_names): 126 k = k.lower() 127 128 if k in col_names and v is not None: 129 type_col = df.dtypes.get(k) 130 if isinstance(v, list): 131 # es fa amb un bucle i no amb isin perque no val per floats 132 df_list = None 133 for ind_val in v: 134 df_temp = _df_individual_filter(df, type_col, k, ind_val, k_operator) 135 if df_list is None: 136 df_list = df_temp 137 elif k_operator == '=': 138 df_list = pd.concat([df_list, df_temp]) 139 if k_operator != '=': 140 # per als operadors que exclouen s'ha de treballar sobre el df filtrat resultant 141 df = df_list = df_temp 142 df = df_list 143 else: 144 df = _df_individual_filter(df, type_col, k, v, k_operator) 145 146 if idx_names: 147 df.set_index(idx_names, inplace=True) 148 149 return df 150 151 152def rename_and_drop_columns(df: Union[DataFrame, GeoDataFrame], map_old_new_col_names: dict[str, str], 153 drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[ 154 DataFrame, GeoDataFrame]: 155 """ 156 Function to rename and remove columns from a dataframe. If the drop_col parameter is True, 157 the columns that are not in the map will be removed. If the strict parameter is True, 158 the names that do not exist in the map as a column will be skipped. 159 Args: 160 df: to remove and rename 161 map_old_new_col_names: the key is the actual name and the value is the new name 162 drop_col: True to remove columns that are not included in the map 163 strict: False to skip names that are not included in the map 164 reordered: True to reorder columns of dataframe 165 166 Returns: modified DataFrame 167 168 """ 169 if df is not None and map_old_new_col_names: 170 col_names = df.columns.values.tolist() 171 col_names_to_drop = col_names.copy() 172 final_map = {} 173 for k, v in map_old_new_col_names.items(): 174 if k in col_names: 175 final_map[k] = v 176 col_names_to_drop.remove(k) 177 if drop_col: 178 df = df.drop(col_names_to_drop, axis=1) 179 if strict: 180 final_map = map_old_new_col_names 181 df = df.rename(columns=final_map) 182 if reordered: 183 new_cols = list(map_old_new_col_names.values()) 184 act_cols = df.columns.tolist() 185 reord_cols = [value for value in new_cols if value in act_cols] 186 df = df[reord_cols] 187 return df 188 189 190def set_null_and_default_values(df: DataFrame | GeoDataFrame) -> DataFrame | GeoDataFrame: 191 """ 192 Function to replace NaN values with None in a DataFrame 193 Args: 194 df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None 195 196 Returns: 197 DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None 198 """ 199 df = df.replace({np.nan: None}) 200 return df 201 202 203def replace_values_with_null(df: Union[DataFrame | GeoDataFrame], dict_col_values: dict) -> Union[ 204 DataFrame | GeoDataFrame]: 205 """ 206 Function to replace values with None in a DataFrame 207 Args: 208 df (DataFrame | GeoDataFrame): DataFrame to replace values with None 209 dict_col_values (dict): Dictionary with the column name and the value to replace with None 210 211 Returns: 212 DataFrame | GeoDataFrame: DataFrame with values replaced with None 213 """ 214 if df is not None and not df.empty and dict_col_values: 215 for name_col, value in dict_col_values.items(): 216 df[name_col] = df[name_col].replace(value, None) 217 return df 218 219 220def convert_to_datetime_col_df(df: DataFrame, cols: list[str], 221 set_end_day: bool = False, set_nat: bool = False) -> DataFrame: 222 """ 223 Force convert date columns to datetime format. 224 If init_date is True, the time is set to 00:00:00 if not to 23:59:59 225 226 Args: 227 df (DataFrame): DataFrame 228 cols (list[str]): Columns to convert 229 set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59 230 set_nat (bool=False): If True set NaT to MAX_DATETIME 231 232 Returns: 233 DataFrame: DataFrame with datetime columns 234 """ 235 if not set_end_day: 236 delta_time = time.min 237 else: 238 delta_time = time(23, 59, 59) 239 240 def _convert_date(value): 241 if type(value) is date: 242 value = datetime.combine(value, time.min) 243 244 if set_nat and (value is NaT or value is None): 245 return MAX_DATETIME 246 elif value is NaT: 247 return value 248 elif ((isinstance(value, Timestamp) or isinstance(value, datetime)) 249 and set_end_day and value.time() == time.min): 250 return datetime.combine(value, delta_time) 251 else: 252 return value 253 254 for col in cols: 255 df[col] = df[col].apply(_convert_date) 256 257 return df 258 259 260def df_memory_usage(df: DataFrame | GeoDataFrame) -> float: 261 """ 262 Return the memory usage of a DataFrame in MB 263 264 Args: 265 df (DataFrame | GeoDataFrame): DataFrame 266 267 Returns: 268 float: Memory usage in MB 269 """ 270 return df.memory_usage(deep=True).sum() / 1024 ** 2 271 272 273def extract_operator(input_string): 274 """ 275 Extract sql operator from input string 276 277 Args: 278 input_string: 279 280 Returns: 281 282 """ 283 special_opers = ('-', '!') # Special operators for negation 284 sql_opers = ('=', '!=', '<>', '>', '<', '>=', '<=') # SQL operators 285 286 match = re.match(r'^(\W+)', input_string) 287 if match: 288 symbols = match.group(1) 289 290 if symbols not in special_opers and symbols not in sql_opers: 291 raise ValueError(f"Operator '{symbols}' not supported") 292 293 return symbols 294 295 return None 296 297 298def sql_from_filter_by_props(**filter_by_props: dict) -> str: 299 """ 300 Get SQL from filter by properties 301 302 Args: 303 **filter_by_props: The filter by properties 304 305 Returns: 306 sql (str): The SQL from filter by properties 307 """ 308 sql_parts = [] 309 for k_fld_oper, value in filter_by_props.items(): 310 if k_operator := extract_operator(k_fld_oper): 311 k_fld = k_fld_oper.replace(k_operator, '') 312 else: 313 k_operator = "=" 314 k_fld = k_fld_oper 315 316 if isinstance(value, str): 317 value = f"'{value}'" 318 elif isinstance(value, Iterable): 319 value = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in value]) 320 value = f"({value})" 321 if k_operator in ('=', '!=', '-'): 322 k_operator = "IN" if k_operator == "=" else "NOT IN" 323 else: 324 raise ValueError(f"Operator '{k_operator}' not supported for iterable values") 325 326 sql_parts.append(f"{k_fld} {k_operator} {value}") 327 328 sql_filter = ' AND '.join(sql_parts) 329 330 return sql_filter 331 332 333def _build_session(max_retries: int = 3) -> requests.Session: 334 """ 335 Create a :class:`requests.Session` with a retry strategy and exponential backoff. 336 337 Args: 338 max_retries (int): Number of retries on transient HTTP errors 339 (429, 500, 502, 503, 504). Defaults to ``3``. 340 341 Returns: 342 requests.Session: Configured session with retry logic. 343 """ 344 session = requests.Session() 345 retry_strategy = Retry( 346 total=max_retries, 347 backoff_factor=1, 348 status_forcelist=[429, 500, 502, 503, 504], 349 allowed_methods=["GET"], 350 raise_on_status=False, 351 ) 352 adapter = HTTPAdapter(max_retries=retry_strategy) 353 session.mount("http://", adapter) 354 session.mount("https://", adapter) 355 return session 356 357 358def _iter_fetch_pages( 359 url_rest_api: str, 360 api_params: dict | None = None, 361 headers: dict | None = None, 362 next_key: str = 'next', 363 timeout: int | tuple[int, int] = (10, 30), 364 max_retries: int = 3, 365 session: requests.Session | None = None, 366) -> Generator[dict | list]: 367 """ 368 Internal helper: fetch all pages from a paginated REST API. 369 370 Handles session lifecycle, retry logic and pagination automatically. 371 Each element in the returned list is the raw JSON body of one page. 372 373 Args: 374 url_rest_api (str): The base URL of the API endpoint. 375 api_params (dict, optional): Query parameters for the first request only. 376 headers (dict, optional): HTTP headers added to the session. 377 next_key (str): Key in JSON dict responses that carries the next-page URL. 378 Defaults to ``'next'``. 379 timeout (int | tuple[int, int]): Request timeout ``(connect, read)`` in seconds. 380 Defaults to ``(10, 30)``. 381 max_retries (int): Retries on transient errors. Defaults to ``3``. 382 session (requests.Session, optional): Existing session to reuse. 383 If None, a new session is created and closed after all pages are fetched. 384 385 Yields: 386 dict | list: Raw JSON responses, one element per page. 387 388 Raises: 389 requests.HTTPError: If any HTTP request returns an error status. 390 requests.ConnectionError: If the connection fails after all retries. 391 """ 392 own_session = session is None 393 if own_session: 394 session = _build_session(max_retries) 395 396 if headers: 397 session.headers.update(headers) 398 399 url: str | None = url_rest_api 400 params = api_params or {} 401 page = 0 402 403 try: 404 while url: 405 page += 1 406 logger.debug(f"Fetching page {page}: {url}") 407 response = session.get( 408 url, 409 params=params if page == 1 else None, 410 timeout=timeout, 411 ) 412 response.raise_for_status() 413 data = response.json() 414 yield data 415 416 # Advance to next page only when response is a dict with a next link 417 url = data.get(next_key) if isinstance(data, dict) else None 418 finally: 419 if own_session: 420 session.close() 421 422 423def df_from_url( 424 url_rest_api: str, 425 api_params: dict | None = None, 426 headers: dict | None = None, 427 results_key: str | None = 'results', 428 next_key: str = 'next', 429 timeout: int | tuple[int, int] = (10, 30), 430 max_retries: int = 3, 431 session: requests.Session | None = None, 432) -> DataFrame | None: 433 """ 434 Fetch paginated JSON from a REST API and return a Pandas DataFrame. 435 436 Delegates HTTP handling and pagination to :func:`_fetch_pages`. 437 438 Args: 439 url_rest_api (str): The base URL of the API endpoint. 440 api_params (dict, optional): Query parameters for the initial request. 441 headers (dict, optional): HTTP headers for the request. 442 results_key (str | None, optional): Key in the JSON response that contains 443 the data list. If ``None``, the entire response body is wrapped as a 444 single record. If the key is absent, the first list value found in the 445 dict is used as fallback. Defaults to ``'results'``. 446 next_key (str): Key in the JSON response containing the next-page URL. 447 Defaults to ``'next'``. 448 timeout (int | tuple[int, int]): Request timeout ``(connect, read)`` in seconds. 449 Defaults to ``(10, 30)``. 450 max_retries (int): Retries on transient HTTP errors. Defaults to ``3``. 451 session (requests.Session, optional): Existing session to reuse. 452 If None, a new session is created and closed after use. 453 454 Returns: 455 DataFrame | None: A DataFrame with all collected data, or ``None`` if empty. 456 457 Raises: 458 requests.HTTPError: If any HTTP request returns an error status. 459 requests.ConnectionError: If the connection fails after all retries. 460 ValueError: If the JSON response has an unexpected structure. 461 """ 462 all_data: list = [] 463 464 for data in _iter_fetch_pages(url_rest_api, api_params, headers, next_key, timeout, max_retries, session): 465 if isinstance(data, list): 466 page_data = data 467 elif isinstance(data, dict): 468 if results_key and results_key in data: 469 page_data = data[results_key] 470 elif results_key is None: 471 page_data = [data] 472 else: 473 # Fallback: primer valor de tipo lista encontrado en el dict 474 page_data = next( 475 (v for v in data.values() if isinstance(v, list)), [data] 476 ) 477 else: 478 raise ValueError( 479 f"Unexpected JSON structure: expected list or dict, got {type(data).__name__}" 480 ) 481 482 all_data.extend(page_data) 483 logger.debug(f"Got {len(page_data)} records (total so far: {len(all_data)})") 484 485 return DataFrame(all_data) if all_data else None
34def optimize_df(df: DataFrame | GeoDataFrame, max_perc_unique_vals: float = DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY, 35 floats_as_categ: bool = False) -> DataFrame | GeoDataFrame: 36 """ 37 Retorna el pd.Dataframe optimizado segun columnas que encuentre 38 39 Args: 40 df (Dataframe | GeoDataFrame): Dataframe a optimizar 41 max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%) 42 floats_as_categ (bool=False): Si True, els floats es converteixen a categoria 43 44 Returns: 45 opt_df (Dataframe | GeoDataFrame): Dataframe optimizado 46 """ 47 opt_df = df.copy() 48 df_ints = opt_df.select_dtypes(include=['int64']) 49 opt_df[df_ints.columns] = df_ints.apply(pd.to_numeric, downcast='signed') 50 df_floats = opt_df.select_dtypes(include='float') 51 opt_df[df_floats.columns] = df_floats.apply(pd.to_numeric, downcast='float') 52 53 excl_types_cat = EXCLUDED_TYPES_TO_CATEGORIZE 54 if not floats_as_categ: 55 excl_types_cat.append('float') 56 57 for col in opt_df.select_dtypes(exclude=excl_types_cat).columns: 58 try: 59 unic_vals = opt_df[col].unique() 60 except (pd.errors.DataError, TypeError): 61 continue 62 63 num_unique_values = len(unic_vals) 64 num_total_values = len(opt_df[col]) - len(opt_df.loc[opt_df[col].isnull()]) 65 if num_total_values > 0 and (num_unique_values / num_total_values) < max_perc_unique_vals: 66 try: 67 opt_df[col] = opt_df[col].astype(CategoricalDtype(ordered=True)) 68 except (NotImplementedError, TypeError): 69 continue 70 71 return opt_df
Retorna el pd.Dataframe optimizado segun columnas que encuentre
Arguments:
- df (Dataframe | GeoDataFrame): Dataframe a optimizar
- max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
- floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
Returns:
opt_df (Dataframe | GeoDataFrame): Dataframe optimizado
74def df_filtered_by_prop(df: DataFrame | GeoDataFrame, filter_prop: dict[str, object]) -> DataFrame | GeoDataFrame | None: 75 """ 76 Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors 77 separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna 78 comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre. 79 En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats. 80 Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors 81 i valors distints per aplicar filtres diferents 82 83 Args: 84 df (DataFrame | GeoDataFrame): DataFrame a filtrar 85 filter_prop (dict[str, object]): Propietats de filtrat 86 87 Returns: 88 DataFrame | GeoDataFrame: DataFrame filtrat 89 """ 90 if df is None or not filter_prop: 91 return df 92 93 idx_names = [idx_col for idx_col in df.index.names if idx_col] 94 if idx_names: 95 df = df.reset_index() 96 97 def _df_individual_filter(_df_ind: DataFrame, type_col_ind, column: str, value, col_operator: str = '='): 98 type_column = type_col_ind.categories.dtype if (type_col_name := type_col_ind.name) == 'category' else type_col_ind 99 100 if type_col_name == 'object': 101 if col_operator == '=': 102 _df_ind = _df_ind[_df_ind[column].str.contains(str(value), case=False, na=False)] 103 elif col_operator == '-' or col_operator == '!': 104 _df_ind = _df_ind[~_df_ind[column].str.contains(str(value), case=False, na=False)] 105 else: 106 value = type_column.type(value) 107 if col_operator == '=': 108 _df_ind = _df_ind.loc[_df_ind[column] == value] 109 elif col_operator == '-' or col_operator == '!': 110 _df_ind = _df_ind.loc[_df_ind[column] != value] 111 elif col_operator == '>': 112 _df_ind = _df_ind.loc[_df_ind[column] > value] 113 elif col_operator == '<': 114 _df_ind = _df_ind.loc[_df_ind[column] < value] 115 116 return _df_ind 117 118 col_names = df.columns.values.tolist() 119 for k, v in filter_prop.items(): 120 k_operator = "=" 121 if k.startswith(('-', '=', '<', '>', '!')): 122 k_operator = k[0:1] 123 k = k[1:] 124 if k.upper() in (col_names + idx_names): 125 k = k.upper() 126 elif k.lower() in (col_names + idx_names): 127 k = k.lower() 128 129 if k in col_names and v is not None: 130 type_col = df.dtypes.get(k) 131 if isinstance(v, list): 132 # es fa amb un bucle i no amb isin perque no val per floats 133 df_list = None 134 for ind_val in v: 135 df_temp = _df_individual_filter(df, type_col, k, ind_val, k_operator) 136 if df_list is None: 137 df_list = df_temp 138 elif k_operator == '=': 139 df_list = pd.concat([df_list, df_temp]) 140 if k_operator != '=': 141 # per als operadors que exclouen s'ha de treballar sobre el df filtrat resultant 142 df = df_list = df_temp 143 df = df_list 144 else: 145 df = _df_individual_filter(df, type_col, k, v, k_operator) 146 147 if idx_names: 148 df.set_index(idx_names, inplace=True) 149 150 return df
Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre. En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats. Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors i valors distints per aplicar filtres diferents
Arguments:
- df (DataFrame | GeoDataFrame): DataFrame a filtrar
- filter_prop (dict[str, object]): Propietats de filtrat
Returns:
DataFrame | GeoDataFrame: DataFrame filtrat
153def rename_and_drop_columns(df: Union[DataFrame, GeoDataFrame], map_old_new_col_names: dict[str, str], 154 drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[ 155 DataFrame, GeoDataFrame]: 156 """ 157 Function to rename and remove columns from a dataframe. If the drop_col parameter is True, 158 the columns that are not in the map will be removed. If the strict parameter is True, 159 the names that do not exist in the map as a column will be skipped. 160 Args: 161 df: to remove and rename 162 map_old_new_col_names: the key is the actual name and the value is the new name 163 drop_col: True to remove columns that are not included in the map 164 strict: False to skip names that are not included in the map 165 reordered: True to reorder columns of dataframe 166 167 Returns: modified DataFrame 168 169 """ 170 if df is not None and map_old_new_col_names: 171 col_names = df.columns.values.tolist() 172 col_names_to_drop = col_names.copy() 173 final_map = {} 174 for k, v in map_old_new_col_names.items(): 175 if k in col_names: 176 final_map[k] = v 177 col_names_to_drop.remove(k) 178 if drop_col: 179 df = df.drop(col_names_to_drop, axis=1) 180 if strict: 181 final_map = map_old_new_col_names 182 df = df.rename(columns=final_map) 183 if reordered: 184 new_cols = list(map_old_new_col_names.values()) 185 act_cols = df.columns.tolist() 186 reord_cols = [value for value in new_cols if value in act_cols] 187 df = df[reord_cols] 188 return df
Function to rename and remove columns from a dataframe. If the drop_col parameter is True, the columns that are not in the map will be removed. If the strict parameter is True, the names that do not exist in the map as a column will be skipped.
Arguments:
- df: to remove and rename
- map_old_new_col_names: the key is the actual name and the value is the new name
- drop_col: True to remove columns that are not included in the map
- strict: False to skip names that are not included in the map
- reordered: True to reorder columns of dataframe
Returns: modified DataFrame
191def set_null_and_default_values(df: DataFrame | GeoDataFrame) -> DataFrame | GeoDataFrame: 192 """ 193 Function to replace NaN values with None in a DataFrame 194 Args: 195 df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None 196 197 Returns: 198 DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None 199 """ 200 df = df.replace({np.nan: None}) 201 return df
Function to replace NaN values with None in a DataFrame
Arguments:
- df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
Returns:
DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None
204def replace_values_with_null(df: Union[DataFrame | GeoDataFrame], dict_col_values: dict) -> Union[ 205 DataFrame | GeoDataFrame]: 206 """ 207 Function to replace values with None in a DataFrame 208 Args: 209 df (DataFrame | GeoDataFrame): DataFrame to replace values with None 210 dict_col_values (dict): Dictionary with the column name and the value to replace with None 211 212 Returns: 213 DataFrame | GeoDataFrame: DataFrame with values replaced with None 214 """ 215 if df is not None and not df.empty and dict_col_values: 216 for name_col, value in dict_col_values.items(): 217 df[name_col] = df[name_col].replace(value, None) 218 return df
Function to replace values with None in a DataFrame
Arguments:
- df (DataFrame | GeoDataFrame): DataFrame to replace values with None
- dict_col_values (dict): Dictionary with the column name and the value to replace with None
Returns:
DataFrame | GeoDataFrame: DataFrame with values replaced with None
221def convert_to_datetime_col_df(df: DataFrame, cols: list[str], 222 set_end_day: bool = False, set_nat: bool = False) -> DataFrame: 223 """ 224 Force convert date columns to datetime format. 225 If init_date is True, the time is set to 00:00:00 if not to 23:59:59 226 227 Args: 228 df (DataFrame): DataFrame 229 cols (list[str]): Columns to convert 230 set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59 231 set_nat (bool=False): If True set NaT to MAX_DATETIME 232 233 Returns: 234 DataFrame: DataFrame with datetime columns 235 """ 236 if not set_end_day: 237 delta_time = time.min 238 else: 239 delta_time = time(23, 59, 59) 240 241 def _convert_date(value): 242 if type(value) is date: 243 value = datetime.combine(value, time.min) 244 245 if set_nat and (value is NaT or value is None): 246 return MAX_DATETIME 247 elif value is NaT: 248 return value 249 elif ((isinstance(value, Timestamp) or isinstance(value, datetime)) 250 and set_end_day and value.time() == time.min): 251 return datetime.combine(value, delta_time) 252 else: 253 return value 254 255 for col in cols: 256 df[col] = df[col].apply(_convert_date) 257 258 return df
Force convert date columns to datetime format. If init_date is True, the time is set to 00:00:00 if not to 23:59:59
Arguments:
- df (DataFrame): DataFrame
- cols (list[str]): Columns to convert
- set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
- set_nat (bool=False): If True set NaT to MAX_DATETIME
Returns:
DataFrame: DataFrame with datetime columns
261def df_memory_usage(df: DataFrame | GeoDataFrame) -> float: 262 """ 263 Return the memory usage of a DataFrame in MB 264 265 Args: 266 df (DataFrame | GeoDataFrame): DataFrame 267 268 Returns: 269 float: Memory usage in MB 270 """ 271 return df.memory_usage(deep=True).sum() / 1024 ** 2
Return the memory usage of a DataFrame in MB
Arguments:
- df (DataFrame | GeoDataFrame): DataFrame
Returns:
float: Memory usage in MB
274def extract_operator(input_string): 275 """ 276 Extract sql operator from input string 277 278 Args: 279 input_string: 280 281 Returns: 282 283 """ 284 special_opers = ('-', '!') # Special operators for negation 285 sql_opers = ('=', '!=', '<>', '>', '<', '>=', '<=') # SQL operators 286 287 match = re.match(r'^(\W+)', input_string) 288 if match: 289 symbols = match.group(1) 290 291 if symbols not in special_opers and symbols not in sql_opers: 292 raise ValueError(f"Operator '{symbols}' not supported") 293 294 return symbols 295 296 return None
Extract sql operator from input string
Arguments:
- input_string:
Returns:
299def sql_from_filter_by_props(**filter_by_props: dict) -> str: 300 """ 301 Get SQL from filter by properties 302 303 Args: 304 **filter_by_props: The filter by properties 305 306 Returns: 307 sql (str): The SQL from filter by properties 308 """ 309 sql_parts = [] 310 for k_fld_oper, value in filter_by_props.items(): 311 if k_operator := extract_operator(k_fld_oper): 312 k_fld = k_fld_oper.replace(k_operator, '') 313 else: 314 k_operator = "=" 315 k_fld = k_fld_oper 316 317 if isinstance(value, str): 318 value = f"'{value}'" 319 elif isinstance(value, Iterable): 320 value = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in value]) 321 value = f"({value})" 322 if k_operator in ('=', '!=', '-'): 323 k_operator = "IN" if k_operator == "=" else "NOT IN" 324 else: 325 raise ValueError(f"Operator '{k_operator}' not supported for iterable values") 326 327 sql_parts.append(f"{k_fld} {k_operator} {value}") 328 329 sql_filter = ' AND '.join(sql_parts) 330 331 return sql_filter
Get SQL from filter by properties
Arguments:
- **filter_by_props: The filter by properties
Returns:
sql (str): The SQL from filter by properties
424def df_from_url( 425 url_rest_api: str, 426 api_params: dict | None = None, 427 headers: dict | None = None, 428 results_key: str | None = 'results', 429 next_key: str = 'next', 430 timeout: int | tuple[int, int] = (10, 30), 431 max_retries: int = 3, 432 session: requests.Session | None = None, 433) -> DataFrame | None: 434 """ 435 Fetch paginated JSON from a REST API and return a Pandas DataFrame. 436 437 Delegates HTTP handling and pagination to :func:`_fetch_pages`. 438 439 Args: 440 url_rest_api (str): The base URL of the API endpoint. 441 api_params (dict, optional): Query parameters for the initial request. 442 headers (dict, optional): HTTP headers for the request. 443 results_key (str | None, optional): Key in the JSON response that contains 444 the data list. If ``None``, the entire response body is wrapped as a 445 single record. If the key is absent, the first list value found in the 446 dict is used as fallback. Defaults to ``'results'``. 447 next_key (str): Key in the JSON response containing the next-page URL. 448 Defaults to ``'next'``. 449 timeout (int | tuple[int, int]): Request timeout ``(connect, read)`` in seconds. 450 Defaults to ``(10, 30)``. 451 max_retries (int): Retries on transient HTTP errors. Defaults to ``3``. 452 session (requests.Session, optional): Existing session to reuse. 453 If None, a new session is created and closed after use. 454 455 Returns: 456 DataFrame | None: A DataFrame with all collected data, or ``None`` if empty. 457 458 Raises: 459 requests.HTTPError: If any HTTP request returns an error status. 460 requests.ConnectionError: If the connection fails after all retries. 461 ValueError: If the JSON response has an unexpected structure. 462 """ 463 all_data: list = [] 464 465 for data in _iter_fetch_pages(url_rest_api, api_params, headers, next_key, timeout, max_retries, session): 466 if isinstance(data, list): 467 page_data = data 468 elif isinstance(data, dict): 469 if results_key and results_key in data: 470 page_data = data[results_key] 471 elif results_key is None: 472 page_data = [data] 473 else: 474 # Fallback: primer valor de tipo lista encontrado en el dict 475 page_data = next( 476 (v for v in data.values() if isinstance(v, list)), [data] 477 ) 478 else: 479 raise ValueError( 480 f"Unexpected JSON structure: expected list or dict, got {type(data).__name__}" 481 ) 482 483 all_data.extend(page_data) 484 logger.debug(f"Got {len(page_data)} records (total so far: {len(all_data)})") 485 486 return DataFrame(all_data) if all_data else None
Fetch paginated JSON from a REST API and return a Pandas DataFrame.
Delegates HTTP handling and pagination to _fetch_pages().
Arguments:
- url_rest_api (str): The base URL of the API endpoint.
- api_params (dict, optional): Query parameters for the initial request.
- headers (dict, optional): HTTP headers for the request.
- results_key (str | None, optional): Key in the JSON response that contains
the data list. If
None, the entire response body is wrapped as a single record. If the key is absent, the first list value found in the dict is used as fallback. Defaults to'results'. - next_key (str): Key in the JSON response containing the next-page URL.
Defaults to
'next'. - timeout (int | tuple[int, int]): Request timeout
(connect, read)in seconds. Defaults to(10, 30). - max_retries (int): Retries on transient HTTP errors. Defaults to
3. - session (requests.Session, optional): Existing session to reuse. If None, a new session is created and closed after use.
Returns:
DataFrame | None: A DataFrame with all collected data, or
Noneif empty.
Raises:
- requests.HTTPError: If any HTTP request returns an error status.
- requests.ConnectionError: If the connection fails after all retries.
- ValueError: If the JSON response has an unexpected structure.