apb_extra_utils.xml_xpath_parser

  1#  coding=utf-8
  2#
  3#  Author: Ernesto Arredondo Martinez (ernestone@gmail.com)
  4#  Created: 7/6/19 18:23
  5#  Last modified: 7/6/19 18:21
  6#  Copyright (c) 2019
  7
  8# Functions to convert a SQL query to XPATH query
  9
 10from pyparsing import CaselessLiteral, Word, delimitedList, Optional, \
 11    Combine, Group, alphas, nums, alphanums, ParseException, Forward, oneOf, quotedString, \
 12    ZeroOrMore, Keyword
 13
 14# Variables fijas globales
 15and_ = Keyword("and", caseless=True)
 16or_ = Keyword("or", caseless=True)
 17in_ = Keyword("in", caseless=True)
 18not_ = Keyword("not", caseless=True)
 19like_ = Keyword("like", caseless=True)
 20
 21
 22def get_parts_sql_filter(sql_query_text):
 23    """
 24
 25    Args:
 26        sql_query_text:
 27
 28    Returns:
 29
 30    """
 31    ident = Word(alphas, alphanums + "_$").setName("identifier")
 32    columnName = delimitedList(ident, ".", combine=True)
 33    whereExpression = Forward()
 34
 35    E = CaselessLiteral("E")
 36    binop = oneOf("= != < > >= <= eq ne lt le gt ge like ", caseless=True)
 37    arithSign = Word("+-", exact=1)
 38    realNum = Combine(Optional(arithSign) + (Word(nums) + "." + Optional(Word(nums)) |
 39                                             ("." + Word(nums))) +
 40                      Optional(E + Optional(arithSign) + Word(nums)))
 41    intNum = Combine(Optional(arithSign) + Word(nums) +
 42                     Optional(E + Optional("+") + Word(nums)))
 43
 44    columnRval = realNum | intNum | quotedString | columnName  # need to add support for alg expressions
 45    whereCondition = Group(
 46        (columnName + Optional(not_) + binop + columnRval) |
 47        (columnName + in_ + "(" + delimitedList(columnRval) + ")") |
 48        ("(" + whereExpression + ")")
 49    )
 50    whereExpression << whereCondition + ZeroOrMore((and_ | or_) + whereExpression)
 51
 52    tokens = None
 53    try:
 54        tokens = whereExpression.parseString(sql_query_text)
 55    except ParseException as err:
 56        print(" " * err.loc + "^\n" + err.msg)
 57        print(err)
 58
 59    return tokens.asList()
 60
 61
 62def parse_sql_filter_to_xpath(filtro_sql, xpath_base=""):
 63    """
 64
 65    Args:
 66        filtro_sql:
 67        xpath_base:
 68
 69    Returns:
 70
 71    """
 72    sql_filter_parts = get_parts_sql_filter(filtro_sql)
 73
 74    xpath_parts = convert_sql_parts_to_xpath_parts(sql_filter_parts)
 75
 76    xpath_query = get_xpath_string_from_xpath_parts(xpath_parts)
 77
 78    xpath_str = "boolean(" + xpath_base + "[" + xpath_query + "])"
 79
 80    return xpath_str
 81
 82
 83def get_xpath_string_from_xpath_parts(xpath_parts):
 84    """
 85
 86    Args:
 87        xpath_parts:
 88
 89    Returns:
 90
 91    """
 92    xpath_str = ""
 93    for xpath_part in xpath_parts:
 94        if type(xpath_part) is list:
 95            xpath_str += get_xpath_string_from_xpath_parts(xpath_part)
 96        else:
 97            xpath_str += xpath_part
 98
 99    return xpath_str
