apb_extra_utils.misc

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 7/6/19 18:23
  5#  Last modified: 7/6/19 18:21
  6#  Copyright (c) 2019
  7
  8import calendar
  9import csv
 10import datetime
 11import errno
 12import inspect
 13import locale
 14import os
 15import re
 16import subprocess
 17import sys
 18from calendar import different_locale
 19from collections import OrderedDict
 20from math import isnan
 21from pathlib import Path
 22import socket
 23from tempfile import gettempdir
 24from urllib.request import build_opener
 25from zipfile import ZipFile, ZIP_DEFLATED
 26
 27import jellyfish
 28from tqdm import tqdm
 29
 30
 31def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True):
 32    """
 33
 34    Args:
 35        url (str):
 36        extract_to (str=None): if None, extract to current directory
 37        headers (list=None)
 38        remove_zip (bool=True):
 39
 40    Returns:
 41        path_zip (str)
 42    """
 43    if zip_file_path := download_from_url(url, extract_to, headers):
 44        extract_to = unzip(zip_file_path, extract_to, remove_zip)
 45
 46        return extract_to
 47
 48
 49def unzip(zip_file_path, extract_to=None, remove_zip=False):
 50    """
 51    Unzip file to extract_to directory
 52
 53    Args:
 54        zip_file_path (str): Path to zip file
 55        extract_to: (str=None): if None, extract to zip's directory
 56        remove_zip: (bool=False): If True remove zip file after unzip
 57
 58    Returns:
 59        extract_to (str)
 60    """
 61    with ZipFile(zip_file_path, 'r') as zipfile:
 62        if not extract_to:
 63            extract_to = os.path.join(
 64                os.path.dirname(zip_file_path),
 65                os.path.splitext(os.path.basename(zip_file_path))[0]
 66            )
 67
 68        desc = f"Extracting {zip_file_path} to {extract_to}"
 69        if not sys.stdout:
 70            print(f'{desc}...')
 71            gen_members = zipfile.infolist()
 72        else:
 73            gen_members = tqdm(zipfile.infolist(), desc=desc)
 74
 75        for member in gen_members:
 76            zipfile.extract(member, extract_to)
 77    if remove_zip:
 78        os.remove(zip_file_path)
 79    return extract_to
 80
 81
 82def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str:
 83    """
 84
 85    Args:
 86        url (str): Url to download
 87        extract_to (str=None): Directory to save file. Default temporary directory
 88        headers (list=None)
 89
 90    Returns:
 91        path_file (str | None)
 92    """
 93    opener = build_opener()
 94    if headers:
 95        opener.addheaders = headers
 96
 97    with opener.open(url) as response:
 98        content_length = response.length
 99        if not extract_to:
