iam-git / WellMet (public) (License: MIT) (since 2021-08-31) (hash sha1)
WellMet is pure Python framework for spatial structural reliability analysis. Or, more specifically, for "failure probability estimation and detection of failure surfaces by adaptive sequential decomposition of the design domain".

/ghull.py (457329fe567f1c0a9950c21c7c494cccf38193cc) (20908 bytes) (mode 100644) (type blob)

#!/usr/bin/env python
# coding: utf-8

import numpy as np
from . import sball
from scipy import stats
from .candybox import CandyBox
from .IS_stat import PushAndPull # for Shell_IS
from .IS_stat import get_1DS_sample # for Shell_1DS



try: # try to outthink f****** OS's memory management
    import os
    mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
except BaseException as e:
    mem_GB = 16
    mem_bytes = mem_GB * 1024**3 # hello, Windows!
    print("ghull failed to get amount of RAM installed. %s GB is supposed."% mem_GB, repr(e))
    print("BTW, you are using Windows, aren't you?")
    
    

        

#č pomocná třída pro integrování
#č nejsem tedy jist, jestli je to nejlepší napad - 
#č dělit Ghull na podtřídy, delegovat funkcionalitu
#č ale jinak Ghull se stavá hodně překomplikovaným.
#č nic lepšího mně nenapadá, přemyšlel jsem dlouho.
class _ShellBaseIntegration:
    def __init__(self, hull, shell, non_Gaussian_reduction=0.98):
        self.shell = shell
        self.hull = hull
        self.nvar = hull.sample.nvar
        
        self.integration_cutoff = np.inf
        self.non_Gaussian_reduction = non_Gaussian_reduction
        
        # test if r>0
        self.r = 0
        central_G = np.full(hull.sample.nvar, 0, dtype=np.int8)
        self.DP = self.hull.sample.f_model.new_sample(central_G, space='G')
        
        
    @property
    def DP_is_valid(self):
        #č dalo by se kontrolovat změny obálky 
        #č a provadět kontrolu DP pouze po změně obálky.
        #č Nejsem liný, jeden bodík prostě za to doopravdy nestoji.
        mask = self.hull.is_outside(self.DP)
        return np.any(mask)
        
    #č na rozdil od rodičovské třídy
    #č vrací odhad r na základě předchozích integrací
    #č metoda je navržena tak, aby Shell_IS jú mohl zdědit. 
    def get_r(self):
        #č bojim sa, rerukce bude aplikována vždycky
        #č Bacha, metoda bude vracet nuly pro obálky v Gaussovskem prostoru!
        return self.r * self.non_Gaussian_reduction
        
        
    #č metoda je navržena tak, aby Shell_IS jú mohl zdědit. 
    def _r_check_out(self, outside_nodes):
        sum_squares = np.sum(np.square(outside_nodes.G), axis=1)
        arg = np.nanargmin(sum_squares)
        new_r = np.sqrt(sum_squares[arg])
        if (not self.DP_is_valid) or (new_r < self.r):
            self.DP = outside_nodes[arg]
            self.r = new_r
        
    
    #č metoda je navržena tak, aby Shell_IS jú mohl zdědit. 
    def integrate(self, nis, callback_all=None, callback_outside=None):
        self.reset(nis)
        
        if self.hull.nsimplex == 0:
            bus = self.integration_cutoff
        else:
            bus = int(mem_bytes / self.hull.nsimplex / 8 / 10) + 1
        while self.nsampled < nis:
            
            seats = min(nis - self.nsampled, self.integration_cutoff, bus)
            try: 
                self._process_part(seats, nis, callback_all, callback_outside)
            except MemoryError as m:
                print(self.__class__.__name__ +":", "memory error, %s sedaček" % seats, repr(m))
                self.integration_cutoff = int(np.ceil(seats/2))
        
        assert nis == self.nsampled
        
        #č finalizovat by mohl i self._get_result(),
        #č ale nebudeme michat vodku s pivem!
        self._finalize()
        
        return self._get_result()
        
    



class Shell_MC(_ShellBaseIntegration):
        
    def reset(self, nis): # clear
        self.r_needed = (self.hull.space!='G')
        
        self.nsampled = 0
        self.nfailed = 0
        
        
    # stateless
    def rvs(self, nsampled, seats, ns): 
        "Generování vzorků (kandidátů a integračních bodů)"
        # rand_dir: prepare ns random directions on a unit d-sphere
        rand_dir = sball.get_random_directions(seats, self.nvar) #random directions
        
        # generate sampling probabilites
        left = (ns-nsampled) / ns
        right = (ns-nsampled-seats) / ns
        #č přidáme trochu zmatku.
        #č globálně jdeme r->R, localně ale R_i+ -> R_i-
        #č menší poloměry musejí jít dřív - na to zavazano nonGaussian _r
        #č převracení lokalně umožní vždycky mít alespoň jeden bodík outside,
        #č což je taky velmi vhodné vzhledem k tomu, že se _r bere z outside bodíků
        p = np.linspace(right, left, seats, endpoint=False) # probabilities for the radius
        
        # convert probabilitites into random radii
        # (distances from origin that are greater than r and less than R)
        r = self.shell.isf(p) # actually, it is the same as CDF inverse
        
        #finally a random sample from the optimal IS density:
        sample_G = rand_dir*r[:,None]
        
        return sample_G
    
    # bus analogy
    def _process_part(self, seats, nis, callback_all=None, callback_outside=None):
        # boarding
        nodes_G = self.rvs(self.nsampled, seats, nis)
        nodes = self.hull.sample.f_model.new_sample(nodes_G, space='G')
        # who is back?
        mask = self.hull.is_outside(nodes)
        if self.r_needed and np.any(mask):
            #č rvs má vzorkovat od měnšího poloměru k většímu.
            #č to znamená, že první outside bude mít nejměnší r vůbec
            self._r_check_out(nodes[mask])
            self.r_needed = False
        
        if callback_all is not None:
            # -2 = 'inside' -1 = 'outside'
            candy_nodes = CandyBox(nodes, event_id=mask-2, is_outside=mask)
            callback_all(candy_nodes)
            
        if (callback_outside is not None) and np.any(mask):
            callback_outside(nodes[mask])
        
        assert len(mask) == seats
        #č nevím proč, ale kdysi mě to vyšlo rychlejc jak obyčejný součet
        self.nfailed += len(mask[mask]) 
        self.nsampled += len(mask) 
        
        
    #č finalizovat by mohl i self._get_result(),
    #č ale nebudeme michat vodku s pivem!
    def _finalize(self):
        #č nebyl žádný outside?
        #č to přece není v pořádku!
        #č (jen to nikomu neříkejte,
        #č ale v současné implementaci třídy 
        #č podmínka dole by neměla nikdy nastat)
        if self.r_needed:
            self.r = self.shell.R * self.non_Gaussian_reduction
        
    def _get_result(self):
        
        nis = self.nsampled
        nf = self.nfailed
        ns = nis - nf
        p_shell = self.shell.p_shell
        shell_pf = nf/nis * p_shell
        shell_ps = ns/nis * p_shell
        
        stats = dict()
        stats["shell_budget"] = nis
        stats["shell_inside"] = shell_ps
        stats["shell_outside"] = shell_pf
        
        return shell_ps, shell_pf, stats




class Shell_IS(Shell_MC):
    def reset(self, nis): # clear
        self.r_needed = (self.hull.space!='G')
        
        self.nsampled = 0
        self.pp = PushAndPull()
        
        
    # stateless
    def rvs(self, nsampled, seats, ns): 
        "Generování vzorků (kandidátů a integračních bodů)"
        # rand_dir: prepare ns random directions on a unit d-sphere
        rand_dir = sball.get_random_directions(seats, self.nvar) #random directions
        
        #č poloměry bereme ze skořapky
        #č za správné nastavení (stejně jako u MC)
        #č zodpovidá uživatel třídy
        #č třída vlastní odhad r nijak nevyuživá!
        r = self.shell.r
        R = self.shell.R
        delta_r = R - r
        left = nsampled / ns * delta_r + r
        right = (nsampled + seats) / ns * delta_r + r
        #č přidáme trochu zmatku.
        #č globálně jdeme r->R, localně ale R_i+ -> R_i-
        #č menší poloměry musejí jít dřív - na to zavazano nonGaussian _r
        #č převracení lokalně umožní vždycky mít alespoň jeden bodík outside,
        #č což je taky velmi vhodné vzhledem k tomu, že se _r bere z outside bodíků
        rs = np.linspace(right, left, seats, endpoint=False)
        
        #finally a random sample from the optimal IS density:
        sample_G = rand_dir*rs[:,None]
        
        #č a jsme doma. Platba za špatný design.
        #č Potřebujeme hustotu 1D rozdělení, implementovanou v Radial
        #č Shell se síce dědí od Radial, ale implementuje .pdf() jako sdruženou 
        #č hustotu v nD Gaussovskem prostoru
        #
        #č vahy jsou definovány jako podil původního rozdělení k vzorkovácímu
        #č původní - Chi rozdělení. nemá zde být žádný p_shell
        f_pdf = stats.chi.pdf(rs, self.nvar)
        #č vzorkovácí. 
        #h_pdf = 1/delta_r
        weights = f_pdf * delta_r
        
        return sample_G, weights
    
    # bus analogy
    def _process_part(self, seats, nis, callback_all=None, callback_outside=None):
        # boarding
        nodes_G, weights = self.rvs(self.nsampled, seats, nis)
        nodes = self.hull.sample.f_model.new_sample(nodes_G, space='G')
        # who is back?
        mask = self.hull.is_outside(nodes)
        if self.r_needed and np.any(mask):
            #č rvs má vzorkovat od měnšího poloměru k většímu.
            #č to znamená, že první outside bude mít nejměnší r vůbec
            self._r_check_out(nodes[mask])
            self.r_needed = False
        
        assert len(mask) == seats
        self.pp.add(weights, mask)
        self.nsampled += len(mask) 
        
        if callback_all is not None:
            # -2 = 'inside' -1 = 'outside'
            candy_nodes = CandyBox(nodes, event_id=mask-2, is_outside=mask, weights=weights)
            callback_all(candy_nodes)
            
        if (callback_outside is not None) and np.any(mask):
            callback_outside(nodes[mask])
        
        
    def _get_result(self):
        
        nis = self.nsampled
        #č nejdřív true, pak false. Posilali jsme is_outside
        mean_pf, mean_ps = self.pp.mean
        var_pf, var_ps = self.pp.var
        shell_pf, shell_ps = self.pp.correct_means(p_overall=self.shell.p_shell)
        
        stats = dict()
        stats["shell_budget"] = nis
        stats["shell_inside_measured"] = mean_ps
        stats["shell_outside_measured"] = mean_pf
        stats["shell_inside_var"] = var_ps
        stats["shell_outside_var"] = var_pf
        stats["shell_inside"] = shell_ps
        stats["shell_outside"] = shell_pf
        
        return shell_ps, shell_pf, stats



#č slovní zásoba došla mně už před pár rokama
class Shell_1DS(_ShellBaseIntegration):
    """1DS stands for 1D sampling.
    1DS is, actually, an importance sampling method
    that, actually, does not calculate IS weights
    as f_density / h_density, but
    (arbitrary, nonuniformly) divides interval to subintervals.
    By using CDF transformes subintervals to 0-1 measure line.
    Then gets one node as middle point of every subinterval,
    weights therefore are just interval widths itself.
    No sampling imprecisions are introduced, 
    therefore no spring, no correction are needed. 
    """
    def reset(self, nis): # clear
        self.r_needed = (self.hull.space!='G')
        
        self.nsampled = 0
        
        #č poloměry bereme ze skořapky
        #č za správné nastavení (stejně jako u MC)
        #č zodpovidá uživatel třídy
        #č třída vlastní odhad r nijak nevyuživá!
        r = self.shell.r
        R = self.shell.R
        # let's predefine 1D sequence at very beginning
        x_sub = np.linspace(r, R, nis+1, endpoint=True)
        #č objevilo se, že scipy.stats.chi je neunosně nepřesné
        #č vzdává se jíž na poloměru 8
        s_ball = sball.Sball(self.nvar)
        x, weights = get_1DS_sample(s_ball, x_sub)
        self.x = x
        self.weights = weights
        self.mask = np.empty(nis, dtype=np.bool)
        
    
    
    # bus analogy
    def _process_part(self, seats, nis, callback_all=None, callback_outside=None):
        # boarding
        left = self.nsampled
        right = self.nsampled + seats
        rs = self.x[left:right]
        
        # rand_dir: prepare ns random directions on a unit d-sphere
        rand_dir = sball.get_random_directions(seats, self.nvar) #random directions
        nodes_G = rand_dir*rs[:,None]
        nodes = self.hull.sample.f_model.new_sample(nodes_G, space='G')
        
        # who is back?
        mask = self.hull.is_outside(nodes)
        assert len(mask) == seats
        
        if self.r_needed and np.any(mask):
            #č má se vzorkovat od měnšího poloměru k většímu.
            #č to znamená, že první outside bude mít nejměnší r vůbec
            self._r_check_out(nodes[mask])
            self.r_needed = False
        
        # the most important part
        self.nsampled += len(mask) 
        self.mask[left:right] = mask
        
        if callback_all is not None:
            # -2 = 'inside' -1 = 'outside'
            candy_nodes = CandyBox(nodes, event_id=mask-2, is_outside=mask)
            callback_all(candy_nodes)
            
        if (callback_outside is not None) and np.any(mask):
            callback_outside(nodes[mask])
        
        
    def _finalize(self):
        #č nebyl žádný outside?
        #č to přece není v pořádku!
        if self.r_needed:
            self.r = self.shell.R * self.non_Gaussian_reduction
        
        
    def _get_result(self):
        #č mask related to hull.is_outside()
        shell_pf = np.sum(self.weights[self.mask])
        shell_ps = np.sum(self.weights[~self.mask])
        
        stats = dict()
        stats["shell_budget"] = self.nsampled
        stats["shell_inside"] = shell_ps
        stats["shell_outside"] = shell_pf
        
        return shell_ps, shell_pf, stats
        
        
        
        
#č mým úkolem při návrhu této třidy je pořádně všecko zkomplikovat.
#č Dostávám za to peníze. 
#č Takže. Udělám 3 druhů estimátorů
# convex_hull_estimation  -2: inside, -1: outside
# shell_estimation  -22: inner, -3: shell, -11: outer
# ghull_estimation  -22: inner, -21: shell inside, -12: shell outside, -11: outer 
class Ghull:
    def __init__(self, hull, calculate_orth=True, calculate_2FORM=True, \
                        Integrator=Shell_1DS, non_Gaussian_reduction=0.9):
        self.hull = hull
        self.shell = sball.Shell(hull.sample.nvar)
        self.outside_dist = sball.Shell(hull.sample.nvar)
        self.sample = hull.sample
        
        #č vlastně nevím, ale nemusejí byť úplně zadarmo...
        self.calculate_orth = calculate_orth
        self.calculate_2FORM = calculate_2FORM
        
        self.set_integrator(Integrator, non_Gaussian_reduction)
        
    def set_integrator(self, Integrator, non_Gaussian_reduction=0.9):
        self.gint = Integrator(self.hull, self.shell, non_Gaussian_reduction)
        
            
    def boom(self, ns, use_MC=False):
        if use_MC:
            self.outside_dist.set_bounds(self.get_R())
            nodes_G = self.outside_dist.rvs(ns)
        else:
            # rand_dir: prepare ns random directions on a unit d-sphere
            rand_dir = sball.get_random_directions(ns, self.sample.nvar) #random directions
            
            #č deme od vnější .get_R() kružnici směrem ven
            r = self.get_R()
            
            # maximum radius, where norm.pdf() wasn't zero
            # -38.575500173381374935388521407730877399444580
            # don't ask me what the magic python use to distinguish
            # digits after double precision
            max_R_ever = 37
            if r < max_R_ever:
                R = max_R_ever
            else:
                R = r + 10
            r = np.linspace(self.get_R(), max_R_ever, ns, endpoint=True) 
            nodes_G = rand_dir*r[:,None]
        
        nodes = self.sample.f_model.new_sample(nodes_G, space='G')
        return nodes
        
    def get_orth_outside(self):
        if self.hull.space == 'G':
            return self.hull.get_orth_outside()
        else: 
            ###č bude to horší jak s-ball, ale budiž.
            ##x = np.full(self.sample.nvar, self.get_R())
            ##return calculate_brick_complement_probability(-x, x)
            #č zůstal calculate_brick_complement_probability
            #č v convex_hull modulu,
            #č ale. na orth odhady celkem spolehám,
            #č nechť se vrací aspon d-ball odhad
            #
            #č shell normálně musí se aktualizovat
            #č nechcu to tady řešit
            return self.shell.pf 
            
    def get_2FORM_outside(self):
        if self.hull.space == 'G':
            return self.hull.get_2FORM_outside()
        else: 
            #č nebudem do toho michat nic dalšího. 
            #č Když 2FORM, tak 2FORM!
            return 2 * stats.norm.sf(self.get_R())
            
    def get_FORM_outside(self):
        if self.hull.space == 'G':
            return stats.norm.sf(self.get_r())
        else:
            return stats.norm.sf(self.get_R())
            
    def get_R(self):
        sum_squared = np.sum(np.square(self.sample.G), axis=1)
        #index = np.argmax(sum_squared)
        return np.sqrt(np.nanmax(sum_squared))
        
    def get_r(self):
        "calculates minimal signed distance to planes. Can therefore be negative"
        #return -np.nanmax(self.hull.b)
        if self.hull.space == 'G':
            return self.hull.get_r()
        else:
            #č get_design_points() nemusí být.
            #č QHull může nemít dostatek teček
            try:
                hull_points = self.hull.get_design_points()
                sum_squared = np.sum(np.square(hull_points.G), axis=1)
                hull_r = np.sqrt(np.nanmin(sum_squared))
            except BaseException as e:
                msg = "cannot get hull points from the convex hull"
                print(self.__class__.__name__ +":", msg, repr(e))
                hull_r = np.inf
                
            
            # ask our experimentation team
            gint_r = self.gint.get_r()
            
            #č podle mně, nemůže se stat, že by metoda vratila:
            #č 1. nenůlový poloměr a že by dokonce i centralní bod byl venku.
            #č 2. poloměr větší jak R
            #č Odpovědnost za to kladu na gint.
            return min(hull_r, gint_r)
     
    def get_shell_estimation(self):
        shell = self.shell
        r = self.get_r()
        R = self.get_R()
        
        if r<0:
            shell.set_bounds(0, R)
        else:
            shell.set_bounds(r, R)
        
        # shell_estimation  -22: inner, -3: shell, -11: outer
        shell_estimation = {-22:shell.ps, -3: shell.p_shell, -11: shell.pf}
        global_stats = {"nsim":self.sample.nsim, "ndim":self.sample.nvar, \
                        "nfacets": self.hull.nsimplex, "r":r, "R":R, \
                    "inner":shell.ps, "shell":shell.p_shell, "outer":shell.pf}
        global_stats['FORM_outside'] = self.get_FORM_outside()
        if self.calculate_2FORM:
            global_stats['2FORM_outside'] = self.get_2FORM_outside()
        if self.calculate_orth:
            global_stats['orth_outside'] = self.get_orth_outside()
        return shell_estimation, global_stats
        
    #č nie    
    ##č pro mně je důležité, aby bylo možné rychle přepinat mezi metodama,
    ##č aby bylo možné výsledky jednoduše porovnavat.
    ##č Proto explicitně posílám priznak, jakou metodu použit.
    def integrate(self, nis, callback_all=None, callback_outside=None):
        #č no to teda disajn třidy je doopravdy hroznej
        # .get_shell_estimation() will calculate radiuses and will update shell
        shell_estimation, global_stats = self.get_shell_estimation()
        # shell_estimation  -22: inner, -3: shell, -11: outer
        shell_estimation.pop(-3)
        # ghull_estimation  -22: inner, -21: shell inside, -12: shell outside, -11: outer 
        ghull_estimation = shell_estimation; del(shell_estimation)
        
        
        shell_ps, shell_pf, stats = self.gint.integrate(nis, callback_all, callback_outside)
        ghull_estimation[-21] = shell_ps
        ghull_estimation[-12] = shell_pf
        global_stats.update(stats)
        
        # convex_hull_estimation  -2: inside, -1: outside
        inside = shell_ps + self.shell.ps
        outside = shell_pf + self.shell.pf
        convex_hull_estimation = {-2: inside, -1: outside}
        global_stats["inside"] = inside
        global_stats["outside"] = outside
        
        return ghull_estimation, convex_hull_estimation, global_stats
        
        


Mode Type Size Ref File
100644 blob 28117 0907e38499eeca10471c7d104d4b4db30b8b7084 IS_stat.py
100644 blob 6 0916b75b752887809bac2330f3de246c42c245cd __init__.py
100644 blob 72 458b7e2ca46acd9ec0d2caf3cc4d72e515bb73dc __main__.py
100644 blob 73368 3d245b8568158ac63c80fa0847631776a140db0f blackbox.py
100644 blob 11243 10c424c2ce5e8cdd0da97a5aba74c54d1ca71e0d candybox.py
100644 blob 29927 066a2d10ea1d21daa6feb79fa067e87941299ec4 convex_hull.py
100644 blob 102798 059ae717e71c651975673420cd8230fbef171e5e dicebox.py
100644 blob 36930 a775d1114bc205bbd1da0a10879297283cca0d4c estimation.py
100644 blob 34394 3f0ab9294a9352a071de18553aa687c2a9e6917a f_models.py
100644 blob 31142 3e14ac49d16a724bb43ab266e8bea23114e47958 g_models.py
100644 blob 20908 457329fe567f1c0a9950c21c7c494cccf38193cc ghull.py
100644 blob 2718 5d721d117448dbb96c554ea8f0e4651ffe9ac457 gp_plot.py
100644 blob 29393 96162a5d181b8307507ba2f44bafe984aa939163 lukiskon.py
100644 blob 2004 6ea8dc8f50a656c48f786d5a00bd6398276c9741 misc.py
040000 tree - e5999d6694937f9afb4db06b5f23c14b5a5894c6 mplot
100644 blob 1462 437b0d372b6544c74fea0d2c480bb9fd218e1854 plot.py
100644 blob 2807 1feb1d43e90e027f35bbd0a6730ab18501cef63a plotly_plot.py
040000 tree - 82bc1cfaa8d3caab910e6a4e165867257e2db4eb qt_gui
100644 blob 8566 5c8f8cc2a34798a0f25cb9bf50b5da8e86becf64 reader.py
100644 blob 4284 a0e0b4e593204ff6254f23a67652804db07800a6 samplebox.py
100644 blob 6558 df0e88ea13c95cd1463a8ba1391e27766b95c3a5 sball.py
100644 blob 6739 0b6f1878277910356c460674c04d35abd80acf13 schemes.py
100644 blob 76 11b2fde4aa744a1bc9fa1b419bdfd29a25c4d3e8 shapeshare.py
100644 blob 54074 ba978868adb487385157afa5b3420f9ad90e4f46 simplex.py
100644 blob 13090 2b9681eed730ecfadc6c61b234d2fb19db95d87d spring.py
100644 blob 10953 da8a8aaa8cac328ec0d1320e83cb802b562864e2 stm_df.py
040000 tree - 7e3ae622bfea167d7e9d2b967255cb3c1e8f2ad6 testcases
100644 blob 2465 d829bff1dd721bdb8bbbed9a53db73efac471dac welford.py
100644 blob 24252 9b916e40ae5bb85d1fa9be902063b017c9958ca1 whitebox.py
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/iam-git/WellMet

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/iam-git/WellMet

Clone this repository using git:
git clone git://git.rocketgit.com/user/iam-git/WellMet

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main