100
101
102def encode_for_xml(unicode_data, encoding='ascii'):
103    """
104    Encode unicode_data for use as XML or HTML, with characters outside
105    of the encoding converted to XML numeric character references.
106
107    Args:
108        unicode_data:
109        encoding:
110
111    Returns:
112    """
113    try:
114        return unicode_data.encode(encoding, 'xmlcharrefreplace')
115    except ValueError:
116        # ValueError is raised if there are unencodable chars in the
117        # data and the 'xmlcharrefreplace' error handler is not found.
118        # Pre-2.3 Python doesn't support the 'xmlcharrefreplace' error
119        # handler, so we'll emulate it.
120        return _xmlcharref_encode(unicode_data, encoding)
121
122
123def _xmlcharref_encode(unicode_data, encoding):
124    """
125    Emulate Python 2.3's 'xmlcharrefreplace' encoding error handler.
126
127    Args:
128        unicode_data:
129        encoding:
130
131    Returns:
132    """
133    chars = []
134    # Step through the unicode_data string one character at a time in
135    # order to catch unencodable characters:
136    for char in unicode_data:
137        try:
138            chars.append(char.encode(encoding, 'strict'))
139        except UnicodeError:
140            chars.append('&#%i;' % ord(char))
141    return ''.join(chars)
142
143
144def convert_sql_parts_to_xpath_parts(sql_filter_parts):
145    """
146
147    Args:
148        sql_filter_parts:
149
150    Returns:
151
152    """
153    xpath_parts = []
154    sufix_xpath_parts = []
155    if "not" in sql_filter_parts:
156        xpath_parts.append("not(")
157        sufix_xpath_parts.append(")")
158
159    for sql_elem in sql_filter_parts:
160        if type(sql_elem) is list:
161            xpath_parts.append(convert_sql_parts_to_xpath_parts(sql_elem))
162        elif sql_elem != not_:
163            if sql_elem == like_:
164                # Lo convertimos en la funcion 'contains()'
165                xpath_parts.append("[contains(text(),")
166                sufix_xpath_parts.insert(0, ")]")
167            elif sql_elem == and_ or sql_elem == or_:
168                xpath_parts.append(" " + sql_elem + " ")
169            else:
170                val_elem = sql_elem
171                # Si el sql_elem es un string entonces se convierte a texto XML en ascii
172                try:
173                    if type(eval(sql_elem)) is str:
174                        val_elem = str(sql_elem, "utf-8")
175                except:
176                    val_elem = str(sql_elem)
177
178                xpath_parts.append(val_elem)
179
180    xpath_parts += sufix_xpath_parts
181
182    return xpath_parts
183
184
185if __name__ == '__main__':
186    import fire
187
188    fire.Fire()
and_ = "and"
or_ = "or"
in_ = "in"
not_ = "not"
like_ = "like"
def get_parts_sql_filter(sql_query_text):
23def get_parts_sql_filter(sql_query_text):
24    """
25
26    Args:
27        sql_query_text:
28
29    Returns:
30
31    """
32    ident = Word(alphas, alphanums + "_$").setName("identifier")
33    columnName = delimitedList(ident, ".", combine=True)
34    whereExpression = Forward()
35
36    E = CaselessLiteral("E")
37    binop = oneOf("= != < > >= <= eq ne lt le gt ge like ", caseless=True)
38    arithSign = Word("+-", exact=1)
39    realNum = Combine(Optional(arithSign) + (Word(nums) + "." + Optional(Word(nums)) |
40                                             ("." + Word(nums))) +
41                      Optional(E + Optional(arithSign) + Word(nums)))
42    intNum = Combine(Optional(arithSign) + Word(nums) +
43                     Optional(E + Optional("+") + Word(nums)))
44
45    columnRval = realNum | intNum | quotedString | columnName  # need to add support for alg expressions
46    whereCondition = Group(
47        (columnName + Optional(not_) + binop + columnRval) |
48        (columnName + in_ + "(" + delimitedList(columnRval) + ")") |
49        ("(" + whereExpression + ")")
50    )
51    whereExpression << whereCondition + ZeroOrMore((and_ | or_) + whereExpression)
52
53    tokens = None
54    try:
55        tokens = whereExpression.parseString(sql_query_text)
56    except ParseException as err:
57        print(" " * err.loc + "^\n" + err.msg)
58        print(err)
59
60    return tokens.asList()
Arguments:
  • sql_query_text:

Returns:

def parse_sql_filter_to_xpath(filtro_sql, xpath_base=''):
63def parse_sql_filter_to_xpath(filtro_sql, xpath_base=""):
64    """
65
66    Args:
67        filtro_sql:
68        xpath_base:
69
70    Returns:
71
72    """
73    sql_filter_parts = get_parts_sql_filter(filtro_sql)
74
75    xpath_parts = convert_sql_parts_to_xpath_parts(sql_filter_parts)
76
77    xpath_query = get_xpath_string_from_xpath_parts(xpath_parts)
78
79    xpath_str = "boolean(" + xpath_base + "[" + xpath_query + "])"
80
81    return xpath_str
Arguments:
  • filtro_sql:
  • xpath_base:

Returns:

def get_xpath_string_from_xpath_parts(xpath_parts):
 84def get_xpath_string_from_xpath_parts(xpath_parts):
 85    """
 86
 87    Args:
 88        xpath_parts:
 89
 90    Returns:
 91
 92    """
 93    xpath_str = ""
 94    for xpath_part in xpath_parts:
 95        if type(xpath_part) is list:
 96            xpath_str += get_xpath_string_from_xpath_parts(xpath_part)
 97        else:
 98            xpath_str += xpath_part
 99
100    return xpath_str
Arguments:
  • xpath_parts:

Returns:

def encode_for_xml(unicode_data, encoding='ascii'):
103def encode_for_xml(unicode_data, encoding='ascii'):
104    """
105    Encode unicode_data for use as XML or HTML, with characters outside
106    of the encoding converted to XML numeric character references.
107
108    Args:
109        unicode_data:
110        encoding:
111
112    Returns:
113    """
114    try:
115        return unicode_data.encode(encoding, 'xmlcharrefreplace')
116    except ValueError:
117        # ValueError is raised if there are unencodable chars in the
118        # data and the 'xmlcharrefreplace' error handler is not found.
119        # Pre-2.3 Python doesn't support the 'xmlcharrefreplace' error
120        # handler, so we'll emulate it.
121        return _xmlcharref_encode(unicode_data, encoding)

Encode unicode_data for use as XML or HTML, with characters outside of the encoding converted to XML numeric character references.

Arguments:
  • unicode_data:
  • encoding:

Returns:

def convert_sql_parts_to_xpath_parts(sql_filter_parts):
145def convert_sql_parts_to_xpath_parts(sql_filter_parts):
146    """
147
148    Args:
149        sql_filter_parts:
150
151    Returns:
152
153    """
154    xpath_parts = []
155    sufix_xpath_parts = []
156    if "not" in sql_filter_parts:
157        xpath_parts.append("not(")
158        sufix_xpath_parts.append(")")
159
160    for sql_elem in sql_filter_parts:
161        if type(sql_elem) is list:
162            xpath_parts.append(convert_sql_parts_to_xpath_parts(sql_elem))
163        elif sql_elem != not_:
164            if sql_elem == like_:
165                # Lo convertimos en la funcion 'contains()'
166                xpath_parts.append("[contains(text(),")
167                sufix_xpath_parts.insert(0, ")]")
168            elif sql_elem == and_ or sql_elem == or_:
169                xpath_parts.append(" " + sql_elem + " ")
170            else:
171                val_elem = sql_elem
172                # Si el sql_elem es un string entonces se convierte a texto XML en ascii
173                try:
174                    if type(eval(sql_elem)) is str:
175                        val_elem = str(sql_elem, "utf-8")
176                except:
177                    val_elem = str(sql_elem)
178
179                xpath_parts.append(val_elem)
180
181    xpath_parts += sufix_xpath_parts
182
183    return xpath_parts
Arguments:
  • sql_filter_parts:

Returns: