apb_pandas_utils

Package apb_pandas_utils

Modules to add functionality over pandas and geopandas

Requires GDAL library version 3.6<=3.9 and instant client Oracle installed.

To install:

pip install apb_pandas_utils

Documentation here apb_pandas_utils

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 
  5#  Copyright (c)
  6"""
  7.. include:: ../README.md
  8"""
  9from __future__ import annotations
 10
 11import re
 12from collections.abc import Iterable
 13from datetime import datetime, time, date
 14from typing import Union
 15
 16import numpy as np
 17import pandas as pd
 18from geopandas import GeoDataFrame
 19from pandas import DataFrame, Timestamp, NaT, CategoricalDtype
 20
 21EXCLUDED_TYPES_TO_CATEGORIZE = ['datetime', 'category', 'geometry']
 22DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY = 0.5
 23MAX_DATETIME = datetime(2250, 12, 31, 23, 59, 59)
 24
 25
 26def optimize_df(df: DataFrame | GeoDataFrame, max_perc_unique_vals: float = DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY,
 27                floats_as_categ: bool = False) -> DataFrame | GeoDataFrame:
 28    """
 29    Retorna el pd.Dataframe optimizado segun columnas que encuentre
 30
 31    Args:
 32        df (Dataframe | GeoDataFrame): Dataframe a optimizar
 33        max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
 34        floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
 35
 36    Returns:
 37        opt_df (Dataframe | GeoDataFrame): Dataframe optimizado
 38    """
 39    opt_df = df.copy()
 40    df_ints = opt_df.select_dtypes(include=['int64'])
 41    opt_df[df_ints.columns] = df_ints.apply(pd.to_numeric, downcast='signed')
 42    df_floats = opt_df.select_dtypes(include='float')
 43    opt_df[df_floats.columns] = df_floats.apply(pd.to_numeric, downcast='float')
 44
 45    excl_types_cat = EXCLUDED_TYPES_TO_CATEGORIZE
 46    if not floats_as_categ:
 47        excl_types_cat.append('float')
 48
 49    for col in opt_df.select_dtypes(exclude=excl_types_cat).columns:
 50        try:
 51            unic_vals = opt_df[col].unique()
 52        except (pd.errors.DataError, TypeError):
 53            continue
 54
 55        num_unique_values = len(unic_vals)
 56        num_total_values = len(opt_df[col]) - len(opt_df.loc[opt_df[col].isnull()])
 57        if num_total_values > 0 and (num_unique_values / num_total_values) < max_perc_unique_vals:
 58            try:
 59                opt_df[col] = opt_df[col].astype(CategoricalDtype(ordered=True))
 60            except (NotImplementedError, TypeError):
 61                continue
 62
 63    return opt_df
 64
 65
 66def df_filtered_by_prop(df: DataFrame | GeoDataFrame, filter_prop: dict[str, object]) -> DataFrame | GeoDataFrame | None:
 67    """
 68    Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors
 69    separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna
 70    comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre.
 71    En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats.
 72    Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors
 73    i valors distints per aplicar filtres diferents
 74
 75    Args:
 76        df (DataFrame | GeoDataFrame): DataFrame a filtrar
 77        filter_prop (dict[str, object]): Propietats de filtrat
 78
 79    Returns:
 80        DataFrame | GeoDataFrame: DataFrame filtrat
 81    """
 82    if df is None or not filter_prop:
 83        return df
 84
 85    idx_names = [idx_col for idx_col in df.index.names if idx_col]
 86    if idx_names:
 87        df = df.reset_index()
 88
 89    def _df_individual_filter(_df_ind: DataFrame, type_col_ind, column: str, value, col_operator: str = '='):
 90        type_column = type_col_ind.categories.dtype if (type_col_name := type_col_ind.name) == 'category' else type_col_ind
 91
 92        if type_col_name == 'object':
 93            if col_operator == '=':
 94                _df_ind = _df_ind[_df_ind[column].str.contains(str(value), case=False, na=False)]
 95            elif col_operator == '-' or col_operator == '!':
 96                _df_ind = _df_ind[~_df_ind[column].str.contains(str(value), case=False, na=False)]
 97        else:
 98            value = type_column.type(value)
 99            if col_operator == '=':
