Skip to content

Commit

Permalink
Merge pull request #89 from daxiongshu/allow_lbfgs_with_cupy
Browse files Browse the repository at this point in the history
[Review] Allow lbfgs and admm with dask cupy inputs
  • Loading branch information
TomAugspurger authored Nov 16, 2020
2 parents 255ee57 + 3a4a262 commit 1daf4c5
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 25 deletions.
25 changes: 17 additions & 8 deletions dask_glm/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import numpy as np
import dask.array as da
from scipy.optimize import fmin_l_bfgs_b
from dask.array.utils import normalize_to_array


from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client
from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client, maybe_to_cupy
from dask_glm.families import Logistic
from dask_glm.regularizers import Regularizer

Expand Down Expand Up @@ -225,14 +225,22 @@ def admm(X, y, regularizer='l1', lamduh=0.1, rho=1, over_relax=1,
def create_local_gradient(func):
@functools.wraps(func)
def wrapped(beta, X, y, z, u, rho):
return func(beta, X, y) + rho * (beta - z + u)
beta = maybe_to_cupy(beta, X)
z = maybe_to_cupy(z, X)
u = maybe_to_cupy(u, X)
res = func(beta, X, y) + rho * (beta - z + u)
return normalize_to_array(res)
return wrapped

def create_local_f(func):
@functools.wraps(func)
def wrapped(beta, X, y, z, u, rho):
return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
beta = maybe_to_cupy(beta, X)
z = maybe_to_cupy(z, X)
u = maybe_to_cupy(u, X)
res = func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
return normalize_to_array(res)
return wrapped

f = create_local_f(pointwise_loss)
Expand Down Expand Up @@ -286,7 +294,7 @@ def wrapped(beta, X, y, z, u, rho):
if primal_res < eps_pri and dual_res < eps_dual:
break

return z
return maybe_to_cupy(z, X)


def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b):
Expand Down Expand Up @@ -339,19 +347,20 @@ def lbfgs(X, y, regularizer=None, lamduh=1.0, max_iter=100, tol=1e-4,
beta0 = np.zeros(p)

def compute_loss_grad(beta, X, y):
beta = maybe_to_cupy(beta, X)
scatter_beta = scatter_array(
beta, dask_distributed_client) if dask_distributed_client else beta
loss_fn = pointwise_loss(scatter_beta, X, y)
gradient_fn = pointwise_gradient(scatter_beta, X, y)
loss, gradient = compute(loss_fn, gradient_fn)
return loss, gradient.copy()
return normalize_to_array(loss), normalize_to_array(gradient.copy())

with dask.config.set(fuse_ave_width=0): # optimizations slows this down
beta, loss, info = fmin_l_bfgs_b(
compute_loss_grad, beta0, fprime=None,
args=(X, y),
iprint=(verbose > 0) - 1, pgtol=tol, maxiter=max_iter)

beta = maybe_to_cupy(beta, X)
return beta


Expand Down
9 changes: 7 additions & 2 deletions dask_glm/tests/test_admm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dask_glm.algorithms import admm, local_update
from dask_glm.families import Logistic, Normal
from dask_glm.regularizers import L1
from dask_glm.utils import make_y
from dask_glm.utils import make_y, to_dask_cupy_array_xy


@pytest.mark.parametrize('N', [1000, 10000])
Expand Down Expand Up @@ -46,11 +46,16 @@ def wrapped(beta, X, y, z, u, rho):
@pytest.mark.parametrize('N', [1000, 10000])
@pytest.mark.parametrize('nchunks', [5, 10])
@pytest.mark.parametrize('p', [1, 5, 10])
def test_admm_with_large_lamduh(N, p, nchunks):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_admm_with_large_lamduh(N, p, nchunks, is_cupy):
X = da.random.random((N, p), chunks=(N // nchunks, p))
beta = np.random.random(p)
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

X, y = persist(X, y)
z = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=500)

Expand Down
33 changes: 28 additions & 5 deletions dask_glm/tests/test_algos_families.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
gradient_descent, admm)
from dask_glm.families import Logistic, Normal, Poisson
from dask_glm.regularizers import Regularizer
from dask_glm.utils import sigmoid, make_y
from dask_glm.utils import (sigmoid, make_y, maybe_to_cupy,
to_dask_cupy_array_xy)


def add_l1(f, lam):
Expand Down Expand Up @@ -46,8 +47,14 @@ def make_intercept_data(N, p, seed=20009):
[(100, 2, 20009),
(250, 12, 90210),
(95, 6, 70605)])
def test_methods(N, p, seed, opt):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_methods(N, p, seed, opt, is_cupy):
X, y = make_intercept_data(N, p, seed=seed)

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

