apb_extra_utils.misc

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 7/6/19 18:23
  5#  Last modified: 7/6/19 18:21
  6#  Copyright (c) 2019
  7from __future__ import annotations
  8
  9import calendar
 10import csv
 11import datetime
 12import errno
 13import inspect
 14import locale
 15import os
 16import re
 17import socket
 18import subprocess
 19import sys
 20from calendar import different_locale
 21from collections import OrderedDict
 22from math import isnan
 23from pathlib import Path
 24from tempfile import gettempdir
 25from typing import Any, Generator, Tuple
 26from urllib.request import build_opener
 27from zipfile import ZipFile, ZIP_DEFLATED
 28
 29import jellyfish
 30from tqdm import tqdm
 31
 32
 33def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True):
 34    """
 35
 36    Args:
 37        url (str):
 38        extract_to (str=None): if None, extract to current directory
 39        headers (list=None)
 40        remove_zip (bool=True):
 41
 42    Returns:
 43        path_zip (str)
 44    """
 45    if zip_file_path := download_from_url(url, extract_to, headers):
 46        extract_to = unzip(zip_file_path, extract_to, remove_zip)
 47
 48        return extract_to
 49
 50
 51def unzip(zip_file_path, extract_to=None, remove_zip=False):
 52    """
 53    Unzip file to extract_to directory
 54
 55    Args:
 56        zip_file_path (str): Path to zip file
 57        extract_to: (str=None): if None, extract to zip's directory
 58        remove_zip: (bool=False): If True remove zip file after unzip
 59
 60    Returns:
 61        extract_to (str)
 62    """
 63    with ZipFile(zip_file_path, 'r') as zipfile:
 64        if not extract_to:
 65            extract_to = os.path.join(
 66                os.path.dirname(zip_file_path),
 67                os.path.splitext(os.path.basename(zip_file_path))[0]
 68            )
 69
 70        desc = f"Extracting {zip_file_path} to {extract_to}"
 71        if not sys.stdout:
 72            print(f'{desc}...')
 73            gen_members = zipfile.infolist()
 74        else:
 75            gen_members = tqdm(zipfile.infolist(), desc=desc)
 76
 77        for member in gen_members:
 78            zipfile.extract(member, extract_to)
 79    if remove_zip:
 80        os.remove(zip_file_path)
 81    return extract_to
 82
 83
 84def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str:
 85    """
 86
 87    Args:
 88        url (str): Url to download
 89        extract_to (str=None): Directory to save file. Default temporary directory
 90        headers (list=None)
 91
 92    Returns:
 93        path_file (str | None)
 94    """
 95    opener = build_opener()
 96    if headers:
 97        opener.addheaders = headers
 98
 99    with opener.open(url) as response:
100        content_length = response.length
101        if not extract_to:
102            extract_to = gettempdir()
103
104        if n_file := response.headers.get_filename():
105            file_path = os.path.join(extract_to, n_file)
106        else:
107            file_path = os.path.join(extract_to, Path(response.url).name)
108
109        with open(file_path, "wb") as out_file:
110            def get_resp_data():
111                while True:
112                    data = response.read(1024)
113                    if not data:
114                        break
115                    yield data
116
117            desc = f'Downloading to "{file_path}"'
118            if not sys.stdout:
119                print(f'{desc}...')
120                for data in get_resp_data():
121                    out_file.write(data)
122            else:
123                with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar:
124                    for data in get_resp_data():
125                        out_file.write(data)
126                        progress_bar.update(len(data))
127
128            return file_path
129
130
131def caller_name(skip=2):
132    """Get a name of a caller in the format module.class.method
133
134       `skip` specifies how many levels of stack to skip while getting caller
135       name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
136
137       An empty string is returned if skipped levels exceed stack height
138    """
139
140    def stack_(frame):
141        framelist = []
142        while frame:
143            framelist.append(frame)
144            frame = frame.f_back
145        return framelist
146
147    stack = stack_(sys._getframe(1))
148    start = 0 + skip
149    if len(stack) < start + 1:
150        return ''
151    parentframe = stack[start]
152
153    name = []
154    module = inspect.getmodule(parentframe)
155    # `modname` can be None when frame is executed directly in console
156    if module and module.__name__ != "__main__":
157        name.append(module.__name__)
158    # detect classname
159    if 'self' in parentframe.f_locals:
160        # I don't know any way to detect call from the object method
161        # XXX: there seems to be no way to detect static method call - it will
162        #      be just a function call
163        name.append(parentframe.f_locals['self'].__class__.__name__)
164    codename = parentframe.f_code.co_name
165    if codename != '<module>':  # top level usually
166        name.append(codename)  # function or a method
167    del parentframe
168
169    return ".".join(name)
170
171
172def get_environ():
173    """
174    Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON.
175    Si no está definida por defecto devuelve 'dev'
176
177    Returns:
178        str: El nombre del entorno 'dev' o 'prod'
179    """
180    return os.getenv("DEV_ENVIRON", "dev").lower()
181
182
183def create_dir(a_dir):
184    """
185    Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE
186
187    Args:
188        a_dir {str}: path del directorio a crear
189
190    Returns:
191        bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no
192
193    """
194    ok = False
195    if os.path.exists(a_dir):
196        ok = True
197    else:
198        try:
199            os.makedirs(a_dir)
200            ok = True
201        except OSError as exc:
202            print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir)
203
204    return ok
205
206
207def remove_content_dir(a_dir):
208    """
209    Borra ficheros y subdirectorios de directorio
210
211    Args:
212        a_dir {str}: path del directorio a crear
213
214    Returns:
215        num_elems_removed (int), num_elems_dir (int)
216    """
217    num_elems_removed = 0
218    num_elems_dir = 0
219    for de in os.scandir(a_dir):
220        if de.is_dir():
221            n_rem_subdir, n_subdir = remove_content_dir(de.path)
222            num_elems_dir += n_subdir
223            num_elems_removed += n_rem_subdir
224            try:
225                os.rmdir(de.path)
226            except:
227                pass
228        else:
229            num_elems_dir += 1
230            try:
231                os.unlink(de.path)
232                num_elems_removed += 1
233            except:
234                pass
235
236    return num_elems_removed, num_elems_dir
237
238
239# Sadly, Python fails to provide the following magic number for us.
240ERROR_INVALID_NAME = 123
241'''
242Windows-specific error code indicating an invalid pathname.
243
244See Also
245----------
246https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx
247    Official listing of all such codes.
248'''
249
250
251def is_pathname_valid(pathname):
252    '''
253    `True` if the passed pathname is a valid pathname for the current OS;
254    `False` otherwise.
255    '''
256    # If this pathname is either not a string or is but is empty, this pathname
257    # is invalid.
258    try:
259        if not isinstance(pathname, str) or not pathname:
260            return False
261
262        # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`)
263        # if any. Since Windows prohibits path components from containing `:`
264        # characters, failing to strip this `:`-suffixed prefix would
265        # erroneously invalidate all valid absolute Windows pathnames.
266        _, pathname = os.path.splitdrive(pathname)
267
268        # Directory guaranteed to exist. If the current OS is Windows, this is
269        # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%"
270        # environment variable); else, the typical root directory.
271        root_dirname = os.environ.get('HOMEDRIVE', 'C:') \
272            if sys.platform == 'win32' else os.sep
273        assert os.path.isdir(root_dirname)  # ...Murphy and her ironclad Law
274
275        # Append a path separator to this directory if needed.
276        root_dirname = root_dirname.rstrip(os.sep) + os.sep
277
278        # Test whether each path component split from this pathname is valid or
279        # not, ignoring non-existent and non-readable path components.
280        for pathname_part in pathname.split(os.sep):
281            try:
282                os.lstat(root_dirname + pathname_part)
283            # If an OS-specific exception is raised, its error code
284            # indicates whether this pathname is valid or not. Unless this
285            # is the case, this exception implies an ignorable kernel or
286            # filesystem complaint (e.g., path not found or inaccessible).
287            #
288            # Only the following exceptions indicate invalid pathnames:
289            #
290            # * Instances of the Windows-specific "WindowsError" class
291            #   defining the "winerror" attribute whose value is
292            #   "ERROR_INVALID_NAME". Under Windows, "winerror" is more
293            #   fine-grained and hence useful than the generic "errno"
294            #   attribute. When a too-long pathname is passed, for example,
295            #   "errno" is "ENOENT" (i.e., no such file or directory) rather
296            #   than "ENAMETOOLONG" (i.e., file name too long).
297            # * Instances of the cross-platform "OSError" class defining the
298            #   generic "errno" attribute whose value is either:
299            #   * Under most POSIX-compatible OSes, "ENAMETOOLONG".
300            #   * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE".
301            except OSError as exc:
302                if hasattr(exc, 'winerror'):
303                    if exc.winerror == ERROR_INVALID_NAME:
304                        return False
305                elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}:
306                    return False
307    # If a "TypeError" exception was raised, it almost certainly has the
308    # error message "embedded NUL character" indicating an invalid pathname.
309    except TypeError as exc:
310        return False
311    # If no exception was raised, all path components and hence this
312    # pathname itself are valid. (Praise be to the curmudgeonly python.)
313    else:
314        return True
315    # If any other exception was raised, this is an unrelated fatal issue
316    # (e.g., a bug). Permit this exception to unwind the call stack.
317    #
318    # Did we mention this should be shipped with Python already?
319
320
321def is_dir_writable(dirname):
322    '''
323    `True` if the current user has sufficient permissions to create **siblings**
324    (i.e., arbitrary files in the parent directory) of the passed pathname;
325    `False` otherwise.
326    '''
327    try:
328        a_tmp = os.path.join(dirname, "temp.tmp")
329        with open(a_tmp, 'w+b'):
330            pass
331
332        try:
333            os.remove(a_tmp)
334        except:
335            pass
336
337        return True
338
339    # While the exact type of exception raised by the above function depends on
340    # the current version of the Python interpreter, all such types subclass the
341    # following exception superclass.
342    except:
343        return False
344
345
346def is_path_exists_or_creatable(pathname):
347    '''
348    `True` if the passed pathname is a valid pathname on the current OS _and_
349    either currently exists or is hypothetically creatable in a cross-platform
350    manner optimized for POSIX-unfriendly filesystems; `False` otherwise.
351
352    This function is guaranteed to _never_ raise exceptions.
353    '''
354    try:
355        # To prevent "os" module calls from raising undesirable exceptions on
356        # invalid pathnames, is_pathname_valid() is explicitly called first.
357        return is_pathname_valid(pathname) and (
358                os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname)))
359    # Report failure on non-fatal filesystem complaints (e.g., connection
360    # timeouts, permissions issues) implying this path to be inaccessible. All
361    # other exceptions are unrelated fatal issues and should not be caught here.
362    except OSError:
363        return False
364
365
366def get_matching_val(search_val, matching_vals):
367    """
368    Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto
369    (prop_val).
370
371    Args:
372        search_val (str): Valor propuesto para comparar
373        matching_vals (list(str)): Lista de valores a comparar
374
375    Returns:
376        match_val (str), fact_jaro_winkler (float)
377    """
378    jaro_results = jaro_winkler(search_val, matching_vals)
379    fact_jaro = next(iter(jaro_results), None)
380
381    return jaro_results.get(fact_jaro), fact_jaro
382
383
384def levenshtein_distance(search_val, matching_vals):
385    """
386
387    Args:
388        search_val:
389        matching_vals:
390
391    Returns:
392
393    """
394    ord_vals = OrderedDict()
395    distances = {}
396    for match_val in matching_vals:
397        fact = jellyfish.levenshtein_distance(search_val, match_val)
398        vals_fact = distances.get(fact, list())
399        distances[fact] = vals_fact + [match_val]
400
401    for fact in sorted(distances):
402        ord_vals[fact] = distances.get(fact, [])
403
404    return ord_vals
405
406
407def jaro_winkler(search_val, matching_vals):
408    """
409
410    Args:
411        search_val:
412        matching_vals:
413
414    Returns:
415
416    """
417    ord_vals = OrderedDict()
418    matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val
419                 for match_val in matching_vals}
420    for fact in sorted(matchings, reverse=True):
421        if fact != 0:
422            ord_vals[fact] = matchings[fact]
423
424    return ord_vals
425
426
427def call_command(command_prog, *args):
428    """
429    Llama comando shell sistema con los argumentos indicados
430
431    Returns:
432        bool: True si OK
433
434    """
435    call_args = [command_prog]
436    call_args.extend(args)
437    ret = subprocess.check_call(call_args, shell=True)
438
439    return (ret == 0)
440
441
442def rounded_float(a_float, num_decs=9):
443    """
444    Formatea un float con el numero de decimales especificado
445    Args:
446        a_float:
447        num_decs:
448
449    Returns:
450        str
451    """
452    return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.'))
453
454
455class formatted_float(float):
456    """
457    Devuelve un float que se representa con un maximo de decimales (__num_decs__)
458    """
459    __num_decs__ = 9
460
461    def __repr__(self):
462        return str(rounded_float(self, self.__num_decs__))
463
464
465def as_format_floats(obj):
466    """
467    Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación
468
469    Args:
470        obj: Cualquier objeto
471
472    Returns:
473        (obj, formatted_float)
474
475    """
476    if isinstance(obj, (float, formatted_float)):
477        return formatted_float(obj)
478    elif isinstance(obj, (dict, OrderedDict)):
479        return obj.__class__((k, as_format_floats(v)) for k, v in obj.items())
480    elif isinstance(obj, (list, tuple)):
481        return obj.__class__(as_format_floats(v) for v in obj)
482    return obj
483
484
485def nums_from_str(a_string, nan=False):
486    """
487    Retorna lista de numeros en el texto pasado
488
489    Args:
490        a_string (str):
491        nan (bool=FAlse): por defecto no trata los NaN como numeros
492
493    Returns:
494        list
495    """
496    l_nums = []
497
498    for s in a_string.strip().split():
499        try:
500            l_nums.append(int(s))
501        except ValueError:
502            try:
503                fl = float(s)
504                if nan or not isnan(fl):
505                    l_nums.append(fl)
506            except ValueError:
507                pass
508
509    return l_nums
510
511
512def first_num_from_str(a_string, nan=False):
513    """
514    Retorna primer numero encontrado del texto pasado
515
516    Args:
517        a_string (str):
518        nan (bool=FAlse): por defecto no trata los NaN como numeros
519
520    Returns:
521        int OR float
522    """
523    return next(iter(nums_from_str(a_string, nan=nan)), None)
524
525
526def dates_from_str(str, formats=None, seps=None, ret_extra_data=False):
527    """
528    Retorna dict de fechas disponibles con el texto pasado segun formatos indicados
529
530    Args:
531        str (str):
532        formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
533        seps (list=None): por defecto [None, '.', ',']
534        ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
535
536    Returns:
537        list
538    """
539    l_fechas = list()
540
541    if not formats:
542        formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
543
544    if not seps:
545        seps = [None, '.', ',']
546
547    str_parts = [s.strip() for sep in seps for s in str.split(sep)]
548
549    for format in formats:
550        for str_part in str_parts:
551            try:
552                val = datetime.datetime.strptime(str_part, format)
553                if ret_extra_data:
554                    val = (val, str_part, format)
555                l_fechas.append(val)
556            except Exception:
557                pass
558
559    return l_fechas
560
561
562def pretty_text(txt):
563    """
564    Coge texto y lo capitaliza y quita carácteres por espacios
565    Args:
566        txt (str):
567
568    Returns:
569        str
570    """
571    return txt.replace("_", " ").replace("-", " ").capitalize()
572
573
574def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED):
575    """
576    Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)
577
578    Args:
579        zip_path:
580        file_paths (list or generator):
581        base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
582        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
583
584    Returns:
585        zip_path (str)
586    """
587    with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip:
588        for file_path in file_paths:
589            if base_path:
590                re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE)
591                arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/'))
592            else:
593                arch_name = os.path.basename(file_path)
594
595            my_zip.write(file_path, arcname=arch_name)
596
597    return zip_path
598
599
600def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED):
601    """
602    Comprime la carpeta indicada
603
604    Args:
605        dir_path (str): path directorio
606        zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo
607                            nombre del directorio zipeado
608        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
609        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
610        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
611
612    Returns:
613        zip_file (str)
614    """
615    if not zip_path:
616        zip_path = f'{dir_path}.zip'
617
618    zip_file = zip_files(zip_path,
619                         iter_paths_dir(dir_path,
620                                        relative_dirs_sel=relative_dirs_sel,
621                                        func_filter_path=func_filter_path),
622                         base_path=dir_path,
623                         compression=compression)
624
625    return zip_file
626
627
628def zip_files_dir(dir_path, remove_files=False, *exts_files):
629    """
630    Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima
631
632    Args:
633        dir_path:
634        remove_files:
635        *exts_files: extensiones de fichero SIN el punto
636
637    Returns:
638        ok (bool)
639    """
640    exts = [".{}".format(ext.lower()) for ext in exts_files]
641    for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path)
642                                for de in os.scandir(dir_path)):
643        if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts):
644            print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path))
645            zip_files(zip_path, [file_path])
646
647            if remove_files and not os.path.samefile(zip_path, file_path):
648                os.remove(file_path)
649
650    return True
651
652
653def split_ext_file(path_file):
654    """
655    Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás
656    Args:
657        path_file:
658    Returns:
659        base_file (str), ext_file (str)
660    """
661    parts_file = os.path.basename(path_file).split(".")
662    base_file = parts_file[0]
663    ext_file = ".".join(parts_file[1:])
664
665    return base_file, ext_file
666
667
668FILE_RUN_LOG = "last_run.log"
669DATE_RUN_LOG_FRMT = "%Y%m%d"
670
671
672def last_run_on_dir(dir_base):
673    """
674    Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
675    Args:
676        dir_base (str):
677
678    Returns:
679        date_last_run (datetime): Si no encuentra devuelve None
680    """
681    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
682    dt_last_run = None
683    if os.path.exists(log_last_run):
684        with open(log_last_run) as fr:
685            dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT)
686
687    return dt_last_run
688
689
690def save_last_run_on_dir(dir_base, date_run=None):
691    """
692    Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
693
694    Args:
695        dir_base (str):
696        date_run (datetime=None): Si no se informa cogerá la fecha de hoy
697    """
698    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
699    if not date_run:
700        date_run = datetime.date.today()
701    with open(log_last_run, "w+") as fw:
702        fw.write(date_run.strftime(DATE_RUN_LOG_FRMT))
703
704
705def month_name(num_month, code_alias_locale="es_cu"):
706    """
707    Retorna numero de mes en el locale espcificado. Por defecto castellano
708
709    Args:
710        num_month (int):
711        code_alias_locale (str='es_es'):
712
713    Returns:
714        str
715    """
716    with different_locale(locale.locale_alias.get(code_alias_locale)):
717        return pretty_text(calendar.month_name[num_month])
718
719
720def file_mod_time(path_file):
721    """
722    Return datetime from mofification stat timestamp from file
723
724    Args:
725        path_file (str):
726
727    Returns:
728        datetime
729    """
730    f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime)
731
732    return f_mod_time
733
734
735def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"):
736    """
737    Itera como dicts indexados por valores primera fila (si header=True) o si no como list
738    las filas del CSV pasado por parametro a_path_csv.
739
740    Args:
741        a_path_csv (str):
742        header (bool=True):
743        sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
744        encoding (str="utf8"):
745    Yields:
746        list OR dict
747    """
748    with open(a_path_csv, encoding=encoding) as a_file:
749        csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';')
750        header_row = None
751        for row in csv_rdr:
752            if header and not header_row:
753                header_row = [v.strip().lower() for v in row]
754                continue
755
756            if header_row:
757                vals_row = dict(zip(header_row, row))
758            else:
759                vals_row = row
760
761            if vals_row:
762                yield vals_row
763
764
765def subdirs_path(path):
766    """
767    Itera sobre los subdirectorios del path
768    Args:
769        path:
770
771    Yields:
772        nom_subdir, path_subdir
773    """
774    with os.scandir(path) as it:
775        for entry in it:
776            if entry.is_dir():
777                yield entry.name, entry.path
778
779
780def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False):
781    """
782
783    Args:
784        path_dir_base:
785        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
786        last_level_as_list (bool=False):
787
788    Returns:
789        dict
790    """
791    tree = {}
792
793    f_valid_dir = None
794    valid_dirs_sel = set()
795    if relative_dirs_sel:
796        for dir_sel in relative_dirs_sel:
797            path_dir_rel = os.path.join(path_dir_base, dir_sel)
798            if os.path.exists(path_dir_rel):
799                valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower())
800
801        def valid_dir(dir_path):
802            valid = False
803            rel_path = os.path.relpath(dir_path, path_dir_base).lower()
804            for dir_sel in valid_dirs_sel:
805                if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)):
806                    valid = True
807                    break
808
809            return valid
810
811        f_valid_dir = valid_dir
812
813    for dir_name, dir_path in subdirs_path(path_dir_base):
814        if not f_valid_dir or f_valid_dir(dir_path):
815            dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower()
816            dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel
817                             if os.path.commonpath((dir_path_rel, dir_sel))]
818            tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path)
819
820    if tree:
821        if last_level_as_list and not any(tree.values()):
822            tree = [*tree.keys()]
823
824    return tree
825
826
827def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False):
828    """
829    Retorna diccionario con el arbol de paths disponibles en el path indicado.
830
831    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
832
833    Args:
834        path_dir_base (str):
835        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
836        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
837        solo_dirs (bool=False):
838
839    Returns:
840        dict
841    """
842    paths = dict()
843
844    valid_dirs_sel = set()
845    if relative_dirs_sel:
846        for dir_sel in relative_dirs_sel:
847            path_dir_rel = os.path.join(path_dir_base, dir_sel)
848            if os.path.exists(path_dir_rel):
849                valid_dirs_sel.add(path_dir_rel)
850
851    for dir_path, dir_names, file_names in os.walk(path_dir_base):
852        if valid_dirs_sel and not any(
853                os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel)
854                for a_dir_sel in valid_dirs_sel):
855            continue
856
857        dir_path = os.path.relpath(dir_path, path_dir_base)
858        dir_name = os.path.basename(dir_path)
859
860        if func_filter_path and not func_filter_path(dir_name):
861            continue
862
863        files_selected = {fn: None for fn in file_names
864                          if not func_filter_path or func_filter_path(fn)}
865
866        if files_selected:
867            subdir_paths = paths
868            # En el caso del primer nivel no se guarda name directorio
869            if dir_path != '.':
870                for d in dir_path.split(os.sep):
871                    if d not in subdir_paths:
872                        subdir_paths[d] = dict()
873                    subdir_paths = subdir_paths[d]
874
875            if not solo_dirs:
876                subdir_paths.update(files_selected)
877
878    return paths
879
880
881def iter_tree_paths(tree_paths, path_base=None):
882    """
883
884    Args:
885        tree_paths (dict):
886        path_base (str=None):
887
888    Yields:
889        path_file
890    """
891    for path, sub_tree in tree_paths.items():
892        if sub_tree and isinstance(sub_tree, dict):
893            for sub_path in iter_tree_paths(sub_tree, path):
894                yield os.path.join(path_base, sub_path) if path_base else sub_path
895        else:
896            yield os.path.join(path_base, path) if path_base else path
897
898
899def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None):
900    """
901    Itera el arbol de paths disponibles en el path indicado.
902
903    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
904
905    Args:
906        path_dir_base (str):
907        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
908        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
909
910    Yields:
911        path (str)
912    """
913    for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base):
914        yield path
915
916
917def is_path_child_from(path, path_parent):
918    """
919    Retorna si path es hijo de path_parent
920
921    Args:
922        path:
923        path_parent:
924
925    Returns:
926        bool
927    """
928    p_path = Path(path)
929    p_path_parent = Path(path_parent)
930
931    return any(p.samefile(p_path_parent) for p in p_path.parents)
932
933
934def machine_name():
935    """
936    Retorna el nombre de la maquina
937
938    Returns:
939        str
940    """
941    # TODO - Get host from docker machine when we are in a container
942    # TODO - import docker
943    # TODO -
944    # TODO - client = docker.from_env()
945    # TODO - container_info = client.containers.get(socket.gethostname())
946    # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress']
947    # TODO - print(docker_host_ip)
948
949    return socket.getfqdn().upper()
950
951
952def machine_apb():
953    """
954    Retorna el nombre de la maquina
955
956    Returns:
957        bool
958    """
959    return socket.getfqdn().lower().endswith('.apb.es')
960
961
962def find_key_values(obj: Any, target_key: str) -> Generator[Tuple[Any, int], None, None]:
963    """
964    Generator that recursively walks `obj` (dicts, lists, tuples, sets)
965    and yields tuples (value, level) for every occurrence of `target_key`.
966
967    Args:
968        obj (Any): The object to search through.
969        target_key (str): The key to search for.
970
971    Yields:
972        Tuple[Any, int]: A tuple containing the value associated with `target_key` and its depth level.
973    """
974    def _recurse(current_obj: Any, current_level: int = 0) -> Generator[Tuple[Any, int], None, None]:
975        if isinstance(current_obj, dict):
976            for k, v in current_obj.items():
977                if k == target_key:
978                    yield v, current_level
979                yield from _recurse(v, current_level + 1)
980        elif isinstance(current_obj, (list, tuple, set)):
981            for item in current_obj:
982                yield from _recurse(item, current_level + 1)
983
984    yield from _recurse(obj)
985
986
987if __name__ == '__main__':
988    import fire
989
990    fire.Fire()
def download_and_unzip( url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True):
34def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True):
35    """
36
37    Args:
38        url (str):
39        extract_to (str=None): if None, extract to current directory
40        headers (list=None)
41        remove_zip (bool=True):
42
43    Returns:
44        path_zip (str)
45    """
46    if zip_file_path := download_from_url(url, extract_to, headers):
47        extract_to = unzip(zip_file_path, extract_to, remove_zip)
48
49        return extract_to
Arguments:
  • url (str):
  • extract_to (str=None): if None, extract to current directory
  • headers (list=None)
  • remove_zip (bool=True):