100                _df_ind = _df_ind.loc[_df_ind[column] == value]
101            elif col_operator == '-' or col_operator == '!':
102                _df_ind = _df_ind.loc[_df_ind[column] != value]
103            elif col_operator == '>':
104                _df_ind = _df_ind.loc[_df_ind[column] > value]
105            elif col_operator == '<':
106                _df_ind = _df_ind.loc[_df_ind[column] < value]
107
108        return _df_ind
109
110    col_names = df.columns.values.tolist()
111    for k, v in filter_prop.items():
112        k_operator = "="
113        if k.startswith(('-', '=', '<', '>', '!')):
114            k_operator = k[0:1]
115            k = k[1:]
116        if k.upper() in (col_names + idx_names):
117            k = k.upper()
118        elif k.lower() in (col_names + idx_names):
119            k = k.lower()
120
121        if k in col_names and v is not None:
122            type_col = df.dtypes.get(k)
123            if isinstance(v, list):
124                # es fa amb un bucle i no amb isin perque no val per floats
125                df_list = None
126                for ind_val in v:
127                    df_temp = _df_individual_filter(df, type_col, k, ind_val, k_operator)
128                    if df_list is None:
129                        df_list = df_temp
130                    elif k_operator == '=':
131                        df_list = pd.concat([df_list, df_temp])
132                    if k_operator != '=':
133                        # per als operadors que exclouen s'ha de treballar sobre el df filtrat resultant
134                        df = df_list = df_temp
135                df = df_list
136            else:
137                df = _df_individual_filter(df, type_col, k, v, k_operator)
138
139    if idx_names:
140        df.set_index(idx_names, inplace=True)
141
142    return df
143
144
145def rename_and_drop_columns(df: Union[DataFrame, GeoDataFrame], map_old_new_col_names: dict[str, str],
146                            drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[
147    DataFrame, GeoDataFrame]:
148    """
149    Function to rename and remove columns from a dataframe. If the drop_col parameter is True,
150    the columns that are not in the map will be removed. If the strict parameter is True,
151    the names that do not exist in the map as a column will be skipped.
152    Args:
153        df: to remove and rename
154        map_old_new_col_names: the key is the actual name and the value is the new name
155        drop_col: True to remove columns that are not included in the map
156        strict: False to skip names that are not included in the map
157        reordered: True to reorder columns of dataframe
158
159    Returns: modified DataFrame
160
161    """
162    if df is not None and map_old_new_col_names:
163        col_names = df.columns.values.tolist()
164        col_names_to_drop = col_names.copy()
165        final_map = {}
166        for k, v in map_old_new_col_names.items():
167            if k in col_names:
168                final_map[k] = v
169                col_names_to_drop.remove(k)
170        if drop_col:
171            df = df.drop(col_names_to_drop, axis=1)
172        if strict:
173            final_map = map_old_new_col_names
174        df = df.rename(columns=final_map)
175        if reordered:
176            new_cols = list(map_old_new_col_names.values())
177            act_cols = df.columns.tolist()
178            reord_cols = [value for value in new_cols if value in act_cols]
179            df = df[reord_cols]
180        return df
181
182
183def set_null_and_default_values(df: DataFrame | GeoDataFrame) -> DataFrame | GeoDataFrame:
184    """
185    Function to replace NaN values with None in a DataFrame
186    Args:
187        df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
188
189    Returns:
190        DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None
191    """
192    df = df.replace({np.nan: None})
193    return df
194
195
196def replace_values_with_null(df: Union[DataFrame | GeoDataFrame], dict_col_values: dict) -> Union[
197    DataFrame | GeoDataFrame]:
198    """
199    Function to replace values with None in a DataFrame
200    Args:
201        df (DataFrame | GeoDataFrame): DataFrame to replace values with None
202        dict_col_values (dict): Dictionary with the column name and the value to replace with None
203
204    Returns:
205        DataFrame | GeoDataFrame: DataFrame with values replaced with None
206    """
207    if df is not None and not df.empty and dict_col_values:
208        for name_col, value in dict_col_values.items():
209            df[name_col] = df[name_col].replace(value, None)
210    return df
211
212
213def convert_to_datetime_col_df(df: DataFrame, cols: list[str],
214                               set_end_day: bool = False, set_nat: bool = False) -> DataFrame:
215    """
216    Force convert date columns to datetime format.
217    If init_date is True, the time is set to 00:00:00 if not to 23:59:59
218
219    Args:
220        df (DataFrame): DataFrame
221        cols (list[str]): Columns to convert
222        set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
223        set_nat (bool=False): If True set NaT to MAX_DATETIME
224
225    Returns:
226        DataFrame: DataFrame with datetime columns
227    """
228    if not set_end_day:
229        delta_time = time.min
230    else:
231        delta_time = time(23, 59, 59)
232
233    def _convert_date(value):
234        if type(value) is date:
235            value = datetime.combine(value, time.min)
236
237        if set_nat and (value is NaT or value is None):
238            return MAX_DATETIME
239        elif value is NaT:
240            return value
241        elif ((isinstance(value, Timestamp) or isinstance(value, datetime))
242              and set_end_day and value.time() == time.min):
243            return datetime.combine(value, delta_time)
244        else:
245            return value
246
247    for col in cols:
248        df[col] = df[col].apply(_convert_date)
249
250    return df
251
252
253def df_memory_usage(df: DataFrame | GeoDataFrame) -> float:
254    """
255    Return the memory usage of a DataFrame in MB
256
257    Args:
258        df (DataFrame | GeoDataFrame): DataFrame
259
260    Returns:
261        float: Memory usage in MB
262    """
263    return df.memory_usage(deep=True).sum() / 1024 ** 2
264
265
266def extract_operator(input_string):
267    """
268    Extract sql operator from input string
269
270    Args:
271        input_string:
272
273    Returns:
274
275    """
276    special_opers = ('-', '!') # Special operators for negation
277    sql_opers = ('=', '!=', '<>', '>', '<', '>=', '<=') # SQL operators
278
279    match = re.match(r'^(\W+)', input_string)
280    if match:
281        symbols = match.group(1)
282
283        if symbols not in special_opers and symbols not in sql_opers:
284            raise ValueError(f"Operator '{symbols}' not supported")
285
286        return symbols
287
288    return None
289
290
291def sql_from_filter_by_props(**filter_by_props: dict) -> str:
292    """
293    Get SQL from filter by properties
294
295    Args:
296        **filter_by_props: The filter by properties
297
298    Returns:
299        sql (str): The SQL from filter by properties
300    """
301    sql_parts = []
302    for k_fld_oper, value in filter_by_props.items():
303        if k_operator := extract_operator(k_fld_oper):
304            k_fld = k_fld_oper.replace(k_operator, '')
305        else:
306            k_operator = "="
307            k_fld = k_fld_oper
308
309        if isinstance(value, str):
310            value = f"'{value}'"
311        elif isinstance(value, Iterable):
312            value = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in value])
313            value = f"({value})"
314            if k_operator in ('=', '!=', '-'):
315                k_operator = "IN" if k_operator == "=" else "NOT IN"
316            else:
317                raise ValueError(f"Operator '{k_operator}' not supported for iterable values")
318
319        sql_parts.append(f"{k_fld} {k_operator} {value}")
320
321    sql_filter = ' AND '.join(sql_parts)
322
323    return sql_filter
EXCLUDED_TYPES_TO_CATEGORIZE = ['datetime', 'category', 'geometry']
DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY = 0.5
MAX_DATETIME = datetime.datetime(2250, 12, 31, 23, 59, 59)
def optimize_df( df: pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame, max_perc_unique_vals: float = 0.5, floats_as_categ: bool = False) -> pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame:
27def optimize_df(df: DataFrame | GeoDataFrame, max_perc_unique_vals: float = DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY,
28                floats_as_categ: bool = False) -> DataFrame | GeoDataFrame:
29    """
30    Retorna el pd.Dataframe optimizado segun columnas que encuentre
31
32    Args:
33        df (Dataframe | GeoDataFrame): Dataframe a optimizar
34        max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
35        floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
36
37    Returns:
38        opt_df (Dataframe | GeoDataFrame): Dataframe optimizado
39    """
40    opt_df = df.copy()
41    df_ints = opt_df.select_dtypes(include=['int64'])
42    opt_df[df_ints.columns] = df_ints.apply(pd.to_numeric, downcast='signed')
43    df_floats = opt_df.select_dtypes(include='float')
44    opt_df[df_floats.columns] = df_floats.apply(pd.to_numeric, downcast='float')
45
46    excl_types_cat = EXCLUDED_TYPES_TO_CATEGORIZE
47    if not floats_as_categ:
48        excl_types_cat.append('float')
49
50    for col in opt_df.select_dtypes(exclude=excl_types_cat).columns:
51        try:
52            unic_vals = opt_df[col].unique()
53        except (pd.errors.DataError, TypeError):
54            continue
55
56        num_unique_values = len(unic_vals)
57        num_total_values = len(opt_df[col]) - len(opt_df.loc[opt_df[col].isnull()])
58        if num_total_values > 0 and (num_unique_values / num_total_values) < max_perc_unique_vals:
59            try:
60                opt_df[col] = opt_df[col].astype(CategoricalDtype(ordered=True))
61            except (NotImplementedError, TypeError):
62                continue
63
64    return opt_df

