Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Review] Allow lbfgs and admm with dask cupy inputs #89

Merged
merged 11 commits into from
Nov 16, 2020
25 changes: 17 additions & 8 deletions dask_glm/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import numpy as np
import dask.array as da
from scipy.optimize import fmin_l_bfgs_b
from dask.array.utils import normalize_to_array


from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client
from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client, maybe_to_cupy
from dask_glm.families import Logistic
from dask_glm.regularizers import Regularizer

Expand Down Expand Up @@ -225,14 +225,22 @@ def admm(X, y, regularizer='l1', lamduh=0.1, rho=1, over_relax=1,
def create_local_gradient(func):
@functools.wraps(func)
def wrapped(beta, X, y, z, u, rho):
return func(beta, X, y) + rho * (beta - z + u)
beta = maybe_to_cupy(beta, X)
z = maybe_to_cupy(z, X)
u = maybe_to_cupy(u, X)
res = func(beta, X, y) + rho * (beta - z + u)
return normalize_to_array(res)
return wrapped

def create_local_f(func):
@functools.wraps(func)
def wrapped(beta, X, y, z, u, rho):
return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
beta = maybe_to_cupy(beta, X)
z = maybe_to_cupy(z, X)
u = maybe_to_cupy(u, X)
res = func(beta, X, y) + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
return normalize_to_array(res)
return wrapped

f = create_local_f(pointwise_loss)
Expand Down Expand Up @@ -286,7 +294,7 @@ def wrapped(beta, X, y, z, u, rho):
if primal_res < eps_pri and dual_res < eps_dual:
break

return z
return maybe_to_cupy(z, X)


def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b):
Expand Down Expand Up @@ -339,19 +347,20 @@ def lbfgs(X, y, regularizer=None, lamduh=1.0, max_iter=100, tol=1e-4,
beta0 = np.zeros(p)

def compute_loss_grad(beta, X, y):
beta = maybe_to_cupy(beta, X)
scatter_beta = scatter_array(
beta, dask_distributed_client) if dask_distributed_client else beta
loss_fn = pointwise_loss(scatter_beta, X, y)
gradient_fn = pointwise_gradient(scatter_beta, X, y)
loss, gradient = compute(loss_fn, gradient_fn)
return loss, gradient.copy()
return normalize_to_array(loss), normalize_to_array(gradient.copy())

with dask.config.set(fuse_ave_width=0): # optimizations slows this down
beta, loss, info = fmin_l_bfgs_b(
compute_loss_grad, beta0, fprime=None,
args=(X, y),
iprint=(verbose > 0) - 1, pgtol=tol, maxiter=max_iter)

beta = maybe_to_cupy(beta, X)
return beta


Expand Down
9 changes: 8 additions & 1 deletion dask_glm/tests/test_admm.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ def wrapped(beta, X, y, z, u, rho):
@pytest.mark.parametrize('N', [1000, 10000])
@pytest.mark.parametrize('nchunks', [5, 10])
@pytest.mark.parametrize('p', [1, 5, 10])
def test_admm_with_large_lamduh(N, p, nchunks):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_admm_with_large_lamduh(N, p, nchunks, is_cupy):
X = da.random.random((N, p), chunks=(N // nchunks, p))
beta = np.random.random(p)
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very convoluted, the best way to generate X is:

cupy = pytest.importorskip('cupy')
xp = cupy if is_cupy else np
rs = da.random.RandomState(RandomState=xp.random.RandomState)
X = rs.random((N, p), chunks=(N // nchunks, p))

It seems that make_y could also be changed to something such as:

def make_y(X, beta=np.array([1.5, -3]), chunks=2, rs=da.random.RandomState(RandomState=np.random.RandomState)):
    n, p = X.shape
    z0 = X.dot(beta)
    y = rs.random(z0.shape, chunks=z0.chunks) < sigmoid(z0)
    return y

And then you'd call it similar to:

y = make_y(X, beta=xp.array(beta), chunks=(N // nchunks,), rs=rs)

NOTE: I didn't actually test the code above, so it may need some fixing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to be using this in 10 or so different tests, it's probably a good idea to write a small util to avoid copying it all back everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review! I notice that X and y are generated in different ways in the current tests. The following utils are used:

  • X, y = make_intercept_data(N, p, seed=seed)
  • X = da.random.random and y = make_y(...)
  • X, y = make_classification(...)
  • X, y = make_regression(...)
  • X, y = make_poisson(...)

I would like cupy inputs to be created using the same functions. Should I go into each of the utils above and add is_cupy branch? What I did currently is like just using one util function to convert the dask array to cupy after they are created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, maybe this is indeed too much work for very little benefit. With that said, I don't necessarily oppose to just leaving things as they are now, I'll defer to @mrocklin on whether he thinks it's acceptable. The only real downside I see with this is with people eventually referring to this code and thinking this is a good way to handle such cases, for tests this is ok but for real-world application this is not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I wrapped up the repeated code in a util function to make it look nicer.

X, y = persist(X, y)
z = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=500)

Expand Down
41 changes: 36 additions & 5 deletions dask_glm/tests/test_algos_families.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
gradient_descent, admm)
from dask_glm.families import Logistic, Normal, Poisson
from dask_glm.regularizers import Regularizer
from dask_glm.utils import sigmoid, make_y
from dask_glm.utils import sigmoid, make_y, maybe_to_cupy


def add_l1(f, lam):
Expand Down Expand Up @@ -46,8 +46,15 @@ def make_intercept_data(N, p, seed=20009):
[(100, 2, 20009),
(250, 12, 90210),
(95, 6, 70605)])
def test_methods(N, p, seed, opt):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_methods(N, p, seed, opt, is_cupy):
X, y = make_intercept_data(N, p, seed=seed)
if is_cupy:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment above.

coefs = opt(X, y)
p = sigmoid(X.dot(coefs).compute())

Expand All @@ -64,16 +71,25 @@ def test_methods(N, p, seed, opt):
@pytest.mark.parametrize('N', [1000])
@pytest.mark.parametrize('nchunks', [1, 10])
@pytest.mark.parametrize('family', [Logistic, Normal, Poisson])
def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_basic_unreg_descent(func, kwargs, N, nchunks, family, is_cupy):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

if is_cupy:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


X, y = persist(X, y)

result = func(X, y, family=family, **kwargs)
test_vec = np.random.normal(size=2)
test_vec = maybe_to_cupy(test_vec, X)

opt = family.pointwise_loss(result, X, y).compute()
test_val = family.pointwise_loss(test_vec, X, y).compute()
Expand All @@ -90,16 +106,24 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family):
@pytest.mark.parametrize('family', [Logistic, Normal, Poisson])
@pytest.mark.parametrize('lam', [0.01, 1.2, 4.05])
@pytest.mark.parametrize('reg', [r() for r in Regularizer.__subclasses__()])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg, is_cupy):
beta = np.random.normal(size=2)
M = len(beta)
X = da.random.random((N, M), chunks=(N // nchunks, M))
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))
if is_cupy:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


X, y = persist(X, y)

result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
test_vec = np.random.normal(size=2)
test_vec = maybe_to_cupy(test_vec, X)

f = reg.add_reg_f(family.pointwise_loss, lam)

Expand All @@ -120,8 +144,15 @@ def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):
'threading',
'multiprocessing'
])
def test_determinism(func, kwargs, scheduler):
@pytest.mark.parametrize('is_cupy', [True, False])
def test_determinism(func, kwargs, scheduler, is_cupy):
X, y = make_intercept_data(1000, 10)
if is_cupy:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


with dask.config.set(scheduler=scheduler):
a = func(X, y, **kwargs)
Expand Down
54 changes: 46 additions & 8 deletions dask_glm/tests/test_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,39 @@ def test_pr_init(solver):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_fit(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_fit(fit_intercept, is_sparse, is_cupy):
X, y = make_classification(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse)

if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


lr = LogisticRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
lr.predict_proba(X)


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_lm(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_lm(fit_intercept, is_sparse, is_cupy):
X, y = make_regression(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

lr = LinearRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
Expand All @@ -65,10 +85,19 @@ def test_lm(fit_intercept, is_sparse):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_big(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_big(fit_intercept, is_sparse, is_cupy):
with dask.config.set(scheduler='synchronous'):
X, y = make_classification(is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

lr = LogisticRegression(fit_intercept=fit_intercept)
lr.fit(X, y)
lr.predict(X)
Expand All @@ -78,10 +107,19 @@ def test_big(fit_intercept, is_sparse):


@pytest.mark.parametrize('fit_intercept', [True, False])
@pytest.mark.parametrize('is_sparse', [True, False])
def test_poisson_fit(fit_intercept, is_sparse):
@pytest.mark.parametrize('is_sparse,is_cupy', [
(True, False),
(False, False),
(False, True)])
def test_poisson_fit(fit_intercept, is_sparse, is_cupy):
with dask.config.set(scheduler='synchronous'):
X, y = make_poisson(is_sparse=is_sparse)
if is_cupy and not is_sparse:
cupy = pytest.importorskip('cupy')
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

pr = PoissonRegression(fit_intercept=fit_intercept)
pr.fit(X, y)
pr.predict(X)
Expand Down
15 changes: 13 additions & 2 deletions dask_glm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def sigmoid(x):

@dispatch(object)
def exp(A):
return A.exp()
return np.exp(A)


@dispatch(float)
Expand Down Expand Up @@ -91,7 +91,7 @@ def sign(A):

@dispatch(object)
def log1p(A):
return A.log1p()
return np.log1p(A)


@dispatch(np.ndarray)
Expand Down Expand Up @@ -205,3 +205,14 @@ def get_distributed_client():
return get_client()
except ValueError:
return None


def maybe_to_cupy(beta, X):
""" convert beta, a numpy array, to a cupy array
if X is a cupy array or dask cupy array
"""
if "cupy" in str(type(X)) or \
hasattr(X, '_meta') and 'cupy' in str(type(X._meta)):
import cupy
return cupy.asarray(beta)
return beta