from math import log10
import json
import numpy as np
import tacoma as tc
from _tacoma import edge_changes as ec
from _tacoma import edge_lists as el
from _tacoma import edge_lists_with_histograms as el_h
from _tacoma import edge_changes_with_histograms as ec_h
from scipy.optimize import curve_fit
from scipy.stats import lognorm
from scipy.special import gamma as Gamma
from scipy.stats import weibull_min
from scipy.integrate import quad
from lmfit import minimize, Parameters
[docs]def complete_graph(N,tmax=1.0):
"""Get a single frame which consists of a complete network.
Parameters
----------
N : int
Number of nodes.
tmax : float, default : 1.0
Maximum time of the static network.
Returns
-------
:class:`_tacoma.edge_lists`
An instance of `tacoma.edge_lists` with t = [0.0], tmax = ``tmax``.
"""
edge_list = []
for i in range(N-1):
for j in range(i+1,N):
edge_list.append((i,j))
this = tc.edge_lists()
this.t = [0.]
this.tmax = tmax
this.edges = [edge_list]
this.N = N
return this
[docs]def convert_static_network(N,edge_list,tmax=1.0):
"""Get a single frame which consists of the static network.
Parameters
----------
N : int
Number of nodes.
edge_list : :obj:`list` of :obj:`tuple` of int
The edges of the static graph
tmax : double, default : 1.0
The maximum time until the network is looped.
Returns
-------
:class:`_tacoma.edge_lists`
An instance of `tacoma.edge_lists` with t = [0.0], tmax = ``tmax``.
"""
new_edge_list = []
for e in edge_list:
if e[0] > e[1]:
new_edge_list.append((e[1],e[0]))
elif e[1] > e[0]:
new_edge_list.append((e[0],e[1]))
this = tc.edge_lists()
this.t = [0.]
this.tmax = tmax
this.edges = [ new_edge_list ]
this.N = N
return this
[docs]def get_logarithmic_histogram(data,
bins, #number of bins
return_bin_means = True,
density = True):
"""Get a logarithmic histogram.
Parameters
----------
data : array-like
The data to bin.
bins : int
The number of bins.
return_bin_means : bool, default : True
return the geometric means of binning intervals,
otherwise return bin edges
density : bool, default : True
return probability density, if `False`, return counts
Returns
-------
x : numpy.ndarray
Either geometric intervals or interval edges
y : numpy.ndarray
Either probability density or counts.
"""
data = np.array(data)
data = data[data>0]
MIN = min(data)
MAX = max(data)
# check if bins is an array, if not, make an array
try:
bins[0]
except:
# if this fails, we assume `bins` is an integer
bins = np.logspace(log10(MIN), log10(MAX), bins+1,base=10.)
y, x = np.histogram(data,
bins = bins,
density = density,
)
new_x = np.sqrt(x[1:]*x[:-1])
if return_bin_means:
return new_x, y
else:
return x, y
[docs]def group_size_histogram(group_sizes_and_durations):
"""Returns the mean number of groups of size `g`
(mean over both ensemble and time).
Parameters
==========
group_sizes_and_durations : :mod:`group_sizes_and_durations`
Result from the function :mod:`measure_group_sizes_and_durations`
Returns
=======
g : :obj:`numpy.ndarray` of float
Group sizes
N : :obj:`numpy.ndarray` of float
Mean number of groups of the corresponding size in `g`.
"""
group_size_histogram = np.array([
(size, val)\
for size, val in enumerate(group_sizes_and_durations.aggregated_size_histogram)\
if val > 0.
],dtype=float)
x_group, y_group = group_size_histogram[:,0], group_size_histogram[:,1]
return x_group, y_group
[docs]def mean_coordination_number(group_sizes_and_durations):
"""Returns the mean coordination number (mean over both ensemble and time).
Following the definition by Zhao, Stehle, Bianconi, Barrat,
the coordination number of node `i` is equal to the size of the group
it is part of.
Parameters
==========
group_sizes_and_durations : :mod:`group_sizes_and_durations`
Result from the function :mod:`measure_group_sizes_and_durations`
Returns
=======
mean_coordination_number : float
Temporal and ensemble mean of a node's group size.
"""
m, N_m = group_size_histogram(group_sizes_and_durations)
N = len( group_sizes_and_durations.aggregated_size_histogram) - 1
P_k = m * N_m / N # degree distribution for completely connected groups
return m.dot(P_k) - 1.0
[docs]def mean_number_of_groups(group_sizes_and_durations):
"""Returns the mean number of groups (mean over both ensemble and time).
Parameters
==========
group_sizes_and_durations : :mod:`group_sizes_and_durations`
Result from the function :mod:`measure_group_sizes_and_durations`
Returns
=======
mean_number_of_groups : float
Temporal and ensemble mean of the total number of groups
a network consists of at a certain time.
"""
m, N_m = group_size_histogram(group_sizes_and_durations)
return N_m.sum()
[docs]def mean_group_size(group_sizes_and_durations):
"""Returns the mean group size (mean over both ensemble and time).
Parameters
==========
group_sizes_and_durations : :mod:`group_sizes_and_durations`
Result from the function :mod:`measure_group_sizes_and_durations`
Returns
=======
mean_group_size : float
Temporal and ensemble mean of the group size of all groups
a network consists of at a certain time.
"""
N = len( group_sizes_and_durations.aggregated_size_histogram) - 1
c = mean_number_of_groups(group_sizes_and_durations)
return float(N) / c
[docs]def slow_mean_degree(temporal_network):
"""Returns the mean degree (mean over ensemble) but it takes
ages to compute. You should instead use :mod:`mean_degree`.
Parameters
==========
temporal_network : :mod:`edge_lists` or :mod:`edge_changes`
Returns
=======
t : float
Temporal and ensemble mean of the node degree.
mean_degree : float
Temporal and ensemble mean of the node degree.
"""
temporal_network = tc._get_raw_temporal_network(temporal_network)
if type(temporal_network) == tc.ec:
t = np.array([temporal_network.t0])
t = np.append(t,temporal_network.t)
m = [ len(temporal_network.edges_initial) ]
for i in range(len(temporal_network.edges_in)):
m_in = len(temporal_network.edges_in[i])
m_out = len(temporal_network.edges_out[i])
dm = m_in - m_out
m.append( m[-1] + dm )
m.append(m[-1])
t = np.append(t,temporal_network.tmax)
m = np.array(m, dtype=float)
k = 2.0*m / float(temporal_network.N)
elif type(temporal_network) == tc.el:
t = np.array(temporal_network.t)
m = [ ]
for i in range(len(temporal_network.edges)):
_m = len(temporal_network.edges[i])
m.append( _m )
m.append(m[-1])
t = np.append(t,temporal_network.tmax)
m = np.array(m, dtype=float)
k = 2.0*m / float(temporal_network.N)
else:
raise ValueError('Unknown temporal network format: ' + str(type(temporal_network)))
return t, k
[docs]def time_average(t,x,tmax=None):
r"""
Return a temporal average of an observable `x(t)`,
.. math::
\overline x = \frac{1}{t_\mathrm{max}-t_0}
\int\limits_{t_0}^{t_\mathrm{max}} dt\, x(t).
Where it's assumed that `x` changes as a step function, such that
.. math::
\overline x = \frac{1}{t_\mathrm{max}-t_0}
\left( (t_\mathrm{max} - t_{N_t-1}) x(t_{N_t-1}) +
\sum_{i=1}^{N_t-1} (t_{i}-t_{i-1})x(t_{i-1}) \right).
Parameters
----------
t : numpy.ndarray
Times at which `x` changes
x : numpy.ndarray
Value of the observable at the corresponding times.
tmax : float, default : None
If this is None, the integral is computed until
time t[-1], otherwise it's evaluated until tmax.
Returns
-------
average_x : float
The temporal average of `x`.
"""
if len(t) != len(x):
raise ValueError("t and x must have the same shape")
if tmax is not None:
t = np.append(t,tmax)
x = np.append(x,x[-1])
dt = t[1:] - t[:-1]
sum_dt = dt.sum()
return dt.dot(x[:-1]) / sum_dt
[docs]def time_RMSE(t,x1,x2,tmax=None):
r"""
Get the root mean squared error (RMSE) of two observables
over the same time. Combine with :func:`tacoma.tools.sample_a_function`.
.. math::
\mathrm{RMSE} = \left(
\frac{1}{t_\mathrm{max}-t_0}
\int\limits_{t_0}^{t_\mathrm{max}} dt\,
\left( x_1(t) - x_2(t) \right)^2.
\right)^{1/2}
Parameters
----------
t : numpy.ndarray
times at which ``x1`` and ``x2`` change
x1 : numpy.ndarray
The first observable to compute the RMSE.
x2 : numpy.ndarray
The second observable to compute the RMSE.
tmax : float, default : None
If this is None, the integral is computed until
time t[-1], otherwise it's evaluated until tmax.
Returns
-------
RMSE : float
The root mean squared error.
"""
if len(t) != len(x1):
raise ValueError("t and x1 must have the same shape")
if len(t) != len(x2):
raise ValueError("t and x2 must have the same shape")
if tmax is not None:
t = np.append(t,tmax)
x1 = np.append(x1,x1[-1])
x2 = np.append(x2,x2[-1])
return np.sqrt(time_average(t,(x1-x2)**2))
[docs]def bin_a_function(x,y,bins,mode='mean'):
r"""
Bin a step function :math:`y(t)` over time bins. Binning can be done using
either ``numpy.sum`` or ``numpy.mean``.
Parameters
----------
x : numpy.ndarray
domain values at which `y` changes
y : numpy.ndarray
values of the cunt
bins : numpy.ndarray
Bin to those bin edges
mode : string, default : mean
Build either a ``mean`` over the bins or a ``sum``.
Returns
-------
new_y : numpy.ndarray
binned observable
"""
if mode=='mean':
cumfunc = np.mean
elif mode=='sum':
cumfunc = np.sum
indices = np.searchsorted(x,bins)
new_y = [0]
for i in range(1,len(indices)):
if indices[i-1] == indices[i]:
_y = 0
else:
_y = cumfunc(y[indices[i-1]:indices[i]])
new_y.append( _y )
return new_y
[docs]def sample_a_function(t,y,time_points,sample_width=0):
r"""
Sample an observable :math:`y` which is a step function
and changes at corresponding values :math:`t`.
Parameters
----------
t : numpy.ndarray
domain values at which `y` changes
y : numpy.ndarray
observable to sample
time_points : numpy.ndarray
time points for which to sample.
sample_width : float, default : 0.0
sample as a temporal average over [t-sample_width/2, t+sample_width/2].
Returns
-------
new_y : numpy.ndarray
values of `y` sampled at `time_points`.
"""
x = t
new_y = []
for bin in time_points:
if sample_width > 0:
indices = np.searchsorted(x,[bin-sample_width/2., bin+sample_width/2.])
dt = x[(indices[0]+1):indices[1]] - x[indices[0]:(indices[1]-1)]
y_ = y[(indices[0]+1):indices[1]]
val = np.dot(dt,y_)
norm = np.sum(dt)
#x_ = x[(indices[0]+1):indices[1]]
#y_ = y[indices[0]:indices[1]]
#val = np.trapz(y=y_,x=x_)
#norm = np.trapz(y=np.ones_like(x_),x=x_)
new_y.append( val/norm)
else:
indices = np.searchsorted(x,[bin])
if indices[0] == 0:
this_index = 0
elif indices[0] == len(x):
this_index = -1
else:
this_index = indices[0]-1
new_y.append(y[this_index])
return np.array(new_y)
[docs]def rescale_time(temporal_network, new_t0, new_tmax):
"""Rescale the time in this temporal network (inplace).
Parameters
==========
temporal_network : :mod:`edge_lists` or :mod:`edge_changes`
new_t0 : float
The new value of t0.
new_tmax : float
The new value of tmax.
Returns
=======
temporal_network : :mod:`edge_lists` or :mod:`edge_changes`
Same instance as input.
"""
if hasattr(temporal_network,'t0'):
this_t0 = temporal_network.t0
temporal_network.t0 = new_t0
else:
this_t0 = temporal_network.t[0]
this_T = temporal_network.tmax - this_t0
new_T = new_tmax - new_t0
temporal_network.t = [ (t - this_t0) / this_T * new_T + new_t0 for t in temporal_network.t ]
temporal_network.tmax = new_tmax
return temporal_network
[docs]def get_edge_probability_and_rate(temporal_network):
r"""
For each edge compute the probability that it is active and the rate
with which it is activated.
Parameters
==========
temporal_network : :class:`_tacoma.edge_lists`, :class:`_tacoma.edge_changes` or :class:`_tacoma.edge_trajectories`
Returns
=======
p : numpy.ndarray
The probability to be switched on for each observed edge of the network
(the remaining un-observed edges have probability p = 0).
omega : numpy.ndarray
The rate with which the observed edges are switched on
:math:`\omega = \left(\frac{1}{\tau^+} + \frac{1}{\tau^{-}}\right)^{-1}`
(the remaining un-observed edges have rate :math:`\omega = 0`).
"""
if type(temporal_network) in [ ec, el, el_h, ec_h ]:
traj = tc.get_edge_trajectories(temporal_network)
tmax = temporal_network.tmax
if type(temporal_network) in [ ec, ec_h ]:
t0 = temporal_network.t0
else:
t0 = temporal_network.t[0]
elif type(temporal_network) == edge_trajectories:
traj = temporal_network.trajectories
tmax = temporal_network.tmax
t0 = temporal_network.t0
else:
raise ValueError("Unknown type for temporal network:", type(temporal_network))
T = tmax - t0
connection_probability = np.empty(len(traj))
activity_rate = np.empty(len(traj))
for iedge, entry in enumerate(traj):
N_events = len(entry.time_pairs)
if entry.time_pairs[0][0] == t0:
N_events -= 1
activity_rate[iedge] = N_events / T
t_on = 0.0
for interval in entry.time_pairs:
t_on += interval[1] - interval[0]
connection_probability[iedge] = t_on / T
return connection_probability, activity_rate
[docs]def get_edge_life_times(temporal_network):
r"""
For each edge compute the probability that it is active and the rate
with which it is activated.
Parameters
==========
temporal_network : :class:`_tacoma.edge_lists`, :class:`_tacoma.edge_changes` or :class:`_tacoma.edge_trajectories`
Returns
=======
p : numpy.ndarray
The probability to be switched on for each observed edge of the network
(the remaining un-observed edges have probability p = 0).
omega : numpy.ndarray
The rate with which the observed edges are switched on
:math:`\omega = \left(\frac{1}{\tau^+} + \frac{1}{\tau^{-}}\right)^{-1}`
(the remaining un-observed edges have rate :math:`\omega = 0`).
"""
if type(temporal_network) in [ ec, el, el_h, ec_h ]:
traj = tc.get_edge_trajectories(temporal_network)
tmax = temporal_network.tmax
if type(temporal_network) in [ ec, ec_h ]:
t0 = temporal_network.t0
else:
t0 = temporal_network.t[0]
elif type(temporal_network) == edge_trajectories:
traj = temporal_network.trajectories
tmax = temporal_network.tmax
t0 = temporal_network.t0
else:
raise ValueError("Unknown type for temporal network:", type(temporal_network))
T = tmax - t0
connection_probability = np.empty(len(traj))
activity_rate = np.empty(len(traj))
life_times = []
for iedge, entry in enumerate(traj):
for pair in entry.time_pairs:
if pair[0] != t0 and pair[0] != tmax:
life_times.append(pair[1]-pair[0])
return np.array(life_times)
[docs]def get_reduced_time(t, intervals_to_discard):
"""
Reduce the provided time in a way that intervals
are cut out.
Parameters
----------
t : numpy.ndarray
time array to reduce
intervals_to_discard : list of tuple of float
The time intervals which have to be discarded.
Returns
-------
new_t : numpy.ndarray
reduced time with equal shape to ``t``. Each time in
intervals which have to be discarded are remapped
to the beginning of the intervals.
"""
x_ = t.copy()
offset = 0.0
for interval in intervals_to_discard:
t0, t1 = interval
x_[np.logical_and(t>=t0, t<t1)] = t0 - offset
x_[t>=t1] -= t1 - t0
offset += t1 - t0
return x_