Returns:

path_zip (str)

def unzip(zip_file_path, extract_to=None, remove_zip=False):
52def unzip(zip_file_path, extract_to=None, remove_zip=False):
53    """
54    Unzip file to extract_to directory
55
56    Args:
57        zip_file_path (str): Path to zip file
58        extract_to: (str=None): if None, extract to zip's directory
59        remove_zip: (bool=False): If True remove zip file after unzip
60
61    Returns:
62        extract_to (str)
63    """
64    with ZipFile(zip_file_path, 'r') as zipfile:
65        if not extract_to:
66            extract_to = os.path.join(
67                os.path.dirname(zip_file_path),
68                os.path.splitext(os.path.basename(zip_file_path))[0]
69            )
70
71        desc = f"Extracting {zip_file_path} to {extract_to}"
72        if not sys.stdout:
73            print(f'{desc}...')
74            gen_members = zipfile.infolist()
75        else:
76            gen_members = tqdm(zipfile.infolist(), desc=desc)
77
78        for member in gen_members:
79            zipfile.extract(member, extract_to)
80    if remove_zip:
81        os.remove(zip_file_path)
82    return extract_to

Unzip file to extract_to directory

Arguments:
  • zip_file_path (str): Path to zip file
  • extract_to: (str=None): if None, extract to zip's directory
  • remove_zip: (bool=False): If True remove zip file after unzip
