From b8b9668d57b75447931a20f38dc25ffeb6cd4791 Mon Sep 17 00:00:00 2001 From: "weisu.yxd" Date: Tue, 28 Nov 2023 16:09:46 +0800 Subject: [PATCH] add ppnet & upgrade backbone --- easy_rec/python/layers/backbone.py | 35 ++++++---------- easy_rec/python/layers/keras/__init__.py | 2 +- easy_rec/python/layers/keras/multi_task.py | 4 +- easy_rec/python/layers/keras/ppnet.py | 40 ++++++++++++------- easy_rec/python/model/easy_rec_model.py | 14 ++++--- easy_rec/python/model/multi_task_model.py | 5 +-- easy_rec/python/model/rank_model.py | 5 +-- easy_rec/python/protos/layer.proto | 18 ++++++++- .../tools/add_feature_info_to_config.py | 2 +- 9 files changed, 70 insertions(+), 55 deletions(-) diff --git a/easy_rec/python/layers/backbone.py b/easy_rec/python/layers/backbone.py index 4a510fa64..0b7de1f38 100644 --- a/easy_rec/python/layers/backbone.py +++ b/easy_rec/python/layers/backbone.py @@ -43,7 +43,6 @@ def __init__(self, config, features, input_layer, l2_reg=None): self._l2_reg = l2_reg self._dag = DAG() self._name_to_blocks = {} - self.loss_dict = {} self._name_to_layer = {} self.reset_input_config(None) self._block_outputs = {} @@ -183,7 +182,7 @@ def has_block(self, name): def block_outputs(self, name): return self._block_outputs.get(name, None) - def block_input(self, config, block_outputs, training=None): + def block_input(self, config, block_outputs, training=None, **kwargs): inputs = [] for input_node in config.inputs: input_type = input_node.WhichOneof('name') @@ -211,9 +210,7 @@ def block_input(self, config, block_outputs, training=None): fn = eval(input_node.package_input_fn) pkg_input = fn(pkg_input) package.set_package_input(pkg_input) - input_feature = package(training) - if len(package.loss_dict) > 0: - self.loss_dict.update(package.loss_dict) + input_feature = package(training, **kwargs) elif input_name in block_outputs: input_feature = block_outputs[input_name] else: @@ -258,16 +255,16 @@ def call(self, is_training, **kwargs): config = self._name_to_blocks[block] if config.layers: # sequential layers logging.info('call sequential %d layers' % len(config.layers)) - output = self.block_input(config, block_outputs, is_training) + output = self.block_input(config, block_outputs, is_training, **kwargs) for i, layer in enumerate(config.layers): name_i = '%s_l%d' % (block, i) - output = self.call_layer(output, layer, name_i, is_training) + output = self.call_layer(output, layer, name_i, is_training, **kwargs) block_outputs[block] = output continue # just one of layer layer = config.WhichOneof('layer') if layer is None: # identity layer - output = self.block_input(config, block_outputs, is_training) + output = self.block_input(config, block_outputs, is_training, **kwargs) block_outputs[block] = output elif layer == 'input_layer': input_fn = self._name_to_layer[block] @@ -277,18 +274,14 @@ def call(self, is_training, **kwargs): input_fn.reset(input_config, is_training) block_outputs[block] = input_fn(input_config, is_training) else: - inputs = self.block_input(config, block_outputs, is_training) - output = self.call_layer(inputs, config, block, is_training) + inputs = self.block_input(config, block_outputs, is_training, **kwargs) + output = self.call_layer(inputs, config, block, is_training, **kwargs) block_outputs[block] = output outputs = [] for output in self._config.concat_blocks: if output in block_outputs: temp = block_outputs[output] - # if type(temp) in (tuple, list): - # outputs.extend(temp) - # else: - # outputs.append(temp) outputs.append(temp) else: raise ValueError('No output `%s` of backbone to be concat' % output) @@ -345,11 +338,10 @@ def load_keras_layer(self, layer_conf, name, reuse=None): layer = layer_cls(*args, name=name) return layer, customize - def call_keras_layer(self, inputs, name, training): + def call_keras_layer(self, inputs, name, training, **kwargs): """Call predefined Keras Layer, which can be reused.""" layer, customize = self._name_to_layer[name] cls = layer.__class__.__name__ - kwargs = {'loss_dict': self.loss_dict} if customize: output = layer(inputs, training=training, **kwargs) else: @@ -361,10 +353,10 @@ def call_keras_layer(self, inputs, name, training): output = layer(inputs) return output - def call_layer(self, inputs, config, name, training): + def call_layer(self, inputs, config, name, training, **kwargs): layer_name = config.WhichOneof('layer') if layer_name == 'keras_layer': - return self.call_keras_layer(inputs, name, training) + return self.call_keras_layer(inputs, name, training, **kwargs) if layer_name == 'lambda': conf = getattr(config, 'lambda') fn = eval(conf.expression) @@ -375,7 +367,7 @@ def call_layer(self, inputs, config, name, training): outputs = [] for i in range(n_loop): name_i = '%s_%d' % (name, i) - output = self.call_keras_layer(inputs, name_i, training) + output = self.call_keras_layer(inputs, name_i, training, **kwargs) outputs.append(output) if len(outputs) == 1: return outputs[0] @@ -392,7 +384,7 @@ def call_layer(self, inputs, config, name, training): output = inputs for i in range(conf.num_steps): name_i = '%s_%d' % (name, i) - output_i = self.call_keras_layer(output, name_i, training) + output_i = self.call_keras_layer(output, name_i, training, **kwargs) if fixed_input_index >= 0: j = 0 for idx in range(len(output)): @@ -421,7 +413,6 @@ class Backbone(object): def __init__(self, config, features, input_layer, l2_reg=None): self._config = config self._l2_reg = l2_reg - self.loss_dict = {} main_pkg = backbone_pb2.BlockPackage() main_pkg.name = 'backbone' main_pkg.blocks.MergeFrom(config.blocks) @@ -432,8 +423,6 @@ def __init__(self, config, features, input_layer, l2_reg=None): def __call__(self, is_training, **kwargs): output = self._main_pkg(is_training, **kwargs) - if len(self._main_pkg.loss_dict) > 0: - self.loss_dict = self._main_pkg.loss_dict if self._config.HasField('top_mlp'): params = Parameter.make_from_pb(self._config.top_mlp) diff --git a/easy_rec/python/layers/keras/__init__.py b/easy_rec/python/layers/keras/__init__.py index cbe36b5ca..3e3b9268b 100644 --- a/easy_rec/python/layers/keras/__init__.py +++ b/easy_rec/python/layers/keras/__init__.py @@ -1,4 +1,3 @@ -from .auxiliary_loss import AuxiliaryLoss from .blocks import MLP from .blocks import Gate from .blocks import Highway @@ -11,6 +10,7 @@ from .interaction import FM from .interaction import Cross from .interaction import DotInteraction +from .auxiliary_loss import AuxiliaryLoss from .mask_net import MaskBlock from .mask_net import MaskNet from .multi_task import MMoE diff --git a/easy_rec/python/layers/keras/multi_task.py b/easy_rec/python/layers/keras/multi_task.py index de6120da1..ee35aa697 100644 --- a/easy_rec/python/layers/keras/multi_task.py +++ b/easy_rec/python/layers/keras/multi_task.py @@ -33,8 +33,8 @@ def __init__(self, params, name='MMoE', reuse=None, **kwargs): expert_params = params.expert_mlp self._has_experts = True self._experts = [ - MLP(expert_params, 'expert_%d' % i, reuse=reuse) - for i in range(self._num_expert) + MLP(expert_params, 'expert_%d' % i, reuse=reuse) + for i in range(self._num_expert) ] else: self._has_experts = False diff --git a/easy_rec/python/layers/keras/ppnet.py b/easy_rec/python/layers/keras/ppnet.py index 4da62380d..091e37712 100644 --- a/easy_rec/python/layers/keras/ppnet.py +++ b/easy_rec/python/layers/keras/ppnet.py @@ -13,7 +13,13 @@ class GateNN(tf.keras.layers.Layer): - def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwargs): + + def __init__(self, + params, + output_units=None, + name='gate_nn', + reuse=None, + **kwargs): super(GateNN, self).__init__(name=name, **kwargs) output_dim = output_units if output_units is not None else params.output_dim hidden_dim = params.get_or_default('hidden_dim', output_dim) @@ -24,15 +30,15 @@ def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwar self._sub_layers = [] dense = tf.keras.layers.Dense( - units=hidden_dim, - use_bias=not do_batch_norm, - kernel_initializer=initializer, - name=name) + units=hidden_dim, + use_bias=not do_batch_norm, + kernel_initializer=initializer, + name=name) self._sub_layers.append(dense) if do_batch_norm: bn = tf.keras.layers.BatchNormalization( - name='%s/bn' % name, trainable=True) + name='%s/bn' % name, trainable=True) self._sub_layers.append(bn) act_layer = activation_layer(activation) @@ -45,11 +51,11 @@ def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwar raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate) dense = tf.keras.layers.Dense( - units=output_dim, - activation='sigmoid', - use_bias=not do_batch_norm, - kernel_initializer=initializer, - name=name) + units=output_dim, + activation='sigmoid', + use_bias=not do_batch_norm, + kernel_initializer=initializer, + name=name) self._sub_layers.append(dense) self._sub_layers.append(lambda x: x * 2) @@ -80,6 +86,7 @@ class PPNet(tf.keras.layers.Layer): def __init__(self, params, name='ppnet', reuse=None, **kwargs): super(PPNet, self).__init__(name=name, **kwargs) params.check_required('mlp') + self.full_gate_input = params.get_or_default('full_gate_input', True) mode = params.get_or_default('mode', 'lazy') gate_params = params.gate_params params = params.mlp @@ -113,7 +120,8 @@ def __init__(self, params, name='ppnet', reuse=None, **kwargs): self.add_rich_layer(num_units, use_bn, drop_rate, activation, initializer, use_bias, use_bn_after_act, name, params.l2_regularizer) - self._sub_layers.append(GateNN(gate_params, num_units, 'gate_%d' % (i + 1))) + self._sub_layers.append( + GateNN(gate_params, num_units, 'gate_%d' % (i + 1))) n = len(units) - 1 drop_rate = dropout_rate[n] if num_dropout > n else 0.0 @@ -122,7 +130,8 @@ def __init__(self, params, name='ppnet', reuse=None, **kwargs): initializer, use_final_bias, use_bn_after_act, name, params.l2_regularizer) if mode == 'lazy': - self._sub_layers.append(GateNN(gate_params, units[-1], 'gate_%d' % (n + 1))) + self._sub_layers.append( + GateNN(gate_params, units[-1], 'gate_%d' % (n + 1))) def add_rich_layer(self, num_units, @@ -169,7 +178,9 @@ def add_rich_layer(self, def call(self, inputs, training=None, **kwargs): """Performs the forward computation of the block.""" x, gate_input = inputs - gate_input = tf.concat([tf.stop_gradient(x), gate_input], axis=-1) + if self.full_gate_input: + gate_input = tf.concat([tf.stop_gradient(x), gate_input], axis=-1) + for layer in self._sub_layers: cls = layer.__class__.__name__ if cls == 'GateNN': @@ -182,4 +193,3 @@ def call(self, inputs, training=None, **kwargs): else: x = layer(x) return x - diff --git a/easy_rec/python/model/easy_rec_model.py b/easy_rec/python/model/easy_rec_model.py index 37249949b..e19ab786b 100644 --- a/easy_rec/python/model/easy_rec_model.py +++ b/easy_rec/python/model/easy_rec_model.py @@ -61,6 +61,7 @@ def __init__(self, self._labels = labels self._prediction_dict = {} self._loss_dict = {} + self._metric_dict = {} # add sample weight from inputs self._sample_weight = 1.0 @@ -88,10 +89,12 @@ def backbone(self): if self._backbone_output: return self._backbone_output if self._backbone_net: - self._backbone_output = self._backbone_net(self._is_training) - loss_dict = self._backbone_net.loss_dict - self._loss_dict.update(loss_dict) - return self._backbone_output + kwargs = { + 'loss_dict': self._loss_dict, + 'metric_dict': self._metric_dict, + constant.SAMPLE_WEIGHT: self._sample_weight + } + return self._backbone_net(self._is_training, **kwargs) return None @property @@ -142,9 +145,8 @@ def build_predict_graph(self): def build_loss_graph(self): pass - @abstractmethod def build_metric_graph(self, eval_config): - pass + return self._metric_dict @abstractmethod def get_outputs(self): diff --git a/easy_rec/python/model/multi_task_model.py b/easy_rec/python/model/multi_task_model.py index c683702ae..ade76d5ab 100644 --- a/easy_rec/python/model/multi_task_model.py +++ b/easy_rec/python/model/multi_task_model.py @@ -137,21 +137,20 @@ def _add_to_prediction_dict(self, output): def build_metric_graph(self, eval_config): """Build metric graph for multi task model.""" - metric_dict = {} for task_tower_cfg in self._task_towers: tower_name = task_tower_cfg.tower_name for metric in task_tower_cfg.metrics_set: loss_types = {task_tower_cfg.loss_type} if len(task_tower_cfg.losses) > 0: loss_types = {loss.loss_type for loss in task_tower_cfg.losses} - metric_dict.update( + self._metric_dict.update( self._build_metric_impl( metric, loss_type=loss_types, label_name=self._label_name_dict[tower_name], num_class=task_tower_cfg.num_class, suffix='_%s' % tower_name)) - return metric_dict + return self._metric_dict def build_loss_weight(self): loss_weights = OrderedDict() diff --git a/easy_rec/python/model/rank_model.py b/easy_rec/python/model/rank_model.py index f8c7f10c3..79e271483 100644 --- a/easy_rec/python/model/rank_model.py +++ b/easy_rec/python/model/rank_model.py @@ -390,18 +390,17 @@ def _build_metric_impl(self, return metric_dict def build_metric_graph(self, eval_config): - metric_dict = {} loss_types = {self._loss_type} if len(self._losses) > 0: loss_types = {loss.loss_type for loss in self._losses} for metric in eval_config.metrics_set: - metric_dict.update( + self._metric_dict.update( self._build_metric_impl( metric, loss_type=loss_types, label_name=self._label_name, num_class=self._num_class)) - return metric_dict + return self._metric_dict def _get_outputs_impl(self, loss_type, num_class=1, suffix=''): binary_loss_set = { diff --git a/easy_rec/python/protos/layer.proto b/easy_rec/python/protos/layer.proto index e278b9f5c..223b339f5 100644 --- a/easy_rec/python/protos/layer.proto +++ b/easy_rec/python/protos/layer.proto @@ -2,6 +2,8 @@ syntax = "proto2"; package protos; import "easy_rec/python/protos/dnn.proto"; +import "easy_rec/python/protos/eval.proto"; +import "easy_rec/python/protos/loss.proto"; message HighWayTower { optional string input = 1; @@ -85,4 +87,18 @@ message PPNet { required GateNN gate_params = 2; // run mode: eager, lazy required string mode = 3 [default = 'eager']; -} \ No newline at end of file + optional bool full_gate_input = 4 [default = true]; +} + +message LossTower { + // label for the task, default is label_fields by order + required string label_name = 2; + // metrics for the task + repeated EvalMetrics metrics_set = 3; + // num_class for multi-class classification loss + optional uint32 num_class = 4 [default = 1]; + // multiple losses + repeated Loss losses = 5; + // whether to use sample weight in this tower + required bool use_sample_weight = 6 [default = false]; +} diff --git a/easy_rec/python/tools/add_feature_info_to_config.py b/easy_rec/python/tools/add_feature_info_to_config.py index a2df7744a..f1b4a4cfd 100644 --- a/easy_rec/python/tools/add_feature_info_to_config.py +++ b/easy_rec/python/tools/add_feature_info_to_config.py @@ -59,7 +59,7 @@ def main(argv): except common_io.exception.OutOfRangeException: reader.close() break - + feature_configs = config_util.get_compatible_feature_configs(pipeline_config) if drop_feature_names: tmp_feature_configs = feature_configs[:]