iam-git / WellMet (public) (License: MIT) (since 2021-08-31) (hash sha1)
WellMet is pure Python framework for spatial structural reliability analysis. Or, more specifically, for "failure probability estimation and detection of failure surfaces by adaptive sequential decomposition of the design domain".

/wellmet/shell.py (8d520b8a1efb95b841ecb98c88553ef30e37575e) (8309 bytes) (mode 100644) (type blob)

#!/usr/bin/env python
# coding: utf-8

import numpy as np
from . import sball
from scipy import stats
from .candynodes import CandyNodes
from .IS_stat import get_1DS_sample # for Shell_1DS

from collections import namedtuple

try: # try to outthink OS's memory management
    import os
    mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
except BaseException as e:
    mem_GB = 16
    mem_bytes = mem_GB * 1024**3 # hello, Windows!
    #print("ghull failed to get amount of RAM installed. %s GB is supposed."% mem_GB, repr(e))
    #print("BTW, you are using Windows, aren't you?")
    
    
    
ShellStats = namedtuple('ShellStats', (
                                        'nsim',
                                        'nvar',
                                        'nfacets',
                                        'r', 'R',
                                        'inner',
                                        'shell',
                                        'outer',
                                        'FORM_outside',
                                        'TwoFORM_outside',
                                        'orth_outside'
                                        ))

ShellEstimation = namedtuple('ShellEstimation', 
                            ('shell_budget', 'shell_inside', 
                            'shell_outside', 'inside', 'outside'))


#č pomocná třída pro integrování
#č nejsem tedy jist, jestli je to nejlepší napad - 
#č dělit Ghull na podtřídy, delegovat funkcionalitu
#č ale jinak Ghull se stavá hodně překomplikovaným.
#č nic lepšího mně nenapadá, přemyšlel jsem dlouho.
class Shell_1DS:
    """1DS stands for 1D sampling.
    1DS is, actually, an importance sampling method
    that, actually, does not calculate IS weights
    as f_density / h_density, but
    (arbitrary, nonuniformly) divides interval to subintervals.
    By using CDF transformes subintervals to 0-1 measure line.
    Then gets one node as middle point of every subinterval,
    weights therefore are just interval widths itself.
    No sampling imprecisions are introduced, 
    therefore no spring, no correction are needed. 
    """
    def __init__(self, hull, shell):
        self.shell = shell
        self.hull = hull
        self.nvar = hull.sample.nvar
        
        #č objevilo se, že scipy.stats.chi je neunosně nepřesné
        #č vzdává se jíž na poloměru 8
        self.sball = sball.Sball(self.nvar)
        
        self.integration_cutoff = np.inf
        
    
    def integrate(self, nis, callback_all=None, callback_outside=None):
        self.reset(nis)
        
        if self.hull.nsimplex == 0:
            bus = self.integration_cutoff
        else:
            bus = int(mem_bytes / self.hull.nsimplex / 8 / 10) + 1
        while self.nsampled < nis:
            
            seats = min(nis - self.nsampled, self.integration_cutoff, bus)
            try: 
                self._process_part(seats, nis, callback_all, callback_outside)
            except MemoryError as m:
                print(self.__class__.__name__ +":", "memory error, %s sedaček" % seats, repr(m))
                self.integration_cutoff = int(np.ceil(seats/2))
        
        assert nis == self.nsampled
        
        return self._get_result()
        
    
    def reset(self, nis): # clear
        
        self.nsampled = 0
        
        #č poloměry bereme ze skořapky
        #č za správné nastavení (stejně jako u MC)
        #č zodpovidá uživatel třídy
        #č třída vlastní odhad r nijak nevyuživá!
        r = self.shell.r
        R = self.shell.R
        # let's predefine 1D sequence at very beginning
        if r > np.sqrt(self.nvar - 1):
            x_sub = np.geomspace(r, R, nis+1, endpoint=True)
        else:
            x_sub = np.linspace(r, R, nis+1, endpoint=True)
        
        
        x, weights = get_1DS_sample(self.sball, x_sub)
        self.x = x
        self.weights = weights
        self.mask = np.empty(nis, dtype=bool)
        
    
    
    # bus analogy
    def _process_part(self, seats, nis, callback_all=None, callback_outside=None):
        # boarding
        left = self.nsampled
        right = self.nsampled + seats
        rs = self.x[left:right]
        
        # rand_dir: prepare ns random directions on a unit d-sphere
        rand_dir = sball.get_random_directions(seats, self.nvar) #random directions
        nodes_G = rand_dir*rs[:,None]
        nodes = self.hull.sample.f_model.new_sample(nodes_G, space='G')
        
        # who is back?
        d, i = self.hull.query(nodes)
        mask = d > 0
        assert len(mask) == seats
        
        # the most important part
        self.nsampled += len(mask) 
        self.mask[left:right] = mask
        
        if callback_all is not None:
            # -2 = 'inside' -1 = 'outside'
            candy_nodes = CandyNodes(nodes, event_id=mask-2, is_outside=mask, d=d, i=i)
            callback_all(candy_nodes)
            
        if (callback_outside is not None) and np.any(mask):
            callback_outside(nodes[mask], d[mask], i[mask])
        
        
        
    def _get_result(self):
        #č mask related to hull.is_outside()
        shell_pf = np.sum(self.weights[self.mask])
        shell_ps = np.sum(self.weights[~self.mask])
        
        return self.nsampled, shell_ps, shell_pf
        
        
        
        
class GaussianAnnulus:
    def __init__(self, hull):
        assert hull.space == 'G'
        
        self.hull = hull
        self.shell = sball.Shell(hull.sample.nvar)
        self.outside_dist = sball.Shell(hull.sample.nvar)
        self.sample = hull.sample
        
        self.gint = Shell_1DS(self.hull, self.shell)
        
            
    def boom(self, ns, use_MC=False):
        if use_MC:
            self.outside_dist.set_bounds(self.get_R())
            nodes_G = self.outside_dist.rvs(ns)
        else:
            # rand_dir: prepare ns random directions on a unit d-sphere
            rand_dir = sball.get_random_directions(ns, self.sample.nvar) #random directions
            
            #č deme od vnější .get_R() kružnici směrem ven
            r = self.get_R()
            
            # maximum radius, where norm.pdf() wasn't zero
            # -38.575500173381374935388521407730877399444580
            # don't ask me what the magic python use to distinguish
            # digits after double precision
            max_R_ever = 37
            if r < max_R_ever:
                R = max_R_ever
            else:
                R = r + 10
            r = np.linspace(self.get_R(), max_R_ever, ns, endpoint=True) 
            nodes_G = rand_dir*r[:,None]
        
        nodes = self.sample.f_model.new_sample(nodes_G, space='G')
        return nodes
        
            
    def get_R(self):
        sum_squared = np.sum(np.square(self.sample.G), axis=1)
        #index = np.argmax(sum_squared)
        return np.sqrt(np.nanmax(sum_squared))
        
    def setup_shell(self):
        shell = self.shell
        r = self.hull.get_r()
        R = self.get_R()
        
        if r<0:
            shell.set_bounds(0, R)
        else:
            shell.set_bounds(r, R)
        
        return r, R
    
    def get_shell_estimation(self):
        r, R = self.setup_shell()
        
        return ShellStats(
                            self.sample.nsim,
                            self.sample.nvar,
                            self.hull.nsimplex,
                            r, R,
                            self.shell.ps,
                            self.shell.p_shell,
                            self.shell.pf,
                            stats.norm.sf(r),
                            self.hull.get_2FORM_outside(),
                            self.hull.get_orth_outside()
                        )
        
    def integrate(self, nis, callback_all=None, callback_outside=None):
        self.setup_shell()
        
        nsampled, shell_ps, shell_pf = self.gint.integrate(nis, callback_all, callback_outside)
        
        inside = shell_ps + self.shell.ps
        outside = shell_pf + self.shell.pf
        
        return ShellEstimation(nsampled, shell_ps, shell_pf, inside, outside)
        
        


Mode Type Size Ref File
100644 blob 26 aed04ad7c97da717e759111aa8dd7cd48768647f .gitignore
100644 blob 1093 263306d87c51114b1320be2ee3277ea0bff99b1f LICENSE
100644 blob 5165 c9a2ecc2110771d29b800aee6152fd3a3d239e80 README.md
100644 blob 2084 6cd17c9e68ac9734b1881157c553856bd2e034de cli_example.py
100644 blob 1257 52ad8257fd62a3dc12f8d08eaf73a7cfb5d392b8 gui_example.py
100644 blob 81 fed528d4a7a148fd0bf0b0198a6461f8c91b87e9 pyproject.toml
100644 blob 795 7f9286ab2094e7dddfb6e1c5e49396fa7d79e67c setup.cfg
100644 blob 54 ee2a480d94ead7579fdddabda39a672e31b90ced setup.py
040000 tree - c71333f92448098d44613a5de568eb3f3788abbe wellmet
Hints:
Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://rocketgit.com/user/iam-git/WellMet

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@ssh.rocketgit.com/user/iam-git/WellMet

Clone this repository using git:
git clone git://git.rocketgit.com/user/iam-git/WellMet

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a merge request:
... clone the repository ...
... make some changes and some commits ...
git push origin main