Returns:

extract_to (str)

def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str:
 85def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str:
 86    """
 87
 88    Args:
 89        url (str): Url to download
 90        extract_to (str=None): Directory to save file. Default temporary directory
 91        headers (list=None)
 92
 93    Returns:
 94        path_file (str | None)
 95    """
 96    opener = build_opener()
 97    if headers:
 98        opener.addheaders = headers
 99
100    with opener.open(url) as response:
101        content_length = response.length
102        if not extract_to:
103            extract_to = gettempdir()
104
105        if n_file := response.headers.get_filename():
106            file_path = os.path.join(extract_to, n_file)
107        else:
108            file_path = os.path.join(extract_to, Path(response.url).name)
109
110        with open(file_path, "wb") as out_file:
111            def get_resp_data():
112                while True:
113                    data = response.read(1024)
114                    if not data:
115                        break
116                    yield data
117
118            desc = f'Downloading to "{file_path}"'
119            if not sys.stdout:
120                print(f'{desc}...')
121                for data in get_resp_data():
122                    out_file.write(data)
123            else:
124                with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar:
125                    for data in get_resp_data():
126                        out_file.write(data)
127                        progress_bar.update(len(data))
128
129            return file_path
Arguments:
  • url (str): Url to download
  • extract_to (str=None): Directory to save file. Default temporary directory
  • headers (list=None)