Retorna el pd.Dataframe optimizado segun columnas que encuentre

Arguments:
  • df (Dataframe | GeoDataFrame): Dataframe a optimizar
  • max_perc_unique_vals (float=DEFAULT_MAX_UNIQUE_VALS_COL_CATEGORY): Màxim percentatge de valors únics respecte total files per a convertir en categoria, expressat entre 0 i 1 (Default 0.5 -> 50%)
  • floats_as_categ (bool=False): Si True, els floats es converteixen a categoria
Returns:

opt_df (Dataframe | GeoDataFrame): Dataframe optimizado

def df_filtered_by_prop( df: pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame, filter_prop: dict[str, object]) -> pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame | None:
 67def df_filtered_by_prop(df: DataFrame | GeoDataFrame, filter_prop: dict[str, object]) -> DataFrame | GeoDataFrame | None:
 68    """
 69    Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors
 70    separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna
 71    comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre.
 72    En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats.
 73    Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors
 74    i valors distints per aplicar filtres diferents
 75
 76    Args:
 77        df (DataFrame | GeoDataFrame): DataFrame a filtrar
 78        filter_prop (dict[str, object]): Propietats de filtrat
 79
 80    Returns:
 81        DataFrame | GeoDataFrame: DataFrame filtrat
 82    """
 83    if df is None or not filter_prop:
 84        return df
 85
 86    idx_names = [idx_col for idx_col in df.index.names if idx_col]
 87    if idx_names:
 88        df = df.reset_index()
 89
 90    def _df_individual_filter(_df_ind: DataFrame, type_col_ind, column: str, value, col_operator: str = '='):
 91        type_column = type_col_ind.categories.dtype if (type_col_name := type_col_ind.name) == 'category' else type_col_ind
 92
 93        if type_col_name == 'object':
 94            if col_operator == '=':
 95                _df_ind = _df_ind[_df_ind[column].str.contains(str(value), case=False, na=False)]
 96            elif col_operator == '-' or col_operator == '!':
 97                _df_ind = _df_ind[~_df_ind[column].str.contains(str(value), case=False, na=False)]
 98        else:
 99            value = type_column.type(value)
