Source code for tacoma.drawing

# -*- coding: utf-8 -*-
"""
This module contains functions for drawing temporal networks
in different ways. It depends on three packages not being installed
during installation with ``pip``, which are

- matplotlib
- networkx
- python-louvain

If you want to use this module, please install the dependencies
listed above.
"""
from __future__ import print_function

try:
    import matplotlib.pyplot as pl
    from matplotlib.collections import LineCollection
except ImportError as e:
    print("\033[1m tacoma does not install `matplotlib` as a dependency. Please install it manually. \033[0m")
    raise e

try:
    import networkx as nx
except ImportError as e:
    print("\033[1m tacoma does not install `networkx` as a dependency. Please install it manually. \033[0m")
    raise e

try:
    import community
except ImportError as e:
    print("\033[1m tacoma does not install `python-louvain`, neccessary for `community`,  as a dependency. Please install it manually. \033[0m")
    raise e

import numpy as np

from scipy.optimize import curve_fit

import tacoma as tc

_layout_function = 'graphviz'


def _draw_edge_lists(L):
    """This draws a force-directed layout for each snapshot of a temporal network given in :mod:`_tacoma.edge_lists` format and hence should be used with caution."""
    from rocsNWL.drawing import draw
    from rocsNWL.drawing import get_pos

    G = nx.Graph()
    G.add_nodes_from(range(L.N))
    G.add_edges_from(L.edges[0])

    pos, _ = get_pos(G, layout_function=_layout_function)

    fig, ax = pl.subplots(1, len(L.edges), figsize=(len(L.edges)*3, 4))
    ax = ax.flatten()

    draw(G, pos=pos, ax=ax[0], layout_function=_layout_function)

    for i in range(1, L.N):
        G = nx.Graph()
        G.add_nodes_from(range(L.N))
        G.add_edges_from(L.edges[i])

        draw(G, pos=pos, ax=ax[i], layout_function=_layout_function)


