apb_extra_utils.xml_xpath_parser
1# coding=utf-8 2# 3# Author: Ernesto Arredondo Martinez (ernestone@gmail.com) 4# Created: 7/6/19 18:23 5# Last modified: 7/6/19 18:21 6# Copyright (c) 2019 7 8# Functions to convert a SQL query to XPATH query 9 10from pyparsing import CaselessLiteral, Word, delimitedList, Optional, \ 11 Combine, Group, alphas, nums, alphanums, ParseException, Forward, oneOf, quotedString, \ 12 ZeroOrMore, Keyword 13 14# Variables fijas globales 15and_ = Keyword("and", caseless=True) 16or_ = Keyword("or", caseless=True) 17in_ = Keyword("in", caseless=True) 18not_ = Keyword("not", caseless=True) 19like_ = Keyword("like", caseless=True) 20 21 22def get_parts_sql_filter(sql_query_text): 23 """ 24 25 Args: 26 sql_query_text: 27 28 Returns: 29 30 """ 31 ident = Word(alphas, alphanums + "_$").setName("identifier") 32 columnName = delimitedList(ident, ".", combine=True) 33 whereExpression = Forward() 34 35 E = CaselessLiteral("E") 36 binop = oneOf("= != < > >= <= eq ne lt le gt ge like ", caseless=True) 37 arithSign = Word("+-", exact=1) 38 realNum = Combine(Optional(arithSign) + (Word(nums) + "." + Optional(Word(nums)) | 39 ("." + Word(nums))) + 40 Optional(E + Optional(arithSign) + Word(nums))) 41 intNum = Combine(Optional(arithSign) + Word(nums) + 42 Optional(E + Optional("+") + Word(nums))) 43 44 columnRval = realNum | intNum | quotedString | columnName # need to add support for alg expressions 45 whereCondition = Group( 46 (columnName + Optional(not_) + binop + columnRval) | 47 (columnName + in_ + "(" + delimitedList(columnRval) + ")") | 48 ("(" + whereExpression + ")") 49 ) 50 whereExpression << whereCondition + ZeroOrMore((and_ | or_) + whereExpression) 51 52 tokens = None 53 try: 54 tokens = whereExpression.parseString(sql_query_text) 55 except ParseException as err: 56 print(" " * err.loc + "^\n" + err.msg) 57 print(err) 58 59 return tokens.asList() 60 61 62def parse_sql_filter_to_xpath(filtro_sql, xpath_base=""): 63 """ 64 65 Args: 66 filtro_sql: 67 xpath_base: 68 69 Returns: 70 71 """ 72 sql_filter_parts = get_parts_sql_filter(filtro_sql) 73 74 xpath_parts = convert_sql_parts_to_xpath_parts(sql_filter_parts) 75 76 xpath_query = get_xpath_string_from_xpath_parts(xpath_parts) 77 78 xpath_str = "boolean(" + xpath_base + "[" + xpath_query + "])" 79 80 return xpath_str 81 82 83def get_xpath_string_from_xpath_parts(xpath_parts): 84 """ 85 86 Args: 87 xpath_parts: 88 89 Returns: 90 91 """ 92 xpath_str = "" 93 for xpath_part in xpath_parts: 94 if type(xpath_part) is list: 95 xpath_str += get_xpath_string_from_xpath_parts(xpath_part) 96 else: 97 xpath_str += xpath_part 98 99 return xpath_str 100 101 102def encode_for_xml(unicode_data, encoding='ascii'): 103 """ 104 Encode unicode_data for use as XML or HTML, with characters outside 105 of the encoding converted to XML numeric character references. 106 107 Args: 108 unicode_data: 109 encoding: 110 111 Returns: 112 """ 113 try: 114 return unicode_data.encode(encoding, 'xmlcharrefreplace') 115 except ValueError: 116 # ValueError is raised if there are unencodable chars in the 117 # data and the 'xmlcharrefreplace' error handler is not found. 118 # Pre-2.3 Python doesn't support the 'xmlcharrefreplace' error 119 # handler, so we'll emulate it. 120 return _xmlcharref_encode(unicode_data, encoding) 121 122 123def _xmlcharref_encode(unicode_data, encoding): 124 """ 125 Emulate Python 2.3's 'xmlcharrefreplace' encoding error handler. 126 127 Args: 128 unicode_data: 129 encoding: 130 131 Returns: 132 """ 133 chars = [] 134 # Step through the unicode_data string one character at a time in 135 # order to catch unencodable characters: 136 for char in unicode_data: 137 try: 138 chars.append(char.encode(encoding, 'strict')) 139 except UnicodeError: 140 chars.append('&#%i;' % ord(char)) 141 return ''.join(chars) 142 143 144def convert_sql_parts_to_xpath_parts(sql_filter_parts): 145 """ 146 147 Args: 148 sql_filter_parts: 149 150 Returns: 151 152 """ 153 xpath_parts = [] 154 sufix_xpath_parts = [] 155 if "not" in sql_filter_parts: 156 xpath_parts.append("not(") 157 sufix_xpath_parts.append(")") 158 159 for sql_elem in sql_filter_parts: 160 if type(sql_elem) is list: 161 xpath_parts.append(convert_sql_parts_to_xpath_parts(sql_elem)) 162 elif sql_elem != not_: 163 if sql_elem == like_: 164 # Lo convertimos en la funcion 'contains()' 165 xpath_parts.append("[contains(text(),") 166 sufix_xpath_parts.insert(0, ")]") 167 elif sql_elem == and_ or sql_elem == or_: 168 xpath_parts.append(" " + sql_elem + " ") 169 else: 170 val_elem = sql_elem 171 # Si el sql_elem es un string entonces se convierte a texto XML en ascii 172 try: 173 if type(eval(sql_elem)) is str: 174 val_elem = str(sql_elem, "utf-8") 175 except: 176 val_elem = str(sql_elem) 177 178 xpath_parts.append(val_elem) 179 180 xpath_parts += sufix_xpath_parts 181 182 return xpath_parts 183 184 185if __name__ == '__main__': 186 import fire 187 188 fire.Fire()
and_ =
"and"
or_ =
"or"
in_ =
"in"
not_ =
"not"
like_ =
"like"
def
get_parts_sql_filter(sql_query_text):
23def get_parts_sql_filter(sql_query_text): 24 """ 25 26 Args: 27 sql_query_text: 28 29 Returns: 30 31 """ 32 ident = Word(alphas, alphanums + "_$").setName("identifier") 33 columnName = delimitedList(ident, ".", combine=True) 34 whereExpression = Forward() 35 36 E = CaselessLiteral("E") 37 binop = oneOf("= != < > >= <= eq ne lt le gt ge like ", caseless=True) 38 arithSign = Word("+-", exact=1) 39 realNum = Combine(Optional(arithSign) + (Word(nums) + "." + Optional(Word(nums)) | 40 ("." + Word(nums))) + 41 Optional(E + Optional(arithSign) + Word(nums))) 42 intNum = Combine(Optional(arithSign) + Word(nums) + 43 Optional(E + Optional("+") + Word(nums))) 44 45 columnRval = realNum | intNum | quotedString | columnName # need to add support for alg expressions 46 whereCondition = Group( 47 (columnName + Optional(not_) + binop + columnRval) | 48 (columnName + in_ + "(" + delimitedList(columnRval) + ")") | 49 ("(" + whereExpression + ")") 50 ) 51 whereExpression << whereCondition + ZeroOrMore((and_ | or_) + whereExpression) 52 53 tokens = None 54 try: 55 tokens = whereExpression.parseString(sql_query_text) 56 except ParseException as err: 57 print(" " * err.loc + "^\n" + err.msg) 58 print(err) 59 60 return tokens.asList()
Arguments:
- sql_query_text:
Returns:
def
parse_sql_filter_to_xpath(filtro_sql, xpath_base=''):
63def parse_sql_filter_to_xpath(filtro_sql, xpath_base=""): 64 """ 65 66 Args: 67 filtro_sql: 68 xpath_base: 69 70 Returns: 71 72 """ 73 sql_filter_parts = get_parts_sql_filter(filtro_sql) 74 75 xpath_parts = convert_sql_parts_to_xpath_parts(sql_filter_parts) 76 77 xpath_query = get_xpath_string_from_xpath_parts(xpath_parts) 78 79 xpath_str = "boolean(" + xpath_base + "[" + xpath_query + "])" 80 81 return xpath_str
Arguments:
- filtro_sql:
- xpath_base:
Returns:
def
get_xpath_string_from_xpath_parts(xpath_parts):
84def get_xpath_string_from_xpath_parts(xpath_parts): 85 """ 86 87 Args: 88 xpath_parts: 89 90 Returns: 91 92 """ 93 xpath_str = "" 94 for xpath_part in xpath_parts: 95 if type(xpath_part) is list: 96 xpath_str += get_xpath_string_from_xpath_parts(xpath_part) 97 else: 98 xpath_str += xpath_part 99 100 return xpath_str
Arguments:
- xpath_parts:
Returns:
def
encode_for_xml(unicode_data, encoding='ascii'):
103def encode_for_xml(unicode_data, encoding='ascii'): 104 """ 105 Encode unicode_data for use as XML or HTML, with characters outside 106 of the encoding converted to XML numeric character references. 107 108 Args: 109 unicode_data: 110 encoding: 111 112 Returns: 113 """ 114 try: 115 return unicode_data.encode(encoding, 'xmlcharrefreplace') 116 except ValueError: 117 # ValueError is raised if there are unencodable chars in the 118 # data and the 'xmlcharrefreplace' error handler is not found. 119 # Pre-2.3 Python doesn't support the 'xmlcharrefreplace' error 120 # handler, so we'll emulate it. 121 return _xmlcharref_encode(unicode_data, encoding)
Encode unicode_data for use as XML or HTML, with characters outside of the encoding converted to XML numeric character references.
Arguments:
- unicode_data:
- encoding:
Returns:
def
convert_sql_parts_to_xpath_parts(sql_filter_parts):
145def convert_sql_parts_to_xpath_parts(sql_filter_parts): 146 """ 147 148 Args: 149 sql_filter_parts: 150 151 Returns: 152 153 """ 154 xpath_parts = [] 155 sufix_xpath_parts = [] 156 if "not" in sql_filter_parts: 157 xpath_parts.append("not(") 158 sufix_xpath_parts.append(")") 159 160 for sql_elem in sql_filter_parts: 161 if type(sql_elem) is list: 162 xpath_parts.append(convert_sql_parts_to_xpath_parts(sql_elem)) 163 elif sql_elem != not_: 164 if sql_elem == like_: 165 # Lo convertimos en la funcion 'contains()' 166 xpath_parts.append("[contains(text(),") 167 sufix_xpath_parts.insert(0, ")]") 168 elif sql_elem == and_ or sql_elem == or_: 169 xpath_parts.append(" " + sql_elem + " ") 170 else: 171 val_elem = sql_elem 172 # Si el sql_elem es un string entonces se convierte a texto XML en ascii 173 try: 174 if type(eval(sql_elem)) is str: 175 val_elem = str(sql_elem, "utf-8") 176 except: 177 val_elem = str(sql_elem) 178 179 xpath_parts.append(val_elem) 180 181 xpath_parts += sufix_xpath_parts 182 183 return xpath_parts
Arguments:
- sql_filter_parts:
Returns: