#!/usr/bin/env python
# coding: utf-8
import numpy as np
from . import IS_stat
from . import sball
from . import f_models
from scipy import spatial
from scipy import stats
from .candybox import CandyBox
#č napadlo mě zababáchnuť třidu, která by se sama starala o všem co se tyče
#č vnější domény. Nešlo mě totíž to udělat jednou funkcí, bylo by velmi
#č špatné z hlediska zodpovednosti kódu. Tak to všecko zabalíme to třidy
#č a odebereme z už beztak přetíženého blackboxu část komplexity
# keywords: ISSI, estimation, outside, ConvexHull, Sball, IS kolem středních hodnot
class Shull: # issi_estimate_outside
def __init__(sx, f, model_space, sampling_space='G', powerset_correction=True, incremental=True):
#č tím powerset_corretion je myšlena úplná soustava jevů,
#č tj. vyrovnaní s použitím míry vnější i vnitřní
#č powerset_correction=True přídá -2 (inside) jev do ISSI
#č zde f-ko musí taky obsahovat vzorky!
sx.f = f
sx.model_space = model_space
sx.sampling_space = sampling_space
sampled_plan_model = getattr(f, model_space)
#č žádná kontrola chyb - nechť to spadné, když má spadnout!
sx.convex_hull = spatial.ConvexHull(sampled_plan_model, incremental=incremental)
# current outside probability estimation
sx.p_out = 0.5 # implicit value
sx.sball = sball.Sball(f.nvar)
sx.base_r = sx.sball.get_r(0.5) # we want in average 50/50 ratio
# -1 = 'outside'
# -2 = 'inside'
#č založme ISSI
sx.powerset_correction = powerset_correction
#č potřebuji pro korektnost mít před integrací zadané jevy
if powerset_correction:
sx.oiss = IS_stat.ISSI([-1, -2])
else:
sx.oiss = IS_stat.ISSI([-1])
def increment(sx, input_sample):
#č sample by měl byt jíž převeden na f (v .add_sample()),
#č zodpovidá za to volajicí kód!
sx.convex_hull.add_points(getattr(input_sample, sx.model_space))
# require
def integrate(sx, nis):
# getting rid of the old estimations
sx.oiss.delete_event_data(-1)
sx.oiss.events.append(-1) #č už tak trošku sahám do vnitřku cizí třidy
if sx.powerset_correction:
sx.oiss.delete_event_data(-2)
sx.oiss.events.append(-2) #č a záse
#č posunutí středu trošiňku porušuje předpoklady, za kterých
#č Sball volí rozdělení, ale přečo se mi stavá,
#č že ve více dimenzích Shull na začatku prostě nemůže
#č trefit ten blbej... v podstatě simplex
#č Těžiště, přesnějí, prostě nějaký střed můžeme najít dvěma způsoby:
#č 1. mean(vertices.model_space).sampling_space
#č 2. mean(vertices.sampling_space)
#č myslím si, že ten první (a bez váh) je stabilnější
#č (víme, jak střed vrcholů v nějakém prostoru může ani netrefit do
#č simplexu v původním prostoru)
# vertices_model = sx.convex_hull.points[sx.convex_hull.vertices]
# barycenter_model = np.mean(vertices_model, axis=0)
# if sx.model_space == sx.sampling_space:
# sx.barycenter_sampling = barycenter_model
# else:
# barycenter = sx.f.new_sample(barycenter_model, sx.model_space)
# sx.barycenter_sampling = np.squeeze(getattr(barycenter, sx.sampling_space))
#č rozhodl jsem, že shull musí odhadovat outside, ne motat se centrem Brna.
#č předpokladám, že uživatel zadal buď G, nebo aspoň Rn prostor
sx.barycenter_sampling = np.full(sx.f.nvar, 0, dtype=np.int8)
# first step
nodes = sx._sample_sball(nis)
mask = nodes.is_outside
cut_off_out = int(nis/3)
cut_off_in = int(nis/3)
#č robím cyklus dokud nesberu dostatečně teček.
#č To je fakt nejrobustnější řešení, co mě napadá
# while (number_of_out_nodes or number_of_nodes_inside is too_small)
while (len(mask[mask]) < cut_off_out): # or (len(mask[~mask]) < cut_off_in):
print(sx.__class__.__name__ + ":", "cut off! Outsides: %s, insides: %s, p_out=%s"\
% (len(mask[mask]), len(mask[~mask]), sx.p_out))
#č je třeba jenom sehnat dostatečně bodíků a utikat
nodes.add_sample(sx._sample_sball(nis))
mask = nodes.is_outside
#č když neprovadíme výrovnání, tak ten vnitršek nachren nepotřebujem
if (len(mask[~mask]) < cut_off_in) and sx.powerset_correction:
print(sx.__class__.__name__ + ":", \
"cut off inside (%s of %s needed)! Do simplex-based integration of convex hull"\
% (len(mask[~mask]), cut_off_in))
nodes.add_sample(sx._sample_simplex(nis))
return nodes
def _sample_simplex(sx, nis):
#č f-ko sice musí odkazovat na aktuální f_model
#č ale na druhou stranu normálně ._integrate_simplex()
#č potřebujeme pouze jednou - hned při vytvaření
vertices = sx.f[sx.convex_hull.vertices]
nvar = vertices.nvar
# IS_like uses .new_sample method, so vertices can not be a SampleBox object
#
#č IS_like těžiště počítá v sampling_space, ale asi mu to až tak nevadí
#
# already divided by nsim in variance formule
# divide by /(nvar+1)/(nvar+2) from simplex inertia tensor solution
# multiply by simplex_volume, but it looks like it shouldn't be here
# for simplex: d = nvar+2
#č sice to má nazev h_plan, ale nese rozdělení a hustoty v f-ku
nodes = IS_stat.IS_like(vertices, sampling_space=sx.sampling_space, nis=nis, d=nvar+2)
#č indikatorová funkce
sx.is_outside(nodes)
# for IS_stats
#č zkoušel jsem zadavat celou serii - zhoršuje to odhady
#č nemůžeme důvěrovat tomu, jak ten slepej simplex vidí svět
weights = nodes.w[~nodes.is_outside]
sx.oiss.add_single_event_data(weights, event=-2, nis=nis)
# add_single_event_data() do not calculate estimations itself
sx.oiss.get_estimations()
return nodes
def _sample_sball(sx, nis):
nvar = sx.f.nvar
sampling_r, sx.p_out = sx.sball.get_r_iteration(sx.p_out)
#sampling_r = sx.sball.get_r(sx.p_out)
#č asi tam bylo sampling_r/base_r, že?
std_ball = sampling_r/sx.base_r
#č chcu std=1, když p_out -> 1
#č a std=sball_solušn, když p_out -> 0
#č surovější nevymyslíš!
std = std_ball + sx.p_out
#č u stats.norm zadáváme směrodatnou odchylku, je to asi správné
h = f_models.UnCorD([stats.norm(sx.barycenter_sampling[i], std) for i in range(nvar)])
nodes = IS_stat.IS(sx.f, h, space_from_h='R', space_to_f=sx.sampling_space, Nsim=nis)
outside_measure = sx._apply_nodes(nodes)
#č pro přiště
sx.p_out = (sx.p_out + outside_measure) / 2
return nodes
def _apply_nodes(sx, nodes):
#č indikatorová funkce
sx.is_outside(nodes)
# for IS_stats
if sx.powerset_correction:
#č získáme výrovnaný odhad - je to skoro zdarma
#svar = (sampling_r/sx.base_r)**2 # svar like sampling_variance
#č kdysi snažil jsem něco odvést, moc se mi to nepovedlo
#č je to jen tak, jeden z pokusu, hrubej nastřel
#im = svar**nvar * np.exp(nvar/svar - nvar)
#č radší ne. IM špatně zachycuje nizkou důvěru k tomu, co nemá vlastní tečky
sx.oiss.add_IS_serie(nodes.w, nodes.event_id, implicit_multiplicator=np.inf)
outside_measure = sx.oiss.estimations[-1]
else:
weights = nodes.w[nodes.is_outside]
#č IM všecko pokazí, jakmile začnu přídávát další jevy
sx.oiss.add_single_event_data(weights, event=-1, nis=nis)
# add_single_event_data() do not calculate estimations itself
weighted_mean, __, events = sx.oiss.get_means()
# outside probability
#č nevyrovnané!
outside_measure = weighted_mean[events==-1][0]
return outside_measure
def is_outside(sx, nodes):
node_coordinates = getattr(nodes, sx.model_space)
mask = is_outside(sx.convex_hull, node_coordinates)
# -1 = 'outside'
# -2 = 'inside'
nodes.event_id = mask - 2
nodes.is_outside = mask
#č tato metoda je vlastně pro MinEnergyCensoredSampling
#č ale zde se taky může hodit
def get_events(sb, simplices): #simplices = bx.tri.simplices
"""
Metoda musí simplexům přiřazovat jev
0=success, 1=failure, 2=mix
"""
in_failure = np.isin(simplices, sb.failure_points)
has_failure = in_failure.any(axis=1)
all_failure = in_failure.all(axis=1)
return np.int8(np.where(has_failure, np.where(all_failure, 1, 2), 0))
# local sample_box function
#č toto lokální řešení teoreticky mohlo být lepší protože ve chvili,
#č kdy potřebujeme provzorkovat simplex - stejně potrebujem i slajs,
#č testy ale ukazují, že běží to jen nepatrně rychleji
def get_simplex_event(simplex_sb, weighting_space=None):
"""
Function requires sample_box object, that contains vertex points
and (optionally) weighting_space to calculate weighted failure ratio
Returns event, event_id and both failure and weighted failure ratio
Values of event_id and event are as follows:
-1 = 'outside', 0=success, 1=failure, 2=mix
Please note, simplex cannot be "outside", therefore function will never return -1
"""
#č žádný intersect, žádný setdiff
#č tohle je nejrychlejší!
failsi = simplex_sb.failsi
# fp like a failure points. Number of failure points
fp = len(failsi[failsi]) # the fastest solution
# -1 = 'outside', 0=success, 1=failure, 2=mix
#č simplex -1 mít nemůže
if fp == 0:
event = 'success'
event_id = 0
fr = 0 # failure ratio
wfr = 0 # weighted failure ratio
elif fp == len(simplex_sb):
event = 'failure'
event_id = 1
fr = 1 # failure ratio
wfr = 1 # weighted failure ratio
else: #č simplex -1 mít nemůže
event = 'mix'
event_id = 2
fr = fp / len(simplex_sb) # failure ratio
# weighted failure ratio
if weighting_space is None:
wfr = fr
else:
pdf = simplex_sb.pdf(weighting_space)
# same as np.average(failsi, weights=pdf), but faster
wfr = np.sum(pdf[failsi]) / np.sum(pdf)
return event, event_id, fr, wfr
# global sample_box function
#č tím globálným sample_box'em my šetříme čas na poměrně drahých slajséch
def get_indices_event(sample_box, indices, weighting_space=None):
"""
Function takes global sample_box, indices of vertex points
and (optionally) weighting_space to calculate weighted failure ratio
Returns event, event_id and, as a bonus, both failure and weighted failure ratio
Values of event_id and event are as follows:
-1 = 'outside', 0=success, 1=failure, 2=mix
Please note, simplex cannot be "outside", therefore function will never return -1
"""
#č žádný intersect, žádný setdiff
#č tohle je nejrychlejší!
indices_failsi = sample_box.failsi[indices]
# fp like a failure points. Number of failure points
fp = len(indices_failsi[indices_failsi]) # the fastest solution
# -1 = 'outside', 0=success, 1=failure, 2=mix
#č simplex -1 mít nemůže
if fp == 0:
event = 'success'
event_id = 0
fr = 0 # failure ratio
wfr = 0 # weighted failure ratio
elif fp == len(indices):
event = 'failure'
event_id = 1
fr = 1 # failure ratio
wfr = 1 # weighted failure ratio
else: #č simplex -1 mít nemůže
event = 'mix'
event_id = 2
fr = fp / len(indices) # failure ratio
# weighted failure ratio
if weighting_space is None:
wfr = fr
else:
indices_pdf = sample_box.pdf(weighting_space)[indices]
# same as np.average(failsi, weights=pdf), but faster
wfr = np.sum(indices_pdf[indices_failsi]) / np.sum(indices_pdf)
return event, event_id, fr, wfr
def get_TRI_estimation(siss, simplex_events):
siss.get_estimations()
simplices = np.array(tuple(siss.estimations.keys()))
probabilities = np.array(tuple(siss.estimations.values()))
estimation = dict()
estimation[-1] = np.sum(probabilities[simplices == -1])
#čs jevy aj klidně in-place (nerobím kopiju)
events = simplices[simplices != -1]
probabilities = probabilities[simplices != -1]
#č zhruba - get_events() vrací pole s odpovidajícími čísly jevů pro každý simplex, počineje od nuly
#č tím slajsingem my jakoby vybirame ke každemu nalezenemu simplexovi ten správnej mu odpovídajicí jev
events = simplex_events[events]
for i in range(3): #čs kvůli 0,1,2 robiť cyklus?
estimation[i] = np.sum(probabilities[events == i])
return estimation
def sample_simplex(vertices, model_space, sampling_space, nis):
nvar = vertices.nvar
# IS_like uses .new_sample method, so vertices can not be a SampleBox object
#
# already divided by nsim in variance formule
# divide by /(nvar+1)/(nvar+2) from simplex inertia tensor solution
# multiply by simplex_volume, but it looks like it shouldn't be here
# for simplex: d = nvar+2
#č sice to má nazev h_plan, ale nese rozdělení a hustoty v f-ku
h_plan = IS_stat.IS_like(vertices, sampling_space=sampling_space, nis=nis, d=nvar+2)
h_plan_model = getattr(h_plan, model_space)
vertices_model = getattr(vertices, model_space)
#č budeme pokažde sestavovat ConvexHull z jedného simplexu
#č a rešit jen zda naši bodíky "inside or outside"
#č (s narustajícím nsim tohle brzy se stavá rychlejším než bežný dotaz)
convex_hull = spatial.ConvexHull(vertices_model)
h_plan.is_outside = is_outside(convex_hull, h_plan_model)
mask = ~h_plan.is_outside
#č součet tady nemusí sa (na konci) rovnat jedne
#č opravdu dělíme nis'em, jako v normálním IS
#č nikoliv počtem příjatých bodíků,
#č protože průměrná vaha je o hodně mene významná metrika
simplex_measure = np.sum(h_plan.w[mask]) / nis
#č v kódu scipy vidím, že objem máme zadarmo,
#č přesneji říct, máme v ceně
#č ConvexHull třida dycky nechá QHull přepočíst
#č objem a plochu při káždé změně
#simplex_volume = convex_hull.volume
#return h_plan, simplex_measure, simplex_volume
return h_plan, convex_hull, simplex_measure
def is_outside(convex_hull, node_coordinates):
x = node_coordinates
#E [normal, offset] forming the hyperplane equation of the facet (see Qhull documentation for more)
A = convex_hull.equations[:,:-1]
b = convex_hull.equations[:,-1]
# N=nsim
NxN = A @ x.T + np.atleast_2d(b).T
mask = np.any(NxN > 0, axis=0)
return mask
def get_sub_simplex(convex_hull):
"""
returns coordinates of sub-simplex,
truly yours, C.O.
"""
#č zatím bez váh
#simplices -- ndarray of ints, shape (nfacet, ndim)
return np.mean(convex_hull.points[[convex_hull.simplices]], axis=1)
def get_COBYLA_constraints(convex_hull):
constraints = []
#E [normal, offset] forming the hyperplane equation of the facet (see Qhull documentation for more)
A = convex_hull.equations[:,:-1]
b = convex_hull.equations[:,-1]
def fungen(A_i, b_i):
def fun(x):
constrain = -(A_i @ x + b_i)
#print(x, A_i, b_i, constrain)
return constrain
return fun
for i in range(len(b)):
# taken from COBYLA sources:
# "the constraint functions should become
# nonnegative eventually, at least to the precision of RHOEND"
constraints.append({'type':'ineq', 'fun':fungen(A[i], b[i])})
return constraints
#
# DEPRECATED
#
# unbelivable: I was unable to find a package to calculate the second moments of an simplex
def points_inertia_tensor(vertices, masses=1):
"""
coordinates of vertices
"""
nsim, nvar = np.shape(vertices)
inertia_tensor = np.empty((nvar, nvar))
for i in range(nvar):
for j in range(i + 1):
if i==j:
inertia_tensor[i,j] = np.sum( masses * (np.sum(np.square(vertices), axis=1) - np.square(vertices[:, i])))
else:
inertia_tensor[i,j] = inertia_tensor[j,i] = - np.sum(masses * vertices[:, i]*vertices[:, j])
return inertia_tensor
def simplex_volume(vertices):
"""
coordinates of vertices
"""
nsim, nvar = np.shape(vertices)
return abs(np.linalg.det(vertices[1:] - vertices[0])) / np.math.factorial(nvar)
def simplex_barycenter_inertia_tensor(vertices, masses=1):
"""
Returns the inertia matrix relative to the center of mass
coordinates of vertices
"""
nsim, nvar = np.shape(vertices)
return points_inertia_tensor(vertices, masses) /(nvar+1)/(nvar+2) * simplex_volume(vertices)
def simplex_inertia_tensor(vertices, masses=1):
"""
coordinates of vertices
"""
nsim, nvar = np.shape(vertices)
masses = masses * np.append(np.full(nsim, 1/(nvar+1)/(nvar+2)), (nvar+1)/(nvar+2))
barycenter = np.mean(vertices, axis=0)
# barycenter beztak sepri4te ke každemu řádku tensoru
#_tensor = points_inertia_tensor(vertices, masses)/(nvar+1) + np.diag(np.square(barycenter))#*(nvar+1))
_tensor = points_inertia_tensor(np.vstack((vertices, barycenter)), masses=masses)
#return _tensor / (nvar+2) * simplex_volume(vertices)
return _tensor * simplex_volume(vertices)
# don't ask me what is it
def simplex_covariance_matrix(vertices, weights=1):
"""
coordinates of vertices
"""
nsim, nvar = np.shape(vertices)
# normalizace
weights = weights / np.mean(weights)
barycenter = np.mean((vertices.T*weights).T, axis=0)
vertices = vertices - barycenter
covariance_matrix = np.empty((nvar, nvar))
for i in range(nvar):
for j in range(i + 1):
if i==j:
covariance_matrix[i,j] = np.sum(weights * np.square(vertices[:, i]))
else:
covariance_matrix[i,j] = covariance_matrix[j,i] = np.sum(weights * vertices[:, i]*vertices[:, j])
# divide by nsim from variance formule, do we need it?
# divide by /(nvar+1)/(nvar+2) from simplex inertia tensor solution
# same as simplex_volume, but it looks like it shouldn't be here
return covariance_matrix /(nvar+1)/(nvar+2) #* simplex_volume(vertices)