Source code for better_json_tools.json_walker

'''
A module that provides tools for easy access to JSON data using JSON paths.
'''
from __future__ import annotations
import json
import re
from typing import Union, Type, Optional, IO, Callable, Iterator, Any

class SKIP_LIST:
    '''Used as literal value for JSONSplitWalker paths'''

# def _remove_escape_characters(text: str) -> str:
#     '''Prints to a string, removint the escape characters'''
#     with io.StringIO() as output:
#         print(text, file=output, end='')
#         contents = output.getvalue()
#     return contents

def _tuple_to_path_str(path: tuple[Union[str, int], ...]):
    result: list[str] = []
    for k in path:
        if isinstance(k, int):
            result.append(f'[{k}]')
        elif isinstance(k, str): # pyright: ignore[reportUnnecessaryIsInstance]
            if re.fullmatch("[a-zA-Z$_]+[a-zA-Z$_0-9]*", k):
                # Mathes JS variable name (like connect like a.b.c)
                if len(result) == 0:  # First item skip the dot
                    result.append(k)
                else: # Add dot before
                    result.append(f".{k}")
            else:
                # Does not match JS variable name (like connect like
                # ["a"]["b"]["c"])
                k = json.dumps(k)  # escape special characters and add quotes
                result.append(f'[{k}]')
        else:
            raise TypeError(f"Invalid key type {type(k)}")
    return "".join(result)