Returns:

path_file (str | None)

def caller_name(skip=2):
132def caller_name(skip=2):
133    """Get a name of a caller in the format module.class.method
134
135       `skip` specifies how many levels of stack to skip while getting caller
136       name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
137
138       An empty string is returned if skipped levels exceed stack height
139    """
140
141    def stack_(frame):
142        framelist = []
143        while frame:
144            framelist.append(frame)
145            frame = frame.f_back
146        return framelist
147
148    stack = stack_(sys._getframe(1))
149    start = 0 + skip
150    if len(stack) < start + 1:
151        return ''
152    parentframe = stack[start]
153
154    name = []
155    module = inspect.getmodule(parentframe)
156    # `modname` can be None when frame is executed directly in console
157    if module and module.__name__ != "__main__":
158        name.append(module.__name__)
159    # detect classname
160    if 'self' in parentframe.f_locals:
161        # I don't know any way to detect call from the object method
162        # XXX: there seems to be no way to detect static method call - it will
163        #      be just a function call
164        name.append(parentframe.f_locals['self'].__class__.__name__)
165    codename = parentframe.f_code.co_name
166    if codename != '<module>':  # top level usually
167        name.append(codename)  # function or a method
168    del parentframe
169
170    return ".".join(name)

Get a name of a caller in the format module.class.method

skip specifies how many levels of stack to skip while getting caller name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.

An empty string is returned if skipped levels exceed stack height

def get_environ():
173def get_environ():
174    """
175    Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON.
176    Si no está definida por defecto devuelve 'dev'
177
178    Returns:
179        str: El nombre del entorno 'dev' o 'prod'
180    """
181    return os.getenv("DEV_ENVIRON", "dev").lower()

Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. Si no está definida por defecto devuelve 'dev'

Returns:

str: El nombre del entorno 'dev' o 'prod'

def create_dir(a_dir):
184def create_dir(a_dir):
185    """
186    Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE
187
188    Args:
189        a_dir {str}: path del directorio a crear
190
191    Returns:
192        bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no
193
194    """
195    ok = False
196    if os.path.exists(a_dir):
197        ok = True
198    else:
199        try:
200            os.makedirs(a_dir)
201            ok = True
202        except OSError as exc:
203            print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir)
204
205    return ok

Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE

Arguments:
  • a_dir {str}: path del directorio a crear
Returns:

bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no

def remove_content_dir(a_dir):
208def remove_content_dir(a_dir):
209    """
210    Borra ficheros y subdirectorios de directorio
211
212    Args:
213        a_dir {str}: path del directorio a crear
214
215    Returns:
216        num_elems_removed (int), num_elems_dir (int)
217    """
218    num_elems_removed = 0
219    num_elems_dir = 0
220    for de in os.scandir(a_dir):
221        if de.is_dir():
222            n_rem_subdir, n_subdir = remove_content_dir(de.path)
223            num_elems_dir += n_subdir
224            num_elems_removed += n_rem_subdir
225            try:
226                os.rmdir(de.path)
227            except:
228                pass
229        else:
230            num_elems_dir += 1
231            try:
232                os.unlink(de.path)
233                num_elems_removed += 1
234            except:
235                pass
236
237    return num_elems_removed, num_elems_dir

Borra ficheros y subdirectorios de directorio

Arguments:
  • a_dir {str}: path del directorio a crear
Returns:

num_elems_removed (int), num_elems_dir (int)

ERROR_INVALID_NAME = 123

Windows-specific error code indicating an invalid pathname.

See Also

https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx Official listing of all such codes.

def is_pathname_valid(pathname):
252def is_pathname_valid(pathname):
253    '''
254    `True` if the passed pathname is a valid pathname for the current OS;
255    `False` otherwise.
256    '''
257    # If this pathname is either not a string or is but is empty, this pathname
258    # is invalid.
259    try:
260        if not isinstance(pathname, str) or not pathname:
261            return False
262
263        # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`)
264        # if any. Since Windows prohibits path components from containing `:`
265        # characters, failing to strip this `:`-suffixed prefix would
266        # erroneously invalidate all valid absolute Windows pathnames.
267        _, pathname = os.path.splitdrive(pathname)
268
269        # Directory guaranteed to exist. If the current OS is Windows, this is
270        # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%"
271        # environment variable); else, the typical root directory.
272        root_dirname = os.environ.get('HOMEDRIVE', 'C:') \
273            if sys.platform == 'win32' else os.sep
274        assert os.path.isdir(root_dirname)  # ...Murphy and her ironclad Law
275
276        # Append a path separator to this directory if needed.
277        root_dirname = root_dirname.rstrip(os.sep) + os.sep
278
279        # Test whether each path component split from this pathname is valid or
280        # not, ignoring non-existent and non-readable path components.
281        for pathname_part in pathname.split(os.sep):
282            try:
283                os.lstat(root_dirname + pathname_part)
284            # If an OS-specific exception is raised, its error code
285            # indicates whether this pathname is valid or not. Unless this
286            # is the case, this exception implies an ignorable kernel or
287            # filesystem complaint (e.g., path not found or inaccessible).
288            #
289            # Only the following exceptions indicate invalid pathnames:
290            #
291            # * Instances of the Windows-specific "WindowsError" class
292            #   defining the "winerror" attribute whose value is
293            #   "ERROR_INVALID_NAME". Under Windows, "winerror" is more
294            #   fine-grained and hence useful than the generic "errno"
295            #   attribute. When a too-long pathname is passed, for example,
296            #   "errno" is "ENOENT" (i.e., no such file or directory) rather
297            #   than "ENAMETOOLONG" (i.e., file name too long).
298            # * Instances of the cross-platform "OSError" class defining the
299            #   generic "errno" attribute whose value is either:
300            #   * Under most POSIX-compatible OSes, "ENAMETOOLONG".
301            #   * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE".
302            except OSError as exc:
303                if hasattr(exc, 'winerror'):
304                    if exc.winerror == ERROR_INVALID_NAME:
305                        return False
306                elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}:
307                    return False
308    # If a "TypeError" exception was raised, it almost certainly has the
309    # error message "embedded NUL character" indicating an invalid pathname.
310    except TypeError as exc:
311        return False
312    # If no exception was raised, all path components and hence this
313    # pathname itself are valid. (Praise be to the curmudgeonly python.)
314    else:
315        return True
316    # If any other exception was raised, this is an unrelated fatal issue
317    # (e.g., a bug). Permit this exception to unwind the call stack.
318    #
319    # Did we mention this should be shipped with Python already?

True if the passed pathname is a valid pathname for the current OS; False otherwise.