100            extract_to = gettempdir()
101
102        if n_file := response.headers.get_filename():
103            file_path = os.path.join(extract_to, n_file)
104        else:
105            file_path = os.path.join(extract_to, Path(response.url).name)
106
107        with open(file_path, "wb") as out_file:
108            def get_resp_data():
109                while True:
110                    data = response.read(1024)
111                    if not data:
112                        break
113                    yield data
114
115            desc = f'Downloading to "{file_path}"'
116            if not sys.stdout:
117                print(f'{desc}...')
118                for data in get_resp_data():
119                    out_file.write(data)
120            else:
121                with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar:
122                    for data in get_resp_data():
123                        out_file.write(data)
124                        progress_bar.update(len(data))
125
126            return file_path
127
128
129def caller_name(skip=2):
130    """Get a name of a caller in the format module.class.method
131
132       `skip` specifies how many levels of stack to skip while getting caller
133       name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
134
135       An empty string is returned if skipped levels exceed stack height
136    """
137
138    def stack_(frame):
139        framelist = []
140        while frame:
141            framelist.append(frame)
142            frame = frame.f_back
143        return framelist
144
145    stack = stack_(sys._getframe(1))
146    start = 0 + skip
147    if len(stack) < start + 1:
148        return ''
149    parentframe = stack[start]
150
151    name = []
152    module = inspect.getmodule(parentframe)
153    # `modname` can be None when frame is executed directly in console
154    if module and module.__name__ != "__main__":
155        name.append(module.__name__)
156    # detect classname
157    if 'self' in parentframe.f_locals:
158        # I don't know any way to detect call from the object method
159        # XXX: there seems to be no way to detect static method call - it will
160        #      be just a function call
161        name.append(parentframe.f_locals['self'].__class__.__name__)
162    codename = parentframe.f_code.co_name
163    if codename != '<module>':  # top level usually
164        name.append(codename)  # function or a method
165    del parentframe
166
167    return ".".join(name)
168
169
170def get_environ():
171    """
172    Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON.
173    Si no está definida por defecto devuelve 'dev'
174
175    Returns:
176        str: El nombre del entorno 'dev' o 'prod'
177    """
178    return os.getenv("DEV_ENVIRON", "dev").lower()
179
180
181def create_dir(a_dir):
182    """
183    Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE
184
185    Args:
186        a_dir {str}: path del directorio a crear
187
188    Returns:
189        bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no
190
191    """
192    ok = False
193    if os.path.exists(a_dir):
194        ok = True
195    else:
196        try:
197            os.makedirs(a_dir)
198            ok = True
199        except OSError as exc:
200            print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir)
201
202    return ok
203
204
205def remove_content_dir(a_dir):
206    """
207    Borra ficheros y subdirectorios de directorio
208
209    Args:
210        a_dir {str}: path del directorio a crear
211
212    Returns:
213        num_elems_removed (int), num_elems_dir (int)
214    """
215    num_elems_removed = 0
216    num_elems_dir = 0
217    for de in os.scandir(a_dir):
218        if de.is_dir():
219            n_rem_subdir, n_subdir = remove_content_dir(de.path)
220            num_elems_dir += n_subdir
221            num_elems_removed += n_rem_subdir
222            try:
223                os.rmdir(de.path)
224            except:
225                pass
226        else:
227            num_elems_dir += 1
228            try:
229                os.unlink(de.path)
230                num_elems_removed += 1
231            except:
232                pass
233
234    return num_elems_removed, num_elems_dir
235
236
237# Sadly, Python fails to provide the following magic number for us.
238ERROR_INVALID_NAME = 123
239'''
240Windows-specific error code indicating an invalid pathname.
241
242See Also
243----------
244https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx
245    Official listing of all such codes.
246'''
247
248
249def is_pathname_valid(pathname):
250    '''
251    `True` if the passed pathname is a valid pathname for the current OS;
252    `False` otherwise.
253    '''
254    # If this pathname is either not a string or is but is empty, this pathname
255    # is invalid.
256    try:
257        if not isinstance(pathname, str) or not pathname:
258            return False
259
260        # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`)
261        # if any. Since Windows prohibits path components from containing `:`
262        # characters, failing to strip this `:`-suffixed prefix would
263        # erroneously invalidate all valid absolute Windows pathnames.
264        _, pathname = os.path.splitdrive(pathname)
265
266        # Directory guaranteed to exist. If the current OS is Windows, this is
267        # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%"
268        # environment variable); else, the typical root directory.
269        root_dirname = os.environ.get('HOMEDRIVE', 'C:') \
270            if sys.platform == 'win32' else os.sep
271        assert os.path.isdir(root_dirname)  # ...Murphy and her ironclad Law
272
273        # Append a path separator to this directory if needed.
274        root_dirname = root_dirname.rstrip(os.sep) + os.sep
275
276        # Test whether each path component split from this pathname is valid or
277        # not, ignoring non-existent and non-readable path components.
278        for pathname_part in pathname.split(os.sep):
279            try:
280                os.lstat(root_dirname + pathname_part)
281            # If an OS-specific exception is raised, its error code
282            # indicates whether this pathname is valid or not. Unless this
283            # is the case, this exception implies an ignorable kernel or
284            # filesystem complaint (e.g., path not found or inaccessible).
285            #
286            # Only the following exceptions indicate invalid pathnames:
287            #
288            # * Instances of the Windows-specific "WindowsError" class
289            #   defining the "winerror" attribute whose value is
290            #   "ERROR_INVALID_NAME". Under Windows, "winerror" is more
291            #   fine-grained and hence useful than the generic "errno"
292            #   attribute. When a too-long pathname is passed, for example,
293            #   "errno" is "ENOENT" (i.e., no such file or directory) rather
294            #   than "ENAMETOOLONG" (i.e., file name too long).
295            # * Instances of the cross-platform "OSError" class defining the
296            #   generic "errno" attribute whose value is either:
297            #   * Under most POSIX-compatible OSes, "ENAMETOOLONG".
298            #   * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE".
299            except OSError as exc:
300                if hasattr(exc, 'winerror'):
301                    if exc.winerror == ERROR_INVALID_NAME:
302                        return False
303                elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}:
304                    return False
305    # If a "TypeError" exception was raised, it almost certainly has the
306    # error message "embedded NUL character" indicating an invalid pathname.
307    except TypeError as exc:
308        return False
309    # If no exception was raised, all path components and hence this
310    # pathname itself are valid. (Praise be to the curmudgeonly python.)
311    else:
312        return True
313    # If any other exception was raised, this is an unrelated fatal issue
314    # (e.g., a bug). Permit this exception to unwind the call stack.
315    #
316    # Did we mention this should be shipped with Python already?
317
318
319def is_dir_writable(dirname):
320    '''
321    `True` if the current user has sufficient permissions to create **siblings**
322    (i.e., arbitrary files in the parent directory) of the passed pathname;
323    `False` otherwise.
324    '''
325    try:
326        a_tmp = os.path.join(dirname, "temp.tmp")
327        with open(a_tmp, 'w+b'):
328            pass
329
330        try:
331            os.remove(a_tmp)
332        except:
333            pass
334
335        return True
336
337    # While the exact type of exception raised by the above function depends on
338    # the current version of the Python interpreter, all such types subclass the
339    # following exception superclass.
340    except:
341        return False
342
343
344def is_path_exists_or_creatable(pathname):
345    '''
346    `True` if the passed pathname is a valid pathname on the current OS _and_
347    either currently exists or is hypothetically creatable in a cross-platform
348    manner optimized for POSIX-unfriendly filesystems; `False` otherwise.
349
350    This function is guaranteed to _never_ raise exceptions.
351    '''
352    try:
353        # To prevent "os" module calls from raising undesirable exceptions on
354        # invalid pathnames, is_pathname_valid() is explicitly called first.
355        return is_pathname_valid(pathname) and (
356                os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname)))
357    # Report failure on non-fatal filesystem complaints (e.g., connection
358    # timeouts, permissions issues) implying this path to be inaccessible. All
359    # other exceptions are unrelated fatal issues and should not be caught here.
360    except OSError:
361        return False
362
363
364def get_matching_val(search_val, matching_vals):
365    """
366    Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto
367    (prop_val).
368
369    Args:
370        search_val (str): Valor propuesto para comparar
371        matching_vals (list(str)): Lista de valores a comparar
372
373    Returns:
374        match_val (str), fact_jaro_winkler (float)
375    """
376    jaro_results = jaro_winkler(search_val, matching_vals)
377    fact_jaro = next(iter(jaro_results), None)
378
379    return jaro_results.get(fact_jaro), fact_jaro
380
381
382def levenshtein_distance(search_val, matching_vals):
383    """
384
385    Args:
386        search_val:
387        matching_vals:
388
389    Returns:
390
391    """
392    ord_vals = OrderedDict()
393    distances = {}
394    for match_val in matching_vals:
395        fact = jellyfish.levenshtein_distance(search_val, match_val)
396        vals_fact = distances.get(fact, list())
397        distances[fact] = vals_fact + [match_val]
398
399    for fact in sorted(distances):
400        ord_vals[fact] = distances.get(fact, [])
401
402    return ord_vals
403
404
405def jaro_winkler(search_val, matching_vals):
406    """
407
408    Args:
409        search_val:
410        matching_vals:
411
412    Returns:
413
414    """
415    ord_vals = OrderedDict()
416    matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val
417                 for match_val in matching_vals}
418    for fact in sorted(matchings, reverse=True):
419        if fact != 0:
420            ord_vals[fact] = matchings[fact]
421
422    return ord_vals
423
424
425def call_command(command_prog, *args):
426    """
427    Llama comando shell sistema con los argumentos indicados
428
429    Returns:
430        bool: True si OK
431
432    """
433    call_args = [command_prog]
434    call_args.extend(args)
435    ret = subprocess.check_call(call_args, shell=True)
436
437    return (ret == 0)
438
439
440def rounded_float(a_float, num_decs=9):
441    """
442    Formatea un float con el numero de decimales especificado
443    Args:
444        a_float:
445        num_decs:
446
447    Returns:
448        str
449    """
450    return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.'))
451
452
453class formatted_float(float):
454    """
455    Devuelve un float que se representa con un maximo de decimales (__num_decs__)
456    """
457    __num_decs__ = 9
458
459    def __repr__(self):
460        return str(rounded_float(self, self.__num_decs__))
461
462
463def as_format_floats(obj):
464    """
465    Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación
466
467    Args:
468        obj: Cualquier objeto
469
470    Returns:
471        (obj, formatted_float)
472
473    """
474    if isinstance(obj, (float, formatted_float)):
475        return formatted_float(obj)
476    elif isinstance(obj, (dict, OrderedDict)):
477        return obj.__class__((k, as_format_floats(v)) for k, v in obj.items())
478    elif isinstance(obj, (list, tuple)):
479        return obj.__class__(as_format_floats(v) for v in obj)
480    return obj
481
482
483def nums_from_str(a_string, nan=False):
484    """
485    Retorna lista de numeros en el texto pasado
486
487    Args:
488        a_string (str):
489        nan (bool=FAlse): por defecto no trata los NaN como numeros
490
491    Returns:
492        list
493    """
494    l_nums = []
495
496    for s in a_string.strip().split():
497        try:
498            l_nums.append(int(s))
499        except ValueError:
500            try:
501                fl = float(s)
502                if nan or not isnan(fl):
503                    l_nums.append(fl)
504            except ValueError:
505                pass
506
507    return l_nums
508
509
510def first_num_from_str(a_string, nan=False):
511    """
512    Retorna primer numero encontrado del texto pasado
513
514    Args:
515        a_string (str):
516        nan (bool=FAlse): por defecto no trata los NaN como numeros
517
518    Returns:
519        int OR float
520    """
521    return next(iter(nums_from_str(a_string, nan=nan)), None)
522
523
524def dates_from_str(str, formats=None, seps=None, ret_extra_data=False):
525    """
526    Retorna dict de fechas disponibles con el texto pasado segun formatos indicados
527
528    Args:
529        str (str):
530        formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
531        seps (list=None): por defecto [None, '.', ',']
532        ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
533
534    Returns:
535        list
536    """
537    l_fechas = list()
538
539    if not formats:
540        formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
541
542    if not seps:
543        seps = [None, '.', ',']
544
545    str_parts = [s.strip() for sep in seps for s in str.split(sep)]
546
547    for format in formats:
548        for str_part in str_parts:
549            try:
550                val = datetime.datetime.strptime(str_part, format)
551                if ret_extra_data:
552                    val = (val, str_part, format)
553                l_fechas.append(val)
554            except Exception:
555                pass
556
557    return l_fechas
558
559
560def pretty_text(txt):
561    """
562    Coge texto y lo capitaliza y quita carácteres por espacios
563    Args:
564        txt (str):
565
566    Returns:
567        str
568    """
569    return txt.replace("_", " ").replace("-", " ").capitalize()
570
571
572def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED):
573    """
574    Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)
575
576    Args:
577        zip_path:
578        file_paths (list or generator):
579        base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
580        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
581
582    Returns:
583        zip_path (str)
584    """
585    with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip:
586        for file_path in file_paths:
587            if base_path:
588                re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE)
589                arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/'))
590            else:
591                arch_name = os.path.basename(file_path)
592
593            my_zip.write(file_path, arcname=arch_name)
594
595    return zip_path
596
597
598def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED):
599    """
600    Comprime la carpeta indicada
601
602    Args:
603        dir_path (str): path directorio
604        zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo
605                            nombre del directorio zipeado
606        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
607        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
608        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
609
610    Returns:
611        zip_file (str)
612    """
613    if not zip_path:
614        zip_path = f'{dir_path}.zip'
615
616    zip_file = zip_files(zip_path,
617                         iter_paths_dir(dir_path,
618                                        relative_dirs_sel=relative_dirs_sel,
619                                        func_filter_path=func_filter_path),
620                         base_path=dir_path,
621                         compression=compression)
622
623    return zip_file
624
625
626def zip_files_dir(dir_path, remove_files=False, *exts_files):
627    """
628    Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima
629
630    Args:
631        dir_path:
632        remove_files:
633        *exts_files: extensiones de fichero SIN el punto
634
635    Returns:
636        ok (bool)
637    """
638    exts = [".{}".format(ext.lower()) for ext in exts_files]
639    for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path)
640                                for de in os.scandir(dir_path)):
641        if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts):
642            print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path))
643            zip_files(zip_path, [file_path])
644
645            if remove_files and not os.path.samefile(zip_path, file_path):
646                os.remove(file_path)
647
648    return True
649
650
651def split_ext_file(path_file):
652    """
653    Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás
654    Args:
655        path_file:
656    Returns:
657        base_file (str), ext_file (str)
658    """
659    parts_file = os.path.basename(path_file).split(".")
660    base_file = parts_file[0]
661    ext_file = ".".join(parts_file[1:])
662
663    return base_file, ext_file
664
665
666FILE_RUN_LOG = "last_run.log"
667DATE_RUN_LOG_FRMT = "%Y%m%d"
668
669
670def last_run_on_dir(dir_base):
671    """
672    Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
673    Args:
674        dir_base (str):
675
676    Returns:
677        date_last_run (datetime): Si no encuentra devuelve None
678    """
679    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
680    dt_last_run = None
681    if os.path.exists(log_last_run):
682        with open(log_last_run) as fr:
683            dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT)
684
685    return dt_last_run
686
687
688def save_last_run_on_dir(dir_base, date_run=None):
689    """
690    Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
691
692    Args:
693        dir_base (str):
694        date_run (datetime=None): Si no se informa cogerá la fecha de hoy
695    """
696    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
697    if not date_run:
698        date_run = datetime.date.today()
699    with open(log_last_run, "w+") as fw:
700        fw.write(date_run.strftime(DATE_RUN_LOG_FRMT))
701
702
703def month_name(num_month, code_alias_locale="es_cu"):
704    """
705    Retorna numero de mes en el locale espcificado. Por defecto castellano
706
707    Args:
708        num_month (int):
709        code_alias_locale (str='es_es'):
710
711    Returns:
712        str
713    """
714    with different_locale(locale.locale_alias.get(code_alias_locale)):
715        return pretty_text(calendar.month_name[num_month])
716
717
718def file_mod_time(path_file):
719    """
720    Return datetime from mofification stat timestamp from file
721
722    Args:
723        path_file (str):
724
725    Returns:
726        datetime
727    """
728    f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime)
729
730    return f_mod_time
731
732
733def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"):
734    """
735    Itera como dicts indexados por valores primera fila (si header=True) o si no como list
736    las filas del CSV pasado por parametro a_path_csv.
737
738    Args:
739        a_path_csv (str):
740        header (bool=True):
741        sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
742        encoding (str="utf8"):
743    Yields:
744        list OR dict
745    """
746    with open(a_path_csv, encoding=encoding) as a_file:
747        csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';')
748        header_row = None
749        for row in csv_rdr:
750            if header and not header_row:
751                header_row = [v.strip().lower() for v in row]
752                continue
753
754            if header_row:
755                vals_row = dict(zip(header_row, row))
756            else:
757                vals_row = row
758
759            if vals_row:
760                yield vals_row
761
762
763def subdirs_path(path):
764    """
765    Itera sobre los subdirectorios del path
766    Args:
767        path:
768
769    Yields:
770        nom_subdir, path_subdir
771    """
772    with os.scandir(path) as it:
773        for entry in it:
774            if entry.is_dir():
775                yield entry.name, entry.path
776
777
778def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False):
779    """
780
781    Args:
782        path_dir_base:
783        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
784        last_level_as_list (bool=False):
785
786    Returns:
787        dict
788    """
789    tree = {}
790
791    f_valid_dir = None
792    valid_dirs_sel = set()
793    if relative_dirs_sel:
794        for dir_sel in relative_dirs_sel:
795            path_dir_rel = os.path.join(path_dir_base, dir_sel)
796            if os.path.exists(path_dir_rel):
797                valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower())
798
799        def valid_dir(dir_path):
800            valid = False
801            rel_path = os.path.relpath(dir_path, path_dir_base).lower()
802            for dir_sel in valid_dirs_sel:
803                if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)):
804                    valid = True
805                    break
806
807            return valid
808
809        f_valid_dir = valid_dir
810
811    for dir_name, dir_path in subdirs_path(path_dir_base):
812        if not f_valid_dir or f_valid_dir(dir_path):
813            dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower()
814            dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel
815                             if os.path.commonpath((dir_path_rel, dir_sel))]
816            tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path)
817
818    if tree:
819        if last_level_as_list and not any(tree.values()):
820            tree = [*tree.keys()]
821
822    return tree
823
824
825def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False):
826    """
827    Retorna diccionario con el arbol de paths disponibles en el path indicado.
828
829    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
830
831    Args:
832        path_dir_base (str):
833        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
834        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
835        solo_dirs (bool=False):
836
837    Returns:
838        dict
839    """
840    paths = dict()
841
842    valid_dirs_sel = set()
843    if relative_dirs_sel:
844        for dir_sel in relative_dirs_sel:
845            path_dir_rel = os.path.join(path_dir_base, dir_sel)
846            if os.path.exists(path_dir_rel):
847                valid_dirs_sel.add(path_dir_rel)
848
849    for dir_path, dir_names, file_names in os.walk(path_dir_base):
850        if valid_dirs_sel and not any(
851                os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel)
852                for a_dir_sel in valid_dirs_sel):
853            continue
854
855        dir_path = os.path.relpath(dir_path, path_dir_base)
856        dir_name = os.path.basename(dir_path)
857
858        if func_filter_path and not func_filter_path(dir_name):
859            continue
860
861        files_selected = {fn: None for fn in file_names
862                          if not func_filter_path or func_filter_path(fn)}
863
864        if files_selected:
865            subdir_paths = paths
866            # En el caso del primer nivel no se guarda name directorio
867            if dir_path != '.':
868                for d in dir_path.split(os.sep):
869                    if d not in subdir_paths:
870                        subdir_paths[d] = dict()
871                    subdir_paths = subdir_paths[d]
872
873            if not solo_dirs:
874                subdir_paths.update(files_selected)
875
876    return paths
877
878
879def iter_tree_paths(tree_paths, path_base=None):
880    """
881
882    Args:
883        tree_paths (dict):
884        path_base (str=None):
885
886    Yields:
887        path_file
888    """
889    for path, sub_tree in tree_paths.items():
890        if sub_tree and isinstance(sub_tree, dict):
891            for sub_path in iter_tree_paths(sub_tree, path):
892                yield os.path.join(path_base, sub_path) if path_base else sub_path
893        else:
894            yield os.path.join(path_base, path) if path_base else path
895
896
897def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None):
898    """
899    Itera el arbol de paths disponibles en el path indicado.
900
901    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
902
903    Args:
904        path_dir_base (str):
905        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
906        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
907
908    Yields:
909        path (str)
910    """
911    for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base):
912        yield path
913
914
915def is_path_child_from(path, path_parent):
916    """
917    Retorna si path es hijo de path_parent
918
919    Args:
920        path:
921        path_parent:
922
923    Returns:
924        bool
925    """
926    p_path = Path(path)
927    p_path_parent = Path(path_parent)
928
929    return any(p.samefile(p_path_parent) for p in p_path.parents)
930
931
932def machine_name():
933    """
934    Retorna el nombre de la maquina
935
936    Returns:
937        str
938    """
939    # TODO - Get host from docker machine when we are in a container
940    # TODO - import docker
941    # TODO -
942    # TODO - client = docker.from_env()
943    # TODO - container_info = client.containers.get(socket.gethostname())
944    # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress']
945    # TODO - print(docker_host_ip)
946
947    return socket.getfqdn().upper()
948
949
950def machine_apb():
951    """
952    Retorna el nombre de la maquina
953
954    Returns:
955        bool
956    """
957    return socket.getfqdn().lower().endswith('.apb.es')
958
959
960if __name__ == '__main__':
961    import fire
962
963    fire.Fire()
def download_and_unzip( url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True):
32def download_and_unzip(url: str, extract_to: str = None, headers: list = None, remove_zip: bool = True):
33    """
34
35    Args:
36        url (str):
37        extract_to (str=None): if None, extract to current directory
38        headers (list=None)
39        remove_zip (bool=True):
40
41    Returns:
42        path_zip (str)
43    """
44    if zip_file_path := download_from_url(url, extract_to, headers):
45        extract_to = unzip(zip_file_path, extract_to, remove_zip)
46
47        return extract_to
Arguments:
  • url (str):
  • extract_to (str=None): if None, extract to current directory
  • headers (list=None)
  • remove_zip (bool=True):