coefs = opt(X, y)
p = sigmoid(X.dot(coefs).compute())

Expand All @@ -64,16 +71,22 @@ def test_methods(N, p, seed, opt):
@pytest.mark.parametrize('N', [1000])
@pytest.mark.parametrize('nchunks', [1, 10])
@pytest.mark.parametrize('family', [Logistic, Normal, Poisson])
def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_basic_unreg_descent(func, kwargs, N, nchunks, family, is_cupy):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

X, y = persist(X, y)

result = func(X, y, family=family, **kwargs)
test_vec = np.random.normal(size=2)
test_vec = maybe_to_cupy(test_vec, X)

opt = family.pointwise_loss(result, X, y).compute()
test_val = family.pointwise_loss(test_vec, X, y).compute()
Expand All @@ -90,16 +103,22 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
@pytest.mark.parametrize('family', [Logistic, Normal, Poisson])
@pytest.mark.parametrize('lam', [0.01, 1.2, 4.05])
@pytest.mark.parametrize('reg', [r() for r in Regularizer.__subclasses__()])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg, is_cupy):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

X, y = persist(X, y)

result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
test_vec = np.random.normal(size=2)
test_vec = maybe_to_cupy(test_vec, X)

f = reg.add_reg_f(family.pointwise_loss, lam)

Expand All @@ -120,8 +139,12 @@ def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
'threading',
'multiprocessing'
])
def test_determinism(func, kwargs, scheduler):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_determinism(func, kwargs, scheduler, is_cupy):
X, y = make_intercept_data(1000, 10)
if is_cupy:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

with dask.config.set(scheduler=scheduler):
a = func(X, y, **kwargs)
Expand Down
43 changes: 35 additions & 8 deletions dask_glm/tests/test_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dask_glm.estimators import LogisticRegression, LinearRegression, PoissonRegression
from dask_glm.datasets import make_classification, make_regression, make_poisson
from dask_glm.regularizers import Regularizer
from dask_glm.utils import to_dask_cupy_array_xy


@pytest.fixture(params=[r() for r in Regularizer.__subclasses__()])
Expand Down Expand Up @@ -44,19 +45,33 @@ def test_pr_init(solver):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_fit(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_fit(fit_intercept, is_sparse, is_cupy):
X, y = make_classification(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse)

if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)

lr = LogisticRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
lr.predict_proba(X)


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_lm(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_lm(fit_intercept, is_sparse, is_cupy):
X, y = make_regression(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)
lr = LinearRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
Expand All @@ -65,10 +80,16 @@ def test_lm(fit_intercept, is_sparse):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_big(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_big(fit_intercept, is_sparse, is_cupy):
with dask.config.set(scheduler='synchronous'):
X, y = make_classification(is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)
lr = LogisticRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
Expand All @@ -78,10 +99,16 @@ def test_big(fit_intercept, is_sparse):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_poisson_fit(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_poisson_fit(fit_intercept, is_sparse, is_cupy):
with dask.config.set(scheduler='synchronous'):
X, y = make_poisson(is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X, y = to_dask_cupy_array_xy(X, y, cupy)
pr = PoissonRegression(fit_intercept=fit_intercept)
pr.fit(X, y)
pr.predict(X)
Expand Down
26 changes: 24 additions & 2 deletions dask_glm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def sigmoid(x):

@dispatch(object)
def exp(A):
return A.exp()
return np.exp(A)


@dispatch(float)
Expand Down Expand Up @@ -91,7 +91,7 @@ def sign(A):

@dispatch(object)
def log1p(A):
return A.log1p()
return np.log1p(A)


@dispatch(np.ndarray)
Expand Down Expand Up @@ -205,3 +205,25 @@ def get_distributed_client():
return get_client()
except ValueError:
return None


def maybe_to_cupy(beta, X):
""" convert beta, a numpy array, to a cupy array
if X is a cupy array or dask cupy array
"""
if "cupy" in str(type(X)) or \
hasattr(X, '_meta') and 'cupy' in str(type(X._meta)):
import cupy
return cupy.asarray(beta)
return beta


def to_dask_cupy_array(X, cupy):
""" convert a dask numpy array to a dask cupy array
"""
return X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))


def to_dask_cupy_array_xy(X, y, cupy):
return to_dask_cupy_array(X, cupy), to_dask_cupy_array(y, cupy)

0 comments on commit 1daf4c5

Please sign in to comment.