def is_dir_writable(dirname):
322def is_dir_writable(dirname):
323    '''
324    `True` if the current user has sufficient permissions to create **siblings**
325    (i.e., arbitrary files in the parent directory) of the passed pathname;
326    `False` otherwise.
327    '''
328    try:
329        a_tmp = os.path.join(dirname, "temp.tmp")
330        with open(a_tmp, 'w+b'):
331            pass
332
333        try:
334            os.remove(a_tmp)
335        except:
336            pass
337
338        return True
339
340    # While the exact type of exception raised by the above function depends on
341    # the current version of the Python interpreter, all such types subclass the
342    # following exception superclass.
343    except:
344        return False

True if the current user has sufficient permissions to create siblings (i.e., arbitrary files in the parent directory) of the passed pathname; False otherwise.

def is_path_exists_or_creatable(pathname):
347def is_path_exists_or_creatable(pathname):
348    '''
349    `True` if the passed pathname is a valid pathname on the current OS _and_
350    either currently exists or is hypothetically creatable in a cross-platform
351    manner optimized for POSIX-unfriendly filesystems; `False` otherwise.
352
353    This function is guaranteed to _never_ raise exceptions.
354    '''
355    try:
356        # To prevent "os" module calls from raising undesirable exceptions on
357        # invalid pathnames, is_pathname_valid() is explicitly called first.
358        return is_pathname_valid(pathname) and (
359                os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname)))
360    # Report failure on non-fatal filesystem complaints (e.g., connection
361    # timeouts, permissions issues) implying this path to be inaccessible. All
362    # other exceptions are unrelated fatal issues and should not be caught here.
363    except OSError:
364        return False

True if the passed pathname is a valid pathname on the current OS _and_ either currently exists or is hypothetically creatable in a cross-platform manner optimized for POSIX-unfriendly filesystems; False otherwise.

This function is guaranteed to _never_ raise exceptions.

def get_matching_val(search_val, matching_vals):
367def get_matching_val(search_val, matching_vals):
368    """
369    Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto
370    (prop_val).
371
372    Args:
373        search_val (str): Valor propuesto para comparar
374        matching_vals (list(str)): Lista de valores a comparar
375
376    Returns:
377        match_val (str), fact_jaro_winkler (float)
378    """
379    jaro_results = jaro_winkler(search_val, matching_vals)
380    fact_jaro = next(iter(jaro_results), None)
381
382    return jaro_results.get(fact_jaro), fact_jaro

Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto (prop_val).

Arguments:
  • search_val (str): Valor propuesto para comparar
  • matching_vals (list(str)): Lista de valores a comparar
Returns:

match_val (str), fact_jaro_winkler (float)

def levenshtein_distance(search_val, matching_vals):
385def levenshtein_distance(search_val, matching_vals):
386    """
387
388    Args:
389        search_val:
390        matching_vals:
391
392    Returns:
393
394    """
395    ord_vals = OrderedDict()
396    distances = {}
397    for match_val in matching_vals:
398        fact = jellyfish.levenshtein_distance(search_val, match_val)
399        vals_fact = distances.get(fact, list())
400        distances[fact] = vals_fact + [match_val]
401
402    for fact in sorted(distances):
403        ord_vals[fact] = distances.get(fact, [])
404
405    return ord_vals
Arguments:
  • search_val:
  • matching_vals:

Returns:

def jaro_winkler(search_val, matching_vals):
408def jaro_winkler(search_val, matching_vals):
409    """
410
411    Args:
412        search_val:
413        matching_vals:
414
415    Returns:
416
417    """
418    ord_vals = OrderedDict()
419    matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val
420                 for match_val in matching_vals}
421    for fact in sorted(matchings, reverse=True):
422        if fact != 0:
423            ord_vals[fact] = matchings[fact]
424
425    return ord_vals
Arguments:
  • search_val:
  • matching_vals:

Returns:

def call_command(command_prog, *args):
428def call_command(command_prog, *args):
429    """
430    Llama comando shell sistema con los argumentos indicados
431
432    Returns:
433        bool: True si OK
434
435    """
436    call_args = [command_prog]
437    call_args.extend(args)
438    ret = subprocess.check_call(call_args, shell=True)
439
440    return (ret == 0)

Llama comando shell sistema con los argumentos indicados

Returns:

bool: True si OK

def rounded_float(a_float, num_decs=9):
443def rounded_float(a_float, num_decs=9):
444    """
445    Formatea un float con el numero de decimales especificado
446    Args:
447        a_float:
448        num_decs:
449
450    Returns:
451        str
452    """
453    return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.'))

Formatea un float con el numero de decimales especificado

Arguments:
  • a_float:
  • num_decs:
Returns:

str

class formatted_float(builtins.float):
456class formatted_float(float):
457    """
458    Devuelve un float que se representa con un maximo de decimales (__num_decs__)
459    """
460    __num_decs__ = 9
461
462    def __repr__(self):
463        return str(rounded_float(self, self.__num_decs__))

Devuelve un float que se representa con un maximo de decimales (__num_decs__)

def as_format_floats(obj):
466def as_format_floats(obj):
467    """
468    Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación
469
470    Args:
471        obj: Cualquier objeto
472
473    Returns:
474        (obj, formatted_float)
475
476    """
477    if isinstance(obj, (float, formatted_float)):
478        return formatted_float(obj)
479    elif isinstance(obj, (dict, OrderedDict)):
480        return obj.__class__((k, as_format_floats(v)) for k, v in obj.items())
481    elif isinstance(obj, (list, tuple)):
482        return obj.__class__(as_format_floats(v) for v in obj)
483    return obj

Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación

Arguments:
  • obj: Cualquier objeto
Returns:

(obj, formatted_float)

def nums_from_str(a_string, nan=False):
486def nums_from_str(a_string, nan=False):
487    """
488    Retorna lista de numeros en el texto pasado
489
490    Args:
491        a_string (str):
492        nan (bool=FAlse): por defecto no trata los NaN como numeros
493
494    Returns:
495        list
496    """
497    l_nums = []
498
499    for s in a_string.strip().split():
500        try:
501            l_nums.append(int(s))
502        except ValueError:
503            try:
504                fl = float(s)
505                if nan or not isnan(fl):
506                    l_nums.append(fl)
507            except ValueError:
508                pass
509
510    return l_nums

Retorna lista de numeros en el texto pasado

Arguments:
  • a_string (str):
  • nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:

list

def first_num_from_str(a_string, nan=False):
513def first_num_from_str(a_string, nan=False):
514    """
515    Retorna primer numero encontrado del texto pasado
516
517    Args:
518        a_string (str):
519        nan (bool=FAlse): por defecto no trata los NaN como numeros
520
521    Returns:
522        int OR float
523    """
524    return next(iter(nums_from_str(a_string, nan=nan)), None)

Retorna primer numero encontrado del texto pasado

Arguments:
  • a_string (str):
  • nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:

int OR float

