Skip to content

Commit

Permalink
add ppnet & upgrade backbone
Browse files Browse the repository at this point in the history
  • Loading branch information
yangxudong committed Nov 28, 2023
1 parent b7d9948 commit b8b9668
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 55 deletions.
35 changes: 12 additions & 23 deletions easy_rec/python/layers/backbone.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def __init__(self, config, features, input_layer, l2_reg=None):
self._l2_reg = l2_reg
self._dag = DAG()
self._name_to_blocks = {}
self.loss_dict = {}
self._name_to_layer = {}
self.reset_input_config(None)
self._block_outputs = {}
Expand Down Expand Up @@ -183,7 +182,7 @@ def has_block(self, name):
def block_outputs(self, name):
return self._block_outputs.get(name, None)

def block_input(self, config, block_outputs, training=None):
def block_input(self, config, block_outputs, training=None, **kwargs):
inputs = []
for input_node in config.inputs:
input_type = input_node.WhichOneof('name')
Expand Down Expand Up @@ -211,9 +210,7 @@ def block_input(self, config, block_outputs, training=None):
fn = eval(input_node.package_input_fn)
pkg_input = fn(pkg_input)
package.set_package_input(pkg_input)
input_feature = package(training)
if len(package.loss_dict) > 0:
self.loss_dict.update(package.loss_dict)
input_feature = package(training, **kwargs)
elif input_name in block_outputs:
input_feature = block_outputs[input_name]
else:
Expand Down Expand Up @@ -258,16 +255,16 @@ def call(self, is_training, **kwargs):
config = self._name_to_blocks[block]
if config.layers: # sequential layers
logging.info('call sequential %d layers' % len(config.layers))
output = self.block_input(config, block_outputs, is_training)
output = self.block_input(config, block_outputs, is_training, **kwargs)
for i, layer in enumerate(config.layers):
name_i = '%s_l%d' % (block, i)
output = self.call_layer(output, layer, name_i, is_training)
output = self.call_layer(output, layer, name_i, is_training, **kwargs)
block_outputs[block] = output
continue
# just one of layer
layer = config.WhichOneof('layer')
if layer is None: # identity layer
output = self.block_input(config, block_outputs, is_training)
output = self.block_input(config, block_outputs, is_training, **kwargs)
block_outputs[block] = output
elif layer == 'input_layer':
input_fn = self._name_to_layer[block]
Expand All @@ -277,18 +274,14 @@ def call(self, is_training, **kwargs):
input_fn.reset(input_config, is_training)
block_outputs[block] = input_fn(input_config, is_training)
else:
inputs = self.block_input(config, block_outputs, is_training)
output = self.call_layer(inputs, config, block, is_training)
inputs = self.block_input(config, block_outputs, is_training, **kwargs)
output = self.call_layer(inputs, config, block, is_training, **kwargs)
block_outputs[block] = output

outputs = []
for output in self._config.concat_blocks:
if output in block_outputs:
temp = block_outputs[output]
# if type(temp) in (tuple, list):
# outputs.extend(temp)
# else:
# outputs.append(temp)
outputs.append(temp)
else:
raise ValueError('No output `%s` of backbone to be concat' % output)
Expand Down Expand Up @@ -345,11 +338,10 @@ def load_keras_layer(self, layer_conf, name, reuse=None):
layer = layer_cls(*args, name=name)
return layer, customize

def call_keras_layer(self, inputs, name, training):
def call_keras_layer(self, inputs, name, training, **kwargs):
"""Call predefined Keras Layer, which can be reused."""
layer, customize = self._name_to_layer[name]
cls = layer.__class__.__name__
kwargs = {'loss_dict': self.loss_dict}
if customize:
output = layer(inputs, training=training, **kwargs)
else:
Expand All @@ -361,10 +353,10 @@ def call_keras_layer(self, inputs, name, training):
output = layer(inputs)
return output

def call_layer(self, inputs, config, name, training):
def call_layer(self, inputs, config, name, training, **kwargs):
layer_name = config.WhichOneof('layer')
if layer_name == 'keras_layer':
return self.call_keras_layer(inputs, name, training)
return self.call_keras_layer(inputs, name, training, **kwargs)
if layer_name == 'lambda':
conf = getattr(config, 'lambda')
fn = eval(conf.expression)
Expand All @@ -375,7 +367,7 @@ def call_layer(self, inputs, config, name, training):
outputs = []
for i in range(n_loop):
name_i = '%s_%d' % (name, i)
output = self.call_keras_layer(inputs, name_i, training)
output = self.call_keras_layer(inputs, name_i, training, **kwargs)
outputs.append(output)
if len(outputs) == 1:
return outputs[0]
Expand All @@ -392,7 +384,7 @@ def call_layer(self, inputs, config, name, training):
output = inputs
for i in range(conf.num_steps):
name_i = '%s_%d' % (name, i)
output_i = self.call_keras_layer(output, name_i, training)
output_i = self.call_keras_layer(output, name_i, training, **kwargs)
if fixed_input_index >= 0:
j = 0
for idx in range(len(output)):
Expand Down Expand Up @@ -421,7 +413,6 @@ class Backbone(object):
def __init__(self, config, features, input_layer, l2_reg=None):
self._config = config
self._l2_reg = l2_reg
self.loss_dict = {}
main_pkg = backbone_pb2.BlockPackage()
main_pkg.name = 'backbone'
main_pkg.blocks.MergeFrom(config.blocks)
Expand All @@ -432,8 +423,6 @@ def __init__(self, config, features, input_layer, l2_reg=None):

def __call__(self, is_training, **kwargs):
output = self._main_pkg(is_training, **kwargs)
if len(self._main_pkg.loss_dict) > 0:
self.loss_dict = self._main_pkg.loss_dict

if self._config.HasField('top_mlp'):
params = Parameter.make_from_pb(self._config.top_mlp)
Expand Down
2 changes: 1 addition & 1 deletion easy_rec/python/layers/keras/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .auxiliary_loss import AuxiliaryLoss
from .blocks import MLP
from .blocks import Gate
from .blocks import Highway
Expand All @@ -11,6 +10,7 @@
from .interaction import FM
from .interaction import Cross
from .interaction import DotInteraction
from .auxiliary_loss import AuxiliaryLoss
from .mask_net import MaskBlock
from .mask_net import MaskNet
from .multi_task import MMoE
Expand Down
4 changes: 2 additions & 2 deletions easy_rec/python/layers/keras/multi_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self, params, name='MMoE', reuse=None, **kwargs):
expert_params = params.expert_mlp
self._has_experts = True
self._experts = [
MLP(expert_params, 'expert_%d' % i, reuse=reuse)
for i in range(self._num_expert)
MLP(expert_params, 'expert_%d' % i, reuse=reuse)
for i in range(self._num_expert)
]
else:
self._has_experts = False
Expand Down
40 changes: 25 additions & 15 deletions easy_rec/python/layers/keras/ppnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@


class GateNN(tf.keras.layers.Layer):
def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwargs):

def __init__(self,
params,
output_units=None,
name='gate_nn',
reuse=None,
**kwargs):
super(GateNN, self).__init__(name=name, **kwargs)
output_dim = output_units if output_units is not None else params.output_dim
hidden_dim = params.get_or_default('hidden_dim', output_dim)
Expand All @@ -24,15 +30,15 @@ def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwar

self._sub_layers = []
dense = tf.keras.layers.Dense(
units=hidden_dim,
use_bias=not do_batch_norm,
kernel_initializer=initializer,
name=name)
units=hidden_dim,
use_bias=not do_batch_norm,
kernel_initializer=initializer,
name=name)
self._sub_layers.append(dense)

if do_batch_norm:
bn = tf.keras.layers.BatchNormalization(
name='%s/bn' % name, trainable=True)
name='%s/bn' % name, trainable=True)
self._sub_layers.append(bn)

act_layer = activation_layer(activation)
Expand All @@ -45,11 +51,11 @@ def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwar
raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate)

dense = tf.keras.layers.Dense(
units=output_dim,
activation='sigmoid',
use_bias=not do_batch_norm,
kernel_initializer=initializer,
name=name)
units=output_dim,
activation='sigmoid',
use_bias=not do_batch_norm,
kernel_initializer=initializer,
name=name)
self._sub_layers.append(dense)
self._sub_layers.append(lambda x: x * 2)

Expand Down Expand Up @@ -80,6 +86,7 @@ class PPNet(tf.keras.layers.Layer):
def __init__(self, params, name='ppnet', reuse=None, **kwargs):
super(PPNet, self).__init__(name=name, **kwargs)
params.check_required('mlp')
self.full_gate_input = params.get_or_default('full_gate_input', True)
mode = params.get_or_default('mode', 'lazy')
gate_params = params.gate_params
params = params.mlp
Expand Down Expand Up @@ -113,7 +120,8 @@ def __init__(self, params, name='ppnet', reuse=None, **kwargs):
self.add_rich_layer(num_units, use_bn, drop_rate, activation, initializer,
use_bias, use_bn_after_act, name,
params.l2_regularizer)
self._sub_layers.append(GateNN(gate_params, num_units, 'gate_%d' % (i + 1)))
self._sub_layers.append(
GateNN(gate_params, num_units, 'gate_%d' % (i + 1)))

n = len(units) - 1
drop_rate = dropout_rate[n] if num_dropout > n else 0.0
Expand All @@ -122,7 +130,8 @@ def __init__(self, params, name='ppnet', reuse=None, **kwargs):
initializer, use_final_bias, use_bn_after_act, name,
params.l2_regularizer)
if mode == 'lazy':
self._sub_layers.append(GateNN(gate_params, units[-1], 'gate_%d' % (n + 1)))
self._sub_layers.append(
GateNN(gate_params, units[-1], 'gate_%d' % (n + 1)))

