#!/usr/bin/env python
# coding: utf-8
"""
"""
import numpy as np
import collections # for defaultdict
#from scipy import optimize # it was needed for spring solution
from .candybox import CandyBox
from scipy import stats # for sample_like()
from . import f_models # for sample_like()
# bisect function is taken from somewhere in internet.
# https://www.quora.com/How-does-one-correctly-compare-two-floats-in-Python-to-test-if-they-are-equal
# Hope it is not big issue.
def bisect(f, target, low, high):
low, high = float(low), float(high)
while low != high:
mid = (low + high) / 2
f_mid = f(mid)
if f_mid == target:
return mid
elif f_mid > target:
high = mid if mid != high else low
else:
low = mid if mid != low else high
return low
# нам позарез нужен ещё один, свой собственный словник
# ten, na rozdil od defaultdict'a, neuklada chybejicí složky do slovníku
class DefaultDict(dict):
def __init__(self, default_value=None):
self.default_value = default_value
def __missing__(self, key):
return self.default_value
#
# deme na to, koťě!
#
# IS, (n-2)-krátá realizace, n>>2, n→∞
def IS(f, h_model, space_from_h='R', space_to_f='G', Nsim=int(1e4)):
"""
space_from_h
space_to_f
matching - v jakých souřadnicích robím merge a jaká PDF použiváme?
"""
# zgenerujeme vzorky
# nic zajimavýho
h = h_model(Nsim)
# tady musíme provést jeden trik
# totež jako v IS_like - ve výsledku dycky dostaneme f_model
to_sample = f.new_sample(getattr(h, space_from_h), space_to_f) # R-ko smerdžíme ako G-čko
w = to_sample.pdf(space_to_f) / h.pdf(space_from_h) # snad je to správně
# vahy máme
# zabalme do boxu
# zbytek už nejsou naši starosti
return CandyBox(to_sample, w=w)
# for simplex: d = nvar+2
# for cell: d = base_r**2
def IS_like(f_plan, sampling_space='G', weights=None, nis=1000, d=1):
"""
takes sample and returns sampling plan with the same cov (in case d=1)
covariance matrix we'll divide by d
"""
plan = getattr(f_plan, sampling_space)
S_bc = np.cov(plan, rowvar=False, bias=True, aweights=weights)
barycenter = np.average(plan, axis=0, weights=weights)
# matika
w, v = np.linalg.eig(S_bc)
# use IS sampling density with center equal to the simplex's barycenter
# set the minimum distance as the standard deviation of IS densisty
# u stats.norm zadáváme směrodatnou odchylku, to je asi správné
sigmas = np.sqrt(w/d)
h_i = [stats.norm(0, sigma) for sigma in sigmas]
# rozdělení ve vlastním prostoru
# select nis = 100 points from IS density
h_L = f_models.UnCorD(h_i)(nis)
# здесь уже так легко не отделаемся. Трансформовать кароно.
h_plan_bc = (v @ h_L.R.T).T
h_plan_sing = h_plan_bc + barycenter
# sice to má nazev h_plan, ale nese rozdělení a hustoty v f-ku
#
# jelikož výtvaříme nový vzorek z "čístých" souřadnic
# máme zaručeno, že h_plan vždy bude f_modelem (podle současného kódu CandyBox)
h_plan = f_plan.new_sample(h_plan_sing, sampling_space)
w = h_plan.pdf(sampling_space) / h_L.pdf('R') # snad je to správně
# vahy máme
# zabalme do boxu
# zbytek už nejsou naši starosti
return CandyBox(h_plan, w=w)
# for simplex: d = nvar+2
# for cell: d = base_r**2
#def sample_like(plan, weights=None, nis=1000, d=1):
# """
# takes coordinates and returns coordinates of the same cov sampling plan
# covariance matrix we'll divide by d
# """
#
#
# S_bc = np.cov(plan, rowvar=False, bias=True, aweights=weights)
# barycenter = np.average(plan, axis=0, weights=weights)
#
#
# # matika
# w, v = np.linalg.eig(S_bc)
#
# # use IS sampling density with center equal to the simplex's barycenter
# # set the minimum distance as the standard deviation of IS densisty
# # u stats.norm zadáváme směrodatnou odchylku, to je asi správné
# sigmas = np.sqrt(w/d)
# h_i = [stats.norm(0, sigma) for sigma in sigmas]
# # rozdělení ve vlastním prostoru
# # select nis = 100 points from IS density
# h_L = f_models.UnCorD(h_i)(nis)
#
# # здесь уже так легко не отделаемся. Трансформовать кароно.
# h_plan_bc = (v @ h_L.R.T).T
# h_plan_sing = h_plan_bc + barycenter
#
#
# return h_plan_sing
class TrueIS:
def __init__(self, f, IS_mode='G'):
self.f_sample = f()
self.series_data = []
self.IS_mode = IS_mode
self.events = []
def add_IS_serie(self, sample, events, alpha=1):
"""
alpha is for rejection sampling, not used
"""
self.series_data.append((sample, events, alpha))
self.events += events
if self.IS_mode == 'R':
# jestli máme to právé vzorkovácí rozdělení - tak nemáme čo robiť
self.f_sample.add_sample(sample) # smerdží se to po R
# w like weights
#wt.w = to_sample.pdf_R / wt.h.pdf_R
else: #IS_mode == 'G':
# tady musíme provést jeden trik
self.f_sample.add_sample(sample.R, 'G') # R-ko smerdžíme ako G-čko
#wt.w = to_sample.pdf_G / wt.h.pdf_R # snad je to správně
def get_weights(self):
accu_denom_w = np.zeros(self.f_sample.nsim, dtype=float)
if self.IS_mode == 'R':
for serie in self.series_data:
h_sample, events, alpha = serie
h = h_sample.new_sample(self.f_sample)
accu_denom_w += h.pdf_R * h_sample.nsim
return self.f_sample.pdf_R / accu_denom_w, self.events
else: # IS_mode == 'G'
for serie in self.series_data:
h_sample, events, alpha = serie
h = h_sample.new_sample(self.f_sample.G, 'R')
accu_denom_w += h.pdf_R * h_sample.nsim
return self.f_sample.pdf_G / accu_denom_w, self.events
def get_means(self):
w, events = self.get_weights()
keys = np.unique(events)
Nsim = len(w)
means = np.empty(len(keys), dtype=float)
vars = np.empty(len(keys), dtype=float)
# tak bacha
ev = np.array(events)
for key, i in zip(keys, range(len(keys))):
mask = ev==key
ev_w = w[mask]
# podle IS vzorečků (skoro)
key_mean = np.sum(ev_w)
key_var = (np.sum(ev_w**2)*Nsim - key_mean**2) / Nsim
# uložime
means[i] = key_mean
vars[i] = key_var
# vyfiltrujeme zbytek
w = w[~mask]
ev = ev[~mask]
# kontrola
assert len(w)==0 and len(ev)==0, "Что за хренотень?!"
return means, vars
class ISSI:
"""
IS statistics = weights series + events
ISSI calculates probabilities of repeated IS series
with implicit (non-zero) variances
zda se mi, že tím nelze nic zhoršit
"""
#č mám pocit, že je tu někde bug pythonu
#č ten prazdnej list [] přežije cokoliv
def __init__(self, events=None):
"""
"""
self.series_data = []
if events is None:
self.events = []
else:
self.events = events
def add_IS_serie(self, weights, events, implicit_multiplicator=1):
keys = np.unique(events)
Nsim = len(weights)
# vytvoříme slovník pro sadu vzorků, který implicitně
# bude předpokládát průměr=0 a rozptyl 1/Nsim^2
# pro všecko, co se v sadě neobjevilo
if Nsim == 1:
# jedno meření je taky měření)
implicit_var = implicit_multiplicator
else:
implicit_var = implicit_multiplicator*(1-1/Nsim)/Nsim**2
serie = DefaultDict((0, implicit_var))
# tak bacha
w = np.array(weights)
ev = np.array(events)
for key in keys:
if key not in self.events:
self.events.append(key)
mask = ev==key
ev_w = w[mask]
# podle IS vzorečků
key_mean = np.sum(ev_w)/Nsim
key_var = (np.sum(ev_w**2)/Nsim - key_mean**2) / Nsim
if key_var == 0: # to nechcem
key_var = implicit_var
# uložime
serie[key] = (key_mean, key_var)
# vyfiltrujeme zbytek
w = w[~mask]
ev = ev[~mask]
# kontrola
assert len(w)==0 and len(ev)==0, "Что за хренотень?!"
self.series_data.append(serie)
# ачиз значение утёз
self.get_estimations()
def add_single_event_data(self, weights, event, nis=None, implicit_multiplicator=np.inf):
if nis is None:
Nsim = len(weights)
else:
Nsim = nis
# kontrola předpokladů
assert len(weights) <= Nsim, "jseš jistej, že máš spravnej počet měření?"
# vytvoříme slovník pro sadu vzorků, který implicitně
# bude předpokládát průměr=0 a rozptyl 1/Nsim^2
# pro všecko, co se v sadě neobjevilo
if Nsim == 1:
# jedno meření je taky měření)
implicit_var = implicit_multiplicator
else:
implicit_var = implicit_multiplicator*(1-1/Nsim)/Nsim**2
serie = DefaultDict((0, implicit_var))
w = np.array(weights)
if event not in self.events:
self.events.append(event)
# podle IS vzorečků
key_mean = np.sum(w)/Nsim
key_var = (np.sum(w**2)/Nsim - key_mean**2) / Nsim
if key_var == 0: # to nechcem
key_var = implicit_var
# uložime
serie[event] = (key_mean, key_var)
self.series_data.append(serie)
#č nebudeme hned počítat odhady, nechť to dělá volající kód sam
# ачиз значение утёз
#self.get_estimations()
#self.estimations = {self.events[i] : self.values[i] for i in range(len(self.events))}
def get_means(self):
# počet jevů
# number of events
n_ev = len(self.events)
self.weighted_means = np.empty(n_ev, dtype=float)
self.weighted_vars = np.empty(n_ev, dtype=float)
# spočteme važené průměry a rozptyly
for event, key in zip(self.events, range(n_ev)):
key_accusum = 0
key_accuweight = 0
for serie in self.series_data:
key_mean, key_var = serie[event]
key_accusum += key_mean / key_var
key_accuweight += 1/key_var
if key_accuweight == 0:
#č já vidím, že .get_estimations nuly skryje
self.weighted_means[key] = 0
self.weighted_vars[key] = np.inf
else:
self.weighted_means[key] = key_accusum / key_accuweight
self.weighted_vars[key] = 1/key_accuweight
return self.weighted_means, self.weighted_vars, np.array(self.events)
def get_estimations(self):
# p-čka. We are still talking about probabilities, right?
p, vars, __ = self.get_means()
sum_p = np.sum(p)
if sum_p == 1: # а вдруг?
self.values = p
#♥оӵ ачиз значение утёз
self.estimations = {self.events[i] : self.values[i] for i in range(len(self.events))}
return p, np.array(self.events)
# spring (non-linear) analogue
# silu, kterou zatížíme system jen tak neseženeme
elif sum_p > 1: # stlačit
low_atF = -np.pi/2
high_atF = 0
else: # roztahnout
low_atF = 0
high_atF = np.pi/2
# nuly nám udělaj problém
mask = p==0
p = np.ma.array(p)
vars = np.ma.array(p)
p.mask = mask
vars.mask = mask
F = np.tan(bisect(lambda atF: np.sum(np.exp(np.tan(atF)*vars/p)*p), 1, low_atF, high_atF))
corrected_means = p * np.exp(F * vars/p)
corrected_means.mask = False
self.values = corrected_means
#♥ ачиз значение утёз
self.estimations = {self.events[i] : self.values[i] for i in range(len(self.events))}
return corrected_means, np.array(self.events)
def delete_event_data(self, event):
"""
pomocná funkce třeba pro případ změny (zpřesnění) nějakých jevů na straně modelu
function doesn't recalculate estimations after that!
"""
# prozatím nebudu kontrolovat, zda ten jev tam vůbec je
self.events.remove(event)
for serie in self.series_data:
# vymažem jev
if event in serie:
serie.pop(event)
#č krucinál, tady narazím už ne natolik triviálné chyby
for i in range(len(self.series_data)-1, -1, -1): # reverse
serie = self.series_data[i]
#č zkontrolujem, zda v serii se ještě něco zůstalo
if len(serie) == 0:
#č jako doufám, že tam tato serie bude jedina
self.series_data.remove(serie)
class ISS:
"""
IS statistics = weights series + events
ISS calculates probabilities of repeated IS series
"""
def __init__(self):
"""
defaultdict zajistí, že na každý jev nás čeka seznam, do kterého
může přidávat průměr a rozptyl ze sady
"""
self.data = collections.defaultdict(list)
def add_IS_serie(self, weights, events):
keys = np.unique(events)
Nsim = len(weights)
# tak bacha
w = np.array(weights)
ev = np.array(events)
for key in keys:
mask = ev==key
ev_w = w[mask]
# podle IS vzorečků
key_mean = np.sum(ev_w)/Nsim
key_var = (np.sum(ev_w**2)/Nsim - key_mean**2) / Nsim
# uložime
self.data[key].append((key_mean, key_var))
# vyfiltrujeme zbytek
w = w[~mask]
ev = ev[~mask]
# kontrola
assert len(w)==0 and len(ev)==0, "Что за хренотень?!"
def get_means(self):
# počet jevů
# number of events
weighted_means = []
weighted_vars = []
# spočteme važené průměry a rozptyly
for key_data in self.data.values():
key_accusum = 0
key_accuweight = 0
for key_mean, key_var in key_data:
key_accusum += key_mean / key_var
key_accuweight += 1/key_var
weighted_means.append(key_accusum / key_accuweight)
weighted_vars.append(1/key_accuweight)
return weighted_means, weighted_vars, np.array(list(self.data.keys()))
def get_estimations(self):
weighted_means, weighted_vars, __ = self.get_means()
# p-čka. We are still talking about probabilities, right?
p = np.array(weighted_means)
vars = np.array(weighted_vars)
sum_p = np.sum(p)
if sum_p == 1: # а вдруг?
return p, np.array(list(self.data.keys()))
# spring (non-linear) analogue
# silu, kterou zatížíme system jen tak neseženeme
elif sum_p > 1: # stlačit
low_atF = -np.pi/2
high_atF = 0
else: # roztahnout
low_atF = 0
high_atF = np.pi/2
F = np.tan(bisect(lambda atF: np.sum(np.exp(np.tan(atF)*vars/p)*p), 1, low_atF, high_atF))
corrected_means = p * np.exp(F * vars/p)
return corrected_means, np.array(list(self.data.keys()))
# scipy проверяет брэкеты, чем вызывает overflow warning, раздражает
# sol = optimize.root_scalar(lambda atF: np.sum(np.exp(np.tan(atF)*vars/p)*p)-1, bracket=(-np.pi/2, np.pi/2) )
# F = np.tan(sol.root)
# triangle analogue
# if len(weighted_means) == 2:
# # tak je to jednoduchý
# # it's piece of cake
# # triangle analogue
# a, b = *weighted_means
# # derivations of Heron's formula
# da = 8*a*b**2 + 6*a - 2*b - 2
# db = 8*b*a**2 + 6*b - 2*a - 2
# # matice koeficientů přetvořené podmínkové rovnice
# B = np.array([[1,1], [da, db]])
# u = 1 - np.sum(p)
# # vzoreček z teorii chyb měření a vyrovnávacího počtu nefunguje
# # dává aj záporné pravděpodobnosti
# #k = (1 - np.sum(weighted_means))/np.sum(weighted_vars)
# #corrected_means = weighted_means + weighted_vars*k