def dates_from_str(str, formats=None, seps=None, ret_extra_data=False):
527def dates_from_str(str, formats=None, seps=None, ret_extra_data=False):
528    """
529    Retorna dict de fechas disponibles con el texto pasado segun formatos indicados
530
531    Args:
532        str (str):
533        formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
534        seps (list=None): por defecto [None, '.', ',']
535        ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
536
537    Returns:
538        list
539    """
540    l_fechas = list()
541
542    if not formats:
543        formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
544
545    if not seps:
546        seps = [None, '.', ',']
547
548    str_parts = [s.strip() for sep in seps for s in str.split(sep)]
549
550    for format in formats:
551        for str_part in str_parts:
552            try:
553                val = datetime.datetime.strptime(str_part, format)
554                if ret_extra_data:
555                    val = (val, str_part, format)
556                l_fechas.append(val)
557            except Exception:
558                pass
559
560    return l_fechas

Retorna dict de fechas disponibles con el texto pasado segun formatos indicados

Arguments:
  • str (str):
  • formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
  • seps (list=None): por defecto [None, '.', ',']
  • ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
Returns:

list

def pretty_text(txt):
563def pretty_text(txt):
564    """
565    Coge texto y lo capitaliza y quita carácteres por espacios
566    Args:
567        txt (str):
568
569    Returns:
570        str
571    """
572    return txt.replace("_", " ").replace("-", " ").capitalize()

Coge texto y lo capitaliza y quita carácteres por espacios

Arguments:
  • txt (str):
Returns:

str

def zip_files(zip_path, file_paths, base_path=None, compression=8):
575def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED):
576    """
577    Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)
578
579    Args:
580        zip_path:
581        file_paths (list or generator):
582        base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
583        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
584
585    Returns:
586        zip_path (str)
587    """
588    with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip:
589        for file_path in file_paths:
590            if base_path:
591                re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE)
592                arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/'))
593            else:
594                arch_name = os.path.basename(file_path)
595
596            my_zip.write(file_path, arcname=arch_name)
597
598    return zip_path

Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)

Arguments:
  • zip_path:
  • file_paths (list or generator):
  • base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
  • compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:

zip_path (str)

def zip_dir( dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=8):
601def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED):
602    """
603    Comprime la carpeta indicada
604
605    Args:
606        dir_path (str): path directorio
607        zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo
608                            nombre del directorio zipeado
609        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
610        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
611        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
612
613    Returns:
614        zip_file (str)
615    """
616    if not zip_path:
617        zip_path = f'{dir_path}.zip'
618
619    zip_file = zip_files(zip_path,
620                         iter_paths_dir(dir_path,
621                                        relative_dirs_sel=relative_dirs_sel,
622                                        func_filter_path=func_filter_path),
623                         base_path=dir_path,
624                         compression=compression)
625
626    return zip_file

Comprime la carpeta indicada

Arguments:
  • dir_path (str): path directorio
  • zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo nombre del directorio zipeado
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
  • compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:

zip_file (str)

def zip_files_dir(dir_path, remove_files=False, *exts_files):
629def zip_files_dir(dir_path, remove_files=False, *exts_files):
630    """
631    Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima
632
633    Args:
634        dir_path:
635        remove_files:
636        *exts_files: extensiones de fichero SIN el punto
637
638    Returns:
639        ok (bool)
640    """
641    exts = [".{}".format(ext.lower()) for ext in exts_files]
642    for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path)
643                                for de in os.scandir(dir_path)):
644        if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts):
645            print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path))
646            zip_files(zip_path, [file_path])
647
648            if remove_files and not os.path.samefile(zip_path, file_path):
649                os.remove(file_path)
650
651    return True

Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima

Arguments:
  • dir_path:
  • remove_files:
  • *exts_files: extensiones de fichero SIN el punto
Returns:

ok (bool)

def split_ext_file(path_file):
654def split_ext_file(path_file):
655    """
656    Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás
657    Args:
658        path_file:
659    Returns:
660        base_file (str), ext_file (str)
661    """
662    parts_file = os.path.basename(path_file).split(".")
663    base_file = parts_file[0]
664    ext_file = ".".join(parts_file[1:])
665
666    return base_file, ext_file

Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás

Arguments:
  • path_file:
Returns:

base_file (str), ext_file (str)

FILE_RUN_LOG = 'last_run.log'
DATE_RUN_LOG_FRMT = '%Y%m%d'
def last_run_on_dir(dir_base):
673def last_run_on_dir(dir_base):
674    """
675    Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
676    Args:
677        dir_base (str):
678
679    Returns:
680        date_last_run (datetime): Si no encuentra devuelve None
681    """
682    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
683    dt_last_run = None
684    if os.path.exists(log_last_run):
685        with open(log_last_run) as fr:
686            dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT)
687
688    return dt_last_run

Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio

Arguments:
  • dir_base (str):
Returns:

date_last_run (datetime): Si no encuentra devuelve None

def save_last_run_on_dir(dir_base, date_run=None):
691def save_last_run_on_dir(dir_base, date_run=None):
692    """
693    Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
694
695    Args:
696        dir_base (str):
697        date_run (datetime=None): Si no se informa cogerá la fecha de hoy
698    """
699    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
700    if not date_run:
701        date_run = datetime.date.today()
702    with open(log_last_run, "w+") as fw:
703        fw.write(date_run.strftime(DATE_RUN_LOG_FRMT))

Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio

Arguments:
  • dir_base (str):
  • date_run (datetime=None): Si no se informa cogerá la fecha de hoy
def month_name(num_month, code_alias_locale='es_cu'):
706def month_name(num_month, code_alias_locale="es_cu"):
707    """
708    Retorna numero de mes en el locale espcificado. Por defecto castellano
709
710    Args:
711        num_month (int):
712        code_alias_locale (str='es_es'):
713
714    Returns:
715        str
716    """
717    with different_locale(locale.locale_alias.get(code_alias_locale)):
718        return pretty_text(calendar.month_name[num_month])

Retorna numero de mes en el locale espcificado. Por defecto castellano

Arguments:
  • num_month (int):
  • code_alias_locale (str='es_es'):
Returns:

str

def file_mod_time(path_file):
721def file_mod_time(path_file):
722    """
723    Return datetime from mofification stat timestamp from file
724
725    Args:
726        path_file (str):
727
728    Returns:
729        datetime
730    """
731    f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime)
732
733    return f_mod_time

Return datetime from mofification stat timestamp from file

Arguments:
  • path_file (str):
Returns:

datetime

def rows_csv(a_path_csv, header=True, sep=';', encoding='utf8'):
736def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"):
737    """
738    Itera como dicts indexados por valores primera fila (si header=True) o si no como list
739    las filas del CSV pasado por parametro a_path_csv.
740
741    Args:
742        a_path_csv (str):
743        header (bool=True):
744        sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
745        encoding (str="utf8"):
746    Yields:
747        list OR dict
748    """
749    with open(a_path_csv, encoding=encoding) as a_file:
750        csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';')
751        header_row = None
752        for row in csv_rdr:
753            if header and not header_row:
754                header_row = [v.strip().lower() for v in row]
755                continue
756
757            if header_row:
758                vals_row = dict(zip(header_row, row))
759            else:
760                vals_row = row
761
762            if vals_row:
763                yield vals_row

Itera como dicts indexados por valores primera fila (si header=True) o si no como list las filas del CSV pasado por parametro a_path_csv.

Arguments:
  • a_path_csv (str):
  • header (bool=True):
  • sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
  • encoding (str="utf8"):
Yields:

list OR dict

def subdirs_path(path):
766def subdirs_path(path):
767    """
768    Itera sobre los subdirectorios del path
769    Args:
770        path:
771
772    Yields:
773        nom_subdir, path_subdir
774    """
775    with os.scandir(path) as it:
776        for entry in it:
777            if entry.is_dir():
778                yield entry.name, entry.path

Itera sobre los subdirectorios del path

Arguments:
  • path:
Yields:

nom_subdir, path_subdir

def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False):
781def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False):
782    """
783
784    Args:
785        path_dir_base:
786        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
787        last_level_as_list (bool=False):
788
789    Returns:
790        dict
791    """
792    tree = {}
793
794    f_valid_dir = None
795    valid_dirs_sel = set()
796    if relative_dirs_sel:
797        for dir_sel in relative_dirs_sel:
798            path_dir_rel = os.path.join(path_dir_base, dir_sel)
799            if os.path.exists(path_dir_rel):
800                valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower())
801
802        def valid_dir(dir_path):
803            valid = False
804            rel_path = os.path.relpath(dir_path, path_dir_base).lower()
805            for dir_sel in valid_dirs_sel:
806                if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)):
807                    valid = True
808                    break
809
810            return valid
811
812        f_valid_dir = valid_dir
813
814    for dir_name, dir_path in subdirs_path(path_dir_base):
815        if not f_valid_dir or f_valid_dir(dir_path):
816            dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower()
817            dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel
818                             if os.path.commonpath((dir_path_rel, dir_sel))]
819            tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path)
820
821    if tree:
822        if last_level_as_list and not any(tree.values()):
823            tree = [*tree.keys()]
824
825    return tree
Arguments:
  • path_dir_base:
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • last_level_as_list (bool=False):
Returns:

dict

def tree_paths( path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False):
828def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False):
829    """
830    Retorna diccionario con el arbol de paths disponibles en el path indicado.
831
832    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
833
834    Args:
835        path_dir_base (str):
836        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
837        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
838        solo_dirs (bool=False):
839
840    Returns:
841        dict
842    """
843    paths = dict()
844
845    valid_dirs_sel = set()
846    if relative_dirs_sel:
847        for dir_sel in relative_dirs_sel:
848            path_dir_rel = os.path.join(path_dir_base, dir_sel)
849            if os.path.exists(path_dir_rel):
850                valid_dirs_sel.add(path_dir_rel)
851
852    for dir_path, dir_names, file_names in os.walk(path_dir_base):
853        if valid_dirs_sel and not any(
854                os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel)
855                for a_dir_sel in valid_dirs_sel):
856            continue
857
858        dir_path = os.path.relpath(dir_path, path_dir_base)
859        dir_name = os.path.basename(dir_path)
860
861        if func_filter_path and not func_filter_path(dir_name):
862            continue
863
864        files_selected = {fn: None for fn in file_names
865                          if not func_filter_path or func_filter_path(fn)}
866
867        if files_selected:
868            subdir_paths = paths
869            # En el caso del primer nivel no se guarda name directorio
870            if dir_path != '.':
871                for d in dir_path.split(os.sep):
872                    if d not in subdir_paths:
873                        subdir_paths[d] = dict()
874                    subdir_paths = subdir_paths[d]
875
876            if not solo_dirs:
877                subdir_paths.update(files_selected)
878
879    return paths

Retorna diccionario con el arbol de paths disponibles en el path indicado.

Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)

Arguments:
  • path_dir_base (str):
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
  • solo_dirs (bool=False):
Returns:

dict

def iter_tree_paths(tree_paths, path_base=None):
882def iter_tree_paths(tree_paths, path_base=None):
883    """
884
885    Args:
886        tree_paths (dict):
887        path_base (str=None):
888
889    Yields:
890        path_file
891    """
892    for path, sub_tree in tree_paths.items():
893        if sub_tree and isinstance(sub_tree, dict):
894            for sub_path in iter_tree_paths(sub_tree, path):
895                yield os.path.join(path_base, sub_path) if path_base else sub_path
896        else:
897            yield os.path.join(path_base, path) if path_base else path
Arguments:
  • tree_paths (dict):
  • path_base (str=None):
Yields:

path_file

def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None):
900def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None):
901    """
902    Itera el arbol de paths disponibles en el path indicado.
903
904    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
905
906    Args:
907        path_dir_base (str):
908        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
909        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
910
911    Yields:
912        path (str)
913    """
914    for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base):
915        yield path

Itera el arbol de paths disponibles en el path indicado.

Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)

Arguments:
  • path_dir_base (str):
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
Yields:

path (str)

def is_path_child_from(path, path_parent):
918def is_path_child_from(path, path_parent):
919    """
920    Retorna si path es hijo de path_parent
921
922    Args:
923        path:
924        path_parent:
925
926    Returns:
927        bool
928    """
929    p_path = Path(path)
930    p_path_parent = Path(path_parent)
931
932    return any(p.samefile(p_path_parent) for p in p_path.parents)

Retorna si path es hijo de path_parent

Arguments:
  • path:
  • path_parent:
Returns:

bool

def machine_name():
935def machine_name():
936    """
937    Retorna el nombre de la maquina
938
939    Returns:
940        str
941    """
942    # TODO - Get host from docker machine when we are in a container
943    # TODO - import docker
944    # TODO -
945    # TODO - client = docker.from_env()
946    # TODO - container_info = client.containers.get(socket.gethostname())
947    # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress']
948    # TODO - print(docker_host_ip)
949
950    return socket.getfqdn().upper()

Retorna el nombre de la maquina

Returns:

str

def machine_apb():
953def machine_apb():
954    """
955    Retorna el nombre de la maquina
956
957    Returns:
958        bool
959    """
960    return socket.getfqdn().lower().endswith('.apb.es')

Retorna el nombre de la maquina

Returns:

bool

def find_key_values( obj: Any, target_key: str) -> Generator[Tuple[Any, int], NoneType, NoneType]:
963def find_key_values(obj: Any, target_key: str) -> Generator[Tuple[Any, int], None, None]:
964    """
965    Generator that recursively walks `obj` (dicts, lists, tuples, sets)
966    and yields tuples (value, level) for every occurrence of `target_key`.
967
968    Args:
969        obj (Any): The object to search through.
970        target_key (str): The key to search for.
971
972    Yields:
973        Tuple[Any, int]: A tuple containing the value associated with `target_key` and its depth level.
974    """
975    def _recurse(current_obj: Any, current_level: int = 0) -> Generator[Tuple[Any, int], None, None]:
976        if isinstance(current_obj, dict):
977            for k, v in current_obj.items():
978                if k == target_key:
979                    yield v, current_level
980                yield from _recurse(v, current_level + 1)
981        elif isinstance(current_obj, (list, tuple, set)):
982            for item in current_obj:
983                yield from _recurse(item, current_level + 1)
984
985    yield from _recurse(obj)

Generator that recursively walks obj (dicts, lists, tuples, sets) and yields tuples (value, level) for every occurrence of target_key.

Arguments:
  • obj (Any): The object to search through.
  • target_key (str): The key to search for.
Yields:

Tuple[Any, int]: A tuple containing the value associated with target_key and its depth level.