[docs]class JSONPath: ''' Represents a path in a JSON file. The paths internally use a tuple listing the keys, but they can be represented or created from a string. The string representation is similar to the one used in JavaScript. The path objects can be used to access the data of :class:`JSONWalker`. Example: >>> from better_json_tools import JSONPath >>> path = JSONPath(("a", "$abc", 1, 2, 'with quote "')) >>> print(path.data) ... ('a', '$abc', 1, 2, 'with quote "') >>> print(path) ... a.$abc[1][2]["with quote \""] >>> another_path = JSONPath(str(path)) >>> print(another_path) ... a.$abc[1][2]["with quote \""] >>> print(another_path.data == path.data) ... True ''' def __init__(self, path: Union[str, tuple[Union[str, int], ...]]): if isinstance(path, str): self.data = JSONPath._from_path_str(path) else: self.data = path def __str__(self) -> str: '''Returns a string representation of the path.''' return _tuple_to_path_str(self.data) @staticmethod def _from_path_str(path_str: str) -> tuple[Union[str, int], ...]: '''Converts a path string to a JSONPath.''' if path_str == "": return tuple() # Results path: list[int | str] = [] curr_path = path_str # Add . to the start of the string to make the patterm matching work # better if not curr_path.startswith("["): curr_path = "." + curr_path while True: if curr_path == "": break if curr_path.startswith("."): # Match a.b.c match = re.match(r"\.([a-zA-Z$_]+[a-zA-Z$_0-9]*)", curr_path) if match is None: raise ValueError(f"Invalid path: {path_str}") path.append(match.group(1)) curr_path = curr_path[match.end():] elif curr_path.startswith("["): if len(curr_path) < 3: # shortest possible path is like [0] raise ValueError(f"Invalid path: {path_str}") if curr_path[1] == '"': # Match ["a"]["b"]["c"] match = re.match(r'\[("(?:[^"]|\\")*")\]', curr_path) if match is None: raise ValueError(f"Invalid path: {path_str}") path.append(json.loads(match.group(1))) curr_path = curr_path[match.end():] else: # Match [0][1][2] match = re.match(r"\[([0-9]+)\]", curr_path) if match is None: raise ValueError(f"Invalid path: {path_str}") path.append(int(match.group(1))) curr_path = curr_path[match.end():] else: raise ValueError(f"Invalid path: {path_str}") return tuple(path)
## Type definitions JSON = Union[dict[str, Any], list[Any], str, float, int, bool, None] JSON_KEY = Union[str, int] JSON_PATH_KEY = Union[str, int, JSONPath] JSON_SPLIT_KEY = Union[str, Type[int], Type[str], None, Type[SKIP_LIST]] JSON_WALKER_DATA = Union[dict[str, Any], list[Any], str, float, int, bool, None, Exception]
[docs]class JSONWalker: ''' A class that represents a path in the JSON file for easy access to its values. ''' def __init__( self, data: JSON_WALKER_DATA, *, parent: Optional[JSONWalker] = None, parent_key: Optional[JSON_KEY] = None): if not isinstance( data, (Exception, dict, list, str, float, int, bool, type(None))): raise ValueError('Input data is not JSON.') self._data: JSON_WALKER_DATA = data self._parent = parent self._parent_key = parent_key @property def parent(self) -> JSONWalker: ''' The parent of this json walker (the walker that created this walker). :rises: :class:`KeyError` when this :class:`JSONWalker` is a root object. ''' if self._parent is None: raise KeyError("You can't get parent of the root object.") return self._parent @property def parent_key(self) -> JSON_KEY: ''' The key used to access this walker from its parent :rises: :class:`KeyError` when this :class:`JSONWalker` is a root object ''' if self._parent_key is None: raise KeyError("You can't get parent of the root object.") return self._parent_key
[docs] @staticmethod def loads(json_text: Union[str, bytes], **kwargs: Any) -> JSONWalker: ''' Creates json walker using `json.loads()` function. Passes all arguments to `json.loads` and tries to creat the walker base on the result. ''' data = json.loads(json_text, **kwargs) return JSONWalker(data)
[docs] @staticmethod def load(json_file: IO[Any], **kwargs: Any) -> JSONWalker: ''' Creates json walker using `json.load()` function. Passes all arguments to `json.load` and tries to creat the walker base on the result. ''' data = json.load(json_file, **kwargs) return JSONWalker(data)
@property def data(self) -> JSON_WALKER_DATA: ''' The data from JSON that this walker points to. ''' return self._data @data.setter def data(self, value: JSON): if self._parent is not None: self.parent.data[ # type: ignore self.parent_key # type: ignore ] = value self._data = value
[docs] def create_path( self, data: JSON, *, exists_ok: bool = True, can_break_data_structure: bool = True, can_create_empty_list_items: bool = True, empty_list_item_factory: Optional[Callable[[], JSON]] = None): ''' Creates path to the part of JSON file pointed by this walker. :param data: the data to put at the end of the path. :param exists_ok: if False, the ValueError will be risen if the path to this item already exists. :param can_break_data_structure: if True than the function will be able to replace certain existing paths with dicts or lists. Example - if path "a"/"b"/"c" points at integer, creating path "a"/"b"/"c"/"d" will replace this integer with a dict in order to make "d" a valid key. Setting this to false would cause a KeyError in this situation. :param can_create_empty_list_items: enables filling up the lists in JSON with values produced by the empty_list_item_factory in order to match the item index specified in the path. Example - if you specify a path to create "a"/5/"c" but the list at "a" path only has 2 items, then the function will create additional item so the 5th index can be valid. :param empty_list_item_factory: a function used to create items for lists in order to make indices specified in the path valid (see can_create_empty_list_items function parameter). If this value is left as None than the lists will be filled with null values. ''' if self.exists: if exists_ok: return raise ValueError("Path already exists") if empty_list_item_factory is None: empty_list_item_factory = lambda: None curr_item = self.root path = self.path for key in path: if isinstance(key, str): # key is a string data must be a dict if not isinstance(curr_item.data, dict): if not can_break_data_structure: raise KeyError(key) curr_item.data = {} if key not in curr_item.data: can_break_data_structure = True # Creating new data curr_item = curr_item / key elif isinstance(key, int): # pyright: ignore[reportUnnecessaryIsInstance] # key is an int data must be a list if key < 0: raise KeyError(key) if not isinstance(curr_item.data, list): if not can_break_data_structure: raise KeyError(key) curr_item.data = [] if len(curr_item.data)-1 < key: if not can_create_empty_list_items: raise KeyError(key) curr_item.data += [ empty_list_item_factory() for _ in range(1+key-len(curr_item.data)) ] can_break_data_structure = True # Creating new data curr_item = curr_item / key else: raise KeyError(key) self._parent = curr_item.parent self._parent_key = curr_item.parent_key self.data = data
@property def exists(self) -> bool: ''' Returns true if path to this item already exists. This function recursively checks the entire path to this item starting from root so even if the object is detached from the root somewhere in the middle of the path, the function will still return correct value. ''' keys: list[JSON_KEY] = [] root = self try: while True: keys.append(root.parent_key) root = root.parent except KeyError: pass keys = list(reversed(keys)) root_data = root.data try: for key in keys: root_data = root_data[key] # type: ignore except: # pylint: disable=bare-except return False return True @property def root(self) -> JSONWalker: ''' The root object of this JSON file. ''' root = self try: while True: root = root.parent except KeyError: pass return root @property def path(self) -> tuple[JSON_KEY, ...]: ''' Full JSON path up to this point starting from the root of the JSON file in from of a tuple of keys. ''' result: list[JSON_KEY] = [] parent = self try: while True: result.append(parent.parent_key) parent = parent.parent except KeyError: pass return tuple(reversed(result)) @property def path_str(self) -> str: ''' Full JSON path up to this point starting from the root of the JSON file in form of a string. ''' return _tuple_to_path_str(self.path)
[docs] def __truediv__(self, key: JSON_PATH_KEY) -> JSONWalker: ''' The `/` operator creates descendant path in the JSON file. ''' if isinstance(key, JSONPath): walker = self for k in key.data: walker = walker / k return walker try: return JSONWalker( self.data[key], # type: ignore parent=self, parent_key=key) except Exception as e: # pylint: disable=broad-except return JSONWalker(e, parent=self, parent_key=key)
[docs] def __floordiv__(self, key: JSON_SPLIT_KEY) -> JSONSplitWalker: ''' The `//` operator creates JSONSplitWalker object with multiple alternative paths that matched provided key. :raises: :class:`TypeError` - invalid input data type :class:`re.error` - invlid regular expression. ''' # pylint: disable=too-many-return-statements # ANYTHING if key is None: if isinstance(self.data, dict): return JSONSplitWalker([ JSONWalker(v, parent=self, parent_key=k) for k, v in self.data.items() ]) if isinstance(self.data, list): return JSONSplitWalker([ JSONWalker(v, parent=self, parent_key=i) for i, v in enumerate(self.data) ]) # ANY LIST ITEM elif key is int: if isinstance(self.data, list): return JSONSplitWalker([ JSONWalker(v, parent=self, parent_key=i) for i, v in enumerate(self.data) ]) # ANY DICT ITEM elif key is str: if isinstance(self.data, dict): return JSONSplitWalker([ JSONWalker(v, parent=self, parent_key=k) for k, v in self.data.items() ]) # REGEX DICT ITEM elif isinstance(key, str): if isinstance(self.data, dict): result: list[JSONWalker] = [] for k, v in self.data.items(): if re.fullmatch(key, k): result.append(JSONWalker( v, parent=self, parent_key=k)) return JSONSplitWalker(result) # IF it's a list use ing key ELSE return split walker with self elif key is SKIP_LIST: if isinstance(self.data, list): return self // int return JSONSplitWalker([self]) else: # INVALID KEY TYPE raise TypeError( 'Key must be a regular expression or one of the values: ' 'str, int, or None') # DATA DOESN'T ACCEPT THIS TYPE OF KEY return JSONSplitWalker([])
[docs] def __add__(self, other: Union[JSONSplitWalker, JSONWalker]) -> JSONSplitWalker: ''' The `+` operator adds json walkers creating a split walker with more values. ''' if isinstance(other, JSONWalker): data = [self, other] else: data = other.data + [self] return JSONSplitWalker( [i for i in data if not isinstance(i.data, Exception)])
[docs]class JSONSplitWalker: ''' Multiple walker objects grouped together. This class can be browse JSON file contents from multiple JSON paths at once. ''' def __init__(self, data: list[JSONWalker]) -> None: self._data: list[JSONWalker] = data @property def data(self) -> list[JSONWalker]: ''' The list of the :class:`JSONWalker` objects contained in this object. ''' return self._data
[docs] def __truediv__(self, key: JSON_PATH_KEY) -> JSONSplitWalker: ''' Applies `/` operator to all of the :class:`JSONWalkers` in this split walker. ''' result: list[JSONWalker] = [] for walker in self.data: new_walker = walker / key if not isinstance(new_walker.data, Exception): result.append(new_walker) return JSONSplitWalker(result)
[docs] def __floordiv__(self, key: JSON_SPLIT_KEY) -> JSONSplitWalker: ''' Applies `//` operator to all of the :class:`JSONWalkers` in this split walker, creating even more split walkers (all groupped together in one object). ''' result: list[JSONWalker] = [] for walker in self.data: new_walker = walker // key result.extend(new_walker.data) return JSONSplitWalker(result)
[docs] def __add__(self, other: Union[JSONSplitWalker, JSONWalker]) -> JSONSplitWalker: ''' The `+` operator adds json walkers creating a split walker with more values. ''' if isinstance(other, JSONWalker): data = self.data + [other] else: data = self.data + other.data return JSONSplitWalker( [i for i in data if not isinstance(i.data, Exception)])
[docs] def __iter__(self) -> Iterator[JSONWalker]: ''' Yield every walker contained in this object. ''' for i in self.data: yield i
def __len__(self) -> int: ''' Return the number of walkers contained in this object. ''' return len(self.data)