Returns:

path_zip (str)

def unzip(zip_file_path, extract_to=None, remove_zip=False):
50def unzip(zip_file_path, extract_to=None, remove_zip=False):
51    """
52    Unzip file to extract_to directory
53
54    Args:
55        zip_file_path (str): Path to zip file
56        extract_to: (str=None): if None, extract to zip's directory
57        remove_zip: (bool=False): If True remove zip file after unzip
58
59    Returns:
60        extract_to (str)
61    """
62    with ZipFile(zip_file_path, 'r') as zipfile:
63        if not extract_to:
64            extract_to = os.path.join(
65                os.path.dirname(zip_file_path),
66                os.path.splitext(os.path.basename(zip_file_path))[0]
67            )
68
69        desc = f"Extracting {zip_file_path} to {extract_to}"
70        if not sys.stdout:
71            print(f'{desc}...')
72            gen_members = zipfile.infolist()
73        else:
74            gen_members = tqdm(zipfile.infolist(), desc=desc)
75
76        for member in gen_members:
77            zipfile.extract(member, extract_to)
78    if remove_zip:
79        os.remove(zip_file_path)
80    return extract_to

Unzip file to extract_to directory

Arguments:
  • zip_file_path (str): Path to zip file
  • extract_to: (str=None): if None, extract to zip's directory
  • remove_zip: (bool=False): If True remove zip file after unzip
Returns:

extract_to (str)

def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str:
 83def download_from_url(url: str, extract_to: str = None, headers: list[str] = None) -> str:
 84    """
 85
 86    Args:
 87        url (str): Url to download
 88        extract_to (str=None): Directory to save file. Default temporary directory
 89        headers (list=None)
 90
 91    Returns:
 92        path_file (str | None)
 93    """
 94    opener = build_opener()
 95    if headers:
 96        opener.addheaders = headers
 97
 98    with opener.open(url) as response:
 99        content_length = response.length
100        if not extract_to:
101            extract_to = gettempdir()
102
103        if n_file := response.headers.get_filename():
104            file_path = os.path.join(extract_to, n_file)
105        else:
106            file_path = os.path.join(extract_to, Path(response.url).name)
107
108        with open(file_path, "wb") as out_file:
109            def get_resp_data():
110                while True:
111                    data = response.read(1024)
112                    if not data:
113                        break
114                    yield data
115
116            desc = f'Downloading to "{file_path}"'
117            if not sys.stdout:
118                print(f'{desc}...')
119                for data in get_resp_data():
120                    out_file.write(data)
121            else:
122                with tqdm(desc=desc, total=content_length, unit="B", unit_scale=True) as progress_bar:
123                    for data in get_resp_data():
124                        out_file.write(data)
125                        progress_bar.update(len(data))
126
127            return file_path
Arguments:
  • url (str): Url to download
  • extract_to (str=None): Directory to save file. Default temporary directory
  • headers (list=None)
Returns:

path_file (str | None)

def caller_name(skip=2):
130def caller_name(skip=2):
131    """Get a name of a caller in the format module.class.method
132
133       `skip` specifies how many levels of stack to skip while getting caller
134       name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
135
136       An empty string is returned if skipped levels exceed stack height
137    """
138
139    def stack_(frame):
140        framelist = []
141        while frame:
142            framelist.append(frame)
143            frame = frame.f_back
144        return framelist
145
146    stack = stack_(sys._getframe(1))
147    start = 0 + skip
148    if len(stack) < start + 1:
149        return ''
150    parentframe = stack[start]
151
152    name = []
153    module = inspect.getmodule(parentframe)
154    # `modname` can be None when frame is executed directly in console
155    if module and module.__name__ != "__main__":
156        name.append(module.__name__)
157    # detect classname
158    if 'self' in parentframe.f_locals:
159        # I don't know any way to detect call from the object method
160        # XXX: there seems to be no way to detect static method call - it will
161        #      be just a function call
162        name.append(parentframe.f_locals['self'].__class__.__name__)
163    codename = parentframe.f_code.co_name
164    if codename != '<module>':  # top level usually
165        name.append(codename)  # function or a method
166    del parentframe
167
168    return ".".join(name)

Get a name of a caller in the format module.class.method

skip specifies how many levels of stack to skip while getting caller name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.

An empty string is returned if skipped levels exceed stack height

def get_environ():
171def get_environ():
172    """
173    Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON.
174    Si no está definida por defecto devuelve 'dev'
175
176    Returns:
177        str: El nombre del entorno 'dev' o 'prod'
178    """
179    return os.getenv("DEV_ENVIRON", "dev").lower()

Devuelve el entorno de trabajo a partir de la environment variable DEV_ENVIRON. Si no está definida por defecto devuelve 'dev'

Returns:

str: El nombre del entorno 'dev' o 'prod'

def create_dir(a_dir):
182def create_dir(a_dir):
183    """
184    Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE
185
186    Args:
187        a_dir {str}: path del directorio a crear
188
189    Returns:
190        bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no
191
192    """
193    ok = False
194    if os.path.exists(a_dir):
195        ok = True
196    else:
197        try:
198            os.makedirs(a_dir)
199            ok = True
200        except OSError as exc:
201            print("ATENCIÓ!! - No se ha podido crear el directorio", a_dir)
202
203    return ok

Crea directorio devolviendo TRUE o FALSE según haya ido. Si ya existe devuelve TRUE

Arguments:
  • a_dir {str}: path del directorio a crear
Returns:

bool: Retorna TRUE si lo ha podido crear o ya existía y FALSE si no

def remove_content_dir(a_dir):
206def remove_content_dir(a_dir):
207    """
208    Borra ficheros y subdirectorios de directorio
209
210    Args:
211        a_dir {str}: path del directorio a crear
212
213    Returns:
214        num_elems_removed (int), num_elems_dir (int)
215    """
216    num_elems_removed = 0
217    num_elems_dir = 0
218    for de in os.scandir(a_dir):
219        if de.is_dir():
220            n_rem_subdir, n_subdir = remove_content_dir(de.path)
221            num_elems_dir += n_subdir
222            num_elems_removed += n_rem_subdir
223            try:
224                os.rmdir(de.path)
225            except:
226                pass
227        else:
228            num_elems_dir += 1
229            try:
230                os.unlink(de.path)
231                num_elems_removed += 1
232            except:
233                pass
234
235    return num_elems_removed, num_elems_dir

Borra ficheros y subdirectorios de directorio

Arguments:
  • a_dir {str}: path del directorio a crear
Returns:

num_elems_removed (int), num_elems_dir (int)

ERROR_INVALID_NAME = 123

Windows-specific error code indicating an invalid pathname.

See Also

https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx Official listing of all such codes.

def is_pathname_valid(pathname):
250def is_pathname_valid(pathname):
251    '''
252    `True` if the passed pathname is a valid pathname for the current OS;
253    `False` otherwise.
254    '''
255    # If this pathname is either not a string or is but is empty, this pathname
256    # is invalid.
257    try:
258        if not isinstance(pathname, str) or not pathname:
259            return False
260
261        # Strip this pathname's Windows-specific drive specifier (e.g., `C:\`)
262        # if any. Since Windows prohibits path components from containing `:`
263        # characters, failing to strip this `:`-suffixed prefix would
264        # erroneously invalidate all valid absolute Windows pathnames.
265        _, pathname = os.path.splitdrive(pathname)
266
267        # Directory guaranteed to exist. If the current OS is Windows, this is
268        # the drive to which Windows was installed (e.g., the "%HOMEDRIVE%"
269        # environment variable); else, the typical root directory.
270        root_dirname = os.environ.get('HOMEDRIVE', 'C:') \
271            if sys.platform == 'win32' else os.sep
272        assert os.path.isdir(root_dirname)  # ...Murphy and her ironclad Law
273
274        # Append a path separator to this directory if needed.
275        root_dirname = root_dirname.rstrip(os.sep) + os.sep
276
277        # Test whether each path component split from this pathname is valid or
278        # not, ignoring non-existent and non-readable path components.
279        for pathname_part in pathname.split(os.sep):
280            try:
281                os.lstat(root_dirname + pathname_part)
282            # If an OS-specific exception is raised, its error code
283            # indicates whether this pathname is valid or not. Unless this
284            # is the case, this exception implies an ignorable kernel or
285            # filesystem complaint (e.g., path not found or inaccessible).
286            #
287            # Only the following exceptions indicate invalid pathnames:
288            #
289            # * Instances of the Windows-specific "WindowsError" class
290            #   defining the "winerror" attribute whose value is
291            #   "ERROR_INVALID_NAME". Under Windows, "winerror" is more
292            #   fine-grained and hence useful than the generic "errno"
293            #   attribute. When a too-long pathname is passed, for example,
294            #   "errno" is "ENOENT" (i.e., no such file or directory) rather
295            #   than "ENAMETOOLONG" (i.e., file name too long).
296            # * Instances of the cross-platform "OSError" class defining the
297            #   generic "errno" attribute whose value is either:
298            #   * Under most POSIX-compatible OSes, "ENAMETOOLONG".
299            #   * Under some edge-case OSes (e.g., SunOS, *BSD), "ERANGE".
300            except OSError as exc:
301                if hasattr(exc, 'winerror'):
302                    if exc.winerror == ERROR_INVALID_NAME:
303                        return False
304                elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}:
305                    return False
306    # If a "TypeError" exception was raised, it almost certainly has the
307    # error message "embedded NUL character" indicating an invalid pathname.
308    except TypeError as exc:
309        return False
310    # If no exception was raised, all path components and hence this
311    # pathname itself are valid. (Praise be to the curmudgeonly python.)
312    else:
313        return True
314    # If any other exception was raised, this is an unrelated fatal issue
315    # (e.g., a bug). Permit this exception to unwind the call stack.
316    #
317    # Did we mention this should be shipped with Python already?

True if the passed pathname is a valid pathname for the current OS; False otherwise.

def is_dir_writable(dirname):
320def is_dir_writable(dirname):
321    '''
322    `True` if the current user has sufficient permissions to create **siblings**
323    (i.e., arbitrary files in the parent directory) of the passed pathname;
324    `False` otherwise.
325    '''
326    try:
327        a_tmp = os.path.join(dirname, "temp.tmp")
328        with open(a_tmp, 'w+b'):
329            pass
330
331        try:
332            os.remove(a_tmp)
333        except:
334            pass
335
336        return True
337
338    # While the exact type of exception raised by the above function depends on
339    # the current version of the Python interpreter, all such types subclass the
340    # following exception superclass.
341    except:
342        return False

True if the current user has sufficient permissions to create siblings (i.e., arbitrary files in the parent directory) of the passed pathname; False otherwise.

def is_path_exists_or_creatable(pathname):
345def is_path_exists_or_creatable(pathname):
346    '''
347    `True` if the passed pathname is a valid pathname on the current OS _and_
348    either currently exists or is hypothetically creatable in a cross-platform
349    manner optimized for POSIX-unfriendly filesystems; `False` otherwise.
350
351    This function is guaranteed to _never_ raise exceptions.
352    '''
353    try:
354        # To prevent "os" module calls from raising undesirable exceptions on
355        # invalid pathnames, is_pathname_valid() is explicitly called first.
356        return is_pathname_valid(pathname) and (
357                os.path.exists(pathname) or is_dir_writable(os.path.dirname(pathname)))
358    # Report failure on non-fatal filesystem complaints (e.g., connection
359    # timeouts, permissions issues) implying this path to be inaccessible. All
360    # other exceptions are unrelated fatal issues and should not be caught here.
361    except OSError:
362        return False