100            if col_operator == '=':
101                _df_ind = _df_ind.loc[_df_ind[column] == value]
102            elif col_operator == '-' or col_operator == '!':
103                _df_ind = _df_ind.loc[_df_ind[column] != value]
104            elif col_operator == '>':
105                _df_ind = _df_ind.loc[_df_ind[column] > value]
106            elif col_operator == '<':
107                _df_ind = _df_ind.loc[_df_ind[column] < value]
108
109        return _df_ind
110
111    col_names = df.columns.values.tolist()
112    for k, v in filter_prop.items():
113        k_operator = "="
114        if k.startswith(('-', '=', '<', '>', '!')):
115            k_operator = k[0:1]
116            k = k[1:]
117        if k.upper() in (col_names + idx_names):
118            k = k.upper()
119        elif k.lower() in (col_names + idx_names):
120            k = k.lower()
121
122        if k in col_names and v is not None:
123            type_col = df.dtypes.get(k)
124            if isinstance(v, list):
125                # es fa amb un bucle i no amb isin perque no val per floats
126                df_list = None
127                for ind_val in v:
128                    df_temp = _df_individual_filter(df, type_col, k, ind_val, k_operator)
129                    if df_list is None:
130                        df_list = df_temp
131                    elif k_operator == '=':
132                        df_list = pd.concat([df_list, df_temp])
133                    if k_operator != '=':
134                        # per als operadors que exclouen s'ha de treballar sobre el df filtrat resultant
135                        df = df_list = df_temp
136                df = df_list
137            else:
138                df = _df_individual_filter(df, type_col, k, v, k_operator)
139
140    if idx_names:
141        df.set_index(idx_names, inplace=True)
142
143    return df

