Skip to content

Commit

Permalink
Fix test, start refactor of flow-based methods
Browse files Browse the repository at this point in the history
  • Loading branch information
pablormier committed Oct 1, 2024
1 parent d2f0ddb commit 8aaceaa
Show file tree
Hide file tree
Showing 15 changed files with 741 additions and 47 deletions.
52 changes: 52 additions & 0 deletions corneto/_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,58 @@ def extract_subgraph(
g.add_edge(s, t, **attr)
return g

# TODO: Validate and replace the extract_subgraph with this one
def _extract_subgraph_keep_order(
self, vertices: Optional[Iterable] = None, edges: Optional[Iterable[int]] = None
):
# Create a new graph
g = Graph()
g._graph_attr = deepcopy(self._graph_attr)

def append_unique(lst, items):
for item in items:
if item not in lst:
lst.append(item)

# Initialize lists to preserve order
if vertices is not None:
vertices = list(vertices)
else:
vertices = []
if edges is not None:
edges = list(edges)
# Collect vertices from edges
for i in edges:
s, t = self.get_edge(i)
v_set = s.union(t)
append_unique(vertices, v_set)
else:
edges = []
# If edges are not specified but vertices are, include edges induced by the vertices
if vertices:
# Collect edges induced by the set of vertices
for i in range(self.ne):
s, t = self.get_edge(i)
v_set = s.union(t)
if all(v in vertices for v in v_set):
edges.append(i)

# Copy vertices
for v in vertices:
g.add_vertex(v)
if v in self._vertex_attr:
v_attr = self.get_attr_vertex(v)
if len(v_attr) > 0:
g._vertex_attr[v] = deepcopy(v_attr)

# Copy edges
for i in edges:
s, t = self.get_edge(i)
attr = deepcopy(self.get_attr_edge(i))
g.add_edge(s, t, **attr)

return g

def reverse(self) -> "Graph":
"""Create a new graph and reverse the direction of all edges in the graph."""
G = self.copy()
Expand Down
2 changes: 1 addition & 1 deletion corneto/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def _get_info() -> Dict[str, Dict]:
cv = f"v{__version__} (up to date)"
else:
if latest:
cv = f"v{__version__} (latest: v{latest})"
cv = f"v{__version__} (latest stable: v{latest})"
else:
cv = f"v{__version__}"
info["corneto_version"] = {
Expand Down
54 changes: 48 additions & 6 deletions corneto/backend/_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import numbers
from copy import copy as shallow_copy
from numbers import Number
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union

Expand Down Expand Up @@ -490,6 +491,22 @@ def get_symbol(self, name) -> CSymbol:
def get_symbols(self, *args) -> List[CSymbol]:
return [self.get_symbol(n) for n in args]

def add_suffix(self, suffix: str, inplace: bool = False) -> "ProblemDef":
o = self
if not inplace:
o = self.copy()
obs = set()
for e in self._constraints + self._objectives:
s = getattr(e, "_proxy_symbols", {})
for x in s:
if hasattr(x, "_name"):
if x not in obs:
x._name = x._name + suffix
obs.add(x)
expr = {k + suffix: v for k, v in self._expressions.items()}
o._expressions = expr
return o

@property
def constraints(self) -> List[CExpression]:
return self._constraints
Expand All @@ -507,7 +524,7 @@ def direction(self) -> Direction:
return self._direction

def copy(self) -> "ProblemDef":
raise NotImplementedError()
return shallow_copy(self)

def _add(self, other: Any, inplace: bool = False):
if isinstance(other, ProblemDef):
Expand Down Expand Up @@ -896,8 +913,8 @@ def Acyclic(
indicator_negative_var_name: Optional[str] = None,
acyclic_var_name: str = VAR_DAG,
max_parents: Optional[Union[int, Dict[Any, int]]] = None,
vertex_lb_dist: Optional[np.ndarray] = None,
vertex_ub_dist: Optional[np.ndarray] = None,
vertex_lb_dist: Optional[List[Dict[Any, int]]] = None,
vertex_ub_dist: Optional[List[Dict[Any, int]]] = None,
) -> ProblemDef:
"""Create Acyclicity Constraint.
Expand Down Expand Up @@ -925,8 +942,6 @@ def Acyclic(
The maximum number of parents per node. If an integer is provided, the maximum
number of parents is the same for all nodes. If a dictionary is provided, the
maximum number of parents can be different for each node. Default is None.
vertex_lb_dist : Optional[np.ndarray], optional
The lower bound distribution of the vertices. Default is None.
Returns:
-------
Expand Down Expand Up @@ -985,7 +1000,6 @@ def Acyclic(
if In is not None:
In_i_order = In[:, i_order]

# These constraints are not compatible with hyperedges
if Ip is not None:
# Get edges s->t that can have a positive flow
# check if Ip has ub field
Expand Down Expand Up @@ -1314,6 +1328,34 @@ def linear_and(
And = self.Variable(varname, Z.shape, 0, 1, vartype=VarType.BINARY)
return self.Problem([And <= Z_norm, And >= Z - N + 1])

def linear_xor(
self,
x: CExpression,
axis: Optional[int] = None,
varname="xor",
ignore_type=False,
) -> ProblemDef:
# Check if the variable is binary, otherwise throw an error
if hasattr(x, "_vartype") and x._vartype != VarType.BINARY and not ignore_type:
raise ValueError(f"Variable x has type {x._vartype} instead of BINARY")
else:
for s in x._proxy_symbols:
if s._vartype != VarType.BINARY:
# Show warning only
LOGGER.warn(
f"Variable {s.name} has type {s._vartype}, expression is assumed to be binary"
)
break
# Sum the binary variables along the specified axis
Z = x.sum(axis=axis)
# Create a new binary variable to represent the XOR result
Xor = self.Variable(varname, Z.shape, 0, 1, vartype=VarType.BINARY)
# Introduce an integer variable to model the floor division
K = self.Variable(varname + "_k", Z.shape, 0, None, vartype=VarType.INTEGER)
# Add the constraint that Z - 2*K - Xor == 0
constraints = [Z - 2 * K - Xor == 0]
return self.Problem(constraints)

def vstack(self, arg_list: Iterable[CExpression]) -> CExpression:
v = None
for a in arg_list:
Expand Down
9 changes: 8 additions & 1 deletion corneto/backend/_picos_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ def _infer_shape(c: Any):
if hasattr(c, "_proxy_symbols"):
for s in c._proxy_symbols:
# If there is some original symbol used in the expression
# with ndim 1, we assume the original shape was 1D.
# with ndim 1, and the current shape contains a dimension
# with a 1, then we assume the original shape was 1D.
s_shape = _get_shape(s)
if len(s_shape) == 1:
if shape[0] == 1:
return (shape[1],)
return (shape[0],)
return shape

Expand All @@ -52,6 +55,10 @@ class PicosExpression(CExpression):
def __init__(self, expr: Any, symbols: Optional[Set["CSymbol"]] = None) -> None:
super().__init__(expr, symbols)

@property
def shape(self) -> Tuple[int, ...]:
return _infer_shape(self)

def _create_proxy_expr(
self, expr: Any, symbols: Optional[Set["CSymbol"]] = None
) -> "PicosExpression":
Expand Down
135 changes: 134 additions & 1 deletion corneto/methods/carnival.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import corneto as cn
from corneto._graph import BaseGraph, Graph
from corneto._settings import LOGGER
from corneto._settings import LOGGER, sparsify
from corneto.methods.signal._util import (
get_incidence_matrices_of_edges,
get_interactions,
Expand Down Expand Up @@ -483,6 +483,136 @@ def bfs_search(
return selected_edges, paths, stats


def create_flow_carnival_v4(
G,
exp_list,
lambd=0.2,
exclusive_vertex_values=True,
upper_bound_flow=1000,
penalty_on="signal", # or "flow"
slack_reg=False,
backend=cn.DEFAULT_BACKEND,
):
At, Ah = get_incidence_matrices_of_edges(G)
interaction = get_interactions(G)

# Create a single unique flow. The flow selects the
# subset of the PKN that will contain all signal propagations.

# NOTE: increased UB flow since we dont have indicator, fractional positive flows <1
# will block signal in this case. To verify if this is a problem. The UB corresponds
# to the value of the Big-M in the constraints.
P = backend.Flow(G, ub=upper_bound_flow)
if penalty_on == "flow":
P += cn.Indicator()

Eact = backend.Variable(
"edge_activates", (G.num_edges, len(exp_list)), vartype=cn.VarType.BINARY
)
Einh = backend.Variable(
"edge_inhibits", (G.num_edges, len(exp_list)), vartype=cn.VarType.BINARY
)

# Edge cannot activate and inhibit at the same time
P += Eact + Einh <= 1

# The value of a vertex is the difference of the positive and negative incoming edges
Va = At @ Eact
Vi = At @ Einh
V = Va - Vi

if exclusive_vertex_values:
# otherwise a vertex can be both active and inactive through diff. paths
# NOTE: Seems to increase the gap of the relaxated problem. Slower than
# the formulations using indicator variables.
P += Va + Vi <= 1
P.register("vertex_value", V)
P.register("vertex_inhibited", Vi)
P.register("vertex_activated", Va)
P.register("edge_value", Eact - Einh)
P.register("edge_has_signal", Eact + Einh)

# Add acyclic constraints on the edge_has_signal (signal)
P = backend.Acyclic(G, P, indicator_positive_var_name="edge_has_signal")

edges_with_head = np.flatnonzero(np.sum(np.abs(Ah), axis=0) > 0)

# Extend flows (M,) to (M, N) where N is the num of experiments
F = P.expr.flow.reshape((Eact.shape[0], 1)) @ np.ones((1, len(exp_list)))
# Fi = P.expr._flow_i.reshape((Eact.shape[0], 1)) @ np.ones((1, len(exp_list)))

# If no flow, no signal (signal cannot circulate in a non-selected subgraph)
P += Eact + Einh <= F

Int = sparsify(
np.reshape(interaction, (interaction.shape[0], 1)) @ np.ones((1, len(exp_list)))
)

# Sum incoming signals for edges with head (for all samples)
# An edge can only be active in two situations:
# - the head vertex is active and the edge activatory
# - or the head vertex is inactive and the edge inhibitory
sum_upstream_act = (Ah.T @ Va)[edges_with_head, :].multiply(
Int[edges_with_head, :] > 0
)
sum_upstream_inh = (Ah.T @ Vi)[edges_with_head, :].multiply(
Int[edges_with_head, :] < 0
)
P += Eact[edges_with_head, :] <= sum_upstream_act + sum_upstream_inh
# The opposite for inhibition
sum_upstream_act = (Ah.T @ Va)[edges_with_head, :].multiply(
Int[edges_with_head, :] < 0
)
sum_upstream_inh = (Ah.T @ Vi)[edges_with_head, :].multiply(
Int[edges_with_head, :] > 0
)
P += Einh[edges_with_head, :] <= sum_upstream_act + sum_upstream_inh

vertex_indexes = np.array(
[G.V.index(v) for exp in exp_list for v in exp_list[exp]["input"]]
)
perturbation_values = np.array(
[val for exp in exp_list for val in exp_list[exp]["input"].values()]
)

# Set the perturbations to the given values
P += V[vertex_indexes, :] == perturbation_values[:, None]
for i, exp in enumerate(exp_list):
# measuremenents:
m_nodes = list(exp_list[exp]["output"].keys())
m_values = np.array(list(exp_list[exp]["output"].values()))
m_nodes_positions = [G.V.index(key) for key in m_nodes]

# Count the negative and the positive parts of the measurements
# Compute the error and add it to the objective function
error = (1 - V[m_nodes_positions, i].multiply(np.sign(m_values))).multiply(
abs(m_values)
)
P.add_objectives(sum(error))

if lambd > 0:
obj = None
if penalty_on == "signal":
P += cn.opt.linear_or(Eact + Einh, axis=1, varname="Y")
obj = sum(P.expr.Y)
elif penalty_on == "flow":
obj = sum(P.expr._flow_i)
else:
raise ValueError(
f"Invalid penalty_on={penalty_on} not valid. Only signal or flow are supported."
)
if slack_reg:
s = backend.Variable(lb=0, ub=G.num_edges)
max_edges = G.num_edges - s
P += obj <= max_edges
P.add_objectives(max_edges, weights=lambd)
else:
P.add_objectives(obj, weights=lambd)
else:
P.add_objectives(0)
return P


def create_flow_carnival_v3(
G,
exp_list,
Expand Down Expand Up @@ -579,6 +709,9 @@ def create_flow_carnival_v3(
<= Z[m_nodes_positions, iexp]
)

# Compute the error and add it to the objective function
# error = abs(m_values) * (1 - np.sign(m_values) * V[m_nodes_positions, iexp])

P.add_objectives(sum(Z[m_nodes_positions, iexp].multiply(abs(m_values))))
if lambd > 0:
P += cn.opt.linear_or(
Expand Down
7 changes: 7 additions & 0 deletions corneto/methods/signal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import warnings

warnings.warn(
"The 'signal' package is deprecated and will be removed in a future release.",
DeprecationWarning,
stacklevel=2,
)
Loading

0 comments on commit 8aaceaa

Please sign in to comment.