True if the passed pathname is a valid pathname on the current OS _and_ either currently exists or is hypothetically creatable in a cross-platform manner optimized for POSIX-unfriendly filesystems; False otherwise.

This function is guaranteed to _never_ raise exceptions.

def get_matching_val(search_val, matching_vals):
365def get_matching_val(search_val, matching_vals):
366    """
367    Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto
368    (prop_val).
369
370    Args:
371        search_val (str): Valor propuesto para comparar
372        matching_vals (list(str)): Lista de valores a comparar
373
374    Returns:
375        match_val (str), fact_jaro_winkler (float)
376    """
377    jaro_results = jaro_winkler(search_val, matching_vals)
378    fact_jaro = next(iter(jaro_results), None)
379
380    return jaro_results.get(fact_jaro), fact_jaro

Retorna el valor que se asimila a los valores a comparar (matching_vals) respecto al valor propuesto (prop_val).

Arguments:
  • search_val (str): Valor propuesto para comparar
  • matching_vals (list(str)): Lista de valores a comparar
Returns:

match_val (str), fact_jaro_winkler (float)

def levenshtein_distance(search_val, matching_vals):
383def levenshtein_distance(search_val, matching_vals):
384    """
385
386    Args:
387        search_val:
388        matching_vals:
389
390    Returns:
391
392    """
393    ord_vals = OrderedDict()
394    distances = {}
395    for match_val in matching_vals:
396        fact = jellyfish.levenshtein_distance(search_val, match_val)
397        vals_fact = distances.get(fact, list())
398        distances[fact] = vals_fact + [match_val]
399
400    for fact in sorted(distances):
401        ord_vals[fact] = distances.get(fact, [])
402
403    return ord_vals
Arguments:
  • search_val:
  • matching_vals:

Returns:

def jaro_winkler(search_val, matching_vals):
406def jaro_winkler(search_val, matching_vals):
407    """
408
409    Args:
410        search_val:
411        matching_vals:
412
413    Returns:
414
415    """
416    ord_vals = OrderedDict()
417    matchings = {jellyfish.jaro_winkler_similarity(search_val, match_val): match_val
418                 for match_val in matching_vals}
419    for fact in sorted(matchings, reverse=True):
420        if fact != 0:
421            ord_vals[fact] = matchings[fact]
422
423    return ord_vals
Arguments:
  • search_val:
  • matching_vals:

Returns:

def call_command(command_prog, *args):
426def call_command(command_prog, *args):
427    """
428    Llama comando shell sistema con los argumentos indicados
429
430    Returns:
431        bool: True si OK
432
433    """
434    call_args = [command_prog]
435    call_args.extend(args)
436    ret = subprocess.check_call(call_args, shell=True)
437
438    return (ret == 0)

Llama comando shell sistema con los argumentos indicados

Returns:

bool: True si OK

def rounded_float(a_float, num_decs=9):
441def rounded_float(a_float, num_decs=9):
442    """
443    Formatea un float con el numero de decimales especificado
444    Args:
445        a_float:
446        num_decs:
447
448    Returns:
449        str
450    """
451    return float(format(round(a_float, num_decs), ".{}f".format(num_decs)).rstrip('0').rstrip('.'))

Formatea un float con el numero de decimales especificado

Arguments:
  • a_float:
  • num_decs:
Returns:

str

class formatted_float(builtins.float):
454class formatted_float(float):
455    """
456    Devuelve un float que se representa con un maximo de decimales (__num_decs__)
457    """
458    __num_decs__ = 9
459
460    def __repr__(self):
461        return str(rounded_float(self, self.__num_decs__))

Devuelve un float que se representa con un maximo de decimales (__num_decs__)

def as_format_floats(obj):
464def as_format_floats(obj):
465    """
466    Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación
467
468    Args:
469        obj: Cualquier objeto
470
471    Returns:
472        (obj, formatted_float)
473
474    """
475    if isinstance(obj, (float, formatted_float)):
476        return formatted_float(obj)
477    elif isinstance(obj, (dict, OrderedDict)):
478        return obj.__class__((k, as_format_floats(v)) for k, v in obj.items())
479    elif isinstance(obj, (list, tuple)):
480        return obj.__class__(as_format_floats(v) for v in obj)
481    return obj

Si encuentra un Float lo convierte a la clase 'formatted_float' para formatear su representación

Arguments:
  • obj: Cualquier objeto
Returns:

(obj, formatted_float)

def nums_from_str(a_string, nan=False):
484def nums_from_str(a_string, nan=False):
485    """
486    Retorna lista de numeros en el texto pasado
487
488    Args:
489        a_string (str):
490        nan (bool=FAlse): por defecto no trata los NaN como numeros
491
492    Returns:
493        list
494    """
495    l_nums = []
496
497    for s in a_string.strip().split():
498        try:
499            l_nums.append(int(s))
500        except ValueError:
501            try:
502                fl = float(s)
503                if nan or not isnan(fl):
504                    l_nums.append(fl)
505            except ValueError:
506                pass
507
508    return l_nums

Retorna lista de numeros en el texto pasado

Arguments:
  • a_string (str):
  • nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:

list

def first_num_from_str(a_string, nan=False):
511def first_num_from_str(a_string, nan=False):
512    """
513    Retorna primer numero encontrado del texto pasado
514
515    Args:
516        a_string (str):
517        nan (bool=FAlse): por defecto no trata los NaN como numeros
518
519    Returns:
520        int OR float
521    """
522    return next(iter(nums_from_str(a_string, nan=nan)), None)

Retorna primer numero encontrado del texto pasado

Arguments:
  • a_string (str):
  • nan (bool=FAlse): por defecto no trata los NaN como numeros
Returns:

int OR float

def dates_from_str(str, formats=None, seps=None, ret_extra_data=False):
525def dates_from_str(str, formats=None, seps=None, ret_extra_data=False):
526    """
527    Retorna dict de fechas disponibles con el texto pasado segun formatos indicados
528
529    Args:
530        str (str):
531        formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
532        seps (list=None): por defecto [None, '.', ',']
533        ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
534
535    Returns:
536        list
537    """
538    l_fechas = list()
539
540    if not formats:
541        formats = ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
542
543    if not seps:
544        seps = [None, '.', ',']
545
546    str_parts = [s.strip() for sep in seps for s in str.split(sep)]
547
548    for format in formats:
549        for str_part in str_parts:
550            try:
551                val = datetime.datetime.strptime(str_part, format)
552                if ret_extra_data:
553                    val = (val, str_part, format)
554                l_fechas.append(val)
555            except Exception:
556                pass
557
558    return l_fechas

Retorna dict de fechas disponibles con el texto pasado segun formatos indicados

Arguments:
  • str (str):
  • formats (list=None): por defecto ['%Y%m%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d']
  • seps (list=None): por defecto [None, '.', ',']
  • ret_extra_data (bool=False): si True retorna tuple con fecha + part_str_src + format utilizado
Returns:

list

def pretty_text(txt):
561def pretty_text(txt):
562    """
563    Coge texto y lo capitaliza y quita carácteres por espacios
564    Args:
565        txt (str):
566
567    Returns:
568        str
569    """
570    return txt.replace("_", " ").replace("-", " ").capitalize()

Coge texto y lo capitaliza y quita carácteres por espacios

Arguments:
  • txt (str):
Returns:

str

def zip_files(zip_path, file_paths, base_path=None, compression=8):
573def zip_files(zip_path, file_paths, base_path=None, compression=ZIP_DEFLATED):
574    """
575    Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)
576
577    Args:
578        zip_path:
579        file_paths (list or generator):
580        base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
581        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
582
583    Returns:
584        zip_path (str)
585    """
586    with ZipFile(zip_path, "w", compression=compression, allowZip64=True) as my_zip:
587        for file_path in file_paths:
588            if base_path:
589                re_base_path = re.compile(os.path.normpath(base_path).replace(os.sep, '/'), re.IGNORECASE)
590                arch_name = re_base_path.sub('', os.path.normpath(file_path).replace(os.sep, '/'))
591            else:
592                arch_name = os.path.basename(file_path)
593
594            my_zip.write(file_path, arcname=arch_name)
595
596    return zip_path

Comprime los ficheros indicados con :file_paths en un fichero zip (:zip_path)

Arguments:
  • zip_path:
  • file_paths (list or generator):
  • base_path (srt=None): path desde el que se mantiene la ruta relativa de los ficheros se mantendra
  • compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:

zip_path (str)

def zip_dir( dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=8):
599def zip_dir(dir_path, zip_path=None, relative_dirs_sel=None, func_filter_path=None, compression=ZIP_DEFLATED):
600    """
601    Comprime la carpeta indicada
602
603    Args:
604        dir_path (str): path directorio
605        zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo
606                            nombre del directorio zipeado
607        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
608        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
609        compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
610
611    Returns:
612        zip_file (str)
613    """
614    if not zip_path:
615        zip_path = f'{dir_path}.zip'
616
617    zip_file = zip_files(zip_path,
618                         iter_paths_dir(dir_path,
619                                        relative_dirs_sel=relative_dirs_sel,
620                                        func_filter_path=func_filter_path),
621                         base_path=dir_path,
622                         compression=compression)
623
624    return zip_file

Comprime la carpeta indicada

Arguments:
  • dir_path (str): path directorio
  • zip_path (str=None): el path del fichero .zip a crear. Por defecto zip en el directorio padre con el mismo nombre del directorio zipeado
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
  • compression (int=ZIP_DEFLATED): 0 (ZIP_STORED) si no se quiere comprimir
Returns:

zip_file (str)

def zip_files_dir(dir_path, remove_files=False, *exts_files):
627def zip_files_dir(dir_path, remove_files=False, *exts_files):
628    """
629    Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima
630
631    Args:
632        dir_path:
633        remove_files:
634        *exts_files: extensiones de fichero SIN el punto
635
636    Returns:
637        ok (bool)
638    """
639    exts = [".{}".format(ext.lower()) for ext in exts_files]
640    for zip_path, file_path in (("{}.zip".format(os.path.splitext(de.path)[0]), de.path)
641                                for de in os.scandir(dir_path)):
642        if not exts or (os.extsep in file_path and os.path.splitext(file_path)[1].lower() in exts):
643            print("Comprimiendo fichero '{}' en el zip '{}'".format(file_path, zip_path))
644            zip_files(zip_path, [file_path])
645
646            if remove_files and not os.path.samefile(zip_path, file_path):
647                os.remove(file_path)
648
649    return True

Comprime los ficheros de una carpeta indicada. Se pueden indicar qué tipo de ficheros se quiere que comprima

Arguments:
  • dir_path:
  • remove_files:
  • *exts_files: extensiones de fichero SIN el punto
Returns:

ok (bool)

def split_ext_file(path_file):
652def split_ext_file(path_file):
653    """
654    Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás
655    Args:
656        path_file:
657    Returns:
658        base_file (str), ext_file (str)
659    """
660    parts_file = os.path.basename(path_file).split(".")
661    base_file = parts_file[0]
662    ext_file = ".".join(parts_file[1:])
663
664    return base_file, ext_file

Devuelve el nombre del fichero partido entre la primera parte antes del separador "." y lo demás

Arguments:
  • path_file:
Returns:

base_file (str), ext_file (str)

FILE_RUN_LOG = 'last_run.log'
DATE_RUN_LOG_FRMT = '%Y%m%d'
def last_run_on_dir(dir_base):
671def last_run_on_dir(dir_base):
672    """
673    Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
674    Args:
675        dir_base (str):
676
677    Returns:
678        date_last_run (datetime): Si no encuentra devuelve None
679    """
680    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
681    dt_last_run = None
682    if os.path.exists(log_last_run):
683        with open(log_last_run) as fr:
684            dt_last_run = datetime.datetime.strptime(fr.read(), DATE_RUN_LOG_FRMT)
685
686    return dt_last_run

Retorna la fecha de ultima ejecucion de proceso generacion en directorio de repositorio

Arguments:
  • dir_base (str):
Returns:

date_last_run (datetime): Si no encuentra devuelve None

def save_last_run_on_dir(dir_base, date_run=None):
689def save_last_run_on_dir(dir_base, date_run=None):
690    """
691    Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio
692
693    Args:
694        dir_base (str):
695        date_run (datetime=None): Si no se informa cogerá la fecha de hoy
696    """
697    log_last_run = os.path.join(dir_base, FILE_RUN_LOG)
698    if not date_run:
699        date_run = datetime.date.today()
700    with open(log_last_run, "w+") as fw:
701        fw.write(date_run.strftime(DATE_RUN_LOG_FRMT))

Graba la fecha de ultima ejecucion de proceso generacion en directorio de repositorio

Arguments:
  • dir_base (str):
  • date_run (datetime=None): Si no se informa cogerá la fecha de hoy
def month_name(num_month, code_alias_locale='es_cu'):
704def month_name(num_month, code_alias_locale="es_cu"):
705    """
706    Retorna numero de mes en el locale espcificado. Por defecto castellano
707
708    Args:
709        num_month (int):
710        code_alias_locale (str='es_es'):
711
712    Returns:
713        str
714    """
715    with different_locale(locale.locale_alias.get(code_alias_locale)):
716        return pretty_text(calendar.month_name[num_month])

Retorna numero de mes en el locale espcificado. Por defecto castellano

Arguments:
  • num_month (int):
  • code_alias_locale (str='es_es'):
Returns:

str

def file_mod_time(path_file):
719def file_mod_time(path_file):
720    """
721    Return datetime from mofification stat timestamp from file
722
723    Args:
724        path_file (str):
725
726    Returns:
727        datetime
728    """
729    f_mod_time = datetime.datetime.fromtimestamp(os.stat(path_file).st_mtime)
730
731    return f_mod_time

Return datetime from mofification stat timestamp from file

Arguments:
  • path_file (str):
Returns:

datetime

def rows_csv(a_path_csv, header=True, sep=';', encoding='utf8'):
734def rows_csv(a_path_csv, header=True, sep=';', encoding="utf8"):
735    """
736    Itera como dicts indexados por valores primera fila (si header=True) o si no como list
737    las filas del CSV pasado por parametro a_path_csv.
738
739    Args:
740        a_path_csv (str):
741        header (bool=True):
742        sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
743        encoding (str="utf8"):
744    Yields:
745        list OR dict
746    """
747    with open(a_path_csv, encoding=encoding) as a_file:
748        csv_rdr = csv.reader(a_file, delimiter=sep if sep else ';')
749        header_row = None
750        for row in csv_rdr:
751            if header and not header_row:
752                header_row = [v.strip().lower() for v in row]
753                continue
754
755            if header_row:
756                vals_row = dict(zip(header_row, row))
757            else:
758                vals_row = row
759
760            if vals_row:
761                yield vals_row

Itera como dicts indexados por valores primera fila (si header=True) o si no como list las filas del CSV pasado por parametro a_path_csv.

Arguments:
  • a_path_csv (str):
  • header (bool=True):
  • sep (str=';'): por defecto cogerá el separador que por defecto usa csv.reader
  • encoding (str="utf8"):
Yields:

list OR dict

def subdirs_path(path):
764def subdirs_path(path):
765    """
766    Itera sobre los subdirectorios del path
767    Args:
768        path:
769
770    Yields:
771        nom_subdir, path_subdir
772    """
773    with os.scandir(path) as it:
774        for entry in it:
775            if entry.is_dir():
776                yield entry.name, entry.path

Itera sobre los subdirectorios del path

Arguments:
  • path:
Yields:

nom_subdir, path_subdir

def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False):
779def tree_subdirs(path_dir_base, relative_dirs_sel=None, last_level_as_list=False):
780    """
781
782    Args:
783        path_dir_base:
784        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
785        last_level_as_list (bool=False):
786
787    Returns:
788        dict
789    """
790    tree = {}
791
792    f_valid_dir = None
793    valid_dirs_sel = set()
794    if relative_dirs_sel:
795        for dir_sel in relative_dirs_sel:
796            path_dir_rel = os.path.join(path_dir_base, dir_sel)
797            if os.path.exists(path_dir_rel):
798                valid_dirs_sel.add(os.path.normpath(os.path.relpath(path_dir_rel, path_dir_base)).lower())
799
800        def valid_dir(dir_path):
801            valid = False
802            rel_path = os.path.relpath(dir_path, path_dir_base).lower()
803            for dir_sel in valid_dirs_sel:
804                if rel_path == dir_sel or os.path.commonpath((rel_path, dir_sel)):
805                    valid = True
806                    break
807
808            return valid
809
810        f_valid_dir = valid_dir
811
812    for dir_name, dir_path in subdirs_path(path_dir_base):
813        if not f_valid_dir or f_valid_dir(dir_path):
814            dir_path_rel = os.path.relpath(dir_path, path_dir_base).lower()
815            dirs_sel_path = [os.path.relpath(dir_sel, dir_path_rel) for dir_sel in valid_dirs_sel
816                             if os.path.commonpath((dir_path_rel, dir_sel))]
817            tree[dir_name] = tree_subdirs(dir_path, dirs_sel_path)
818
819    if tree:
820        if last_level_as_list and not any(tree.values()):
821            tree = [*tree.keys()]
822
823    return tree
Arguments:
  • path_dir_base:
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • last_level_as_list (bool=False):
Returns:

dict

def tree_paths( path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False):
826def tree_paths(path_dir_base, relative_dirs_sel=None, func_filter_path=None, solo_dirs=False):
827    """
828    Retorna diccionario con el arbol de paths disponibles en el path indicado.
829
830    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
831
832    Args:
833        path_dir_base (str):
834        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
835        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
836        solo_dirs (bool=False):
837
838    Returns:
839        dict
840    """
841    paths = dict()
842
843    valid_dirs_sel = set()
844    if relative_dirs_sel:
845        for dir_sel in relative_dirs_sel:
846            path_dir_rel = os.path.join(path_dir_base, dir_sel)
847            if os.path.exists(path_dir_rel):
848                valid_dirs_sel.add(path_dir_rel)
849
850    for dir_path, dir_names, file_names in os.walk(path_dir_base):
851        if valid_dirs_sel and not any(
852                os.path.samefile(dir_path, a_dir_sel) or is_path_child_from(dir_path, a_dir_sel)
853                for a_dir_sel in valid_dirs_sel):
854            continue
855
856        dir_path = os.path.relpath(dir_path, path_dir_base)
857        dir_name = os.path.basename(dir_path)
858
859        if func_filter_path and not func_filter_path(dir_name):
860            continue
861
862        files_selected = {fn: None for fn in file_names
863                          if not func_filter_path or func_filter_path(fn)}
864
865        if files_selected:
866            subdir_paths = paths
867            # En el caso del primer nivel no se guarda name directorio
868            if dir_path != '.':
869                for d in dir_path.split(os.sep):
870                    if d not in subdir_paths:
871                        subdir_paths[d] = dict()
872                    subdir_paths = subdir_paths[d]
873
874            if not solo_dirs:
875                subdir_paths.update(files_selected)
876
877    return paths

Retorna diccionario con el arbol de paths disponibles en el path indicado.

Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)

Arguments:
  • path_dir_base (str):
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
  • solo_dirs (bool=False):
Returns:

dict

def iter_tree_paths(tree_paths, path_base=None):
880def iter_tree_paths(tree_paths, path_base=None):
881    """
882
883    Args:
884        tree_paths (dict):
885        path_base (str=None):
886
887    Yields:
888        path_file
889    """
890    for path, sub_tree in tree_paths.items():
891        if sub_tree and isinstance(sub_tree, dict):
892            for sub_path in iter_tree_paths(sub_tree, path):
893                yield os.path.join(path_base, sub_path) if path_base else sub_path
894        else:
895            yield os.path.join(path_base, path) if path_base else path
Arguments:
  • tree_paths (dict):
  • path_base (str=None):
Yields:

path_file

def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None):
898def iter_paths_dir(path_dir_base, relative_dirs_sel=None, func_filter_path=None):
899    """
900    Itera el arbol de paths disponibles en el path indicado.
901
902    Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)
903
904    Args:
905        path_dir_base (str):
906        relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
907        func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
908
909    Yields:
910        path (str)
911    """
912    for path in iter_tree_paths(tree_paths(path_dir_base, relative_dirs_sel, func_filter_path), path_dir_base):
913        yield path

Itera el arbol de paths disponibles en el path indicado.

Con la función F_VALID (-> bool) se podrà filtrar los paths a retornar (por defecto siempre True)

Arguments:
  • path_dir_base (str):
  • relative_dirs_sel (list=None): lista de paths relativos de directorios que se trataran
  • func_filter_path (func=None): Func que validará si el nom del path és valid o no per retornar
Yields:

path (str)

def is_path_child_from(path, path_parent):
916def is_path_child_from(path, path_parent):
917    """
918    Retorna si path es hijo de path_parent
919
920    Args:
921        path:
922        path_parent:
923
924    Returns:
925        bool
926    """
927    p_path = Path(path)
928    p_path_parent = Path(path_parent)
929
930    return any(p.samefile(p_path_parent) for p in p_path.parents)

Retorna si path es hijo de path_parent

Arguments:
  • path:
  • path_parent:
Returns:

bool

def machine_name():
933def machine_name():
934    """
935    Retorna el nombre de la maquina
936
937    Returns:
938        str
939    """
940    # TODO - Get host from docker machine when we are in a container
941    # TODO - import docker
942    # TODO -
943    # TODO - client = docker.from_env()
944    # TODO - container_info = client.containers.get(socket.gethostname())
945    # TODO - docker_host_ip = container_info.attrs['NetworkSettings']['IPAddress']
946    # TODO - print(docker_host_ip)
947
948    return socket.getfqdn().upper()

Retorna el nombre de la maquina

Returns:

str

def machine_apb():
951def machine_apb():
952    """
953    Retorna el nombre de la maquina
954
955    Returns:
956        bool
957    """
958    return socket.getfqdn().lower().endswith('.apb.es')

Retorna el nombre de la maquina

Returns:

bool