#!/usr/bin/env python
# coding: utf-8
"""
Zde leží BlackBox (tuším, bude jeden)
BlackBox pěčlivě ukladá věškerá data,
věškeré sady vzorků, průběžné odhady a tak.
Nejsem už jistý, zda BlackBox je šťastný nazev, neboť
teďkom je to spíše jen krabička pro krámy
"""
import numpy as np
from scipy import spatial
from . import plot
import pickle
from . import IS_stat
from . import sball # for adaptive censoring
from . import f_models # for adaptive censoring
from scipy import stats # for adaptive censoring
import inspect # for ._log() function
from .samplebox import SampleBox # for candidates packing
# considering to rewrite it with pandas
class GuessBox:
"""
Je tu vynalezaní kola, ale aspoň tak
Odhady můžou tvořít strašnej bordel a proto jsou
ukladany do slovníku.
Tak ale jej nemůžu furt ukladat,
proto to robim s určitým krokem
Na konci je třeba zabalit tuhle krabičku ručně!
"""
def __init__(gb, filename='', flush=100):
"""
.counter - kdy bylo poslední ukladaní
"""
gb.estimations = dict()
gb.filename = filename
gb.counter = 0
gb.flush = flush
gb.pick()
def __repr__(self):
return "%s(%r, %s)"%(self.__class__.__name__, self.filename, self.flush)
def __str__(gb):
return str(gb.estimations)
def __len__(gb):
return len(gb.estimations)
def __getitem__(gb, slice):
return gb.estimations[slice]
def guess(gb, index, nsim, estimation):
if index in gb.estimations:
gb.estimations[index][0].append(nsim)
gb.estimations[index][1].append(estimation)
else:
gb.estimations[index] = [[nsim], [estimation]]
gb.counter+= 1
if gb.filename and gb.counter > gb.flush:
gb.put()
def pick(gb):
if gb.filename:
try:
with open(gb.filename + '.pickle', 'rb') as f:
gb.estimations = pickle.load(f)
except:
# škoda, no
print("GuessBox: Já tu vaši odhady %s.pickle nevidím" % gb.filename)
else:
print('GuessBox is in air mode')
def put(gb):
if gb.filename:
try:
with open(gb.filename + '.pickle', 'wb') as f:
pickle.dump(gb.estimations, f)
gb.counter = 0
except:
# nefrčí...
print("GuessBox: Can not write %s.pickle" % gb.filename)
else:
print('GuessBox is in air mode')
class BlackBox:
"""
BlackBox pěčlivě ukladá věškerá data,
věškeré sady vzorků (well, no yet), průběžné odhady a tak.
Nejsem už jistý, zda BlackBox je šťastný nazev, neboť
teďkom je to spíše jen krabička pro krámy
.sampled_plan object
.Z = g_values
.failsi
Souřadnice primárně z prostoru modelu, ty co jsme rovnou
posilali do g_modelu!
"""
def __init__(bx, sample_box):
bx.sample_box = sample_box
# not really needed
bx.f = sample_box.sampled_plan()
# sample density (just helpful thing)
bx.h = bx.f
# don't ask me
bx.candidates = sample_box.sampled_plan()
# má bejt GuessBox součástí BlackBoxu?
try:
bx.guessbox = GuessBox(sample_box.filename, flush=20)
except:
bx.guessbox = GuessBox("", flush=20)
bx.regen()
def __repr__(bx):
return "%s(%s, %s)"%('BlackBox', repr(bx.f))
def __str__(bx):
return str('BlackBox ' + bx.sample_box)
def __len__(bx):
return bx.sample_box.nsim
def __call__(bx):
"""
Offer next sample
"""
# I do not see nothing illegal here
# LHS_like_correction do right conversion
return bx.LHS_like_correction(bx.h(1))
def __getitem__(bx, slice):
# stačí vratit sample_box
return bx.sample_box[slice]
def __getattr__(bx, attr):
# По всем вопросам обращайтесь
# на нашу горячую линию
return getattr(bx.sample_box, attr)
# just plot, green points, red points...
plot2D = plot.plot2D
plot3D = plot.plot3D
show2D = plot.show2D
show3D = plot.show3D
# přidávání vzorků musí bejt explicitní!
def add_sample(bx, input_sample):
bx._log("we have got new data:", str(input_sample))
bx.sample_box.add_sample(input_sample)
# tohle musí převest rozdělení vstupního vzorku na vlastní rozdělení skříňky
inner_sample = bx.sample_box.new_sample(input_sample)
bx.increment(inner_sample)
def increment(bx, input_sample):
for i in range(bx.nvar):
for j in range(len(input_sample)):
plan_index = np.searchsorted(bx.sorted_plan_U[i], input_sample.U[j,i])
bx.sorted_plan_U[i] = np.insert(bx.sorted_plan_U[i], plan_index, input_sample.U[j,i])
def regen(bx):
# pro LHS_like_correction
bx.sorted_plan_U = [i for i in range(bx.nvar)] # just create list
for i in range(bx.nvar):
bx.sorted_plan_U[i] = np.concatenate(([0], np.sort(bx.sampled_plan.U[:, i]), [1]))
# LHS_style correction
def LHS_like_correction(bx, input_sample):
"""
returns sample object (f_model)
"""
# what is input?
# as we need transformation anyway,
# I'll ask for conversion to f sample
# Здесь вижу железную конвертацию до f-ка,
# которая пройдёт по R координатам
# Kruci drát, tady by se nemohlo nic posrat
to_sample_node = bx.f.new_sample(input_sample)
LHS_node = np.empty(bx.nvar, dtype=float)
for i in range(bx.nvar):
if to_sample_node.U.flatten()[i] <= bx.sorted_plan_U[i][0]:
LHS_node[i] = (bx.sorted_plan_U[i][0] + bx.sorted_plan_U[i][1]) / 2
elif to_sample_node.U.flatten()[i] >= bx.sorted_plan_U[i][-1]:
LHS_node[i] = (bx.sorted_plan_U[i][-2] + bx.sorted_plan_U[i][-1]) / 2
else:
plan_index = np.searchsorted(bx.sorted_plan_U[i], to_sample_node.U.flatten()[i])
# vzdy
LHS_node[i] = (bx.sorted_plan_U[i][plan_index] + bx.sorted_plan_U[i][plan_index - 1]) / 2
return bx.f.new_sample(LHS_node, 'U')
def _log(bx, *msg, indent=0):
print(bx.__class__.__name__ + ":", *msg)
def _logi(bx, *msg, indent=1):
print("\t"*indent, inspect.currentframe().f_back.f_code.co_name + ":", *msg)
class Censoring(BlackBox):
def __init__(bx, sample_object, tri_space='Rn'):
bx.tri_space = tri_space
super().__init__(sample_object)
def __repr__(bx):
return "%s(%s, %s, %s)"%('Censoring', repr(bx.f), repr(bx.tri_space))
def __str__(bx):
return str('Censoring')
def increment(bx, input_sample):
super().increment(input_sample)
if "tri" in dir(bx):
# tri - Deloneho triangulace
# sample je jíž převeden na f (v .add_sample()), takže je to bezpěčný
bx.tri.add_points(getattr(input_sample, bx.tri_space))
# print('increment se podaril')
if len(bx.tri.coplanar): # pokud triangulace není v pořadku
#print('triangulace v pořádku není')
bx._log('triangulation is coplanar')
else:
bx._log('Triangulace (zatím?) neexistuje')
bx.regen()
def regen(bx):
super().regen()
# pokud tecek nestaci na vytvareni aspon jedneho simplexu - pokrcim rameny
try:
# tady je to OK
bx.tri = spatial.Delaunay(getattr(bx.sampled_plan, bx.tri_space), incremental=True)
if len(bx.tri.coplanar): # pokud triangulace je v pořadku
#print('triangulace v pořádku není')
bx._log('triangulation is coplanar')
else:
#print('triangulace je v pořádku')
bx._log('triangulation is OK')
except:
# kdyby neco - berem kramle
bx._log('triangulation failed')
def __call__(bx):
# je treba si uvedomit ze pravdepodobnost chytnuti muze byt min jak pf. V soucasne realizaci
try:
# očekávám, že projde kandidaty
# vodkaď jsou - netuším
to_sample, rate = bx.filter()
# když nepovedlo s kandidaty, zkusíme sami nagenerovat
while len(to_sample) == 0: # nekonečno
# tak, děcka, server má spoustu času a nikam nespejchá
# ale pokud tohle sežere věškerou paměť (ne když, ale kdy)
# dostaneš co proto!
# vomezíme pole na 10e6 (desitky mega teda vodhadově)
# by bylo možně použit chytrejší vzoreček s logaritmem
# ale snad tohle postačí
to_sample, rate = bx.filter(bx.h(int(0.9999*rate + 100)))
return bx.LHS_like_correction(to_sample)
# chcu zachytit spadnuti QHull na začatku, kdy ještě není
# dostatek teček. Je-li bx.tri fakticky existuje, tj.
# triangulace jíž existovala - je třeba nechat QHull spadnout
except AttributeError:
# kdyby neco - berem kramle
print("Triangulation doesn't exist. Censoring failed")
return bx.LHS_like_correction(bx.h(1)) # it should be OK
def filter(bx, candidates=[]):
"""
supports both sample and sample.R input
logika metody, nebo, přesněji, implementaci
je taková, že bule-li někdo-něco mimo doménu,
tak funkce je vrátí a zbytek už neřeší
"""
# co to bylo na vstupu?
# když nebyl žádný,
# projdeme vlastními kandidaty
if len(candidates)==0:
candidates = bx.candidates
# tady byl problém. Funkce byla původně navržena tak,
# aby ji nezajimalo co je na vstupu
# to ale nefunguje
# další funkce jako výstup očekavají něco s validním R-kem
candidates_to_sample_node = getattr(bx.f.new_sample(candidates), bx.tri_space)
found_simplices = bx.tri.find_simplex(candidates_to_sample_node)
# ouside of domain - it's easy
outside = candidates[found_simplices < 0]
if len(outside) > 0:
# my hodnotili svých kandidatov?
if bx.candidates == candidates:
bx.candidates = outside
return outside, len(candidates)/len(outside)
# tady já chcu vrátit první vhodný vzorek a tím končít
for candidate_id in range(len(candidates)):
# simplex_id >= 0: # inside of domain
simplex_id = found_simplices[candidate_id]
simplex = bx.tri.simplices[simplex_id]
# fp like a failure points. Number of failure points
fp = len(np.setdiff1d(simplex, bx.failure_points))
# pokud je simplex není jednobarevny..
if (fp != 0) and (fp != bx.nvar+1):
# my hodnotili svých kandidatov?
if bx.candidates == candidates:
bx.candidates = candidates[candidate_id:]
return candidates[candidate_id], candidate_id
# nepovedlo. nic
# mě nenapadá žádný lepší způsob vrátit prázdnou matici
return candidates[0:0], len(candidates)
class AdaptiveCensoring(Censoring):
def __init__(bx, sample_object, tri_space='Rn', pf_lim=(1,0)):
bx._log("instance creating")
bx.sball = sball.Sball(sample_object.nvar)
bx.pf_lim = pf_lim
bx.base_r = bx.sball.get_r(0.5)
# pro jistotu pridame
bx.simplex_index = {'failure':[], 'success':[], 'mix':[]}
# overall estimations
#bx.oiss = IS_stat.ISSI(['failure', 'success', 'out', 'mix'])
# -1 = 'out', 0=success, 1=failure, 2=mix
bx.oiss = IS_stat.ISSI([-1,0,1,2])
# current estimations
bx.ciss = IS_stat.ISSI([-1,0,1,2])
super().__init__(sample_object, tri_space)
def __repr__(bx):
return "%s(%s, %s, %s, %s)"%('AdaptiveCensoring', repr(bx.f), repr(bx.tri_space), repr(bx.pf_lim))
def __str__(bx):
return str('AdaptiveCensoring')
def regen(bx):
bx._logi(inspect.currentframe().f_back.f_code.co_name, "launched regeneration")
super().regen()
if bx.pf_lim[1] == 0:
bx.drop_r = float("inf")
else:
bx.drop_r = bx.sball.get_r(bx.pf_lim[1])
# dropneme (pro jistotu) odhady
bx.oiss = bx.ciss
# -1 = 'out', 0=success, 1=failure, 2=mix
bx.ciss = IS_stat.ISSI([-1,0,1,2])
# drop indexes
bx.simplex_index = {'failure':[], 'success':[], 'mix':[]}
def increment(bx, input_sample):
super().increment(input_sample)
# drop indexes
bx.simplex_index = {'failure':[], 'success':[], 'mix':[]}
# current estimations
try: # to čo já vidím v kódu - ISSI slovníky se pokažde generujóu znovu,
# není nutně je explicitně kopirovať
bx.guessbox.guess('TRI_overall_estimations', bx.nsim-1, bx.oiss.estimations)
bx.guessbox.guess('TRI_current_estimations', bx.nsim-1, bx.ciss.estimations)
except AttributeError:
bx.guessbox.guess('TRI_upper_pf', bx.nsim-1, 1)
# a znovu začneme počítat
# -1 = 'out', 0=success, 1=failure, 2=mix
bx.ciss = IS_stat.ISSI([-1,0,1,2])
def __call__(bx):
bx._log("we were asked for an recommendation")
# je treba si uvedomit ze pravdepodobnost chytnuti muze byt min jak pf. V soucasne realizaci
try:
# očekávám, že projde kandidaty
# odkaď jsou - netuším
to_sample, rate, simplex_id = bx.filter()
# když nepovedlo s kandidaty, zkusíme sami nagenerovat
while len(to_sample) == 0: # nekonečno
# pokusme se nastavit rate tak, abychom získali právě jedneho kandidata
try: # try uvnitř traja
p_rate = bx.oiss.estimations[-1] + bx.oiss.estimations[2]
except:
p_rate = bx.pf_lim[0] - bx.pf_lim[1]
if p_rate < 1e-5:
rate = 100000
else:
rate = int(1/p_rate) + 1
to_sample, rate, simplex_id = bx.filter(bx.get_candidates(rate))
choose = bx.LHS_like_correction(to_sample)
bx._log("finally we choose", str(choose), "of", simplex_id, "simplex")
return choose
# chcu zachytit spadnuti QHull na začatku, kdy ještě není
# dostatek teček. Je-li bx.tri fakticky existuje, tj.
# triangulace jíž existovala - je třeba nechat QHull spadnout
except AttributeError:
choose = bx.LHS_like_correction(bx.get_candidates(1))
if bx.nsim < bx.nvar + 1: # je to legální
bx._log("we have no enough points to build triangulation, so", str(choose), "is our recommendation")
return choose
elif bx.nsim < 2*bx.nvar + 3: # to je ještě budiž
bx._log("we have troubles with triangulation, so we offer random sample for now:", str(choose))
return choose
else: # no to teda ne!
raise ValueError("AdaptiveCensoring: s tou triangulací je fakt něco není v pořadku")
def filter(bx, candidates=[]):
"""
za pvré, jako vstup očekávám kandidaty od .get_candidates() funkce,
zabalené do полукустарного sample_boxu s zadaným .implicit_multiplicator
(je to drobnost pro přesnějši zpracování sad IS IS_statem).
Metoda musí souřádnicím přiřazovat jev
"success", "failure", "mix", "outside"
TATO metoda jakmile narazí na "mix" nebo "outside"
ukladá zjištěné informace do ISSI a nalezeného kandidata vrací
"""
# co to bylo na vstupu?
# když nebyl žádný,
# projdeme vlastními kandidaty
if len(candidates)==0:
candidates = bx.candidates
bx._logi("kandidaty:", candidates)
# je třeba lokálně zachovat implicit_multiplicator
# jinak se ztrací při slajsingu
# nechceš přepsat SampleBox, Alexi?
try:
implicit_multiplicator = candidates.implicit_multiplicator
except AttributeError: # kandidaty můžou bejt odkudkoliv
# s nekonečným rozptylem nebudou mít váhu "absenční" jevy
# moc to odhadům nepomůže, protože je-li kandidaty
# nemajú .implicit_multiplicator
# asi nebudou mať ani váhy IS v .g_values
implicit_multiplicator = float("inf")
bx._logi("Dobrý den, kandidaty nemajú .implicit_multiplicator")#. S pozdravem, AdaptiveCensoring")
# tady byl problém. Funkce byla původně navržena tak,
# aby ji nezajimalo co je na vstupu
# to ale nefunguje
# další funkce jako výstup očekavají něco s validním R-kem
# no tj. já zde provádím posouzení transformací z R-ka vstupních souřadnic
candidates_to_sample_node = getattr(bx.f.new_sample(candidates), bx.tri_space)
current_simplices = bx.tri.find_simplex(candidates_to_sample_node)
# tak bacha
# budeme přepísovat jevy in-place
found_simplices = np.ma.array(current_simplices.copy()) #.copy()
# nemaskované - obsahuji číslo simplexu
# maskované - číslo jevu
# -1 = 'out', 0=success, 1=failure, 2=mix
# tj. procházíme simplexy z náhodné sady vzorků,
# nahrazujeme čislo simplexu odpovidajicím mu jevem
# a skryváme ho
# pote ty "skryté", "projduté" vzorky využiváme k žískání odhadů
while len(current_simplices):# > 0:
bx._logi("current simplices", current_simplices)
# berem hned prvního kandidata
# a posuzujeme co je zač
simplex_id = current_simplices[0]
mask = found_simplices==simplex_id
if simplex_id < 0: # -1 means ouside
# berem kramle
break
elif simplex_id in bx.simplex_index['success']:
found_simplices[mask] = 0
elif simplex_id in bx.simplex_index['failure']:
found_simplices[mask] = 1
elif simplex_id in bx.simplex_index['mix']:
found_simplices[mask] = 2
# kramle
break
else: # no index information
# tady já chcu vrátit první vhodný vzorek a tím končít
# simplex_id >= 0: # inside of domain
# asi tady získavam množinu s čísly vrcholů
# kteří zakladají simplex
simplex = bx.tri.simplices[simplex_id]
# for debug
bx._logi("провал индексу", simplex_id, indent=2)
# fp like a failure points. Number of failure points
# setdiff "Return the unique values in ar1 that are not in ar2."
fp = len(np.setdiff1d(simplex, bx.failure_points))
if fp == bx.nvar+1: #
bx.simplex_index['success'].append(simplex_id)
found_simplices[mask] = 0
elif fp == 0:
bx.simplex_index['failure'].append(simplex_id)
found_simplices[mask] = 1
#print("failure simplex", simplex_id)
else:
bx.simplex_index['mix'].append(simplex_id)
found_simplices[mask] = 2
bx._logi("mixed simplex", simplex_id)
# bacha! kramle
break
# pridame do seznamu známého
found_simplices[mask] = np.ma.masked
# eště raz
cmask = current_simplices==simplex_id
# vyfiltrujeme
current_simplices = current_simplices[~cmask]
# zde je třeba перехватить ситуацию, куке одӥг но кандидат ӧвӧл
# нужно ли?
# if len(current_simplices) == 0:
# # nepovedlo. nic
# bx.candidates = candidates[0:0]
# # mě nenapadá žádný lepší způsob vrátit prázdnou matici
# return candidates[0:0], len(candidates), -2 # simple_id
# nemaskované, včetně současného kandidata (nevím proč) - ke kandidatům
# землю - крестьянам, фабрики - рабочим
# předpokladam, že kandidaty jsou se všim všudy
# s vahami (.g_value) a se svým .implicit_multiplicator'em
## zde True hodnoty roušky - to co jíž bylo skryto
bx.candidates = candidates[~np.ma.getmaskarray(found_simplices)][1:] # toho prvního prečo nechcem
try: # na zacatku je tam prazdný f_model, kterému atribut pripsat nemůžeme
bx.candidates.implicit_multiplicator = implicit_multiplicator
except:
pass
# vrátíme kandidaty, všechny-ne všechny?
# малы одӥг гинэ? уг тодӥськы чик...
selected_candidate = candidates[~np.ma.getmaskarray(found_simplices)][:1] # chcem toho prvního
# odešleme ISSI
try:
# pridame do seznamu známého
# rouška musí zůstat z cyklu
# proboha, Alexi, co to je za roušku, co se tu děje?
# jakmile v tom hlavním cyklu nalezli jsme mix nebo outside
# my hned z cyklu vylezli a je neskryli - abychom je vzali jako kandidaty
# teď je však skryváme s tou "rouškou", co musela být před opuštěním cyklu nastavena
# tak ISSI bude mít možnost odhadovat i pravděpodobnosti mix a outside
found_simplices[mask] = np.ma.masked
# zde získáme True hodnoty roušek
# ukazatel
imask = found_simplices.mask
found_simplices.mask = ~imask # invertujem, dotkne to i samotnou imask
events = found_simplices.compressed()
print(candidates)
print(candidates.g_values)
print("imask", imask)
print(candidates.g_values[~imask])
print(events)
bx.oiss.add_IS_serie(candidates.g_values[~imask], events, implicit_multiplicator)
print("global estimation", bx.oiss.estimations)
bx.ciss.add_IS_serie(candidates.g_values[~imask], events, implicit_multiplicator)
print("current estimation", bx.ciss.estimations)
except AttributeError:
bx._logi("Это вы мне прислали неваженых кандидатов?")
except UnboundLocalError: # čo tu chybu způsobuje?
# asi nebyly žádné kandidaty (třeba hned na začátku)
assert len(candidates)==0 and len(bx.candidates)==0, "AdaptiveCensoring: Что за бурда с этими кандидатама?"
return candidates, 0, -2
# rate = kolík bylo - kolik zůstalo
return selected_candidate, len(candidates) - len(bx.candidates), simplex_id
def get_candidates(bx, Nsim=int(1e4)):
# -1 = 'out', 0=success, 1=failure, 2=mix
# не мудрствуя лукаво
user_pf = np.mean(bx.pf_lim)
try:
low_pf = bx.oiss.estimations[1] # failure
upper_pf = 1 - bx.ciss.estimations[0] # sucess
self_pf = (low_pf + upper_pf)/2
except AttributeError:
self_pf = 0.5
# bereme *mean* od svého a uživatelského odhadu
# minimum nejede
sampling_r, __ = bx.sball.get_r_iteration(np.mean((self_pf, user_pf)))
# asi tam bylo sampling_r/bx.base_r, že?
# u stats.norm zadáváme směrodatnou odchylku, je to asi správné
h = f_models.UnCorD([stats.norm(0, sampling_r/bx.base_r) for i in range(bx.nvar)])
# for IS_stats
svar = (sampling_r/bx.base_r)**2 # svar like sampling_variance
# něco takovýho bych nahrubo placnul
implicit_multiplicator = svar**bx.nvar * np.exp(bx.nvar/svar - bx.nvar)
#
# jdeme na to, koťě!
#
# zgenerujeme vzorky
# nic zajimavýho
h = h(Nsim)
# dropneme priliš vzdálené kandidaty
distance_from_zero = np.sum(h.R**2, axis=1)
mask = distance_from_zero < bx.drop_r
# a teď bacha!
# tady musíme provést jeden trik
to_sample = bx.f.new_sample(h.R[mask], 'G') # R-ko smerdžíme ako G-čko
w = to_sample.pdf_G / h.pdf_R # snad je to správně
# zabalme do boxu
candidates = SampleBox(to_sample, w, 'BlackBox internal samples and weights')
candidates.implicit_multiplicator = implicit_multiplicator
# vahy máme, zbytek už nejsou naši starosti
return candidates