apb_extra_utils.misc
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 7/6/19 18:23 5# Last modified: 7/6/19 18:21 6# Copyright (c) 2019 7from __future__ import annotations 8 9import calendar 10import csv 11import datetime 12import errno 13import inspect 14import locale 15import os 16import re 17import socket 18import subprocess 19import sys 20from calendar import different_locale 21from collections import OrderedDict 22from math import isnan 23from pathlib import Path 24from tempfile import gettempdir 25from typing import Any, Generator, Tuple 26from urllib.request import build_opener 27from zipfile import ZipFile, ZIP_DEFLATED 28 29import jellyfish 30from tqdm import tqdm 31 32 33def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True): 34 """ 35 36 Args: 37 url (str): 38 extract_to (str=None): if None, extract to current directory 39 headers (list=None) 40 remove_zip (bool=True): 41 42 Returns: 43 path_zip (str) 44 """ 45 if zip_file_path := download_from_url(url, extract_to, headers): 46 extract_to = unzip(zip_file_path, extract_to, remove_zip) 47 48 return extract_to 49 50 51def unzip(zip_file_path, extract_to=None, remove_zip=False): 52 """ 53 Unzip file to extract_to directory 54 55 Args: 56 zip_file_path (str): Path to zip file 57 extract_to: (str=None): if None, extract to zip's directory 58 remove_zip: (bool=False): If True remove zip file after unzip 59 60 Returns: 61 extract_to (str) 62 """ 63 with ZipFile(zip_file_path, 'r') as zipfile: 64 if not extract_to: 65 extract_to = os.path.join( 66 os.path.dirname(zip_file_path), 67 os.path.splitext(os.path.basename(zip_file_path))[0] 68 ) 69 70 desc = f"Extracting {zip_file_path} to {extract_to}" 71 if not sys.stdout: 72 print(f'{desc}...') 73 gen_members = zipfile.infolist() 74 else: 75 gen_members = tqdm(zipfile.infolist(), desc=desc) 76 77 for member in gen_members: 78 zipfile.extract(member, extract_to) 79 if remove_zip: 80 os.remove(zip_file_path) 81 return extract_to 82 83 84def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str: 85 """ 86 87 Args: 88 url (str): Url to download 89 extract_to (str=None): Directory to save file. Default temporary directory 90 headers (list=None) 91 92 Returns: 93 path_file (str | None) 94 """ 95 opener = build_opener() 96 if headers: 97 opener.addheaders = headers 98 99 with opener.open(url) as response: 100 content_length = response.length 101 if not extract_to: 102 extract_to = gettempdir() 103 104 if n_file := response.headers.get_filename(): 105 file_path = os.path.join(extract_to, n_file) 106 else: 107 file_path = os.path.join(extract_to, Path(response.url).name) 108 109 with open(file_path, "wb") as out_file: 110 def get_resp_data(): 111 while True: 112 data = response.read(1024) 113 if not data: 114 break 115 yield data 116 117 desc = f'Downloading to "{file_path}"' 118 if not sys.stdout: 119 print(f'{desc}...') 120 for data in get_resp_data(): 121 out_file.write(data) 122 else: 123 with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar: 124 for data in get_resp_data(): 125 out_file.write(data) 126 progress_bar.update(len(data)) 127 128 return file_path 129 130 131def caller_name(skip=2): 132 """Get a name of a caller in the format module.class.method 133 134 `skip` specifies how many levels of stack to skip while getting caller 135 name. skip=1 means "who calls me", skip=2 "who calls my caller" etc. 136 137 An empty string is returned if skipped levels exceed stack height 138 """ 139 140 def stack_(frame): 141 framelist = [] 142 while frame: 143 framelist.append(frame) 144 frame = frame.f_back 145 return framelist 146 147 stack = stack_(sys._getframe(1)) 148 start = 0 + skip 149 if len(stack) < start + 1: 150 return '' 151 parentframe = stack[start] 152 153 name = [] 154 module = inspect.getmodule(parentframe) 155 # `modname` can be None when frame is executed directly in console 156 if module and module.__name__ != "__main__": 157 name.append(module.__name__) 158 # detect classname 159 if 'self' in parentframe.f_locals: 160 # I don't know any way to detect call from the object method 161 # XXX: there seems to be no way to detect static method call - it will 162 # be just a function call 163 name.append(parentframe.f_locals['self'].__class__.__name__) 164 codename = parentframe.f_code.co_name 165 if codename != '<module>': # top level usually 166 name.append(codename) # function or a method 167 del parentframe 168 169 return ".".join(name) 170 171 172def get_environ(): 173 """ 174 Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. 175 Si no está definida por defecto devuelve 'dev' 176 177 Returns: 178 str: El nombre del entorno 'dev' o 'prod' 179 """ 180 return os.getenv("DEV_ENVIRON", "dev").lower() 181 182 183def create_dir(a_dir): 184 """ 185 Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE 186 187 Args: 188 a_dir {str}: path del directorio a crear 189 190 Returns: 191 bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no 192 193 """ 194 ok = False 195 if os.path.exists(a_dir): 196 ok = True 197 else: 198 try: 199 os.makedirs(a_dir) 200 ok = True 201 except OSError as exc: 202 print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir) 203 204 return ok 205 206 207def remove_content_dir(a_dir): 208 """ 209 Borra ficheros y subdirectorios de directorio 210 211 Args: 212 a_dir {str}: path del directorio a crear 213 214 Returns: 215 num_elems_removed (int), num_elems_dir (int) 216 """ 217 num_elems_removed = 0 218 num_elems_dir = 0 219 for de in os.scandir(a_dir): 220 if de.is_dir(): 221 n_rem_subdir, n_subdir = remove_content_dir(de.path) 222 num_elems_dir += n_subdir 223 num_elems_removed += n_rem_subdir 224 try: 225 os.rmdir(de.path) 226 except: 227 pass 228 else: 229 num_elems_dir += 1 230 try: 231 os.unlink(de.path) 232 num_elems_removed += 1 233 except: 234 pass 235 236 return num_elems_removed, num_elems_dir 237 238 239# Sadly, Python fails to provide the following magic number for us. 240ERROR_INVALID_NAME = 123 241''' 242Windows-specific error code indicating an invalid pathname. 243 244See Also 245---------- 246https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx 247 Official listing of all such codes. 248''' 249 250 251def is_pathname_valid(pathname): 252 ''' 253 `True` if the passed pathname is a valid pathname for the current OS; 254 `False` otherwise. 255 ''' 256 # If this pathname is either not a string or is but is empty, this pathname 257 # is invalid. 258 try: 259 if not isinstance(pathname, str) or not pathname: 260 return False 261 262 # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`) 263 # if any. Since Windows prohibits path components from containing `:` 264 # characters, failing to strip this `:`-suffixed prefix would 265 # erroneously invalidate all valid absolute Windows pathnames. 266 _, pathname = os.path.splitdrive(pathname) 267 268 # Directory guaranteed to exist. If the current OS is Windows, this is 269 # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%" 270 # environment variable); else, the typical root directory. 271 root_dirname = os.environ.get('HOMEDRIVE', 'C:') \ 272 if sys.platform == 'win32' else os.sep 273 assert os.path.isdir(root_dirname) # ...Murphy and her ironclad Law 274 275 # Append a path separator to this directory if needed. 276 root_dirname = root_dirname.rstrip(os.sep) + os.sep 277 278 # Test whether each path component split from this pathname is valid or 279 # not, ignoring non-existent and non-readable path components. 280 for pathname_part in pathname.split(os.sep): 281 try: 282 os.lstat(root_dirname + pathname_part) 283 # If an OS-specific exception is raised, its error code 284 # indicates whether this pathname is valid or not. Unless this 285 # is the case, this exception implies an ignorable kernel or 286 # filesystem complaint (e.g., path not found or inaccessible). 287 # 288 # Only the following exceptions indicate invalid pathnames: 289 # 290 # * Instances of the Windows-specific "WindowsError" class 291 # defining the "winerror" attribute whose value is 292 # "ERROR_INVALID_NAME". Under Windows, "winerror" is more 293 # fine-grained and hence useful than the generic "errno" 294 # attribute. When a too-long pathname is passed, for example, 295 # "errno" is "ENOENT" (i.e., no such file or directory) rather 296 # than "ENAMETOOLONG" (i.e., file name too long). 297 # * Instances of the cross-platform "OSError" class defining the 298 # generic "errno" attribute whose value is either: 299 # * Under most POSIX-compatible OSes, "ENAMETOOLONG". 300 # * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE". 301 except OSError as exc: 302 if hasattr(exc, 'winerror'): 303 if exc.winerror == ERROR_INVALID_NAME: 304 return False 305 elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}: 306 return False 307 # If a "TypeError" exception was raised, it almost certainly has the 308 # error message "embedded NUL character" indicating an invalid pathname. 309 except TypeError as exc: 310 return False 311 # If no exception was raised, all path components and hence this 312 # pathname itself are valid. (Praise be to the curmudgeonly python.) 313 else: 314 return True 315 # If any other exception was raised, this is an unrelated fatal issue 316 # (e.g., a bug). Permit this exception to unwind the call stack. 317 # 318 # Did we mention this should be shipped with Python already? 319 320 321def is_dir_writable(dirname): 322 ''' 323 `True` if the current user has sufficient permissions to create **siblings** 324 (i.e., arbitrary files in the parent directory) of the passed pathname; 325 `False` otherwise. 326 ''' 327 try: 328 a_tmp = os.path.join(dirname, "temp.tmp") 329 with open(a_tmp, 'w+b'): 330 pass 331 332 try: 333 os.remove(a_tmp) 334 except: 335 pass 336 337 return True 338 339 # While the exact type of exception raised by the above function depends on 340 # the current version of the Python interpreter, all such types subclass the 341 # following exception superclass. 342 except: 343 return False 344 345 346def is_path_exists_or_creatable(pathname): 347 ''' 348 `True` if the passed pathname is a valid pathname on the current OS _and_ 349 either currently exists or is hypothetically creatable in a cross-platform 350 manner optimized for POSIX-unfriendly filesystems; `False` otherwise. 351 352 This function is guaranteed to _never_ raise exceptions. 353 ''' 354 try: 355 # To prevent "os" module calls from raising undesirable exceptions on 356 # invalid pathnames, is_pathname_valid() is explicitly called first. 357 return is_pathname_valid(pathname) and ( 358 os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname))) 359 # Report failure on non-fatal filesystem complaints (e.g., connection 360 # timeouts, permissions issues) implying this path to be inaccessible. All 361 # other exceptions are unrelated fatal issues and should not be caught here. 362 except OSError: 363 return False 364 365 366def get_matching_val(search_val, matching_vals): 367 """ 368 Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto 369 (prop_val). 370 371 Args: 372 search_val (str): Valor propuesto para comparar 373 matching_vals (list(str)): Lista de valores a comparar 374 375 Returns: 376 match_val (str), fact_jaro_winkler (float) 377 """ 378 jaro_results = jaro_winkler(search_val, matching_vals) 379 fact_jaro = next(iter(jaro_results), None) 380 381 return jaro_results.get(fact_jaro), fact_jaro 382 383 384def levenshtein_distance(search_val, matching_vals): 385 """ 386 387 Args: 388 search_val: 389 matching_vals: 390 391 Returns: 392 393 """ 394 ord_vals = OrderedDict() 395 distances = {} 396 for match_val in matching_vals: 397 fact = jellyfish.levenshtein_distance(search_val, match_val) 398 vals_fact = distances.get(fact, list()) 399 distances[fact] = vals_fact + [match_val] 400 401 for fact in sorted(distances): 402 ord_vals[fact] = distances.get(fact, []) 403 404 return ord_vals 405 406 407def jaro_winkler(search_val, matching_vals): 408 """ 409 410 Args: 411 search_val: 412 matching_vals: 413 414 Returns: 415 416 """ 417 ord_vals = OrderedDict() 418 matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val 419 for match_val in matching_vals} 420 for fact in sorted(matchings, reverse=True): 421 if fact != 0: 422 ord_vals[fact] = matchings[fact] 423 424 return ord_vals 425 426 427def call_command(command_prog, *args): 428 """ 429 Llama comando shell sistema con los argumentos indicados 430 431 Returns: 432 bool: True si OK 433 434 """ 435 call_args = [command_prog] 436 call_args.extend(args) 437 ret = subprocess.check_call(call_args, shell=True) 438 439 return (ret == 0) 440 441 442def rounded_float(a_float, num_decs=9): 443 """ 444 Formatea un float con el numero de decimales especificado 445 Args: 446 a_float: 447 num_decs: 448 449 Returns: 450 str 451 """ 452 return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.')) 453 454 455class formatted_float(float): 456 """ 457 Devuelve un float que se representa con un maximo de decimales (__num_decs__) 458 """ 459 __num_decs__ = 9 460 461 def __repr__(self): 462 return str(rounded_float(self, self.__num_decs__)) 463 464 465def as_format_floats(obj): 466 """ 467 Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación 468 469 Args: 470 obj: Cualquier objeto 471 472 Returns: 473 (obj, formatted_float) 474 475 """ 476 if isinstance(obj, (float, formatted_float)): 477 return formatted_float(obj) 478 elif isinstance(obj, (dict, OrderedDict)): 479 return obj.__class__((k, as_format_floats(v)) for k, v in obj.items()) 480 elif isinstance(obj, (list, tuple)): 481 return obj.__class__(as_format_floats(v) for v in obj) 482 return obj 483 484 485def nums_from_str(a_string, nan=False): 486 """ 487 Retorna lista de numeros en el texto pasado 488 489 Args: 490 a_string (str): 491 nan (bool=FAlse): por defecto no trata los NaN como numeros 492 493 Returns: 494 list 495 """ 496 l_nums = [] 497 498 for s in a_string.strip().split(): 499 try: 500 l_nums.append(int(s)) 501 except ValueError: 502 try: 503 fl = float(s) 504 if nan or not isnan(fl): 505 l_nums.append(fl) 506 except ValueError: 507 pass 508 509 return l_nums 510 511 512def first_num_from_str(a_string, nan=False): 513 """ 514 Retorna primer numero encontrado del texto pasado 515 516 Args: 517 a_string (str): 518 nan (bool=FAlse): por defecto no trata los NaN como numeros 519 520 Returns: 521 int OR float 522 """ 523 return next(iter(nums_from_str(a_string, nan=nan)), None) 524 525 526def dates_from_str(str, formats=None, seps=None, ret_extra_data=False): 527 """ 528 Retorna dict de fechas disponibles con el texto pasado segun formatos indicados 529 530 Args: 531 str (str): 532 formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 533 seps (list=None): por defecto [None, '.', ','] 534 ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado 535 536 Returns: 537 list 538 """ 539 l_fechas = list() 540 541 if not formats: 542 formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 543 544 if not seps: 545 seps = [None, '.', ','] 546 547 str_parts = [s.strip() for sep in seps for s in str.split(sep)] 548 549 for format in formats: 550 for str_part in str_parts: 551 try: 552 val = datetime.datetime.strptime(str_part, format) 553 if ret_extra_data: 554 val = (val, str_part, format) 555 l_fechas.append(val) 556 except Exception: 557 pass 558 559 return l_fechas 560 561 562def pretty_text(txt): 563 """ 564 Coge texto y lo capitaliza y quita carácteres por espacios 565 Args: 566 txt (str): 567 568 Returns: 569 str 570 """ 571 return txt.replace("_", " ").replace("-", " ").capitalize() 572 573 574def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED): 575 """ 576 Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path) 577 578 Args: 579 zip_path: 580 file_paths (list or generator): 581 base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra 582 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 583 584 Returns: 585 zip_path (str) 586 """ 587 with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip: 588 for file_path in file_paths: 589 if base_path: 590 re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE) 591 arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/')) 592 else: 593 arch_name = os.path.basename(file_path) 594 595 my_zip.write(file_path, arcname=arch_name) 596 597 return zip_path 598 599 600def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED): 601 """ 602 Comprime la carpeta indicada 603 604 Args: 605 dir_path (str): path directorio 606 zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo 607 nombre del directorio zipeado 608 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 609 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 610 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 611 612 Returns: 613 zip_file (str) 614 """ 615 if not zip_path: 616 zip_path = f'{dir_path}.zip' 617 618 zip_file = zip_files(zip_path, 619 iter_paths_dir(dir_path, 620 relative_dirs_sel=relative_dirs_sel, 621 func_filter_path=func_filter_path), 622 base_path=dir_path, 623 compression=compression) 624 625 return zip_file 626 627 628def zip_files_dir(dir_path, remove_files=False, *exts_files): 629 """ 630 Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima 631 632 Args: 633 dir_path: 634 remove_files: 635 *exts_files: extensiones de fichero SIN el punto 636 637 Returns: 638 ok (bool) 639 """ 640 exts = [".{}".format(ext.lower()) for ext in exts_files] 641 for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path) 642 for de in os.scandir(dir_path)): 643 if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts): 644 print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path)) 645 zip_files(zip_path, [file_path]) 646 647 if remove_files and not os.path.samefile(zip_path, file_path): 648 os.remove(file_path) 649 650 return True 651 652 653def split_ext_file(path_file): 654 """ 655 Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás 656 Args: 657 path_file: 658 Returns: 659 base_file (str), ext_file (str) 660 """ 661 parts_file = os.path.basename(path_file).split(".") 662 base_file = parts_file[0] 663 ext_file = ".".join(parts_file[1:]) 664 665 return base_file, ext_file 666 667 668FILE_RUN_LOG = "last_run.log" 669DATE_RUN_LOG_FRMT = "%Y%m%d" 670 671 672def last_run_on_dir(dir_base): 673 """ 674 Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 675 Args: 676 dir_base (str): 677 678 Returns: 679 date_last_run (datetime): Si no encuentra devuelve None 680 """ 681 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 682 dt_last_run = None 683 if os.path.exists(log_last_run): 684 with open(log_last_run) as fr: 685 dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT) 686 687 return dt_last_run 688 689 690def save_last_run_on_dir(dir_base, date_run=None): 691 """ 692 Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 693 694 Args: 695 dir_base (str): 696 date_run (datetime=None): Si no se informa cogerá la fecha de hoy 697 """ 698 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 699 if not date_run: 700 date_run = datetime.date.today() 701 with open(log_last_run, "w+") as fw: 702 fw.write(date_run.strftime(DATE_RUN_LOG_FRMT)) 703 704 705def month_name(num_month, code_alias_locale="es_cu"): 706 """ 707 Retorna numero de mes en el locale espcificado. Por defecto castellano 708 709 Args: 710 num_month (int): 711 code_alias_locale (str='es_es'): 712 713 Returns: 714 str 715 """ 716 with different_locale(locale.locale_alias.get(code_alias_locale)): 717 return pretty_text(calendar.month_name[num_month]) 718 719 720def file_mod_time(path_file): 721 """ 722 Return datetime from mofification stat timestamp from file 723 724 Args: 725 path_file (str): 726 727 Returns: 728 datetime 729 """ 730 f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime) 731 732 return f_mod_time 733 734 735def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"): 736 """ 737 Itera como dicts indexados por valores primera fila (si header=True) o si no como list 738 las filas del CSV pasado por parametro a_path_csv. 739 740 Args: 741 a_path_csv (str): 742 header (bool=True): 743 sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader 744 encoding (str="utf8"): 745 Yields: 746 list OR dict 747 """ 748 with open(a_path_csv, encoding=encoding) as a_file: 749 csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';') 750 header_row = None 751 for row in csv_rdr: 752 if header and not header_row: 753 header_row = [v.strip().lower() for v in row] 754 continue 755 756 if header_row: 757 vals_row = dict(zip(header_row, row)) 758 else: 759 vals_row = row 760 761 if vals_row: 762 yield vals_row 763 764 765def subdirs_path(path): 766 """ 767 Itera sobre los subdirectorios del path 768 Args: 769 path: 770 771 Yields: 772 nom_subdir, path_subdir 773 """ 774 with os.scandir(path) as it: 775 for entry in it: 776 if entry.is_dir(): 777 yield entry.name, entry.path 778 779 780def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False): 781 """ 782 783 Args: 784 path_dir_base: 785 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 786 last_level_as_list (bool=False): 787 788 Returns: 789 dict 790 """ 791 tree = {} 792 793 f_valid_dir = None 794 valid_dirs_sel = set() 795 if relative_dirs_sel: 796 for dir_sel in relative_dirs_sel: 797 path_dir_rel = os.path.join(path_dir_base, dir_sel) 798 if os.path.exists(path_dir_rel): 799 valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower()) 800 801 def valid_dir(dir_path): 802 valid = False 803 rel_path = os.path.relpath(dir_path, path_dir_base).lower() 804 for dir_sel in valid_dirs_sel: 805 if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)): 806 valid = True 807 break 808 809 return valid 810 811 f_valid_dir = valid_dir 812 813 for dir_name, dir_path in subdirs_path(path_dir_base): 814 if not f_valid_dir or f_valid_dir(dir_path): 815 dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower() 816 dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel 817 if os.path.commonpath((dir_path_rel, dir_sel))] 818 tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path) 819 820 if tree: 821 if last_level_as_list and not any(tree.values()): 822 tree = [*tree.keys()] 823 824 return tree 825 826 827def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False): 828 """ 829 Retorna diccionario con el arbol de paths disponibles en el path indicado. 830 831 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 832 833 Args: 834 path_dir_base (str): 835 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 836 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 837 solo_dirs (bool=False): 838 839 Returns: 840 dict 841 """ 842 paths = dict() 843 844 valid_dirs_sel = set() 845 if relative_dirs_sel: 846 for dir_sel in relative_dirs_sel: 847 path_dir_rel = os.path.join(path_dir_base, dir_sel) 848 if os.path.exists(path_dir_rel): 849 valid_dirs_sel.add(path_dir_rel) 850 851 for dir_path, dir_names, file_names in os.walk(path_dir_base): 852 if valid_dirs_sel and not any( 853 os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel) 854 for a_dir_sel in valid_dirs_sel): 855 continue 856 857 dir_path = os.path.relpath(dir_path, path_dir_base) 858 dir_name = os.path.basename(dir_path) 859 860 if func_filter_path and not func_filter_path(dir_name): 861 continue 862 863 files_selected = {fn: None for fn in file_names 864 if not func_filter_path or func_filter_path(fn)} 865 866 if files_selected: 867 subdir_paths = paths 868 # En el caso del primer nivel no se guarda name directorio 869 if dir_path != '.': 870 for d in dir_path.split(os.sep): 871 if d not in subdir_paths: 872 subdir_paths[d] = dict() 873 subdir_paths = subdir_paths[d] 874 875 if not solo_dirs: 876 subdir_paths.update(files_selected) 877 878 return paths 879 880 881def iter_tree_paths(tree_paths, path_base=None): 882 """ 883 884 Args: 885 tree_paths (dict): 886 path_base (str=None): 887 888 Yields: 889 path_file 890 """ 891 for path, sub_tree in tree_paths.items(): 892 if sub_tree and isinstance(sub_tree, dict): 893 for sub_path in iter_tree_paths(sub_tree, path): 894 yield os.path.join(path_base, sub_path) if path_base else sub_path 895 else: 896 yield os.path.join(path_base, path) if path_base else path 897 898 899def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None): 900 """ 901 Itera el arbol de paths disponibles en el path indicado. 902 903 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 904 905 Args: 906 path_dir_base (str): 907 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 908 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 909 910 Yields: 911 path (str) 912 """ 913 for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base): 914 yield path 915 916 917def is_path_child_from(path, path_parent): 918 """ 919 Retorna si path es hijo de path_parent 920 921 Args: 922 path: 923 path_parent: 924 925 Returns: 926 bool 927 """ 928 p_path = Path(path) 929 p_path_parent = Path(path_parent) 930 931 return any(p.samefile(p_path_parent) for p in p_path.parents) 932 933 934def machine_name(): 935 """ 936 Retorna el nombre de la maquina 937 938 Returns: 939 str 940 """ 941 # TODO - Get host from docker machine when we are in a container 942 # TODO - import docker 943 # TODO - 944 # TODO - client = docker.from_env() 945 # TODO - container_info = client.containers.get(socket.gethostname()) 946 # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress'] 947 # TODO - print(docker_host_ip) 948 949 return socket.getfqdn().upper() 950 951 952def machine_apb(): 953 """ 954 Retorna el nombre de la maquina 955 956 Returns: 957 bool 958 """ 959 return socket.getfqdn().lower().endswith('.apb.es') 960 961 962def find_key_values(obj: Any, target_key: str) -> Generator[Tuple[Any, int], None, None]: 963 """ 964 Generator that recursively walks `obj` (dicts, lists, tuples, sets) 965 and yields tuples (value, level) for every occurrence of `target_key`. 966 967 Args: 968 obj (Any): The object to search through. 969 target_key (str): The key to search for. 970 971 Yields: 972 Tuple[Any, int]: A tuple containing the value associated with `target_key` and its depth level. 973 """ 974 def _recurse(current_obj: Any, current_level: int = 0) -> Generator[Tuple[Any, int], None, None]: 975 if isinstance(current_obj, dict): 976 for k, v in current_obj.items(): 977 if k == target_key: 978 yield v, current_level 979 yield from _recurse(v, current_level + 1) 980 elif isinstance(current_obj, (list, tuple, set)): 981 for item in current_obj: 982 yield from _recurse(item, current_level + 1) 983 984 yield from _recurse(obj) 985 986 987if __name__ == '__main__': 988 import fire 989 990 fire.Fire()
34def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True): 35 """ 36 37 Args: 38 url (str): 39 extract_to (str=None): if None, extract to current directory 40 headers (list=None) 41 remove_zip (bool=True): 42 43 Returns: 44 path_zip (str) 45 """ 46 if zip_file_path := download_from_url(url, extract_to, headers): 47 extract_to = unzip(zip_file_path, extract_to, remove_zip) 48 49 return extract_to
Arguments:
- url (str):
- extract_to (str=None): if None, extract to current directory
- headers (list=None)
- remove_zip (bool=True):
Returns:
path_zip (str)
52def unzip(zip_file_path, extract_to=None, remove_zip=False): 53 """ 54 Unzip file to extract_to directory 55 56 Args: 57 zip_file_path (str): Path to zip file 58 extract_to: (str=None): if None, extract to zip's directory 59 remove_zip: (bool=False): If True remove zip file after unzip 60 61 Returns: 62 extract_to (str) 63 """ 64 with ZipFile(zip_file_path, 'r') as zipfile: 65 if not extract_to: 66 extract_to = os.path.join( 67 os.path.dirname(zip_file_path), 68 os.path.splitext(os.path.basename(zip_file_path))[0] 69 ) 70 71 desc = f"Extracting {zip_file_path} to {extract_to}" 72 if not sys.stdout: 73 print(f'{desc}...') 74 gen_members = zipfile.infolist() 75 else: 76 gen_members = tqdm(zipfile.infolist(), desc=desc) 77 78 for member in gen_members: 79 zipfile.extract(member, extract_to) 80 if remove_zip: 81 os.remove(zip_file_path) 82 return extract_to
Unzip file to extract_to directory
Arguments:
- zip_file_path (str): Path to zip file
- extract_to: (str=None): if None, extract to zip's directory
- remove_zip: (bool=False): If True remove zip file after unzip
Returns:
extract_to (str)
85def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str: 86 """ 87 88 Args: 89 url (str): Url to download 90 extract_to (str=None): Directory to save file. Default temporary directory 91 headers (list=None) 92 93 Returns: 94 path_file (str | None) 95 """ 96 opener = build_opener() 97 if headers: 98 opener.addheaders = headers 99 100 with opener.open(url) as response: 101 content_length = response.length 102 if not extract_to: 103 extract_to = gettempdir() 104 105 if n_file := response.headers.get_filename(): 106 file_path = os.path.join(extract_to, n_file) 107 else: 108 file_path = os.path.join(extract_to, Path(response.url).name) 109 110 with open(file_path, "wb") as out_file: 111 def get_resp_data(): 112 while True: 113 data = response.read(1024) 114 if not data: 115 break 116 yield data 117 118 desc = f'Downloading to "{file_path}"' 119 if not sys.stdout: 120 print(f'{desc}...') 121 for data in get_resp_data(): 122 out_file.write(data) 123 else: 124 with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar: 125 for data in get_resp_data(): 126 out_file.write(data) 127 progress_bar.update(len(data)) 128 129 return file_path
Arguments:
- url (str): Url to download
- extract_to (str=None): Directory to save file. Default temporary directory
- headers (list=None)
Returns:
path_file (str | None)
132def caller_name(skip=2): 133 """Get a name of a caller in the format module.class.method 134 135 `skip` specifies how many levels of stack to skip while getting caller 136 name. skip=1 means "who calls me", skip=2 "who calls my caller" etc. 137 138 An empty string is returned if skipped levels exceed stack height 139 """ 140 141 def stack_(frame): 142 framelist = [] 143 while frame: 144 framelist.append(frame) 145 frame = frame.f_back 146 return framelist 147 148 stack = stack_(sys._getframe(1)) 149 start = 0 + skip 150 if len(stack) < start + 1: 151 return '' 152 parentframe = stack[start] 153 154 name = [] 155 module = inspect.getmodule(parentframe) 156 # `modname` can be None when frame is executed directly in console 157 if module and module.__name__ != "__main__": 158 name.append(module.__name__) 159 # detect classname 160 if 'self' in parentframe.f_locals: 161 # I don't know any way to detect call from the object method 162 # XXX: there seems to be no way to detect static method call - it will 163 # be just a function call 164 name.append(parentframe.f_locals['self'].__class__.__name__) 165 codename = parentframe.f_code.co_name 166 if codename != '<module>': # top level usually 167 name.append(codename) # function or a method 168 del parentframe 169 170 return ".".join(name)
Get a name of a caller in the format module.class.method
skip specifies how many levels of stack to skip while getting caller
name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
An empty string is returned if skipped levels exceed stack height
173def get_environ(): 174 """ 175 Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. 176 Si no está definida por defecto devuelve 'dev' 177 178 Returns: 179 str: El nombre del entorno 'dev' o 'prod' 180 """ 181 return os.getenv("DEV_ENVIRON", "dev").lower()
Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. Si no está definida por defecto devuelve 'dev'
Returns:
str: El nombre del entorno 'dev' o 'prod'
184def create_dir(a_dir): 185 """ 186 Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE 187 188 Args: 189 a_dir {str}: path del directorio a crear 190 191 Returns: 192 bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no 193 194 """ 195 ok = False 196 if os.path.exists(a_dir): 197 ok = True 198 else: 199 try: 200 os.makedirs(a_dir) 201 ok = True 202 except OSError as exc: 203 print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir) 204 205 return ok
Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE
Arguments:
- a_dir {str}: path del directorio a crear
Returns:
bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no
208def remove_content_dir(a_dir): 209 """ 210 Borra ficheros y subdirectorios de directorio 211 212 Args: 213 a_dir {str}: path del directorio a crear 214 215 Returns: 216 num_elems_removed (int), num_elems_dir (int) 217 """ 218 num_elems_removed = 0 219 num_elems_dir = 0 220 for de in os.scandir(a_dir): 221 if de.is_dir(): 222 n_rem_subdir, n_subdir = remove_content_dir(de.path) 223 num_elems_dir += n_subdir 224 num_elems_removed += n_rem_subdir 225 try: 226 os.rmdir(de.path) 227 except: 228 pass 229 else: 230 num_elems_dir += 1 231 try: 232 os.unlink(de.path) 233 num_elems_removed += 1 234 except: 235 pass 236 237 return num_elems_removed, num_elems_dir
Borra ficheros y subdirectorios de directorio
Arguments:
- a_dir {str}: path del directorio a crear
Returns:
num_elems_removed (int), num_elems_dir (int)
Windows-specific error code indicating an invalid pathname.
See Also
https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx Official listing of all such codes.
252def is_pathname_valid(pathname): 253 ''' 254 `True` if the passed pathname is a valid pathname for the current OS; 255 `False` otherwise. 256 ''' 257 # If this pathname is either not a string or is but is empty, this pathname 258 # is invalid. 259 try: 260 if not isinstance(pathname, str) or not pathname: 261 return False 262 263 # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`) 264 # if any. Since Windows prohibits path components from containing `:` 265 # characters, failing to strip this `:`-suffixed prefix would 266 # erroneously invalidate all valid absolute Windows pathnames. 267 _, pathname = os.path.splitdrive(pathname) 268 269 # Directory guaranteed to exist. If the current OS is Windows, this is 270 # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%" 271 # environment variable); else, the typical root directory. 272 root_dirname = os.environ.get('HOMEDRIVE', 'C:') \ 273 if sys.platform == 'win32' else os.sep 274 assert os.path.isdir(root_dirname) # ...Murphy and her ironclad Law 275 276 # Append a path separator to this directory if needed. 277 root_dirname = root_dirname.rstrip(os.sep) + os.sep 278 279 # Test whether each path component split from this pathname is valid or 280 # not, ignoring non-existent and non-readable path components. 281 for pathname_part in pathname.split(os.sep): 282 try: 283 os.lstat(root_dirname + pathname_part) 284 # If an OS-specific exception is raised, its error code 285 # indicates whether this pathname is valid or not. Unless this 286 # is the case, this exception implies an ignorable kernel or 287 # filesystem complaint (e.g., path not found or inaccessible). 288 # 289 # Only the following exceptions indicate invalid pathnames: 290 # 291 # * Instances of the Windows-specific "WindowsError" class 292 # defining the "winerror" attribute whose value is 293 # "ERROR_INVALID_NAME". Under Windows, "winerror" is more 294 # fine-grained and hence useful than the generic "errno" 295 # attribute. When a too-long pathname is passed, for example, 296 # "errno" is "ENOENT" (i.e., no such file or directory) rather 297 # than "ENAMETOOLONG" (i.e., file name too long). 298 # * Instances of the cross-platform "OSError" class defining the 299 # generic "errno" attribute whose value is either: 300 # * Under most POSIX-compatible OSes, "ENAMETOOLONG". 301 # * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE". 302 except OSError as exc: 303 if hasattr(exc, 'winerror'): 304 if exc.winerror == ERROR_INVALID_NAME: 305 return False 306 elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}: 307 return False 308 # If a "TypeError" exception was raised, it almost certainly has the 309 # error message "embedded NUL character" indicating an invalid pathname. 310 except TypeError as exc: 311 return False 312 # If no exception was raised, all path components and hence this 313 # pathname itself are valid. (Praise be to the curmudgeonly python.) 314 else: 315 return True 316 # If any other exception was raised, this is an unrelated fatal issue 317 # (e.g., a bug). Permit this exception to unwind the call stack. 318 # 319 # Did we mention this should be shipped with Python already?
True if the passed pathname is a valid pathname for the current OS;
False otherwise.
322def is_dir_writable(dirname): 323 ''' 324 `True` if the current user has sufficient permissions to create **siblings** 325 (i.e., arbitrary files in the parent directory) of the passed pathname; 326 `False` otherwise. 327 ''' 328 try: 329 a_tmp = os.path.join(dirname, "temp.tmp") 330 with open(a_tmp, 'w+b'): 331 pass 332 333 try: 334 os.remove(a_tmp) 335 except: 336 pass 337 338 return True 339 340 # While the exact type of exception raised by the above function depends on 341 # the current version of the Python interpreter, all such types subclass the 342 # following exception superclass. 343 except: 344 return False
True if the current user has sufficient permissions to create siblings
(i.e., arbitrary files in the parent directory) of the passed pathname;
False otherwise.
347def is_path_exists_or_creatable(pathname): 348 ''' 349 `True` if the passed pathname is a valid pathname on the current OS _and_ 350 either currently exists or is hypothetically creatable in a cross-platform 351 manner optimized for POSIX-unfriendly filesystems; `False` otherwise. 352 353 This function is guaranteed to _never_ raise exceptions. 354 ''' 355 try: 356 # To prevent "os" module calls from raising undesirable exceptions on 357 # invalid pathnames, is_pathname_valid() is explicitly called first. 358 return is_pathname_valid(pathname) and ( 359 os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname))) 360 # Report failure on non-fatal filesystem complaints (e.g., connection 361 # timeouts, permissions issues) implying this path to be inaccessible. All 362 # other exceptions are unrelated fatal issues and should not be caught here. 363 except OSError: 364 return False
True if the passed pathname is a valid pathname on the current OS _and_
either currently exists or is hypothetically creatable in a cross-platform
manner optimized for POSIX-unfriendly filesystems; False otherwise.
This function is guaranteed to _never_ raise exceptions.
367def get_matching_val(search_val, matching_vals): 368 """ 369 Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto 370 (prop_val). 371 372 Args: 373 search_val (str): Valor propuesto para comparar 374 matching_vals (list(str)): Lista de valores a comparar 375 376 Returns: 377 match_val (str), fact_jaro_winkler (float) 378 """ 379 jaro_results = jaro_winkler(search_val, matching_vals) 380 fact_jaro = next(iter(jaro_results), None) 381 382 return jaro_results.get(fact_jaro), fact_jaro
Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto (prop_val).
Arguments:
- search_val (str): Valor propuesto para comparar
- matching_vals (list(str)): Lista de valores a comparar
Returns:
match_val (str), fact_jaro_winkler (float)
385def levenshtein_distance(search_val, matching_vals): 386 """ 387 388 Args: 389 search_val: 390 matching_vals: 391 392 Returns: 393 394 """ 395 ord_vals = OrderedDict() 396 distances = {} 397 for match_val in matching_vals: 398 fact = jellyfish.levenshtein_distance(search_val, match_val) 399 vals_fact = distances.get(fact, list()) 400 distances[fact] = vals_fact + [match_val] 401 402 for fact in sorted(distances): 403 ord_vals[fact] = distances.get(fact, []) 404 405 return ord_vals
Arguments:
- search_val:
- matching_vals:
Returns:
408def jaro_winkler(search_val, matching_vals): 409 """ 410 411 Args: 412 search_val: 413 matching_vals: 414 415 Returns: 416 417 """ 418 ord_vals = OrderedDict() 419 matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val 420 for match_val in matching_vals} 421 for fact in sorted(matchings, reverse=True): 422 if fact != 0: 423 ord_vals[fact] = matchings[fact] 424 425 return ord_vals
Arguments:
- search_val:
- matching_vals:
Returns:
428def call_command(command_prog, *args): 429 """ 430 Llama comando shell sistema con los argumentos indicados 431 432 Returns: 433 bool: True si OK 434 435 """ 436 call_args = [command_prog] 437 call_args.extend(args) 438 ret = subprocess.check_call(call_args, shell=True) 439 440 return (ret == 0)
Llama comando shell sistema con los argumentos indicados
Returns:
bool: True si OK
443def rounded_float(a_float, num_decs=9): 444 """ 445 Formatea un float con el numero de decimales especificado 446 Args: 447 a_float: 448 num_decs: 449 450 Returns: 451 str 452 """ 453 return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.'))
Formatea un float con el numero de decimales especificado
Arguments:
- a_float:
- num_decs:
Returns:
str
456class formatted_float(float): 457 """ 458 Devuelve un float que se representa con un maximo de decimales (__num_decs__) 459 """ 460 __num_decs__ = 9 461 462 def __repr__(self): 463 return str(rounded_float(self, self.__num_decs__))
Devuelve un float que se representa con un maximo de decimales (__num_decs__)
466def as_format_floats(obj): 467 """ 468 Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación 469 470 Args: 471 obj: Cualquier objeto 472 473 Returns: 474 (obj, formatted_float) 475 476 """ 477 if isinstance(obj, (float, formatted_float)): 478 return formatted_float(obj) 479 elif isinstance(obj, (dict, OrderedDict)): 480 return obj.__class__((k, as_format_floats(v)) for k, v in obj.items()) 481 elif isinstance(obj, (list, tuple)): 482 return obj.__class__(as_format_floats(v) for v in obj) 483 return obj
Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación
Arguments:
- obj: Cualquier objeto
Returns:
(obj, formatted_float)
486def nums_from_str(a_string, nan=False): 487 """ 488 Retorna lista de numeros en el texto pasado 489 490 Args: 491 a_string (str): 492 nan (bool=FAlse): por defecto no trata los NaN como numeros 493 494 Returns: 495 list 496 """ 497 l_nums = [] 498 499 for s in a_string.strip().split(): 500 try: 501 l_nums.append(int(s)) 502 except ValueError: 503 try: 504 fl = float(s) 505 if nan or not isnan(fl): 506 l_nums.append(fl) 507 except ValueError: 508 pass 509 510 return l_nums
Retorna lista de numeros en el texto pasado
Arguments:
- a_string (str):
- nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:
list
513def first_num_from_str(a_string, nan=False): 514 """ 515 Retorna primer numero encontrado del texto pasado 516 517 Args: 518 a_string (str): 519 nan (bool=FAlse): por defecto no trata los NaN como numeros 520 521 Returns: 522 int OR float 523 """ 524 return next(iter(nums_from_str(a_string, nan=nan)), None)
Retorna primer numero encontrado del texto pasado
Arguments:
- a_string (str):
- nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:
int OR float
527def dates_from_str(str, formats=None, seps=None, ret_extra_data=False): 528 """ 529 Retorna dict de fechas disponibles con el texto pasado segun formatos indicados 530 531 Args: 532 str (str): 533 formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 534 seps (list=None): por defecto [None, '.', ','] 535 ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado 536 537 Returns: 538 list 539 """ 540 l_fechas = list() 541 542 if not formats: 543 formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 544 545 if not seps: 546 seps = [None, '.', ','] 547 548 str_parts = [s.strip() for sep in seps for s in str.split(sep)] 549 550 for format in formats: 551 for str_part in str_parts: 552 try: 553 val = datetime.datetime.strptime(str_part, format) 554 if ret_extra_data: 555 val = (val, str_part, format) 556 l_fechas.append(val) 557 except Exception: 558 pass 559 560 return l_fechas
Retorna dict de fechas disponibles con el texto pasado segun formatos indicados
Arguments:
- str (str):
- formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
- seps (list=None): por defecto [None, '.', ',']
- ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
Returns:
list
563def pretty_text(txt): 564 """ 565 Coge texto y lo capitaliza y quita carácteres por espacios 566 Args: 567 txt (str): 568 569 Returns: 570 str 571 """ 572 return txt.replace("_", " ").replace("-", " ").capitalize()
Coge texto y lo capitaliza y quita carácteres por espacios
Arguments:
- txt (str):
Returns:
str
575def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED): 576 """ 577 Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path) 578 579 Args: 580 zip_path: 581 file_paths (list or generator): 582 base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra 583 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 584 585 Returns: 586 zip_path (str) 587 """ 588 with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip: 589 for file_path in file_paths: 590 if base_path: 591 re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE) 592 arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/')) 593 else: 594 arch_name = os.path.basename(file_path) 595 596 my_zip.write(file_path, arcname=arch_name) 597 598 return zip_path
Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)
Arguments:
- zip_path:
- file_paths (list or generator):
- base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
- compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:
zip_path (str)
601def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED): 602 """ 603 Comprime la carpeta indicada 604 605 Args: 606 dir_path (str): path directorio 607 zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo 608 nombre del directorio zipeado 609 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 610 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 611 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 612 613 Returns: 614 zip_file (str) 615 """ 616 if not zip_path: 617 zip_path = f'{dir_path}.zip' 618 619 zip_file = zip_files(zip_path, 620 iter_paths_dir(dir_path, 621 relative_dirs_sel=relative_dirs_sel, 622 func_filter_path=func_filter_path), 623 base_path=dir_path, 624 compression=compression) 625 626 return zip_file
Comprime la carpeta indicada
Arguments:
- dir_path (str): path directorio
- zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo nombre del directorio zipeado
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
- compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:
zip_file (str)
629def zip_files_dir(dir_path, remove_files=False, *exts_files): 630 """ 631 Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima 632 633 Args: 634 dir_path: 635 remove_files: 636 *exts_files: extensiones de fichero SIN el punto 637 638 Returns: 639 ok (bool) 640 """ 641 exts = [".{}".format(ext.lower()) for ext in exts_files] 642 for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path) 643 for de in os.scandir(dir_path)): 644 if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts): 645 print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path)) 646 zip_files(zip_path, [file_path]) 647 648 if remove_files and not os.path.samefile(zip_path, file_path): 649 os.remove(file_path) 650 651 return True
Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima
Arguments:
- dir_path:
- remove_files:
- *exts_files: extensiones de fichero SIN el punto
Returns:
ok (bool)
654def split_ext_file(path_file): 655 """ 656 Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás 657 Args: 658 path_file: 659 Returns: 660 base_file (str), ext_file (str) 661 """ 662 parts_file = os.path.basename(path_file).split(".") 663 base_file = parts_file[0] 664 ext_file = ".".join(parts_file[1:]) 665 666 return base_file, ext_file
Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás
Arguments:
- path_file:
Returns:
base_file (str), ext_file (str)
673def last_run_on_dir(dir_base): 674 """ 675 Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 676 Args: 677 dir_base (str): 678 679 Returns: 680 date_last_run (datetime): Si no encuentra devuelve None 681 """ 682 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 683 dt_last_run = None 684 if os.path.exists(log_last_run): 685 with open(log_last_run) as fr: 686 dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT) 687 688 return dt_last_run
Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
Arguments:
- dir_base (str):
Returns:
date_last_run (datetime): Si no encuentra devuelve None
691def save_last_run_on_dir(dir_base, date_run=None): 692 """ 693 Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 694 695 Args: 696 dir_base (str): 697 date_run (datetime=None): Si no se informa cogerá la fecha de hoy 698 """ 699 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 700 if not date_run: 701 date_run = datetime.date.today() 702 with open(log_last_run, "w+") as fw: 703 fw.write(date_run.strftime(DATE_RUN_LOG_FRMT))
Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
Arguments:
- dir_base (str):
- date_run (datetime=None): Si no se informa cogerá la fecha de hoy
706def month_name(num_month, code_alias_locale="es_cu"): 707 """ 708 Retorna numero de mes en el locale espcificado. Por defecto castellano 709 710 Args: 711 num_month (int): 712 code_alias_locale (str='es_es'): 713 714 Returns: 715 str 716 """ 717 with different_locale(locale.locale_alias.get(code_alias_locale)): 718 return pretty_text(calendar.month_name[num_month])
Retorna numero de mes en el locale espcificado. Por defecto castellano
Arguments:
- num_month (int):
- code_alias_locale (str='es_es'):
Returns:
str
721def file_mod_time(path_file): 722 """ 723 Return datetime from mofification stat timestamp from file 724 725 Args: 726 path_file (str): 727 728 Returns: 729 datetime 730 """ 731 f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime) 732 733 return f_mod_time
Return datetime from mofification stat timestamp from file
Arguments:
- path_file (str):
Returns:
datetime
736def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"): 737 """ 738 Itera como dicts indexados por valores primera fila (si header=True) o si no como list 739 las filas del CSV pasado por parametro a_path_csv. 740 741 Args: 742 a_path_csv (str): 743 header (bool=True): 744 sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader 745 encoding (str="utf8"): 746 Yields: 747 list OR dict 748 """ 749 with open(a_path_csv, encoding=encoding) as a_file: 750 csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';') 751 header_row = None 752 for row in csv_rdr: 753 if header and not header_row: 754 header_row = [v.strip().lower() for v in row] 755 continue 756 757 if header_row: 758 vals_row = dict(zip(header_row, row)) 759 else: 760 vals_row = row 761 762 if vals_row: 763 yield vals_row
Itera como dicts indexados por valores primera fila (si header=True) o si no como list las filas del CSV pasado por parametro a_path_csv.
Arguments:
- a_path_csv (str):
- header (bool=True):
- sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
- encoding (str="utf8"):
Yields:
list OR dict
766def subdirs_path(path): 767 """ 768 Itera sobre los subdirectorios del path 769 Args: 770 path: 771 772 Yields: 773 nom_subdir, path_subdir 774 """ 775 with os.scandir(path) as it: 776 for entry in it: 777 if entry.is_dir(): 778 yield entry.name, entry.path
Itera sobre los subdirectorios del path
Arguments:
- path:
Yields:
nom_subdir, path_subdir
781def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False): 782 """ 783 784 Args: 785 path_dir_base: 786 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 787 last_level_as_list (bool=False): 788 789 Returns: 790 dict 791 """ 792 tree = {} 793 794 f_valid_dir = None 795 valid_dirs_sel = set() 796 if relative_dirs_sel: 797 for dir_sel in relative_dirs_sel: 798 path_dir_rel = os.path.join(path_dir_base, dir_sel) 799 if os.path.exists(path_dir_rel): 800 valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower()) 801 802 def valid_dir(dir_path): 803 valid = False 804 rel_path = os.path.relpath(dir_path, path_dir_base).lower() 805 for dir_sel in valid_dirs_sel: 806 if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)): 807 valid = True 808 break 809 810 return valid 811 812 f_valid_dir = valid_dir 813 814 for dir_name, dir_path in subdirs_path(path_dir_base): 815 if not f_valid_dir or f_valid_dir(dir_path): 816 dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower() 817 dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel 818 if os.path.commonpath((dir_path_rel, dir_sel))] 819 tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path) 820 821 if tree: 822 if last_level_as_list and not any(tree.values()): 823 tree = [*tree.keys()] 824 825 return tree
Arguments:
- path_dir_base:
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- last_level_as_list (bool=False):
Returns:
dict
828def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False): 829 """ 830 Retorna diccionario con el arbol de paths disponibles en el path indicado. 831 832 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 833 834 Args: 835 path_dir_base (str): 836 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 837 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 838 solo_dirs (bool=False): 839 840 Returns: 841 dict 842 """ 843 paths = dict() 844 845 valid_dirs_sel = set() 846 if relative_dirs_sel: 847 for dir_sel in relative_dirs_sel: 848 path_dir_rel = os.path.join(path_dir_base, dir_sel) 849 if os.path.exists(path_dir_rel): 850 valid_dirs_sel.add(path_dir_rel) 851 852 for dir_path, dir_names, file_names in os.walk(path_dir_base): 853 if valid_dirs_sel and not any( 854 os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel) 855 for a_dir_sel in valid_dirs_sel): 856 continue 857 858 dir_path = os.path.relpath(dir_path, path_dir_base) 859 dir_name = os.path.basename(dir_path) 860 861 if func_filter_path and not func_filter_path(dir_name): 862 continue 863 864 files_selected = {fn: None for fn in file_names 865 if not func_filter_path or func_filter_path(fn)} 866 867 if files_selected: 868 subdir_paths = paths 869 # En el caso del primer nivel no se guarda name directorio 870 if dir_path != '.': 871 for d in dir_path.split(os.sep): 872 if d not in subdir_paths: 873 subdir_paths[d] = dict() 874 subdir_paths = subdir_paths[d] 875 876 if not solo_dirs: 877 subdir_paths.update(files_selected) 878 879 return paths
Retorna diccionario con el arbol de paths disponibles en el path indicado.
Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
Arguments:
- path_dir_base (str):
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
- solo_dirs (bool=False):
Returns:
dict
882def iter_tree_paths(tree_paths, path_base=None): 883 """ 884 885 Args: 886 tree_paths (dict): 887 path_base (str=None): 888 889 Yields: 890 path_file 891 """ 892 for path, sub_tree in tree_paths.items(): 893 if sub_tree and isinstance(sub_tree, dict): 894 for sub_path in iter_tree_paths(sub_tree, path): 895 yield os.path.join(path_base, sub_path) if path_base else sub_path 896 else: 897 yield os.path.join(path_base, path) if path_base else path
Arguments:
- tree_paths (dict):
- path_base (str=None):
Yields:
path_file
900def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None): 901 """ 902 Itera el arbol de paths disponibles en el path indicado. 903 904 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 905 906 Args: 907 path_dir_base (str): 908 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 909 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 910 911 Yields: 912 path (str) 913 """ 914 for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base): 915 yield path
Itera el arbol de paths disponibles en el path indicado.
Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
Arguments:
- path_dir_base (str):
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
Yields:
path (str)
918def is_path_child_from(path, path_parent): 919 """ 920 Retorna si path es hijo de path_parent 921 922 Args: 923 path: 924 path_parent: 925 926 Returns: 927 bool 928 """ 929 p_path = Path(path) 930 p_path_parent = Path(path_parent) 931 932 return any(p.samefile(p_path_parent) for p in p_path.parents)
Retorna si path es hijo de path_parent
Arguments:
- path:
- path_parent:
Returns:
bool
935def machine_name(): 936 """ 937 Retorna el nombre de la maquina 938 939 Returns: 940 str 941 """ 942 # TODO - Get host from docker machine when we are in a container 943 # TODO - import docker 944 # TODO - 945 # TODO - client = docker.from_env() 946 # TODO - container_info = client.containers.get(socket.gethostname()) 947 # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress'] 948 # TODO - print(docker_host_ip) 949 950 return socket.getfqdn().upper()
Retorna el nombre de la maquina
Returns:
str
953def machine_apb(): 954 """ 955 Retorna el nombre de la maquina 956 957 Returns: 958 bool 959 """ 960 return socket.getfqdn().lower().endswith('.apb.es')
Retorna el nombre de la maquina
Returns:
bool
963def find_key_values(obj: Any, target_key: str) -> Generator[Tuple[Any, int], None, None]: 964 """ 965 Generator that recursively walks `obj` (dicts, lists, tuples, sets) 966 and yields tuples (value, level) for every occurrence of `target_key`. 967 968 Args: 969 obj (Any): The object to search through. 970 target_key (str): The key to search for. 971 972 Yields: 973 Tuple[Any, int]: A tuple containing the value associated with `target_key` and its depth level. 974 """ 975 def _recurse(current_obj: Any, current_level: int = 0) -> Generator[Tuple[Any, int], None, None]: 976 if isinstance(current_obj, dict): 977 for k, v in current_obj.items(): 978 if k == target_key: 979 yield v, current_level 980 yield from _recurse(v, current_level + 1) 981 elif isinstance(current_obj, (list, tuple, set)): 982 for item in current_obj: 983 yield from _recurse(item, current_level + 1) 984 985 yield from _recurse(obj)
Generator that recursively walks obj (dicts, lists, tuples, sets)
and yields tuples (value, level) for every occurrence of target_key.
Arguments:
- obj (Any): The object to search through.
- target_key (str): The key to search for.
Yields:
Tuple[Any, int]: A tuple containing the value associated with
target_keyand its depth level.