Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Review] Allow lbfgs and admm with dask cupy inputs #89

Merged
merged 11 commits into from
Nov 16, 2020
25 changes: 17 additions & 8 deletions dask_glm/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import numpy as np
import dask.array as da
from scipy.optimize import fmin_l_bfgs_b
from dask.array.utils import normalize_to_array


from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client
from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client, maybe_to_cupy
from dask_glm.families import Logistic
from dask_glm.regularizers import Regularizer

Expand Down Expand Up @@ -225,14 +225,22 @@ def admm(X, y, regularizer='l1', lamduh=0.1, rho=1, over_relax=1,
def create_local_gradient(func):
@functools.wraps(func)
def wrapped(beta, X, y, z, u, rho):
return func(beta, X, y) + rho * (beta - z + u)
beta = maybe_to_cupy(beta, X)
z = maybe_to_cupy(z, X)
u = maybe_to_cupy(u, X)
res = func(beta, X, y) + rho * (beta - z + u)
return normalize_to_array(res)
return wrapped

def create_local_f(func):
@functools.wraps(func)
def wrapped(beta, X, y, z, u, rho):
return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
beta = maybe_to_cupy(beta, X)
z = maybe_to_cupy(z, X)
u = maybe_to_cupy(u, X)
res = func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
return normalize_to_array(res)
return wrapped

f = create_local_f(pointwise_loss)
Expand Down Expand Up @@ -286,7 +294,7 @@ def wrapped(beta, X, y, z, u, rho):
if primal_res < eps_pri and dual_res < eps_dual:
break

return z
return maybe_to_cupy(z, X)


def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b):
Expand Down Expand Up @@ -339,19 +347,20 @@ def lbfgs(X, y, regularizer=None, lamduh=1.0, max_iter=100, tol=1e-4,
beta0 = np.zeros(p)

def compute_loss_grad(beta, X, y):
beta = maybe_to_cupy(beta, X)
scatter_beta = scatter_array(
beta, dask_distributed_client) if dask_distributed_client else beta
loss_fn = pointwise_loss(scatter_beta, X, y)
gradient_fn = pointwise_gradient(scatter_beta, X, y)
loss, gradient = compute(loss_fn, gradient_fn)
return loss, gradient.copy()
return normalize_to_array(loss), normalize_to_array(gradient.copy())

with dask.config.set(fuse_ave_width=0): # optimizations slows this down
beta, loss, info = fmin_l_bfgs_b(
compute_loss_grad, beta0, fprime=None,
args=(X, y),
iprint=(verbose > 0) - 1, pgtol=tol, maxiter=max_iter)

beta = maybe_to_cupy(beta, X)
return beta


Expand Down
9 changes: 7 additions & 2 deletions dask_glm/tests/test_admm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dask_glm.algorithms import admm, local_update
from dask_glm.families import Logistic, Normal
from dask_glm.regularizers import L1
from dask_glm.utils import make_y
from dask_glm.utils import make_y, to_dask_cupy_array_xy


@pytest.mark.parametrize('N', [1000, 10000])
Expand Down Expand Up @@ -46,11 +46,16 @@ def wrapped(beta, X, y, z, u, rho):
@pytest.mark.parametrize('N', [1000, 10000])
@pytest.mark.parametrize('nchunks', [5, 10])
@pytest.mark.parametrize('p', [1, 5, 10])
def test_admm_with_large_lamduh(N, p, nchunks):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_admm_with_large_lamduh(N, p, nchunks, is_cupy):
X = da.random.random((N, p), chunks=(N // nchunks, p))
beta = np.random.random(p)
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

X, y = persist(X, y)
z = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=500)

Expand Down
33 changes: 28 additions & 5 deletions dask_glm/tests/test_algos_families.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
gradient_descent, admm)
from dask_glm.families import Logistic, Normal, Poisson
from dask_glm.regularizers import Regularizer
from dask_glm.utils import sigmoid, make_y
from dask_glm.utils import (sigmoid, make_y, maybe_to_cupy,
to_dask_cupy_array_xy)


def add_l1(f, lam):
Expand Down Expand Up @@ -46,8 +47,14 @@ def make_intercept_data(N, p, seed=20009):
[(100, 2, 20009),
(250, 12, 90210),
(95, 6, 70605)])
def test_methods(N, p, seed, opt):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_methods(N, p, seed, opt, is_cupy):
X, y = make_intercept_data(N, p, seed=seed)

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