def add_rich_layer(self,
num_units,
Expand Down Expand Up @@ -169,7 +178,9 @@ def add_rich_layer(self,
def call(self, inputs, training=None, **kwargs):
"""Performs the forward computation of the block."""
x, gate_input = inputs
gate_input = tf.concat([tf.stop_gradient(x), gate_input], axis=-1)
if self.full_gate_input:
gate_input = tf.concat([tf.stop_gradient(x), gate_input], axis=-1)

for layer in self._sub_layers:
cls = layer.__class__.__name__
if cls == 'GateNN':
Expand All @@ -182,4 +193,3 @@ def call(self, inputs, training=None, **kwargs):
else:
x = layer(x)
return x

14 changes: 8 additions & 6 deletions easy_rec/python/model/easy_rec_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self,
self._labels = labels
self._prediction_dict = {}
self._loss_dict = {}
self._metric_dict = {}

# add sample weight from inputs
self._sample_weight = 1.0
Expand Down Expand Up @@ -88,10 +89,12 @@ def backbone(self):
if self._backbone_output:
return self._backbone_output
if self._backbone_net:
self._backbone_output = self._backbone_net(self._is_training)
loss_dict = self._backbone_net.loss_dict
self._loss_dict.update(loss_dict)
return self._backbone_output
kwargs = {
'loss_dict': self._loss_dict,
'metric_dict': self._metric_dict,
constant.SAMPLE_WEIGHT: self._sample_weight
}
return self._backbone_net(self._is_training, **kwargs)
return None

@property
Expand Down Expand Up @@ -142,9 +145,8 @@ def build_predict_graph(self):
def build_loss_graph(self):
pass

@abstractmethod
def build_metric_graph(self, eval_config):
pass
return self._metric_dict

@abstractmethod
def get_outputs(self):
Expand Down
5 changes: 2 additions & 3 deletions easy_rec/python/model/multi_task_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,20 @@ def _add_to_prediction_dict(self, output):

def build_metric_graph(self, eval_config):
"""Build metric graph for multi task model."""
metric_dict = {}
for task_tower_cfg in self._task_towers:
tower_name = task_tower_cfg.tower_name
for metric in task_tower_cfg.metrics_set:
loss_types = {task_tower_cfg.loss_type}
if len(task_tower_cfg.losses) > 0:
loss_types = {loss.loss_type for loss in task_tower_cfg.losses}
metric_dict.update(
self._metric_dict.update(
self._build_metric_impl(
metric,
loss_type=loss_types,
label_name=self._label_name_dict[tower_name],
num_class=task_tower_cfg.num_class,
suffix='_%s' % tower_name))
return metric_dict
return self._metric_dict

def build_loss_weight(self):
loss_weights = OrderedDict()
Expand Down
5 changes: 2 additions & 3 deletions easy_rec/python/model/rank_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,17 @@ def _build_metric_impl(self,
return metric_dict

def build_metric_graph(self, eval_config):
metric_dict = {}
loss_types = {self._loss_type}
if len(self._losses) > 0:
loss_types = {loss.loss_type for loss in self._losses}
for metric in eval_config.metrics_set:
metric_dict.update(
self._metric_dict.update(
self._build_metric_impl(
metric,
loss_type=loss_types,
label_name=self._label_name,
num_class=self._num_class))
return metric_dict
return self._metric_dict

def _get_outputs_impl(self, loss_type, num_class=1, suffix=''):
binary_loss_set = {
Expand Down
18 changes: 17 additions & 1 deletion easy_rec/python/protos/layer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto2";
package protos;

import "easy_rec/python/protos/dnn.proto";
import "easy_rec/python/protos/eval.proto";
import "easy_rec/python/protos/loss.proto";

message HighWayTower {
optional string input = 1;
Expand Down Expand Up @@ -85,4 +87,18 @@ message PPNet {
required GateNN gate_params = 2;
// run mode: eager, lazy
required string mode = 3 [default = 'eager'];
}
optional bool full_gate_input = 4 [default = true];
}

message LossTower {
// label for the task, default is label_fields by order
required string label_name = 2;
// metrics for the task
repeated EvalMetrics metrics_set = 3;
// num_class for multi-class classification loss
optional uint32 num_class = 4 [default = 1];
// multiple losses
repeated Loss losses = 5;
// whether to use sample weight in this tower
required bool use_sample_weight = 6 [default = false];
}
2 changes: 1 addition & 1 deletion easy_rec/python/tools/add_feature_info_to_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def main(argv):
except common_io.exception.OutOfRangeException:
reader.close()
break

feature_configs = config_util.get_compatible_feature_configs(pipeline_config)
if drop_feature_names:
tmp_feature_configs = feature_configs[:]
Expand Down

0 comments on commit b8b9668

Please sign in to comment.