apb_extra_utils.misc
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 7/6/19 18:23 5# Last modified: 7/6/19 18:21 6# Copyright (c) 2019 7 8import calendar 9import csv 10import datetime 11import errno 12import inspect 13import locale 14import os 15import re 16import subprocess 17import sys 18from calendar import different_locale 19from collections import OrderedDict 20from math import isnan 21from pathlib import Path 22import socket 23from tempfile import gettempdir 24from urllib.request import build_opener 25from zipfile import ZipFile, ZIP_DEFLATED 26 27import jellyfish 28from tqdm import tqdm 29 30 31def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True): 32 """ 33 34 Args: 35 url (str): 36 extract_to (str=None): if None, extract to current directory 37 headers (list=None) 38 remove_zip (bool=True): 39 40 Returns: 41 path_zip (str) 42 """ 43 if zip_file_path := download_from_url(url, extract_to, headers): 44 extract_to = unzip(zip_file_path, extract_to, remove_zip) 45 46 return extract_to 47 48 49def unzip(zip_file_path, extract_to=None, remove_zip=False): 50 """ 51 Unzip file to extract_to directory 52 53 Args: 54 zip_file_path (str): Path to zip file 55 extract_to: (str=None): if None, extract to zip's directory 56 remove_zip: (bool=False): If True remove zip file after unzip 57 58 Returns: 59 extract_to (str) 60 """ 61 with ZipFile(zip_file_path, 'r') as zipfile: 62 if not extract_to: 63 extract_to = os.path.join( 64 os.path.dirname(zip_file_path), 65 os.path.splitext(os.path.basename(zip_file_path))[0] 66 ) 67 68 desc = f"Extracting {zip_file_path} to {extract_to}" 69 if not sys.stdout: 70 print(f'{desc}...') 71 gen_members = zipfile.infolist() 72 else: 73 gen_members = tqdm(zipfile.infolist(), desc=desc) 74 75 for member in gen_members: 76 zipfile.extract(member, extract_to) 77 if remove_zip: 78 os.remove(zip_file_path) 79 return extract_to 80 81 82def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str: 83 """ 84 85 Args: 86 url (str): Url to download 87 extract_to (str=None): Directory to save file. Default temporary directory 88 headers (list=None) 89 90 Returns: 91 path_file (str | None) 92 """ 93 opener = build_opener() 94 if headers: 95 opener.addheaders = headers 96 97 with opener.open(url) as response: 98 content_length = response.length 99 if not extract_to: 100 extract_to = gettempdir() 101 102 if n_file := response.headers.get_filename(): 103 file_path = os.path.join(extract_to, n_file) 104 else: 105 file_path = os.path.join(extract_to, Path(response.url).name) 106 107 with open(file_path, "wb") as out_file: 108 def get_resp_data(): 109 while True: 110 data = response.read(1024) 111 if not data: 112 break 113 yield data 114 115 desc = f'Downloading to "{file_path}"' 116 if not sys.stdout: 117 print(f'{desc}...') 118 for data in get_resp_data(): 119 out_file.write(data) 120 else: 121 with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar: 122 for data in get_resp_data(): 123 out_file.write(data) 124 progress_bar.update(len(data)) 125 126 return file_path 127 128 129def caller_name(skip=2): 130 """Get a name of a caller in the format module.class.method 131 132 `skip` specifies how many levels of stack to skip while getting caller 133 name. skip=1 means "who calls me", skip=2 "who calls my caller" etc. 134 135 An empty string is returned if skipped levels exceed stack height 136 """ 137 138 def stack_(frame): 139 framelist = [] 140 while frame: 141 framelist.append(frame) 142 frame = frame.f_back 143 return framelist 144 145 stack = stack_(sys._getframe(1)) 146 start = 0 + skip 147 if len(stack) < start + 1: 148 return '' 149 parentframe = stack[start] 150 151 name = [] 152 module = inspect.getmodule(parentframe) 153 # `modname` can be None when frame is executed directly in console 154 if module and module.__name__ != "__main__": 155 name.append(module.__name__) 156 # detect classname 157 if 'self' in parentframe.f_locals: 158 # I don't know any way to detect call from the object method 159 # XXX: there seems to be no way to detect static method call - it will 160 # be just a function call 161 name.append(parentframe.f_locals['self'].__class__.__name__) 162 codename = parentframe.f_code.co_name 163 if codename != '<module>': # top level usually 164 name.append(codename) # function or a method 165 del parentframe 166 167 return ".".join(name) 168 169 170def get_environ(): 171 """ 172 Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. 173 Si no está definida por defecto devuelve 'dev' 174 175 Returns: 176 str: El nombre del entorno 'dev' o 'prod' 177 """ 178 return os.getenv("DEV_ENVIRON", "dev").lower() 179 180 181def create_dir(a_dir): 182 """ 183 Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE 184 185 Args: 186 a_dir {str}: path del directorio a crear 187 188 Returns: 189 bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no 190 191 """ 192 ok = False 193 if os.path.exists(a_dir): 194 ok = True 195 else: 196 try: 197 os.makedirs(a_dir) 198 ok = True 199 except OSError as exc: 200 print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir) 201 202 return ok 203 204 205def remove_content_dir(a_dir): 206 """ 207 Borra ficheros y subdirectorios de directorio 208 209 Args: 210 a_dir {str}: path del directorio a crear 211 212 Returns: 213 num_elems_removed (int), num_elems_dir (int) 214 """ 215 num_elems_removed = 0 216 num_elems_dir = 0 217 for de in os.scandir(a_dir): 218 if de.is_dir(): 219 n_rem_subdir, n_subdir = remove_content_dir(de.path) 220 num_elems_dir += n_subdir 221 num_elems_removed += n_rem_subdir 222 try: 223 os.rmdir(de.path) 224 except: 225 pass 226 else: 227 num_elems_dir += 1 228 try: 229 os.unlink(de.path) 230 num_elems_removed += 1 231 except: 232 pass 233 234 return num_elems_removed, num_elems_dir 235 236 237# Sadly, Python fails to provide the following magic number for us. 238ERROR_INVALID_NAME = 123 239''' 240Windows-specific error code indicating an invalid pathname. 241 242See Also 243---------- 244https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx 245 Official listing of all such codes. 246''' 247 248 249def is_pathname_valid(pathname): 250 ''' 251 `True` if the passed pathname is a valid pathname for the current OS; 252 `False` otherwise. 253 ''' 254 # If this pathname is either not a string or is but is empty, this pathname 255 # is invalid. 256 try: 257 if not isinstance(pathname, str) or not pathname: 258 return False 259 260 # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`) 261 # if any. Since Windows prohibits path components from containing `:` 262 # characters, failing to strip this `:`-suffixed prefix would 263 # erroneously invalidate all valid absolute Windows pathnames. 264 _, pathname = os.path.splitdrive(pathname) 265 266 # Directory guaranteed to exist. If the current OS is Windows, this is 267 # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%" 268 # environment variable); else, the typical root directory. 269 root_dirname = os.environ.get('HOMEDRIVE', 'C:') \ 270 if sys.platform == 'win32' else os.sep 271 assert os.path.isdir(root_dirname) # ...Murphy and her ironclad Law 272 273 # Append a path separator to this directory if needed. 274 root_dirname = root_dirname.rstrip(os.sep) + os.sep 275 276 # Test whether each path component split from this pathname is valid or 277 # not, ignoring non-existent and non-readable path components. 278 for pathname_part in pathname.split(os.sep): 279 try: 280 os.lstat(root_dirname + pathname_part) 281 # If an OS-specific exception is raised, its error code 282 # indicates whether this pathname is valid or not. Unless this 283 # is the case, this exception implies an ignorable kernel or 284 # filesystem complaint (e.g., path not found or inaccessible). 285 # 286 # Only the following exceptions indicate invalid pathnames: 287 # 288 # * Instances of the Windows-specific "WindowsError" class 289 # defining the "winerror" attribute whose value is 290 # "ERROR_INVALID_NAME". Under Windows, "winerror" is more 291 # fine-grained and hence useful than the generic "errno" 292 # attribute. When a too-long pathname is passed, for example, 293 # "errno" is "ENOENT" (i.e., no such file or directory) rather 294 # than "ENAMETOOLONG" (i.e., file name too long). 295 # * Instances of the cross-platform "OSError" class defining the 296 # generic "errno" attribute whose value is either: 297 # * Under most POSIX-compatible OSes, "ENAMETOOLONG". 298 # * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE". 299 except OSError as exc: 300 if hasattr(exc, 'winerror'): 301 if exc.winerror == ERROR_INVALID_NAME: 302 return False 303 elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}: 304 return False 305 # If a "TypeError" exception was raised, it almost certainly has the 306 # error message "embedded NUL character" indicating an invalid pathname. 307 except TypeError as exc: 308 return False 309 # If no exception was raised, all path components and hence this 310 # pathname itself are valid. (Praise be to the curmudgeonly python.) 311 else: 312 return True 313 # If any other exception was raised, this is an unrelated fatal issue 314 # (e.g., a bug). Permit this exception to unwind the call stack. 315 # 316 # Did we mention this should be shipped with Python already? 317 318 319def is_dir_writable(dirname): 320 ''' 321 `True` if the current user has sufficient permissions to create **siblings** 322 (i.e., arbitrary files in the parent directory) of the passed pathname; 323 `False` otherwise. 324 ''' 325 try: 326 a_tmp = os.path.join(dirname, "temp.tmp") 327 with open(a_tmp, 'w+b'): 328 pass 329 330 try: 331 os.remove(a_tmp) 332 except: 333 pass 334 335 return True 336 337 # While the exact type of exception raised by the above function depends on 338 # the current version of the Python interpreter, all such types subclass the 339 # following exception superclass. 340 except: 341 return False 342 343 344def is_path_exists_or_creatable(pathname): 345 ''' 346 `True` if the passed pathname is a valid pathname on the current OS _and_ 347 either currently exists or is hypothetically creatable in a cross-platform 348 manner optimized for POSIX-unfriendly filesystems; `False` otherwise. 349 350 This function is guaranteed to _never_ raise exceptions. 351 ''' 352 try: 353 # To prevent "os" module calls from raising undesirable exceptions on 354 # invalid pathnames, is_pathname_valid() is explicitly called first. 355 return is_pathname_valid(pathname) and ( 356 os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname))) 357 # Report failure on non-fatal filesystem complaints (e.g., connection 358 # timeouts, permissions issues) implying this path to be inaccessible. All 359 # other exceptions are unrelated fatal issues and should not be caught here. 360 except OSError: 361 return False 362 363 364def get_matching_val(search_val, matching_vals): 365 """ 366 Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto 367 (prop_val). 368 369 Args: 370 search_val (str): Valor propuesto para comparar 371 matching_vals (list(str)): Lista de valores a comparar 372 373 Returns: 374 match_val (str), fact_jaro_winkler (float) 375 """ 376 jaro_results = jaro_winkler(search_val, matching_vals) 377 fact_jaro = next(iter(jaro_results), None) 378 379 return jaro_results.get(fact_jaro), fact_jaro 380 381 382def levenshtein_distance(search_val, matching_vals): 383 """ 384 385 Args: 386 search_val: 387 matching_vals: 388 389 Returns: 390 391 """ 392 ord_vals = OrderedDict() 393 distances = {} 394 for match_val in matching_vals: 395 fact = jellyfish.levenshtein_distance(search_val, match_val) 396 vals_fact = distances.get(fact, list()) 397 distances[fact] = vals_fact + [match_val] 398 399 for fact in sorted(distances): 400 ord_vals[fact] = distances.get(fact, []) 401 402 return ord_vals 403 404 405def jaro_winkler(search_val, matching_vals): 406 """ 407 408 Args: 409 search_val: 410 matching_vals: 411 412 Returns: 413 414 """ 415 ord_vals = OrderedDict() 416 matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val 417 for match_val in matching_vals} 418 for fact in sorted(matchings, reverse=True): 419 if fact != 0: 420 ord_vals[fact] = matchings[fact] 421 422 return ord_vals 423 424 425def call_command(command_prog, *args): 426 """ 427 Llama comando shell sistema con los argumentos indicados 428 429 Returns: 430 bool: True si OK 431 432 """ 433 call_args = [command_prog] 434 call_args.extend(args) 435 ret = subprocess.check_call(call_args, shell=True) 436 437 return (ret == 0) 438 439 440def rounded_float(a_float, num_decs=9): 441 """ 442 Formatea un float con el numero de decimales especificado 443 Args: 444 a_float: 445 num_decs: 446 447 Returns: 448 str 449 """ 450 return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.')) 451 452 453class formatted_float(float): 454 """ 455 Devuelve un float que se representa con un maximo de decimales (__num_decs__) 456 """ 457 __num_decs__ = 9 458 459 def __repr__(self): 460 return str(rounded_float(self, self.__num_decs__)) 461 462 463def as_format_floats(obj): 464 """ 465 Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación 466 467 Args: 468 obj: Cualquier objeto 469 470 Returns: 471 (obj, formatted_float) 472 473 """ 474 if isinstance(obj, (float, formatted_float)): 475 return formatted_float(obj) 476 elif isinstance(obj, (dict, OrderedDict)): 477 return obj.__class__((k, as_format_floats(v)) for k, v in obj.items()) 478 elif isinstance(obj, (list, tuple)): 479 return obj.__class__(as_format_floats(v) for v in obj) 480 return obj 481 482 483def nums_from_str(a_string, nan=False): 484 """ 485 Retorna lista de numeros en el texto pasado 486 487 Args: 488 a_string (str): 489 nan (bool=FAlse): por defecto no trata los NaN como numeros 490 491 Returns: 492 list 493 """ 494 l_nums = [] 495 496 for s in a_string.strip().split(): 497 try: 498 l_nums.append(int(s)) 499 except ValueError: 500 try: 501 fl = float(s) 502 if nan or not isnan(fl): 503 l_nums.append(fl) 504 except ValueError: 505 pass 506 507 return l_nums 508 509 510def first_num_from_str(a_string, nan=False): 511 """ 512 Retorna primer numero encontrado del texto pasado 513 514 Args: 515 a_string (str): 516 nan (bool=FAlse): por defecto no trata los NaN como numeros 517 518 Returns: 519 int OR float 520 """ 521 return next(iter(nums_from_str(a_string, nan=nan)), None) 522 523 524def dates_from_str(str, formats=None, seps=None, ret_extra_data=False): 525 """ 526 Retorna dict de fechas disponibles con el texto pasado segun formatos indicados 527 528 Args: 529 str (str): 530 formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 531 seps (list=None): por defecto [None, '.', ','] 532 ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado 533 534 Returns: 535 list 536 """ 537 l_fechas = list() 538 539 if not formats: 540 formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 541 542 if not seps: 543 seps = [None, '.', ','] 544 545 str_parts = [s.strip() for sep in seps for s in str.split(sep)] 546 547 for format in formats: 548 for str_part in str_parts: 549 try: 550 val = datetime.datetime.strptime(str_part, format) 551 if ret_extra_data: 552 val = (val, str_part, format) 553 l_fechas.append(val) 554 except Exception: 555 pass 556 557 return l_fechas 558 559 560def pretty_text(txt): 561 """ 562 Coge texto y lo capitaliza y quita carácteres por espacios 563 Args: 564 txt (str): 565 566 Returns: 567 str 568 """ 569 return txt.replace("_", " ").replace("-", " ").capitalize() 570 571 572def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED): 573 """ 574 Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path) 575 576 Args: 577 zip_path: 578 file_paths (list or generator): 579 base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra 580 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 581 582 Returns: 583 zip_path (str) 584 """ 585 with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip: 586 for file_path in file_paths: 587 if base_path: 588 re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE) 589 arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/')) 590 else: 591 arch_name = os.path.basename(file_path) 592 593 my_zip.write(file_path, arcname=arch_name) 594 595 return zip_path 596 597 598def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED): 599 """ 600 Comprime la carpeta indicada 601 602 Args: 603 dir_path (str): path directorio 604 zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo 605 nombre del directorio zipeado 606 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 607 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 608 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 609 610 Returns: 611 zip_file (str) 612 """ 613 if not zip_path: 614 zip_path = f'{dir_path}.zip' 615 616 zip_file = zip_files(zip_path, 617 iter_paths_dir(dir_path, 618 relative_dirs_sel=relative_dirs_sel, 619 func_filter_path=func_filter_path), 620 base_path=dir_path, 621 compression=compression) 622 623 return zip_file 624 625 626def zip_files_dir(dir_path, remove_files=False, *exts_files): 627 """ 628 Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima 629 630 Args: 631 dir_path: 632 remove_files: 633 *exts_files: extensiones de fichero SIN el punto 634 635 Returns: 636 ok (bool) 637 """ 638 exts = [".{}".format(ext.lower()) for ext in exts_files] 639 for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path) 640 for de in os.scandir(dir_path)): 641 if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts): 642 print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path)) 643 zip_files(zip_path, [file_path]) 644 645 if remove_files and not os.path.samefile(zip_path, file_path): 646 os.remove(file_path) 647 648 return True 649 650 651def split_ext_file(path_file): 652 """ 653 Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás 654 Args: 655 path_file: 656 Returns: 657 base_file (str), ext_file (str) 658 """ 659 parts_file = os.path.basename(path_file).split(".") 660 base_file = parts_file[0] 661 ext_file = ".".join(parts_file[1:]) 662 663 return base_file, ext_file 664 665 666FILE_RUN_LOG = "last_run.log" 667DATE_RUN_LOG_FRMT = "%Y%m%d" 668 669 670def last_run_on_dir(dir_base): 671 """ 672 Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 673 Args: 674 dir_base (str): 675 676 Returns: 677 date_last_run (datetime): Si no encuentra devuelve None 678 """ 679 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 680 dt_last_run = None 681 if os.path.exists(log_last_run): 682 with open(log_last_run) as fr: 683 dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT) 684 685 return dt_last_run 686 687 688def save_last_run_on_dir(dir_base, date_run=None): 689 """ 690 Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 691 692 Args: 693 dir_base (str): 694 date_run (datetime=None): Si no se informa cogerá la fecha de hoy 695 """ 696 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 697 if not date_run: 698 date_run = datetime.date.today() 699 with open(log_last_run, "w+") as fw: 700 fw.write(date_run.strftime(DATE_RUN_LOG_FRMT)) 701 702 703def month_name(num_month, code_alias_locale="es_cu"): 704 """ 705 Retorna numero de mes en el locale espcificado. Por defecto castellano 706 707 Args: 708 num_month (int): 709 code_alias_locale (str='es_es'): 710 711 Returns: 712 str 713 """ 714 with different_locale(locale.locale_alias.get(code_alias_locale)): 715 return pretty_text(calendar.month_name[num_month]) 716 717 718def file_mod_time(path_file): 719 """ 720 Return datetime from mofification stat timestamp from file 721 722 Args: 723 path_file (str): 724 725 Returns: 726 datetime 727 """ 728 f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime) 729 730 return f_mod_time 731 732 733def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"): 734 """ 735 Itera como dicts indexados por valores primera fila (si header=True) o si no como list 736 las filas del CSV pasado por parametro a_path_csv. 737 738 Args: 739 a_path_csv (str): 740 header (bool=True): 741 sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader 742 encoding (str="utf8"): 743 Yields: 744 list OR dict 745 """ 746 with open(a_path_csv, encoding=encoding) as a_file: 747 csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';') 748 header_row = None 749 for row in csv_rdr: 750 if header and not header_row: 751 header_row = [v.strip().lower() for v in row] 752 continue 753 754 if header_row: 755 vals_row = dict(zip(header_row, row)) 756 else: 757 vals_row = row 758 759 if vals_row: 760 yield vals_row 761 762 763def subdirs_path(path): 764 """ 765 Itera sobre los subdirectorios del path 766 Args: 767 path: 768 769 Yields: 770 nom_subdir, path_subdir 771 """ 772 with os.scandir(path) as it: 773 for entry in it: 774 if entry.is_dir(): 775 yield entry.name, entry.path 776 777 778def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False): 779 """ 780 781 Args: 782 path_dir_base: 783 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 784 last_level_as_list (bool=False): 785 786 Returns: 787 dict 788 """ 789 tree = {} 790 791 f_valid_dir = None 792 valid_dirs_sel = set() 793 if relative_dirs_sel: 794 for dir_sel in relative_dirs_sel: 795 path_dir_rel = os.path.join(path_dir_base, dir_sel) 796 if os.path.exists(path_dir_rel): 797 valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower()) 798 799 def valid_dir(dir_path): 800 valid = False 801 rel_path = os.path.relpath(dir_path, path_dir_base).lower() 802 for dir_sel in valid_dirs_sel: 803 if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)): 804 valid = True 805 break 806 807 return valid 808 809 f_valid_dir = valid_dir 810 811 for dir_name, dir_path in subdirs_path(path_dir_base): 812 if not f_valid_dir or f_valid_dir(dir_path): 813 dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower() 814 dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel 815 if os.path.commonpath((dir_path_rel, dir_sel))] 816 tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path) 817 818 if tree: 819 if last_level_as_list and not any(tree.values()): 820 tree = [*tree.keys()] 821 822 return tree 823 824 825def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False): 826 """ 827 Retorna diccionario con el arbol de paths disponibles en el path indicado. 828 829 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 830 831 Args: 832 path_dir_base (str): 833 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 834 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 835 solo_dirs (bool=False): 836 837 Returns: 838 dict 839 """ 840 paths = dict() 841 842 valid_dirs_sel = set() 843 if relative_dirs_sel: 844 for dir_sel in relative_dirs_sel: 845 path_dir_rel = os.path.join(path_dir_base, dir_sel) 846 if os.path.exists(path_dir_rel): 847 valid_dirs_sel.add(path_dir_rel) 848 849 for dir_path, dir_names, file_names in os.walk(path_dir_base): 850 if valid_dirs_sel and not any( 851 os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel) 852 for a_dir_sel in valid_dirs_sel): 853 continue 854 855 dir_path = os.path.relpath(dir_path, path_dir_base) 856 dir_name = os.path.basename(dir_path) 857 858 if func_filter_path and not func_filter_path(dir_name): 859 continue 860 861 files_selected = {fn: None for fn in file_names 862 if not func_filter_path or func_filter_path(fn)} 863 864 if files_selected: 865 subdir_paths = paths 866 # En el caso del primer nivel no se guarda name directorio 867 if dir_path != '.': 868 for d in dir_path.split(os.sep): 869 if d not in subdir_paths: 870 subdir_paths[d] = dict() 871 subdir_paths = subdir_paths[d] 872 873 if not solo_dirs: 874 subdir_paths.update(files_selected) 875 876 return paths 877 878 879def iter_tree_paths(tree_paths, path_base=None): 880 """ 881 882 Args: 883 tree_paths (dict): 884 path_base (str=None): 885 886 Yields: 887 path_file 888 """ 889 for path, sub_tree in tree_paths.items(): 890 if sub_tree and isinstance(sub_tree, dict): 891 for sub_path in iter_tree_paths(sub_tree, path): 892 yield os.path.join(path_base, sub_path) if path_base else sub_path 893 else: 894 yield os.path.join(path_base, path) if path_base else path 895 896 897def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None): 898 """ 899 Itera el arbol de paths disponibles en el path indicado. 900 901 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 902 903 Args: 904 path_dir_base (str): 905 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 906 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 907 908 Yields: 909 path (str) 910 """ 911 for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base): 912 yield path 913 914 915def is_path_child_from(path, path_parent): 916 """ 917 Retorna si path es hijo de path_parent 918 919 Args: 920 path: 921 path_parent: 922 923 Returns: 924 bool 925 """ 926 p_path = Path(path) 927 p_path_parent = Path(path_parent) 928 929 return any(p.samefile(p_path_parent) for p in p_path.parents) 930 931 932def machine_name(): 933 """ 934 Retorna el nombre de la maquina 935 936 Returns: 937 str 938 """ 939 # TODO - Get host from docker machine when we are in a container 940 # TODO - import docker 941 # TODO - 942 # TODO - client = docker.from_env() 943 # TODO - container_info = client.containers.get(socket.gethostname()) 944 # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress'] 945 # TODO - print(docker_host_ip) 946 947 return socket.getfqdn().upper() 948 949 950def machine_apb(): 951 """ 952 Retorna el nombre de la maquina 953 954 Returns: 955 bool 956 """ 957 return socket.getfqdn().lower().endswith('.apb.es') 958 959 960if __name__ == '__main__': 961 import fire 962 963 fire.Fire()
32def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True): 33 """ 34 35 Args: 36 url (str): 37 extract_to (str=None): if None, extract to current directory 38 headers (list=None) 39 remove_zip (bool=True): 40 41 Returns: 42 path_zip (str) 43 """ 44 if zip_file_path := download_from_url(url, extract_to, headers): 45 extract_to = unzip(zip_file_path, extract_to, remove_zip) 46 47 return extract_to
Arguments:
- url (str):
- extract_to (str=None): if None, extract to current directory
- headers (list=None)
- remove_zip (bool=True):
Returns:
path_zip (str)
50def unzip(zip_file_path, extract_to=None, remove_zip=False): 51 """ 52 Unzip file to extract_to directory 53 54 Args: 55 zip_file_path (str): Path to zip file 56 extract_to: (str=None): if None, extract to zip's directory 57 remove_zip: (bool=False): If True remove zip file after unzip 58 59 Returns: 60 extract_to (str) 61 """ 62 with ZipFile(zip_file_path, 'r') as zipfile: 63 if not extract_to: 64 extract_to = os.path.join( 65 os.path.dirname(zip_file_path), 66 os.path.splitext(os.path.basename(zip_file_path))[0] 67 ) 68 69 desc = f"Extracting {zip_file_path} to {extract_to}" 70 if not sys.stdout: 71 print(f'{desc}...') 72 gen_members = zipfile.infolist() 73 else: 74 gen_members = tqdm(zipfile.infolist(), desc=desc) 75 76 for member in gen_members: 77 zipfile.extract(member, extract_to) 78 if remove_zip: 79 os.remove(zip_file_path) 80 return extract_to
Unzip file to extract_to directory
Arguments:
- zip_file_path (str): Path to zip file
- extract_to: (str=None): if None, extract to zip's directory
- remove_zip: (bool=False): If True remove zip file after unzip
Returns:
extract_to (str)
83def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str: 84 """ 85 86 Args: 87 url (str): Url to download 88 extract_to (str=None): Directory to save file. Default temporary directory 89 headers (list=None) 90 91 Returns: 92 path_file (str | None) 93 """ 94 opener = build_opener() 95 if headers: 96 opener.addheaders = headers 97 98 with opener.open(url) as response: 99 content_length = response.length 100 if not extract_to: 101 extract_to = gettempdir() 102 103 if n_file := response.headers.get_filename(): 104 file_path = os.path.join(extract_to, n_file) 105 else: 106 file_path = os.path.join(extract_to, Path(response.url).name) 107 108 with open(file_path, "wb") as out_file: 109 def get_resp_data(): 110 while True: 111 data = response.read(1024) 112 if not data: 113 break 114 yield data 115 116 desc = f'Downloading to "{file_path}"' 117 if not sys.stdout: 118 print(f'{desc}...') 119 for data in get_resp_data(): 120 out_file.write(data) 121 else: 122 with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar: 123 for data in get_resp_data(): 124 out_file.write(data) 125 progress_bar.update(len(data)) 126 127 return file_path
Arguments:
- url (str): Url to download
- extract_to (str=None): Directory to save file. Default temporary directory
- headers (list=None)
Returns:
path_file (str | None)
130def caller_name(skip=2): 131 """Get a name of a caller in the format module.class.method 132 133 `skip` specifies how many levels of stack to skip while getting caller 134 name. skip=1 means "who calls me", skip=2 "who calls my caller" etc. 135 136 An empty string is returned if skipped levels exceed stack height 137 """ 138 139 def stack_(frame): 140 framelist = [] 141 while frame: 142 framelist.append(frame) 143 frame = frame.f_back 144 return framelist 145 146 stack = stack_(sys._getframe(1)) 147 start = 0 + skip 148 if len(stack) < start + 1: 149 return '' 150 parentframe = stack[start] 151 152 name = [] 153 module = inspect.getmodule(parentframe) 154 # `modname` can be None when frame is executed directly in console 155 if module and module.__name__ != "__main__": 156 name.append(module.__name__) 157 # detect classname 158 if 'self' in parentframe.f_locals: 159 # I don't know any way to detect call from the object method 160 # XXX: there seems to be no way to detect static method call - it will 161 # be just a function call 162 name.append(parentframe.f_locals['self'].__class__.__name__) 163 codename = parentframe.f_code.co_name 164 if codename != '<module>': # top level usually 165 name.append(codename) # function or a method 166 del parentframe 167 168 return ".".join(name)
Get a name of a caller in the format module.class.method
skip
specifies how many levels of stack to skip while getting caller
name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
An empty string is returned if skipped levels exceed stack height
171def get_environ(): 172 """ 173 Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. 174 Si no está definida por defecto devuelve 'dev' 175 176 Returns: 177 str: El nombre del entorno 'dev' o 'prod' 178 """ 179 return os.getenv("DEV_ENVIRON", "dev").lower()
Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. Si no está definida por defecto devuelve 'dev'
Returns:
str: El nombre del entorno 'dev' o 'prod'
182def create_dir(a_dir): 183 """ 184 Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE 185 186 Args: 187 a_dir {str}: path del directorio a crear 188 189 Returns: 190 bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no 191 192 """ 193 ok = False 194 if os.path.exists(a_dir): 195 ok = True 196 else: 197 try: 198 os.makedirs(a_dir) 199 ok = True 200 except OSError as exc: 201 print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir) 202 203 return ok
Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE
Arguments:
- a_dir {str}: path del directorio a crear
Returns:
bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no
206def remove_content_dir(a_dir): 207 """ 208 Borra ficheros y subdirectorios de directorio 209 210 Args: 211 a_dir {str}: path del directorio a crear 212 213 Returns: 214 num_elems_removed (int), num_elems_dir (int) 215 """ 216 num_elems_removed = 0 217 num_elems_dir = 0 218 for de in os.scandir(a_dir): 219 if de.is_dir(): 220 n_rem_subdir, n_subdir = remove_content_dir(de.path) 221 num_elems_dir += n_subdir 222 num_elems_removed += n_rem_subdir 223 try: 224 os.rmdir(de.path) 225 except: 226 pass 227 else: 228 num_elems_dir += 1 229 try: 230 os.unlink(de.path) 231 num_elems_removed += 1 232 except: 233 pass 234 235 return num_elems_removed, num_elems_dir
Borra ficheros y subdirectorios de directorio
Arguments:
- a_dir {str}: path del directorio a crear
Returns:
num_elems_removed (int), num_elems_dir (int)
Windows-specific error code indicating an invalid pathname.
See Also
https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx Official listing of all such codes.
250def is_pathname_valid(pathname): 251 ''' 252 `True` if the passed pathname is a valid pathname for the current OS; 253 `False` otherwise. 254 ''' 255 # If this pathname is either not a string or is but is empty, this pathname 256 # is invalid. 257 try: 258 if not isinstance(pathname, str) or not pathname: 259 return False 260 261 # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`) 262 # if any. Since Windows prohibits path components from containing `:` 263 # characters, failing to strip this `:`-suffixed prefix would 264 # erroneously invalidate all valid absolute Windows pathnames. 265 _, pathname = os.path.splitdrive(pathname) 266 267 # Directory guaranteed to exist. If the current OS is Windows, this is 268 # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%" 269 # environment variable); else, the typical root directory. 270 root_dirname = os.environ.get('HOMEDRIVE', 'C:') \ 271 if sys.platform == 'win32' else os.sep 272 assert os.path.isdir(root_dirname) # ...Murphy and her ironclad Law 273 274 # Append a path separator to this directory if needed. 275 root_dirname = root_dirname.rstrip(os.sep) + os.sep 276 277 # Test whether each path component split from this pathname is valid or 278 # not, ignoring non-existent and non-readable path components. 279 for pathname_part in pathname.split(os.sep): 280 try: 281 os.lstat(root_dirname + pathname_part) 282 # If an OS-specific exception is raised, its error code 283 # indicates whether this pathname is valid or not. Unless this 284 # is the case, this exception implies an ignorable kernel or 285 # filesystem complaint (e.g., path not found or inaccessible). 286 # 287 # Only the following exceptions indicate invalid pathnames: 288 # 289 # * Instances of the Windows-specific "WindowsError" class 290 # defining the "winerror" attribute whose value is 291 # "ERROR_INVALID_NAME". Under Windows, "winerror" is more 292 # fine-grained and hence useful than the generic "errno" 293 # attribute. When a too-long pathname is passed, for example, 294 # "errno" is "ENOENT" (i.e., no such file or directory) rather 295 # than "ENAMETOOLONG" (i.e., file name too long). 296 # * Instances of the cross-platform "OSError" class defining the 297 # generic "errno" attribute whose value is either: 298 # * Under most POSIX-compatible OSes, "ENAMETOOLONG". 299 # * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE". 300 except OSError as exc: 301 if hasattr(exc, 'winerror'): 302 if exc.winerror == ERROR_INVALID_NAME: 303 return False 304 elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}: 305 return False 306 # If a "TypeError" exception was raised, it almost certainly has the 307 # error message "embedded NUL character" indicating an invalid pathname. 308 except TypeError as exc: 309 return False 310 # If no exception was raised, all path components and hence this 311 # pathname itself are valid. (Praise be to the curmudgeonly python.) 312 else: 313 return True 314 # If any other exception was raised, this is an unrelated fatal issue 315 # (e.g., a bug). Permit this exception to unwind the call stack. 316 # 317 # Did we mention this should be shipped with Python already?
True
if the passed pathname is a valid pathname for the current OS;
False
otherwise.
320def is_dir_writable(dirname): 321 ''' 322 `True` if the current user has sufficient permissions to create **siblings** 323 (i.e., arbitrary files in the parent directory) of the passed pathname; 324 `False` otherwise. 325 ''' 326 try: 327 a_tmp = os.path.join(dirname, "temp.tmp") 328 with open(a_tmp, 'w+b'): 329 pass 330 331 try: 332 os.remove(a_tmp) 333 except: 334 pass 335 336 return True 337 338 # While the exact type of exception raised by the above function depends on 339 # the current version of the Python interpreter, all such types subclass the 340 # following exception superclass. 341 except: 342 return False
True
if the current user has sufficient permissions to create siblings
(i.e., arbitrary files in the parent directory) of the passed pathname;
False
otherwise.
345def is_path_exists_or_creatable(pathname): 346 ''' 347 `True` if the passed pathname is a valid pathname on the current OS _and_ 348 either currently exists or is hypothetically creatable in a cross-platform 349 manner optimized for POSIX-unfriendly filesystems; `False` otherwise. 350 351 This function is guaranteed to _never_ raise exceptions. 352 ''' 353 try: 354 # To prevent "os" module calls from raising undesirable exceptions on 355 # invalid pathnames, is_pathname_valid() is explicitly called first. 356 return is_pathname_valid(pathname) and ( 357 os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname))) 358 # Report failure on non-fatal filesystem complaints (e.g., connection 359 # timeouts, permissions issues) implying this path to be inaccessible. All 360 # other exceptions are unrelated fatal issues and should not be caught here. 361 except OSError: 362 return False
True
if the passed pathname is a valid pathname on the current OS _and_
either currently exists or is hypothetically creatable in a cross-platform
manner optimized for POSIX-unfriendly filesystems; False
otherwise.
This function is guaranteed to _never_ raise exceptions.
365def get_matching_val(search_val, matching_vals): 366 """ 367 Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto 368 (prop_val). 369 370 Args: 371 search_val (str): Valor propuesto para comparar 372 matching_vals (list(str)): Lista de valores a comparar 373 374 Returns: 375 match_val (str), fact_jaro_winkler (float) 376 """ 377 jaro_results = jaro_winkler(search_val, matching_vals) 378 fact_jaro = next(iter(jaro_results), None) 379 380 return jaro_results.get(fact_jaro), fact_jaro
Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto (prop_val).
Arguments:
- search_val (str): Valor propuesto para comparar
- matching_vals (list(str)): Lista de valores a comparar
Returns:
match_val (str), fact_jaro_winkler (float)
383def levenshtein_distance(search_val, matching_vals): 384 """ 385 386 Args: 387 search_val: 388 matching_vals: 389 390 Returns: 391 392 """ 393 ord_vals = OrderedDict() 394 distances = {} 395 for match_val in matching_vals: 396 fact = jellyfish.levenshtein_distance(search_val, match_val) 397 vals_fact = distances.get(fact, list()) 398 distances[fact] = vals_fact + [match_val] 399 400 for fact in sorted(distances): 401 ord_vals[fact] = distances.get(fact, []) 402 403 return ord_vals
Arguments:
- search_val:
- matching_vals:
Returns:
406def jaro_winkler(search_val, matching_vals): 407 """ 408 409 Args: 410 search_val: 411 matching_vals: 412 413 Returns: 414 415 """ 416 ord_vals = OrderedDict() 417 matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val 418 for match_val in matching_vals} 419 for fact in sorted(matchings, reverse=True): 420 if fact != 0: 421 ord_vals[fact] = matchings[fact] 422 423 return ord_vals
Arguments:
- search_val:
- matching_vals:
Returns:
426def call_command(command_prog, *args): 427 """ 428 Llama comando shell sistema con los argumentos indicados 429 430 Returns: 431 bool: True si OK 432 433 """ 434 call_args = [command_prog] 435 call_args.extend(args) 436 ret = subprocess.check_call(call_args, shell=True) 437 438 return (ret == 0)
Llama comando shell sistema con los argumentos indicados
Returns:
bool: True si OK
441def rounded_float(a_float, num_decs=9): 442 """ 443 Formatea un float con el numero de decimales especificado 444 Args: 445 a_float: 446 num_decs: 447 448 Returns: 449 str 450 """ 451 return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.'))
Formatea un float con el numero de decimales especificado
Arguments:
- a_float:
- num_decs:
Returns:
str
454class formatted_float(float): 455 """ 456 Devuelve un float que se representa con un maximo de decimales (__num_decs__) 457 """ 458 __num_decs__ = 9 459 460 def __repr__(self): 461 return str(rounded_float(self, self.__num_decs__))
Devuelve un float que se representa con un maximo de decimales (__num_decs__)
464def as_format_floats(obj): 465 """ 466 Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación 467 468 Args: 469 obj: Cualquier objeto 470 471 Returns: 472 (obj, formatted_float) 473 474 """ 475 if isinstance(obj, (float, formatted_float)): 476 return formatted_float(obj) 477 elif isinstance(obj, (dict, OrderedDict)): 478 return obj.__class__((k, as_format_floats(v)) for k, v in obj.items()) 479 elif isinstance(obj, (list, tuple)): 480 return obj.__class__(as_format_floats(v) for v in obj) 481 return obj
Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación
Arguments:
- obj: Cualquier objeto
Returns:
(obj, formatted_float)
484def nums_from_str(a_string, nan=False): 485 """ 486 Retorna lista de numeros en el texto pasado 487 488 Args: 489 a_string (str): 490 nan (bool=FAlse): por defecto no trata los NaN como numeros 491 492 Returns: 493 list 494 """ 495 l_nums = [] 496 497 for s in a_string.strip().split(): 498 try: 499 l_nums.append(int(s)) 500 except ValueError: 501 try: 502 fl = float(s) 503 if nan or not isnan(fl): 504 l_nums.append(fl) 505 except ValueError: 506 pass 507 508 return l_nums
Retorna lista de numeros en el texto pasado
Arguments:
- a_string (str):
- nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:
list
511def first_num_from_str(a_string, nan=False): 512 """ 513 Retorna primer numero encontrado del texto pasado 514 515 Args: 516 a_string (str): 517 nan (bool=FAlse): por defecto no trata los NaN como numeros 518 519 Returns: 520 int OR float 521 """ 522 return next(iter(nums_from_str(a_string, nan=nan)), None)
Retorna primer numero encontrado del texto pasado
Arguments:
- a_string (str):
- nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:
int OR float
525def dates_from_str(str, formats=None, seps=None, ret_extra_data=False): 526 """ 527 Retorna dict de fechas disponibles con el texto pasado segun formatos indicados 528 529 Args: 530 str (str): 531 formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 532 seps (list=None): por defecto [None, '.', ','] 533 ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado 534 535 Returns: 536 list 537 """ 538 l_fechas = list() 539 540 if not formats: 541 formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d'] 542 543 if not seps: 544 seps = [None, '.', ','] 545 546 str_parts = [s.strip() for sep in seps for s in str.split(sep)] 547 548 for format in formats: 549 for str_part in str_parts: 550 try: 551 val = datetime.datetime.strptime(str_part, format) 552 if ret_extra_data: 553 val = (val, str_part, format) 554 l_fechas.append(val) 555 except Exception: 556 pass 557 558 return l_fechas
Retorna dict de fechas disponibles con el texto pasado segun formatos indicados
Arguments:
- str (str):
- formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
- seps (list=None): por defecto [None, '.', ',']
- ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
Returns:
list
561def pretty_text(txt): 562 """ 563 Coge texto y lo capitaliza y quita carácteres por espacios 564 Args: 565 txt (str): 566 567 Returns: 568 str 569 """ 570 return txt.replace("_", " ").replace("-", " ").capitalize()
Coge texto y lo capitaliza y quita carácteres por espacios
Arguments:
- txt (str):
Returns:
str
573def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED): 574 """ 575 Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path) 576 577 Args: 578 zip_path: 579 file_paths (list or generator): 580 base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra 581 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 582 583 Returns: 584 zip_path (str) 585 """ 586 with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip: 587 for file_path in file_paths: 588 if base_path: 589 re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE) 590 arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/')) 591 else: 592 arch_name = os.path.basename(file_path) 593 594 my_zip.write(file_path, arcname=arch_name) 595 596 return zip_path
Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)
Arguments:
- zip_path:
- file_paths (list or generator):
- base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
- compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:
zip_path (str)
599def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED): 600 """ 601 Comprime la carpeta indicada 602 603 Args: 604 dir_path (str): path directorio 605 zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo 606 nombre del directorio zipeado 607 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 608 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 609 compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir 610 611 Returns: 612 zip_file (str) 613 """ 614 if not zip_path: 615 zip_path = f'{dir_path}.zip' 616 617 zip_file = zip_files(zip_path, 618 iter_paths_dir(dir_path, 619 relative_dirs_sel=relative_dirs_sel, 620 func_filter_path=func_filter_path), 621 base_path=dir_path, 622 compression=compression) 623 624 return zip_file
Comprime la carpeta indicada
Arguments:
- dir_path (str): path directorio
- zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo nombre del directorio zipeado
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
- compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:
zip_file (str)
627def zip_files_dir(dir_path, remove_files=False, *exts_files): 628 """ 629 Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima 630 631 Args: 632 dir_path: 633 remove_files: 634 *exts_files: extensiones de fichero SIN el punto 635 636 Returns: 637 ok (bool) 638 """ 639 exts = [".{}".format(ext.lower()) for ext in exts_files] 640 for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path) 641 for de in os.scandir(dir_path)): 642 if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts): 643 print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path)) 644 zip_files(zip_path, [file_path]) 645 646 if remove_files and not os.path.samefile(zip_path, file_path): 647 os.remove(file_path) 648 649 return True
Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima
Arguments:
- dir_path:
- remove_files:
- *exts_files: extensiones de fichero SIN el punto
Returns:
ok (bool)
652def split_ext_file(path_file): 653 """ 654 Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás 655 Args: 656 path_file: 657 Returns: 658 base_file (str), ext_file (str) 659 """ 660 parts_file = os.path.basename(path_file).split(".") 661 base_file = parts_file[0] 662 ext_file = ".".join(parts_file[1:]) 663 664 return base_file, ext_file
Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás
Arguments:
- path_file:
Returns:
base_file (str), ext_file (str)
671def last_run_on_dir(dir_base): 672 """ 673 Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 674 Args: 675 dir_base (str): 676 677 Returns: 678 date_last_run (datetime): Si no encuentra devuelve None 679 """ 680 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 681 dt_last_run = None 682 if os.path.exists(log_last_run): 683 with open(log_last_run) as fr: 684 dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT) 685 686 return dt_last_run
Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
Arguments:
- dir_base (str):
Returns:
date_last_run (datetime): Si no encuentra devuelve None
689def save_last_run_on_dir(dir_base, date_run=None): 690 """ 691 Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio 692 693 Args: 694 dir_base (str): 695 date_run (datetime=None): Si no se informa cogerá la fecha de hoy 696 """ 697 log_last_run = os.path.join(dir_base, FILE_RUN_LOG) 698 if not date_run: 699 date_run = datetime.date.today() 700 with open(log_last_run, "w+") as fw: 701 fw.write(date_run.strftime(DATE_RUN_LOG_FRMT))
Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
Arguments:
- dir_base (str):
- date_run (datetime=None): Si no se informa cogerá la fecha de hoy
704def month_name(num_month, code_alias_locale="es_cu"): 705 """ 706 Retorna numero de mes en el locale espcificado. Por defecto castellano 707 708 Args: 709 num_month (int): 710 code_alias_locale (str='es_es'): 711 712 Returns: 713 str 714 """ 715 with different_locale(locale.locale_alias.get(code_alias_locale)): 716 return pretty_text(calendar.month_name[num_month])
Retorna numero de mes en el locale espcificado. Por defecto castellano
Arguments:
- num_month (int):
- code_alias_locale (str='es_es'):
Returns:
str
719def file_mod_time(path_file): 720 """ 721 Return datetime from mofification stat timestamp from file 722 723 Args: 724 path_file (str): 725 726 Returns: 727 datetime 728 """ 729 f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime) 730 731 return f_mod_time
Return datetime from mofification stat timestamp from file
Arguments:
- path_file (str):
Returns:
datetime
734def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"): 735 """ 736 Itera como dicts indexados por valores primera fila (si header=True) o si no como list 737 las filas del CSV pasado por parametro a_path_csv. 738 739 Args: 740 a_path_csv (str): 741 header (bool=True): 742 sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader 743 encoding (str="utf8"): 744 Yields: 745 list OR dict 746 """ 747 with open(a_path_csv, encoding=encoding) as a_file: 748 csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';') 749 header_row = None 750 for row in csv_rdr: 751 if header and not header_row: 752 header_row = [v.strip().lower() for v in row] 753 continue 754 755 if header_row: 756 vals_row = dict(zip(header_row, row)) 757 else: 758 vals_row = row 759 760 if vals_row: 761 yield vals_row
Itera como dicts indexados por valores primera fila (si header=True) o si no como list las filas del CSV pasado por parametro a_path_csv.
Arguments:
- a_path_csv (str):
- header (bool=True):
- sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
- encoding (str="utf8"):
Yields:
list OR dict
764def subdirs_path(path): 765 """ 766 Itera sobre los subdirectorios del path 767 Args: 768 path: 769 770 Yields: 771 nom_subdir, path_subdir 772 """ 773 with os.scandir(path) as it: 774 for entry in it: 775 if entry.is_dir(): 776 yield entry.name, entry.path
Itera sobre los subdirectorios del path
Arguments:
- path:
Yields:
nom_subdir, path_subdir
779def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False): 780 """ 781 782 Args: 783 path_dir_base: 784 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 785 last_level_as_list (bool=False): 786 787 Returns: 788 dict 789 """ 790 tree = {} 791 792 f_valid_dir = None 793 valid_dirs_sel = set() 794 if relative_dirs_sel: 795 for dir_sel in relative_dirs_sel: 796 path_dir_rel = os.path.join(path_dir_base, dir_sel) 797 if os.path.exists(path_dir_rel): 798 valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower()) 799 800 def valid_dir(dir_path): 801 valid = False 802 rel_path = os.path.relpath(dir_path, path_dir_base).lower() 803 for dir_sel in valid_dirs_sel: 804 if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)): 805 valid = True 806 break 807 808 return valid 809 810 f_valid_dir = valid_dir 811 812 for dir_name, dir_path in subdirs_path(path_dir_base): 813 if not f_valid_dir or f_valid_dir(dir_path): 814 dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower() 815 dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel 816 if os.path.commonpath((dir_path_rel, dir_sel))] 817 tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path) 818 819 if tree: 820 if last_level_as_list and not any(tree.values()): 821 tree = [*tree.keys()] 822 823 return tree
Arguments:
- path_dir_base:
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- last_level_as_list (bool=False):
Returns:
dict
826def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False): 827 """ 828 Retorna diccionario con el arbol de paths disponibles en el path indicado. 829 830 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 831 832 Args: 833 path_dir_base (str): 834 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 835 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 836 solo_dirs (bool=False): 837 838 Returns: 839 dict 840 """ 841 paths = dict() 842 843 valid_dirs_sel = set() 844 if relative_dirs_sel: 845 for dir_sel in relative_dirs_sel: 846 path_dir_rel = os.path.join(path_dir_base, dir_sel) 847 if os.path.exists(path_dir_rel): 848 valid_dirs_sel.add(path_dir_rel) 849 850 for dir_path, dir_names, file_names in os.walk(path_dir_base): 851 if valid_dirs_sel and not any( 852 os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel) 853 for a_dir_sel in valid_dirs_sel): 854 continue 855 856 dir_path = os.path.relpath(dir_path, path_dir_base) 857 dir_name = os.path.basename(dir_path) 858 859 if func_filter_path and not func_filter_path(dir_name): 860 continue 861 862 files_selected = {fn: None for fn in file_names 863 if not func_filter_path or func_filter_path(fn)} 864 865 if files_selected: 866 subdir_paths = paths 867 # En el caso del primer nivel no se guarda name directorio 868 if dir_path != '.': 869 for d in dir_path.split(os.sep): 870 if d not in subdir_paths: 871 subdir_paths[d] = dict() 872 subdir_paths = subdir_paths[d] 873 874 if not solo_dirs: 875 subdir_paths.update(files_selected) 876 877 return paths
Retorna diccionario con el arbol de paths disponibles en el path indicado.
Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
Arguments:
- path_dir_base (str):
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
- solo_dirs (bool=False):
Returns:
dict
880def iter_tree_paths(tree_paths, path_base=None): 881 """ 882 883 Args: 884 tree_paths (dict): 885 path_base (str=None): 886 887 Yields: 888 path_file 889 """ 890 for path, sub_tree in tree_paths.items(): 891 if sub_tree and isinstance(sub_tree, dict): 892 for sub_path in iter_tree_paths(sub_tree, path): 893 yield os.path.join(path_base, sub_path) if path_base else sub_path 894 else: 895 yield os.path.join(path_base, path) if path_base else path
Arguments:
- tree_paths (dict):
- path_base (str=None):
Yields:
path_file
898def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None): 899 """ 900 Itera el arbol de paths disponibles en el path indicado. 901 902 Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True) 903 904 Args: 905 path_dir_base (str): 906 relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran 907 func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar 908 909 Yields: 910 path (str) 911 """ 912 for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base): 913 yield path
Itera el arbol de paths disponibles en el path indicado.
Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
Arguments:
- path_dir_base (str):
- relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
- func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
Yields:
path (str)
916def is_path_child_from(path, path_parent): 917 """ 918 Retorna si path es hijo de path_parent 919 920 Args: 921 path: 922 path_parent: 923 924 Returns: 925 bool 926 """ 927 p_path = Path(path) 928 p_path_parent = Path(path_parent) 929 930 return any(p.samefile(p_path_parent) for p in p_path.parents)
Retorna si path es hijo de path_parent
Arguments:
- path:
- path_parent:
Returns:
bool
933def machine_name(): 934 """ 935 Retorna el nombre de la maquina 936 937 Returns: 938 str 939 """ 940 # TODO - Get host from docker machine when we are in a container 941 # TODO - import docker 942 # TODO - 943 # TODO - client = docker.from_env() 944 # TODO - container_info = client.containers.get(socket.gethostname()) 945 # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress'] 946 # TODO - print(docker_host_ip) 947 948 return socket.getfqdn().upper()
Retorna el nombre de la maquina
Returns:
str
951def machine_apb(): 952 """ 953 Retorna el nombre de la maquina 954 955 Returns: 956 bool 957 """ 958 return socket.getfqdn().lower().endswith('.apb.es')
Retorna el nombre de la maquina
Returns:
bool