/botly/db.py (afef9d710524530ad5d7137a9eeeb28794b61a69) (6640 bytes) (mode 100644) (type blob)

#!/usr/bin/env python
"""Database class that loads the XML db, shared with daugther db classes."""

from lxml import etree


class Db:
    
    dbxml = False
    dbpath = ""
    dbmodified = False

    def __init__(self):
        assert False, "No instance of this class should be created."

    @classmethod
    def load(cls, filePath):
        """Loads the database XML file at given path."""
        cls.dbxml = etree.parse(filePath)
        cls.dbpath = filePath

    @classmethod
    def save(cls, filePath=''):
        """Saves the database on the filesystem."""
        if not cls.dbmodified:
            return False
        if not len(filePath):
            filePath = cls.dbpath
        cls.dbxml.write(filePath, encoding="utf-8", xml_declaration=True)
        cls.dbmodified = False
        return True

    @classmethod
    def was_modified(cls):
        """Returns whether or not the DB was modified an require a save."""
        return cls.dbmodified

    def get_node(self, path):
        """Returns the node at the given absolute path"""
        return Db.dbxml.xpath(path)[0]
    
    def is_loaded(self):
        """Returns whether or not the database was loaded."""
        return Db.dbxml != False

    def store_value(self, valueName, value, parentXpath = ''):
        """Stores a single value in the database."""
        self._pre_check()

        # If we provided a parent path, we use it, otherwise, use root
        if len(parentXpath):
            parent = self.root.xpath(parentXpath)
            if len(parent):
                parent = parent[0]
            else:
                return None
        else:
            parent = self.root

        # If value exists in this parent, remove it. Then we create it
        elem = parent.find(valueName)
        if elem is not None:
            parent.remove(elem)
        elem = etree.Element(valueName)
        elem.text = value
        parent.append(elem)

        self._set_modified(True)
        return True

    def query_value(self, valueName, parentPath = None):
        """Loads a single value from the given parent."""
        self._pre_check()
        
        # Define parent: Either the root elem or given parent path
        parent = None
        if parentPath is None:
            parent = self.root
        else:
            parent = self.root.xpath(parentXpath)
            if len(parent):
                parent = parent[0]
            else:
                return None
            
        # Now query the value amoung the parent' subelements
        if parent is not None:
            valueElem = parent.find(valueName)
            if valueElem is not None and valueElem.text is not None:
                return valueElem.text
        return None

    def store_object(self, parentXpath, obj):
        """Store a single object in the db."""
        self._pre_check()

        parent = self.root.xpath(parentXpath)[0]
        if parent is not None:
            # Is the object already in the db? If so, remove it
            for child in parent.iterchildren():
                if child.get('Id') == obj['Id']:
                    parent.remove(child)
                    break
            # Now save the object
            parent.append(self._obj_to_xml(obj))
            self._set_modified(True)
            return True
        return False

    def query_object(self, parentXpath, objectId):
        """Loads a single object identified by its Id. None if not found."""
        self._pre_check()
        
        parent = self.root.xpath(parentXpath)[0]
        if parent is not None:
            for child1 in parent.iterchildren():
                if child1.get('Id') != objectId:
                    continue
                return self._load_object_values(child1)
        return None

    def query_object_list(self, parentXpath):
        """Loads a list of objects, represented by a dict with its values"""
        self._pre_check()

        parent = self.root.xpath(parentXpath)[0]
        if parent is not None:
            objects = []
            for child1 in parent.iterchildren():
                objects.append(self._load_object_values(child1))
            return objects
        return None

    def _set_modified(self, modified):
        Db.dbmodified = modified

    def _obj_to_xml(self, obj):
        """Create an xml element out of an object (dictionary)"""
        elem = etree.Element(obj['Type'])
        for key, value in obj.items():
            if key == 'Id':
                elem.set(key, value)
            elif key == 'Type':
                # We already used that value to name the element's tag
                continue
            elif isinstance(value, list):
                # We found a list so loop through every value and create a 
                # child list elem. Its name is based on its parent's
                childElem = etree.Element(key)
                for lvalue in value:
                    listElem = etree.Element(key[:-1])
                    listElem.text = lvalue
                    childElem.append(listElem)
                elem.append(childElem)
            else:
                childElem = etree.Element(key)
                childElem.text = value
                elem.append(childElem)
        return elem

    def _load_object_values(self, xmlobj):
        """Loads the given xml node in a dictionary, considered as an object.
        
        Objects are suspected to have an Id specified as an attribute on their
        root xml element. It will be loaded in the dictionary as 'Id'.

        Elements containing no children are considered as simple values.
        They are loaded in the dict using their tagname and their value.
        
        Elements that contains children are considered a list of value. A list
        is created in the dict under that element's tag name containing the
        value of its children. Those children's tag name is ignored."""

        obj = {}
        obj['Id'] = xmlobj.get('Id')
        obj['Type'] = xmlobj.tag
        for child in xmlobj.iterchildren():
            if len(child) == 0:
                # We have no children, this node is value
                obj[child.tag] = child.text
            else:
                # We have children. We consider this node as a list
                obj[child.tag] = []
                for child2 in child.iterchildren():
                    if child2.text is not None:
                        obj[child.tag].append(child2.text)
        return obj

    def _pre_check(self): 
        assert self.is_loaded(), "The DB is not loaded."
        assert self.root is not None, "Root node not defined."



Mode Type Size Ref File
100644 blob 10175 e454a52586f29b8ce8a6799163eac1f875e9ac01 LICENSE
100644 blob 912 43cc367ee8608a8760e6665ac29c411d58dc8b99 README.md
040000 tree - 66dcfaf2ba5cd5742f36e01d7785daf5113c2107 botly
100644 blob 172 3f148ab99d152565c46e51fd45e51030417f7faa example.py
040000 tree - e05b34e00f419ee6a20bf1bb6a0603727b1b4399 examplebot
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/detche/Botly

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/detche/Botly

Clone this repository using git:
git clone git://git.rocketgit.com/user/detche/Botly

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main