#!/usr/bin/env python
# coding: utf-8
"""
"""
import numpy as np
import collections # for defaultdict
#from scipy import optimize # it was needed for spring solution
# bisect function is taken from somewhere in internet.
# StackOverflow, maybe?
# Hope it is not big issue.
def bisect(f, target, low, high):
low, high = float(low), float(high)
while low != high:
mid = (low + high) / 2
f_mid = f(mid)
if f_mid == target:
return mid
elif f_mid > target:
high = mid if mid != high else low
else:
low = mid if mid != low else high
return low
class TrueIS:
def __init__(self, f, IS_mode='G'):
self.f_sample = f()
self.series_data = []
self.IS_mode = IS_mode
self.events = []
def add_IS_serie(self, sample, events, alpha=1):
"""
alpha is for rejection sampling, not used
"""
self.series_data.append((sample, events, alpha))
self.events += events
if self.IS_mode == 'R':
# jestli máme to právé vzorkovácí rozdělení - tak nemáme čo robiť
self.f_sample.add_sample(sample) # smerdží se to po R
# w like weights
#wt.w = to_sample.pdf_R / wt.h.pdf_R
else: #IS_mode == 'G':
# tady musíme provést jeden trik
self.f_sample.add_sample(sample.R, 'G') # R-ko smerdžíme ako G-čko
#wt.w = to_sample.pdf_G / wt.h.pdf_R # snad je to správně
def get_weights(self):
accu_denom_w = np.zeros(self.f_sample.nsim, dtype=float)
if self.IS_mode == 'R':
for serie in self.series_data:
h_sample, events, alpha = serie
h = h_sample.new_sample(self.f_sample)
accu_denom_w += h.pdf_R * h_sample.nsim
return self.f_sample.pdf_R / accu_denom_w, self.events
else: # IS_mode == 'G'
for serie in self.series_data:
h_sample, events, alpha = serie
h = h_sample.new_sample(self.f_sample.G, 'R')
accu_denom_w += h.pdf_R * h_sample.nsim
return self.f_sample.pdf_G / accu_denom_w, self.events
def get_means(self):
w, events = self.get_weights()
keys = np.unique(events)
Nsim = len(w)
means = np.empty(len(keys), dtype=float)
vars = np.empty(len(keys), dtype=float)
# tak bacha
ev = np.array(events)
for key, i in zip(keys, range(len(keys))):
mask = ev==key
ev_w = w[mask]
# podle IS vzorečků (skoro)
key_mean = np.sum(ev_w)
key_var = (np.sum(ev_w**2)*Nsim - key_mean**2) / Nsim
# uložime
means[i] = key_mean
vars[i] = key_var
# vyfiltrujeme zbytek
w = w[~mask]
ev = ev[~mask]
# kontrola
assert len(w)==0 and len(ev)==0, "Что за хренотень?!"
return means, vars
class ISSI:
"""
IS statistics = weights series + events
ISSI calculates probabilities of repeated IS series
with implicit (non-zero) variances
zda se mi, že tím nelze nic zhoršit
"""
def __init__(self, events=[]):
"""
"""
self.series_data = []
self.events = events
def add_IS_serie(self, weights, events, implicit_multiplicator=1):
keys = np.unique(events)
Nsim = len(weights)
# vytvoříme slovník pro sadu vzorků, který implicitně
# bude předpokládát průměr=0 a rozptyl 1/Nsim^2
# pro všecko, co se v sadě neobjevilo
if Nsim == 1:
# jedno meření je taky měření)
implicit_var = implicit_multiplicator
else:
implicit_var = implicit_multiplicator*(1-1/Nsim)/Nsim**2
serie = collections.defaultdict(lambda:(0, implicit_var))
# tak bacha
w = np.array(weights)
ev = np.array(events)
for key in keys:
if key not in self.events:
self.events.append(key)
mask = ev==key
ev_w = w[mask]
# podle IS vzorečků
key_mean = np.sum(ev_w)/Nsim
key_var = (np.sum(ev_w**2)/Nsim - key_mean**2) / Nsim
if key_var == 0: # to nechcem
key_var = implicit_var
# uložime
serie[key] = (key_mean, key_var)
# vyfiltrujeme zbytek
w = w[~mask]
ev = ev[~mask]
# kontrola
assert len(w)==0 and len(ev)==0, "Что за хренотень?!"
self.series_data.append(serie)
# ачиз значение утёз
self.get_estimations()
self.estimations = {self.events[i] : self.values[i] for i in range(len(self.events))}
def get_means(self):
# počet jevů
# number of events
n_ev = len(self.events)
self.weighted_means = np.empty(n_ev, dtype=float)
self.weighted_vars = np.empty(n_ev, dtype=float)
# spočteme važené průměry a rozptyly
for event, key in zip(self.events, range(n_ev)):
key_accusum = 0
key_accuweight = 0
for serie in self.series_data:
key_mean, key_var = serie[event]
key_accusum += key_mean / key_var
key_accuweight += 1/key_var
self.weighted_means[key] = key_accusum / key_accuweight
self.weighted_vars[key] = 1/key_accuweight
return self.weighted_means, self.weighted_vars, np.array(self.events)
def get_estimations(self):
# p-čka. We are still talking about probabilities, right?
p, vars, __ = self.get_means()
sum_p = np.sum(p)
if sum_p == 1: # а вдруг?
self.values = p
return p, np.array(self.events)
# spring (non-linear) analogue
# silu, kterou zatížíme system jen tak neseženeme
elif sum_p > 1: # stlačit
low_atF = -np.pi/2
high_atF = 0
else: # roztahnout
low_atF = 0
high_atF = np.pi/2
# nuly nám udělaj problém
mask = p==0
p = np.ma.array(p)
vars = np.ma.array(p)
p.mask = mask
vars.mask = mask
F = np.tan(bisect(lambda atF: np.sum(np.exp(np.tan(atF)*vars/p)*p), 1, low_atF, high_atF))
corrected_means = p * np.exp(F * vars/p)
corrected_means.mask = False
self.values = corrected_means
return corrected_means, np.array(self.events)
class ISS:
"""
IS statistics = weights series + events
ISS calculates probabilities of repeated IS series
"""
def __init__(self):
"""
defaultdict zajistí, že na každý jev nás čeka seznam, do kterého
může přidávat průměr a rozptyl ze sady
"""
self.data = collections.defaultdict(list)
def add_IS_serie(self, weights, events):
keys = np.unique(events)
Nsim = len(weights)
# tak bacha
w = np.array(weights)
ev = np.array(events)
for key in keys:
mask = ev==key
ev_w = w[mask]
# podle IS vzorečků
key_mean = np.sum(ev_w)/Nsim
key_var = (np.sum(ev_w**2)/Nsim - key_mean**2) / Nsim
# uložime
self.data[key].append((key_mean, key_var))
# vyfiltrujeme zbytek
w = w[~mask]
ev = ev[~mask]
# kontrola
assert len(w)==0 and len(ev)==0, "Что за хренотень?!"
def get_means(self):
# počet jevů
# number of events
weighted_means = []
weighted_vars = []
# spočteme važené průměry a rozptyly
for key_data in self.data.values():
key_accusum = 0
key_accuweight = 0
for key_mean, key_var in key_data:
key_accusum += key_mean / key_var
key_accuweight += 1/key_var
weighted_means.append(key_accusum / key_accuweight)
weighted_vars.append(1/key_accuweight)
return weighted_means, weighted_vars, np.array(list(self.data.keys()))
def get_estimations(self):
weighted_means, weighted_vars, __ = self.get_means()
# p-čka. We are still talking about probabilities, right?
p = np.array(weighted_means)
vars = np.array(weighted_vars)
sum_p = np.sum(p)
if sum_p == 1: # а вдруг?
return p, np.array(list(self.data.keys()))
# spring (non-linear) analogue
# silu, kterou zatížíme system jen tak neseženeme
elif sum_p > 1: # stlačit
low_atF = -np.pi/2
high_atF = 0
else: # roztahnout
low_atF = 0
high_atF = np.pi/2
F = np.tan(bisect(lambda atF: np.sum(np.exp(np.tan(atF)*vars/p)*p), 1, low_atF, high_atF))
corrected_means = p * np.exp(F * vars/p)
return corrected_means, np.array(list(self.data.keys()))
# scipy проверяет брэкеты, чем вызывает overflow warning, раздражает
# sol = optimize.root_scalar(lambda atF: np.sum(np.exp(np.tan(atF)*vars/p)*p)-1, bracket=(-np.pi/2, np.pi/2) )
# F = np.tan(sol.root)
# triangle analogue
# if len(weighted_means) == 2:
# # tak je to jednoduchý
# # it's piece of cake
# # triangle analogue
# a, b = *weighted_means
# # derivations of Heron's formula
# da = 8*a*b**2 + 6*a - 2*b - 2
# db = 8*b*a**2 + 6*b - 2*a - 2
# # matice koeficientů přetvořené podmínkové rovnice
# B = np.array([[1,1], [da, db]])
# u = 1 - np.sum(p)
# # vzoreček z teorii chyb měření a vyrovnávacího počtu nefunguje
# # dává aj záporné pravděpodobnosti
# #k = (1 - np.sum(weighted_means))/np.sum(weighted_vars)
# #corrected_means = weighted_means + weighted_vars*k