coefs = opt(X, y)
p = sigmoid(X.dot(coefs).compute())

Expand All @@ -64,16 +71,22 @@ def test_methods(N, p, seed, opt):
@pytest.mark.parametrize('N', [1000])
@pytest.mark.parametrize('nchunks', [1, 10])
@pytest.mark.parametrize('family', [Logistic, Normal, Poisson])
def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_basic_unreg_descent(func, kwargs, N, nchunks, family, is_cupy):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

X, y = persist(X, y)

result = func(X, y, family=family, **kwargs)
test_vec = np.random.normal(size=2)
test_vec = maybe_to_cupy(test_vec, X)

opt = family.pointwise_loss(result, X, y).compute()
test_val = family.pointwise_loss(test_vec, X, y).compute()
Expand All @@ -90,16 +103,22 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
@pytest.mark.parametrize('family', [Logistic, Normal, Poisson])
@pytest.mark.parametrize('lam', [0.01, 1.2, 4.05])
@pytest.mark.parametrize('reg', [r() for r in Regularizer.__subclasses__()])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg, is_cupy):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

X, y = persist(X, y)

result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
test_vec = np.random.normal(size=2)
test_vec = maybe_to_cupy(test_vec, X)

f = reg.add_reg_f(family.pointwise_loss, lam)

Expand All @@ -120,8 +139,12 @@ def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
'threading',
'multiprocessing'
])
def test_determinism(func, kwargs, scheduler):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_determinism(func, kwargs, scheduler, is_cupy):
X, y = make_intercept_data(1000, 10)
if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

with dask.config.set(scheduler=scheduler):
a = func(X, y, **kwargs)
Expand Down
43 changes: 35 additions & 8 deletions dask_glm/tests/test_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dask_glm.estimators import LogisticRegression, LinearRegression, PoissonRegression
from dask_glm.datasets import make_classification, make_regression, make_poisson
from dask_glm.regularizers import Regularizer
from dask_glm.utils import to_dask_cupy_array_xy


@pytest.fixture(params=[r() for r in Regularizer.__subclasses__()])
Expand Down Expand Up @@ -44,19 +45,33 @@ def test_pr_init(solver):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_fit(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_fit(fit_intercept, is_sparse, is_cupy):
X, y = make_classification(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse)

if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

lr = LogisticRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
lr.predict_proba(X)


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_lm(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_lm(fit_intercept, is_sparse, is_cupy):
X, y = make_regression(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)
lr = LinearRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
Expand All @@ -65,10 +80,16 @@ def test_lm(fit_intercept, is_sparse):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_big(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_big(fit_intercept, is_sparse, is_cupy):
with dask.config.set(scheduler='synchronous'):
X, y = make_classification(is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)
lr = LogisticRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
Expand All @@ -78,10 +99,16 @@ def test_big(fit_intercept, is_sparse):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_poisson_fit(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_poisson_fit(fit_intercept, is_sparse, is_cupy):
with dask.config.set(scheduler='synchronous'):
X, y = make_poisson(is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)
pr = PoissonRegression(fit_intercept=fit_intercept)
pr.fit(X, y)
pr.predict(X)
Expand Down
26 changes: 24 additions & 2 deletions dask_glm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def sigmoid(x):

@dispatch(object)
def exp(A):
return A.exp()
return np.exp(A)


@dispatch(float)
Expand Down Expand Up @@ -91,7 +91,7 @@ def sign(A):

@dispatch(object)
def log1p(A):
return A.log1p()
return np.log1p(A)


@dispatch(np.ndarray)
Expand Down Expand Up @@ -205,3 +205,25 @@ def get_distributed_client():
return get_client()
except ValueError:
return None


def maybe_to_cupy(beta, X):
""" convert beta, a numpy array, to a cupy array
if X is a cupy array or dask cupy array
"""
if "cupy" in str(type(X)) or \
hasattr(X, '_meta') and 'cupy' in str(type(X._meta)):
import cupy
return cupy.asarray(beta)
return beta


def to_dask_cupy_array(X, cupy):
""" convert a dask numpy array to a dask cupy array
"""
return X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))


def to_dask_cupy_array_xy(X, y, cupy):
return to_dask_cupy_array(X, cupy), to_dask_cupy_array(y, cupy)