[docs]def fit_function(x, alpha, scale, fac, intervals_to_discard_for_fit): r""" A fit function for the number of uniquely observed edges over time, following the assumption that edge activity rates follow a gamma distribution. .. math:: f(x) = \frac{\lambda^\alpha}{\Gamma(\lambda)} x^{\alpha-1}\\exp(-\lambda x) The fit function is .. math:: y(x) = \phi\times \left[ 1 - \left(\frac{\lambda}{\lambda+x}\right)^\alpha\right] Parameters ---------- x : numpy.ndarray Data on the x-axis, typically time alpha : float exponent in gamma distribution, has to be alpha > 0 scale : float scale :math:`\\lambda` in gamma distribution, has to be scale > 0 fac : float prefactor, typically :math:`\\phi=N(N-1)/2`. intervals_to_discard_for_fit : list of tuple of float a list of time intervals which have to be discarded for the fit Returns ------- y : numpy.ndarray value of the function """ x_ = x.copy() offset = 0.0 for interval in intervals_to_discard_for_fit: t0, t1 = interval x_[np.logical_and(x >= t0, x < t1)] = t0 - offset x_[x >= t1] -= t1 - t0 offset += t1 - t0 return fac * (1.0 - (scale/(scale+x_))**(alpha))
[docs]def draw_edges(traj, time_normalization_factor=1., time_unit=None, ax=None, fit=False, edge_order=None, color=None, alpha=0.5, linewidth=1.0, intervals_to_discard_for_fit=[], fit_color='k', return_fit_params=False, ): """ Draw edges according to an edge activity plot. Parameters ---------- traj : list of :class:`_tacoma.edge_trajectory_entry` The result of :func:`tacoma.api.get_edge_trajectories`. time_normalization_factor, float, default : 1.0 Rescale time by this factor. time_unit : str, default : None Unit of time to put on the axis. ax : matplotlib.Axes, default : None Axis to draw on, will create new one if none provided. fit : bool, default : False Fit a curve to the number of uniquely observed edges. edge_order : list of int, default : None Reorder the edges according to this list before drawing. color : a matplotlib color, default : None Color in which to draw the edges in alpha : float, default : 0.5 Line opacity of edges linewidth : float, default : 1.0 Line width of edges intervals_to_discard_for_fit : list of tuple of float a list of time intervals which have to be discarded for the fit fit_color : a matplotlib color, default : 'k' color in which to draw the fit in return_fit_params : bool, default : False Switch this on if you want to obtain the fit parameters. Returns ------- fig : matplotlib.Figure If an axes was provided, this is `None`. ax : matplotlib.Axes The axes the plot was drawn on. popt : tuple of float Fit parameters, will only be returned if return_fit_params is `True`. """ if ax is None: fig, ax = pl.subplots(1, 1) else: fig = None if color is None: color = 'k' lines = [] max_i = len(traj) all_t_max = [] all_t_min = [] max_node = [] for i, entry in enumerate(traj): all_t_max.append(entry.time_pairs[-1][-1] * time_normalization_factor) all_t_min.append(entry.time_pairs[0][0] * time_normalization_factor) max_node.extend(entry.edge) for t_ in entry.time_pairs: t_ = np.array(t_) * time_normalization_factor if edge_order is not None: y = edge_order[i] else: y = i lines.append([t_, [y, y]]) # if intervals_to_discard_for_fit is not None: # fit_x = [] # # for t_ in all_t_min: # # else: fit_x = np.array(all_t_min) fit_y = np.arange(len(traj), dtype=float) lines = [list(zip(x, y)) for x, y in lines] colors = [color for _ in range(len(lines))] ax.add_collection(LineCollection(lines, colors=colors, alpha=alpha, linewidth=linewidth)) t0 = min(all_t_min) ax.set_ylim(-1, max_i) ax.set_xlim(t0, max(all_t_max)) xlabel = 'time' if time_unit is not None: xlabel += ' ['+time_unit+']' ylabel = 'edge id' ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) if fit: N = max(max_node) + 1 fac = N*(N-1)/2. def fit_f(x, alpha, scale): return fit_function(x, alpha, scale, fac, intervals_to_discard_for_fit) #popt, pcov = curve_fit(fit, fit_x, fit_y,[1./fac,fac,10.0],maxfev=10000) #popt, pcov = curve_fit(fit, fit_x, fit_y,[2,fac,10.0],maxfev=10000) popt, pcov = curve_fit(fit_f, fit_x, fit_y, [0.5, 10.0], maxfev=10000) #print (popt) ax.plot(fit_x, fit(fit_x, *popt), 'r') #log_y = np.log(fit_y) - 1. #log_x = np.log(fit_x) - 1. if not return_fit_params: return fig, ax else: return fig, ax, popt
[docs]def edge_activity_plot(temporal_network, time_normalization_factor=1., time_unit=None, ax=None, fit=False, edge_order=None, color=None, alpha=0.5, linewidth=1, intervals_to_discard_for_fit=[], fit_color=None, return_fit_params=False, ): """ Draw an edge activity plot for the given temporal network. This is a wrapper for :func:`tacoma.drawing.draw_edges`. Parameters ---------- temporal_network : :class:`_tacoma.edge_lists` or :class:`_tacoma.edge_changes`. A temporal network. time_normalization_factor, float, default : 1.0 Rescale time by this factor. time_unit : str, default : None Unit of time to put on the axis. ax : matplotlib.Axes, default : None Axis to draw an, will create new one if none provided. fit : bool, default : False Fit a curve to the number of uniquely observed edges. edge_order : list of int, default : None Reorder the edges according to this list before drawing. color : a matplotlib color, default : None Color in which to draw the edges in alpha : float, default : 0.5 Line opacity of edges linewidth : float, default : 1.0 Line width of edges intervals_to_discard_for_fit : list of tuple of float a list of time intervals which have to be discarded for the fit fit_color : a matplotlib color, default : 'k' color in which to draw the fit in return_fit_params : bool, default : False Switch this on if you want to obtain the fit parameters. Returns ------- fig : matplotlib.Figure If an axes was provided, this is `None`. ax : matplotlib.Axes The axes the plot was drawn on. popt : tuple of float Fit parameters, will only be returned if `return_fit_params` is `True`. """ traj = tc.get_edge_trajectories(temporal_network) return draw_edges( traj, time_normalization_factor=time_normalization_factor, time_unit=time_unit, ax=ax, fit=fit, edge_order=edge_order, color=color, alpha=alpha, linewidth=linewidth, intervals_to_discard_for_fit=intervals_to_discard_for_fit, fit_color=fit_color, return_fit_params=return_fit_params, )
[docs]def get_edge_order(edge_traj, edge_sim, threshold=0.): """ Create an edge order by performing a Louvain-clustering on the thresholded edge similarity graph. Parameters ---------- edge_traj : list of :class:`_tacoma.edge_trajectory_entry` Edge trajectories, first result of :func:`tacoma.api.get_edge_trajectories`, or entry ``trajectories`` of :class`_tacoma.edge_trajectories`. edge_sim : dict where key is a tuple of int and value is a float Edge similarities, tuple of int denoting the pair of edges, similarity is in dimension of time. 2nd result of :func:`tacoma.api.get_edge_trajectories`, or entry ``edge_similarities`` of :class`_tacoma.edge_trajectories`. threshold : float Ignore similarities below this threshold (minimum time spent together, where spent together refers to edges connected to the same node at the same time). Returns ------- edge_order : list of int Edge indices ordered in clusters. """ # get nx graph G = get_edge_graph(edge_traj, edge_sim, threshold=0.) # find best partition using Louvain clustering partition = community.best_partition(G) N_comm = max([v for v in partition.values()]) + 1 comm = [[] for i in range(N_comm)] for k, v in partition.items(): comm[v].append(k) order = [] for module in comm: order.extend(module) order = np.argsort(order) return order
[docs]def get_edge_graph(edge_traj, edge_sim, threshold=0.): """ Construct a thresholded edge similarity graph. Parameters ---------- edge_traj : list of :class:`_tacoma.edge_trajectory_entry` Edge trajectories, first result of :func:`tacoma.api.get_edge_trajectories`, or entry ``trajectories`` of :class`_tacoma.edge_trajectories`. edge_sim : dict where key is a tuple of int and value is a float Edge similarities, tuple of int denoting the pair of edges, similarity is in dimension of time. 2nd result of :func:`tacoma.api.get_edge_trajectories`, or entry ``edge_similarities`` of :class`_tacoma.edge_trajectories`. threshold : float Ignore similarities below this threshold (minimum time spent together, where spent together refers to edges connected to the same node at the same time). Returns ------- G : nx.Graph An undirected, unweighted graph where nodes are edges in the temporal network and edges mean their similarity is above the threshold. """ N_edges = len(edge_traj) G = nx.Graph() G.add_nodes_from(range(N_edges)) G.add_edges_from([(u, v) for u, v, val in edge_sim if val > threshold]) return G
if __name__ == "__main__": import time import _tacoma L = tc.edge_lists() L.N = 3 L.t = [0.0, 1.0, 2.0] L.tmax = 3.0 L.edges = [ [ (0, 1) ], [ (1, 2), (0, 2) ], [ (0, 1) ], ] L = _tacoma.dynamic_RGG(100, 100, mean_link_duration=10) #F = tc.flockwork_P_varying_rates([],100,[0.5],100,[(0.0,1.0)],tmax=100) F = L FBIN = tc.bin(F, dt=1) # draw_rows(FBIN) start = time.time() traj, similarities = tc.get_edge_trajectories( FBIN, return_edge_similarities=True) end = time.time() print(similarities) print("needed ", end-start, "seconds") draw_edges(traj, fit=True) start = time.time() result = tc.get_edge_trajectories(F) end = time.time() print("needed ", end-start, "seconds") draw_edges(traj) # draw_edge_lists(L) pl.show()