Filtra el dataframe amb el diccionari passat, on la clau fa referència a la columna i el valor o llistat de valors separats per comes son els que s’apliquen al filtre. Si la clau/columna no existeix es desestima. Si la clau/columna comença per alguns d’aquest signes “=, !, -, >, <” s’aplica la corresponent operació de filtre. En el cas de “!” i “–“ s’aplica la mateixa operació de negat o que no contingui el valor o valors passats. Els filtres “<“ i “>” no apliquen a camps text i es desestimen. Es poden passar la mateixa columna amb operadors i valors distints per aplicar filtres diferents

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame a filtrar
  • filter_prop (dict[str, object]): Propietats de filtrat
Returns:

DataFrame | GeoDataFrame: DataFrame filtrat

def rename_and_drop_columns( df: Union[pandas.core.frame.DataFrame, geopandas.geodataframe.GeoDataFrame], map_old_new_col_names: dict[str, str], drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[pandas.core.frame.DataFrame, geopandas.geodataframe.GeoDataFrame]:
146def rename_and_drop_columns(df: Union[DataFrame, GeoDataFrame], map_old_new_col_names: dict[str, str],
147                            drop_col: bool = True, strict: bool = False, reordered: bool = False) -> Union[
148    DataFrame, GeoDataFrame]:
149    """
150    Function to rename and remove columns from a dataframe. If the drop_col parameter is True,
151    the columns that are not in the map will be removed. If the strict parameter is True,
152    the names that do not exist in the map as a column will be skipped.
153    Args:
154        df: to remove and rename
155        map_old_new_col_names: the key is the actual name and the value is the new name
156        drop_col: True to remove columns that are not included in the map
157        strict: False to skip names that are not included in the map
158        reordered: True to reorder columns of dataframe
159
160    Returns: modified DataFrame
161
162    """
163    if df is not None and map_old_new_col_names:
164        col_names = df.columns.values.tolist()
165        col_names_to_drop = col_names.copy()
166        final_map = {}
167        for k, v in map_old_new_col_names.items():
168            if k in col_names:
169                final_map[k] = v
170                col_names_to_drop.remove(k)
171        if drop_col:
172            df = df.drop(col_names_to_drop, axis=1)
173        if strict:
174            final_map = map_old_new_col_names
175        df = df.rename(columns=final_map)
176        if reordered:
177            new_cols = list(map_old_new_col_names.values())
178            act_cols = df.columns.tolist()
179            reord_cols = [value for value in new_cols if value in act_cols]
180            df = df[reord_cols]
181        return df

Function to rename and remove columns from a dataframe. If the drop_col parameter is True, the columns that are not in the map will be removed. If the strict parameter is True, the names that do not exist in the map as a column will be skipped.

Arguments:
  • df: to remove and rename
  • map_old_new_col_names: the key is the actual name and the value is the new name
  • drop_col: True to remove columns that are not included in the map
  • strict: False to skip names that are not included in the map
  • reordered: True to reorder columns of dataframe

Returns: modified DataFrame

def set_null_and_default_values( df: pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame) -> pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame:
184def set_null_and_default_values(df: DataFrame | GeoDataFrame) -> DataFrame | GeoDataFrame:
185    """
186    Function to replace NaN values with None in a DataFrame
187    Args:
188        df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
189
190    Returns:
191        DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None
192    """
193    df = df.replace({np.nan: None})
194    return df

Function to replace NaN values with None in a DataFrame

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame to replace NaN values with None
Returns:

DataFrame | GeoDataFrame: DataFrame with NaN values replaced with None

def replace_values_with_null( df: Union[pandas.core.frame.DataFrame, geopandas.geodataframe.GeoDataFrame], dict_col_values: dict) -> Union[pandas.core.frame.DataFrame, geopandas.geodataframe.GeoDataFrame]:
197def replace_values_with_null(df: Union[DataFrame | GeoDataFrame], dict_col_values: dict) -> Union[
198    DataFrame | GeoDataFrame]:
199    """
200    Function to replace values with None in a DataFrame
201    Args:
202        df (DataFrame | GeoDataFrame): DataFrame to replace values with None
203        dict_col_values (dict): Dictionary with the column name and the value to replace with None
204
205    Returns:
206        DataFrame | GeoDataFrame: DataFrame with values replaced with None
207    """
208    if df is not None and not df.empty and dict_col_values:
209        for name_col, value in dict_col_values.items():
210            df[name_col] = df[name_col].replace(value, None)
211    return df

Function to replace values with None in a DataFrame

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame to replace values with None
  • dict_col_values (dict): Dictionary with the column name and the value to replace with None
Returns:

DataFrame | GeoDataFrame: DataFrame with values replaced with None

def convert_to_datetime_col_df( df: pandas.core.frame.DataFrame, cols: list[str], set_end_day: bool = False, set_nat: bool = False) -> pandas.core.frame.DataFrame:
214def convert_to_datetime_col_df(df: DataFrame, cols: list[str],
215                               set_end_day: bool = False, set_nat: bool = False) -> DataFrame:
216    """
217    Force convert date columns to datetime format.
218    If init_date is True, the time is set to 00:00:00 if not to 23:59:59
219
220    Args:
221        df (DataFrame): DataFrame
222        cols (list[str]): Columns to convert
223        set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
224        set_nat (bool=False): If True set NaT to MAX_DATETIME
225
226    Returns:
227        DataFrame: DataFrame with datetime columns
228    """
229    if not set_end_day:
230        delta_time = time.min
231    else:
232        delta_time = time(23, 59, 59)
233
234    def _convert_date(value):
235        if type(value) is date:
236            value = datetime.combine(value, time.min)
237
238        if set_nat and (value is NaT or value is None):
239            return MAX_DATETIME
240        elif value is NaT:
241            return value
242        elif ((isinstance(value, Timestamp) or isinstance(value, datetime))
243              and set_end_day and value.time() == time.min):
244            return datetime.combine(value, delta_time)
245        else:
246            return value
247
248    for col in cols:
249        df[col] = df[col].apply(_convert_date)
250
251    return df

Force convert date columns to datetime format. If init_date is True, the time is set to 00:00:00 if not to 23:59:59

Arguments:
  • df (DataFrame): DataFrame
  • cols (list[str]): Columns to convert
  • set_end_day (bool=False): If False the time is set to 00:00:00 if not to 23:59:59
  • set_nat (bool=False): If True set NaT to MAX_DATETIME
Returns:

DataFrame: DataFrame with datetime columns

def df_memory_usage( df: pandas.core.frame.DataFrame | geopandas.geodataframe.GeoDataFrame) -> float:
254def df_memory_usage(df: DataFrame | GeoDataFrame) -> float:
255    """
256    Return the memory usage of a DataFrame in MB
257
258    Args:
259        df (DataFrame | GeoDataFrame): DataFrame
260
261    Returns:
262        float: Memory usage in MB
263    """
264    return df.memory_usage(deep=True).sum() / 1024 ** 2

Return the memory usage of a DataFrame in MB

Arguments:
  • df (DataFrame | GeoDataFrame): DataFrame
Returns:

float: Memory usage in MB

def extract_operator(input_string):
267def extract_operator(input_string):
268    """
269    Extract sql operator from input string
270
271    Args:
272        input_string:
273
274    Returns:
275
276    """
277    special_opers = ('-', '!') # Special operators for negation
278    sql_opers = ('=', '!=', '<>', '>', '<', '>=', '<=') # SQL operators
279
280    match = re.match(r'^(\W+)', input_string)
281    if match:
282        symbols = match.group(1)
283
284        if symbols not in special_opers and symbols not in sql_opers:
285            raise ValueError(f"Operator '{symbols}' not supported")
286
287        return symbols
288
289    return None

Extract sql operator from input string

Arguments:
  • input_string:

Returns:

def sql_from_filter_by_props(**filter_by_props: dict) -> str:
292def sql_from_filter_by_props(**filter_by_props: dict) -> str:
293    """
294    Get SQL from filter by properties
295
296    Args:
297        **filter_by_props: The filter by properties
298
299    Returns:
300        sql (str): The SQL from filter by properties
301    """
302    sql_parts = []
303    for k_fld_oper, value in filter_by_props.items():
304        if k_operator := extract_operator(k_fld_oper):
305            k_fld = k_fld_oper.replace(k_operator, '')
306        else:
307            k_operator = "="
308            k_fld = k_fld_oper
309
310        if isinstance(value, str):
311            value = f"'{value}'"
312        elif isinstance(value, Iterable):
313            value = ', '.join([f"'{v}'" if isinstance(v, str) else str(v) for v in value])
314            value = f"({value})"
315            if k_operator in ('=', '!=', '-'):
316                k_operator = "IN" if k_operator == "=" else "NOT IN"
317            else:
318                raise ValueError(f"Operator '{k_operator}' not supported for iterable values")
319
320        sql_parts.append(f"{k_fld} {k_operator} {value}")
321
322    sql_filter = ' AND '.join(sql_parts)
323
324    return sql_filter

Get SQL from filter by properties

Arguments:
  • **filter_by_props: The filter by properties
Returns:

sql (str): The SQL from filter by properties