Source code for gluonnlp.model.transformer

# coding: utf-8

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-many-lines
"""Encoder and decoder usded in sequence-to-sequence learning."""
__all__ = ['TransformerEncoder', 'PositionwiseFFN', 'TransformerEncoderCell',
           'transformer_en_de_512']

import os

import math
import numpy as np
import mxnet as mx
from mxnet import cpu, gluon
from mxnet.gluon import nn
from mxnet.gluon.block import HybridBlock
from mxnet.gluon.model_zoo import model_store
from .seq2seq_encoder_decoder import Seq2SeqEncoder, Seq2SeqDecoder, _get_attention_cell
from .block import GELU
from .translation import NMTModel
from .utils import _load_vocab, _load_pretrained_params


###############################################################################
#                               BASE ENCODER  BLOCKS                          #
###############################################################################

def _position_encoding_init(max_length, dim):
    """ Init the sinusoid position encoding table """
    position_enc = np.arange(max_length).reshape((-1, 1)) \
                   / (np.power(10000, (2. / dim) * np.arange(dim).reshape((1, -1))))
    # Apply the cosine to even columns and sin to odds.
    position_enc[:, 0::2] = np.sin(position_enc[:, 0::2])  # dim 2i
    position_enc[:, 1::2] = np.cos(position_enc[:, 1::2])  # dim 2i+1
    return position_enc

def _get_layer_norm(use_bert, units):
    from .bert import BERTLayerNorm
    layer_norm = BERTLayerNorm if use_bert else nn.LayerNorm
    return layer_norm(in_channels=units)

class BasePositionwiseFFN(HybridBlock):
    """Base Structure of the Positionwise Feed-Forward Neural Network.

    Parameters
    ----------
    units : int
        Number of units for the output
    hidden_size : int
        Number of units in the hidden layer of position-wise feed-forward networks
    dropout : float
    use_residual : bool
    weight_initializer : str or Initializer
        Initializer for the input weights matrix, used for the linear
        transformation of the inputs.
    bias_initializer : str or Initializer
        Initializer for the bias vector.
    activation : str, default 'relu'
        Activation function
    use_bert_layer_norm : bool, default False.
        Whether to use the BERT-stype layer norm implemented in Tensorflow, where
        epsilon is added inside the square root. Set to True for pre-trained BERT model.
    prefix : str, default None
        Prefix for name of `Block`s
        (and name of weight if params is `None`).
    params : Parameter or None
        Container for weight sharing between cells.
        Created if `None`.

    Inputs:
        - **inputs** : input sequence of shape (batch_size, length, C_in).

    Outputs:
        - **outputs** : output encoding of shape (batch_size, length, C_out).
    """
    def __init__(self, units=512, hidden_size=2048, dropout=0.0, use_residual=True,
                 weight_initializer=None, bias_initializer='zeros', activation='relu',
                 use_bert_layer_norm=False, prefix=None, params=None):
        super(BasePositionwiseFFN, self).__init__(prefix=prefix, params=params)
        self._hidden_size = hidden_size
        self._units = units
        self._use_residual = use_residual
        with self.name_scope():
            self.ffn_1 = nn.Dense(units=hidden_size, flatten=False,
                                  weight_initializer=weight_initializer,
                                  bias_initializer=bias_initializer,
                                  prefix='ffn_1_')
            self.activation = self._get_activation(activation) if activation else None
            self.ffn_2 = nn.Dense(units=units, flatten=False,
                                  weight_initializer=weight_initializer,
                                  bias_initializer=bias_initializer,
                                  prefix='ffn_2_')
            self.dropout_layer = nn.Dropout(dropout)
            self.layer_norm = _get_layer_norm(use_bert_layer_norm, units)

    def _get_activation(self, act):
        """ Get activation block based on the name. """
        if isinstance(act, str):
            if act.lower() == 'gelu':
                return GELU()
            else:
                return gluon.nn.Activation(act)
        assert isinstance(act, gluon.Block)
        return act

    def hybrid_forward(self, F, inputs):  # pylint: disable=arguments-differ
        # pylint: disable=unused-argument
        """Position-wise encoding of the inputs.

        Parameters
        ----------
        inputs : Symbol or NDArray
            Input sequence. Shape (batch_size, length, C_in)

        Returns
        -------
        outputs : Symbol or NDArray
            Shape (batch_size, length, C_out)
        """
        outputs = self.ffn_1(inputs)
        if self.activation:
            outputs = self.activation(outputs)
        outputs = self.ffn_2(outputs)
        outputs = self.dropout_layer(outputs)
        if self._use_residual:
            outputs = outputs + inputs
        outputs = self.layer_norm(outputs)
        return outputs


class BaseTransformerEncoderCell(HybridBlock):
    """Base Structure of the Transformer Encoder Cell.

    Parameters
    ----------
    attention_cell : AttentionCell or str, default 'multi_head'
        Arguments of the attention cell.
        Can be 'multi_head', 'scaled_luong', 'scaled_dot', 'dot', 'cosine', 'normed_mlp', 'mlp'
    units : int
        Number of units for the output
    hidden_size : int
        number of units in the hidden layer of position-wise feed-forward networks
    num_heads : int
        Number of heads in multi-head attention
    scaled : bool
        Whether to scale the softmax input by the sqrt of the input dimension
        in multi-head attention
    dropout : float
        Dropout probability for the attention cell, positionwise ffn and
        the attention cell output
    use_residual : bool
    output_attention: bool
        Whether to output the attention weights
    weight_initializer : str or Initializer
        Initializer for the input weights matrix, used for the linear
        transformation of the inputs.
    bias_initializer : str or Initializer
        Initializer for the bias vector.
    use_bert_layer_norm : bool, default False.
        Whether to use BERTLayerNorm or LayerNorm. Set to True for pre-trained BERT model.
    use_bert_ffn : bool, default False.
        Whether to use BERTPositionwiseFFN. Set to True for pre-trained BERT model.
    attention_use_bias : bool
        Apply bias term to the linear projections of key, value, query in the attention cell.
        Default is False.
    attention_proj_use_bias : bool
        Apply bias term to the linear projection of the output of attention cell. Default is False.
    prefix : str, default 'rnn_'
        Prefix for name of `Block`s
        (and name of weight if params is `None`).
    params : Parameter or None
        Container for weight sharing between cells.
        Created if `None`.
    """
    def __init__(self, attention_cell='multi_head', units=128,
                 hidden_size=512, num_heads=4, scaled=True,
                 dropout=0.0, use_residual=True, output_attention=False,
                 weight_initializer=None, bias_initializer='zeros',
                 attention_use_bias=False, attention_proj_use_bias=False,
                 use_bert_layer_norm=False, use_bert_ffn=False, prefix=None, params=None):
        super(BaseTransformerEncoderCell, self).__init__(prefix=prefix, params=params)
        self._units = units
        self._num_heads = num_heads
        self._dropout = dropout
        self._use_residual = use_residual
        self._output_attention = output_attention
        with self.name_scope():
            self.dropout_layer = nn.Dropout(dropout)
            self.attention_cell = _get_attention_cell(attention_cell,
                                                      units=units,
                                                      num_heads=num_heads,
                                                      scaled=scaled,
                                                      dropout=dropout,
                                                      use_bias=attention_use_bias)
            self.proj = nn.Dense(units=units, flatten=False,
                                 use_bias=attention_proj_use_bias,
                                 weight_initializer=weight_initializer,
                                 bias_initializer=bias_initializer,
                                 prefix='proj_')
            self.ffn = self._get_positionwise_ffn(use_bert_ffn, units, hidden_size, dropout,
                                                  use_residual, weight_initializer,
                                                  bias_initializer)
            self.layer_norm = _get_layer_norm(use_bert_layer_norm, units)

    def _get_positionwise_ffn(self, use_bert, units, hidden_size, dropout, use_residual,
                              weight_initializer, bias_initializer):
        from .bert import BERTPositionwiseFFN
        positionwise_ffn = BERTPositionwiseFFN if use_bert else PositionwiseFFN
        return positionwise_ffn(units=units, hidden_size=hidden_size, dropout=dropout,
                                use_residual=use_residual, weight_initializer=weight_initializer,
                                bias_initializer=bias_initializer)


    def hybrid_forward(self, F, inputs, mask=None):  # pylint: disable=arguments-differ
        # pylint: disable=unused-argument
        """Transformer Encoder Attention Cell.

        Parameters
        ----------
        inputs : Symbol or NDArray
            Input sequence. Shape (batch_size, length, C_in)
        mask : Symbol or NDArray or None
            Mask for inputs. Shape (batch_size, length, length)

        Returns
        -------
        encoder_cell_outputs: list
            Outputs of the encoder cell. Contains:

            - outputs of the transformer encoder cell. Shape (batch_size, length, C_out)
            - additional_outputs of all the transformer encoder cell
        """
        outputs, attention_weights =\
            self.attention_cell(inputs, inputs, inputs, mask)
        outputs = self.proj(outputs)
        outputs = self.dropout_layer(outputs)
        if self._use_residual:
            outputs = outputs + inputs
        outputs = self.layer_norm(outputs)
        outputs = self.ffn(outputs)
        additional_outputs = []
        if self._output_attention:
            additional_outputs.append(attention_weights)
        return outputs, additional_outputs


class BaseTransformerEncoder(HybridBlock, Seq2SeqEncoder):
    """Base Structure of the Transformer Encoder.

    Parameters
    ----------
    attention_cell : AttentionCell or str, default 'multi_head'
        Arguments of the attention cell.
        Can be 'multi_head', 'scaled_luong', 'scaled_dot', 'dot', 'cosine', 'normed_mlp', 'mlp'
    num_layers : int
        Number of attention layers.
    units : int
        Number of units for the output.
    hidden_size : int
        number of units in the hidden layer of position-wise feed-forward networks
    max_length : int
        Maximum length of the input sequence
    num_heads : int
        Number of heads in multi-head attention
    scaled : bool
        Whether to scale the softmax input by the sqrt of the input dimension
        in multi-head attention
    dropout : float
        Dropout probability of the attention probabilities.
    use_residual : bool
    output_attention: bool
        Whether to output the attention weights
    weight_initializer : str or Initializer
        Initializer for the input weights matrix, used for the linear
        transformation of the inputs.
    bias_initializer : str or Initializer
        Initializer for the bias vector.
    positional_weight: str, default 'sinusoidal'
        Type of positional embedding. Can be 'sinusoidal', 'learned'.
        If set to 'sinusoidal', the embedding is intialized as sinusoidal values and keep constant.
    use_bert_encoder : bool, default False
        Whether to use BERTEncoderCell and BERTLayerNorm. Set to True for pre-trained BERT model
    use_layer_norm_before_dropout: bool, default False
        Before passing embeddings to attention cells, whether to perform `layernorm -> dropout` or
        `dropout -> layernorm`. Set to True for pre-trained BERT models.
    scale_embed : bool, default True
        Scale the input embeddings by sqrt(embed_size). Set to False for pre-trained BERT models.
    prefix : str, default 'rnn_'
        Prefix for name of `Block`s
        (and name of weight if params is `None`).
    params : Parameter or None
        Container for weight sharing between cells.
        Created if `None`.
    """
    def __init__(self, attention_cell='multi_head', num_layers=2,
                 units=512, hidden_size=2048, max_length=50,
                 num_heads=4, scaled=True, dropout=0.0,
                 use_residual=True, output_attention=False,
                 weight_initializer=None, bias_initializer='zeros',
                 positional_weight='sinusoidal', use_bert_encoder=False,
                 use_layer_norm_before_dropout=True, scale_embed=True,
                 prefix=None, params=None):
        super(BaseTransformerEncoder, self).__init__(prefix=prefix, params=params)
        assert units % num_heads == 0,\
            'In TransformerEncoder, The units should be divided exactly ' \
            'by the number of heads. Received units={}, num_heads={}' \
            .format(units, num_heads)
        self._num_layers = num_layers
        self._max_length = max_length
        self._num_heads = num_heads
        self._units = units
        self._hidden_size = hidden_size
        self._output_attention = output_attention
        self._dropout = dropout
        self._use_residual = use_residual
        self._scaled = scaled
        self._use_layer_norm_before_dropout = use_layer_norm_before_dropout
        self._scale_embed = scale_embed
        with self.name_scope():
            self.dropout_layer = nn.Dropout(dropout)
            self.layer_norm = _get_layer_norm(use_bert_encoder, units)
            self.position_weight = self._get_positional(positional_weight, max_length, units,
                                                        weight_initializer)
            self.transformer_cells = nn.HybridSequential()
            for i in range(num_layers):
                cell = self._get_encoder_cell(use_bert_encoder, units, hidden_size, num_heads,
                                              attention_cell, weight_initializer, bias_initializer,
                                              dropout, use_residual, scaled, output_attention, i)
                self.transformer_cells.add(cell)

    def _get_positional(self, weight_type, max_length, units, initializer):
        if weight_type == 'sinusoidal':
            encoding = _position_encoding_init(max_length, units)
            position_weight = self.params.get_constant('const', encoding)
        elif weight_type == 'learned':
            position_weight = self.params.get('position_weight', shape=(max_length, units),
                                              init=initializer)
        else:
            raise ValueError('Unexpected value for argument position_weight: %s'%(position_weight))
        return position_weight

    def _get_encoder_cell(self, use_bert, units, hidden_size, num_heads, attention_cell,
                          weight_initializer, bias_initializer, dropout, use_residual,
                          scaled, output_attention, i):
        from .bert import BERTEncoderCell
        cell = BERTEncoderCell if use_bert else TransformerEncoderCell
        return cell(units=units, hidden_size=hidden_size,
                    num_heads=num_heads, attention_cell=attention_cell,
                    weight_initializer=weight_initializer,
                    bias_initializer=bias_initializer,
                    dropout=dropout, use_residual=use_residual,
                    scaled=scaled, output_attention=output_attention,
                    prefix='transformer%d_'%i)


    def __call__(self, inputs, states=None, valid_length=None): #pylint: disable=arguments-differ
        """Encoder the inputs given the states and valid sequence length.

        Parameters
        ----------
        inputs : NDArray
            Input sequence. Shape (batch_size, length, C_in)
        states : list of NDArrays or None
            Initial states. The list of initial states and masks
        valid_length : NDArray or None
            Valid lengths of each sequence. This is usually used when part of sequence has
            been padded. Shape (batch_size,)

        Returns
        -------
        encoder_outputs: list
            Outputs of the encoder. Contains:

            - outputs of the transformer encoder. Shape (batch_size, length, C_out)
            - additional_outputs of all the transformer encoder
        """
        return super(BaseTransformerEncoder, self).__call__(inputs, states, valid_length)

    def forward(self, inputs, states=None, valid_length=None, steps=None): # pylint: disable=arguments-differ
        """

        Parameters
        ----------
        inputs : NDArray, Shape(batch_size, length, C_in)
        states : list of NDArray
        valid_length : NDArray
        steps : NDArray
            Stores value [0, 1, ..., length].
            It is used for lookup in positional encoding matrix

        Returns
        -------
        outputs : NDArray
            The output of the encoder. Shape is (batch_size, length, C_out)
        additional_outputs : list
            Either be an empty list or contains the attention weights in this step.
            The attention weights will have shape (batch_size, length, length) or
            (batch_size, num_heads, length, length)

        """
        length = inputs.shape[1]
        if valid_length is not None:
            mask = mx.nd.broadcast_lesser(
                mx.nd.arange(length, ctx=valid_length.context).reshape((1, -1)),
                valid_length.reshape((-1, 1)))
            mask = mx.nd.broadcast_axes(mx.nd.expand_dims(mask, axis=1), axis=1, size=length)
            if states is None:
                states = [mask]
            else:
                states.append(mask)
        if self._scale_embed:
            inputs = inputs * math.sqrt(inputs.shape[-1])
        steps = mx.nd.arange(length, ctx=inputs.context)
        if states is None:
            states = [steps]
        else:
            states.append(steps)
        if valid_length is not None:
            step_output, additional_outputs =\
                super(BaseTransformerEncoder, self).forward(inputs, states, valid_length)
        else:
            step_output, additional_outputs =\
                super(BaseTransformerEncoder, self).forward(inputs, states)
        return step_output, additional_outputs

    def hybrid_forward(self, F, inputs, states=None, valid_length=None, position_weight=None):
        # pylint: disable=arguments-differ
        """

        Parameters
        ----------
        inputs : NDArray or Symbol, Shape(batch_size, length, C_in)
        states : list of NDArray or Symbol
        valid_length : NDArray or Symbol
        position_weight : NDArray or Symbol

        Returns
        -------
        outputs : NDArray or Symbol
            The output of the encoder. Shape is (batch_size, length, C_out)
        additional_outputs : list
            Either be an empty list or contains the attention weights in this step.
            The attention weights will have shape (batch_size, length, length) or
            (batch_size, num_heads, length, length)

        """
        if states is not None:
            steps = states[-1]
            # Positional Encoding
            positional_embed = F.Embedding(steps, position_weight, self._max_length, self._units)
            inputs = F.broadcast_add(inputs, F.expand_dims(positional_embed, axis=0))
        if self._use_layer_norm_before_dropout:
            inputs = self.layer_norm(inputs)
            inputs = self.dropout_layer(inputs)
        else:
            inputs = self.dropout_layer(inputs)
            inputs = self.layer_norm(inputs)
        outputs = inputs
        if valid_length is not None:
            mask = states[-2]
        else:
            mask = None
        additional_outputs = []
        for cell in self.transformer_cells:
            outputs, attention_weights = cell(inputs, mask)
            inputs = outputs
            if self._output_attention:
                additional_outputs.append(attention_weights)
        if valid_length is not None:
            outputs = F.SequenceMask(outputs, sequence_length=valid_length,
                                     use_sequence_length=True, axis=1)
        return outputs, additional_outputs

###############################################################################
#                                ENCODER                                      #
###############################################################################

[docs]class PositionwiseFFN(BasePositionwiseFFN): """Structure of the Positionwise Feed-Forward Neural Network for Transformer. Computes the positionwise encoding of the inputs. Parameters ---------- units : int Number of units for the output hidden_size : int Number of units in the hidden layer of position-wise feed-forward networks dropout : float Dropout probability for the output use_residual : bool Add residual connection between the input and the output weight_initializer : str or Initializer Initializer for the input weights matrix, used for the linear transformation of the inputs. bias_initializer : str or Initializer Initializer for the bias vector. prefix : str, default None Prefix for name of `Block`s (and name of weight if params is `None`). params : Parameter or None Container for weight sharing between cells. Created if `None`. Inputs: - **inputs** : input sequence of shape (batch_size, length, C_in). Outputs: - **outputs** : output encoding of shape (batch_size, length, C_out). """ def __init__(self, units=512, hidden_size=2048, dropout=0.0, use_residual=True, weight_initializer=None, bias_initializer='zeros', prefix=None, params=None): super(PositionwiseFFN, self).__init__(units=units, hidden_size=hidden_size, dropout=dropout, use_residual=use_residual, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix=prefix, params=params, # extra configurations for transformer activation='relu', use_bert_layer_norm=False)
[docs]class TransformerEncoderCell(BaseTransformerEncoderCell): """Structure of the Transformer Encoder Cell. Parameters ---------- attention_cell : AttentionCell or str, default 'multi_head' Arguments of the attention cell. Can be 'multi_head', 'scaled_luong', 'scaled_dot', 'dot', 'cosine', 'normed_mlp', 'mlp' units : int Number of units for the output hidden_size : int number of units in the hidden layer of position-wise feed-forward networks num_heads : int Number of heads in multi-head attention scaled : bool Whether to scale the softmax input by the sqrt of the input dimension in multi-head attention dropout : float use_residual : bool output_attention: bool Whether to output the attention weights weight_initializer : str or Initializer Initializer for the input weights matrix, used for the linear transformation of the inputs. bias_initializer : str or Initializer Initializer for the bias vector. prefix : str, default None Prefix for name of `Block`s. (and name of weight if params is `None`). params : Parameter or None Container for weight sharing between cells. Created if `None`. Inputs: - **inputs** : input sequence. Shape (batch_size, length, C_in) - **mask** : mask for inputs. Shape (batch_size, length, length) Outputs: - **outputs**: output tensor of the transformer encoder cell. Shape (batch_size, length, C_out) - **additional_outputs**: the additional output of all the transformer encoder cell. """ def __init__(self, attention_cell='multi_head', units=128, hidden_size=512, num_heads=4, scaled=True, dropout=0.0, use_residual=True, output_attention=False, weight_initializer=None, bias_initializer='zeros', prefix=None, params=None): super(TransformerEncoderCell, self).__init__(attention_cell=attention_cell, units=units, hidden_size=hidden_size, num_heads=num_heads, scaled=scaled, dropout=dropout, use_residual=use_residual, output_attention=output_attention, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix=prefix, params=params, # extra configurations for transformer attention_use_bias=False, attention_proj_use_bias=False, use_bert_layer_norm=False, use_bert_ffn=False)
[docs]class TransformerEncoder(BaseTransformerEncoder): """Structure of the Transformer Encoder. Parameters ---------- attention_cell : AttentionCell or str, default 'multi_head' Arguments of the attention cell. Can be 'multi_head', 'scaled_luong', 'scaled_dot', 'dot', 'cosine', 'normed_mlp', 'mlp' num_layers : int Number of attention layers. units : int Number of units for the output. hidden_size : int number of units in the hidden layer of position-wise feed-forward networks max_length : int Maximum length of the input sequence num_heads : int Number of heads in multi-head attention scaled : bool Whether to scale the softmax input by the sqrt of the input dimension in multi-head attention dropout : float Dropout probability of the attention probabilities. use_residual : bool output_attention: bool Whether to output the attention weights weight_initializer : str or Initializer Initializer for the input weights matrix, used for the linear transformation of the inputs. bias_initializer : str or Initializer Initializer for the bias vector. prefix : str, default None. Prefix for name of `Block`s. (and name of weight if params is `None`). params : Parameter or None Container for weight sharing between cells. Created if `None`. Inputs: - **inputs** : input sequence of shape (batch_size, length, C_in) - **states** : list of tensors for initial states and masks. - **valid_length** : valid lengths of each sequence. Usually used when part of sequence has been padded. Shape is (batch_size, ) Outputs: - **outputs** : the output of the encoder. Shape is (batch_size, length, C_out) - **additional_outputs** : list of tensors. Either be an empty list or contains the attention weights in this step. The attention weights will have shape (batch_size, length, mem_length) or (batch_size, num_heads, length, mem_length) """ def __init__(self, attention_cell='multi_head', num_layers=2, units=512, hidden_size=2048, max_length=50, num_heads=4, scaled=True, dropout=0.0, use_residual=True, output_attention=False, weight_initializer=None, bias_initializer='zeros', prefix=None, params=None): super(TransformerEncoder, self).__init__(attention_cell=attention_cell, num_layers=num_layers, units=units, hidden_size=hidden_size, max_length=max_length, num_heads=num_heads, scaled=scaled, dropout=dropout, use_residual=use_residual, output_attention=output_attention, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix=prefix, params=params, # extra configurations for transformer positional_weight='sinusoidal', use_bert_encoder=False, use_layer_norm_before_dropout=True, scale_embed=True)
############################################################################### # DECODER # ############################################################################### class TransformerDecoderCell(HybridBlock): """Structure of the Transformer Decoder Cell. Parameters ---------- attention_cell : AttentionCell or str, default 'multi_head' Arguments of the attention cell. Can be 'multi_head', 'scaled_luong', 'scaled_dot', 'dot', 'cosine', 'normed_mlp', 'mlp' units : int Number of units for the output hidden_size : int number of units in the hidden layer of position-wise feed-forward networks num_heads : int Number of heads in multi-head attention scaled : bool Whether to scale the softmax input by the sqrt of the input dimension in multi-head attention dropout : float use_residual : bool output_attention: bool Whether to output the attention weights weight_initializer : str or Initializer Initializer for the input weights matrix, used for the linear transformation of the inputs. bias_initializer : str or Initializer Initializer for the bias vector. prefix : str, default 'rnn_' Prefix for name of `Block`s (and name of weight if params is `None`). params : Parameter or None Container for weight sharing between cells. Created if `None`. """ def __init__(self, attention_cell='multi_head', units=128, hidden_size=512, num_heads=4, scaled=True, dropout=0.0, use_residual=True, output_attention=False, weight_initializer=None, bias_initializer='zeros', prefix=None, params=None): super(TransformerDecoderCell, self).__init__(prefix=prefix, params=params) self._units = units self._num_heads = num_heads self._dropout = dropout self._use_residual = use_residual self._output_attention = output_attention self._scaled = scaled with self.name_scope(): self.dropout_layer = nn.Dropout(dropout) self.attention_cell_in = _get_attention_cell(attention_cell, units=units, num_heads=num_heads, scaled=scaled, dropout=dropout) self.attention_cell_inter = _get_attention_cell(attention_cell, units=units, num_heads=num_heads, scaled=scaled, dropout=dropout) self.proj_in = nn.Dense(units=units, flatten=False, use_bias=False, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix='proj_in_') self.proj_inter = nn.Dense(units=units, flatten=False, use_bias=False, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix='proj_inter_') self.ffn = PositionwiseFFN(hidden_size=hidden_size, units=units, use_residual=use_residual, dropout=dropout, weight_initializer=weight_initializer, bias_initializer=bias_initializer) self.layer_norm_in = nn.LayerNorm() self.layer_norm_inter = nn.LayerNorm() def hybrid_forward(self, F, inputs, mem_value, mask=None, mem_mask=None): #pylint: disable=unused-argument # pylint: disable=arguments-differ """Transformer Decoder Attention Cell. Parameters ---------- inputs : Symbol or NDArray Input sequence. Shape (batch_size, length, C_in) mem_value : Symbol or NDArrays Memory value, i.e. output of the encoder. Shape (batch_size, mem_length, C_in) mask : Symbol or NDArray or None Mask for inputs. Shape (batch_size, length, length) mem_mask : Symbol or NDArray or None Mask for mem_value. Shape (batch_size, length, mem_length) Returns ------- decoder_cell_outputs: list Outputs of the decoder cell. Contains: - outputs of the transformer decoder cell. Shape (batch_size, length, C_out) - additional_outputs of all the transformer decoder cell """ outputs, attention_in_outputs =\ self.attention_cell_in(inputs, inputs, inputs, mask) outputs = self.proj_in(outputs) outputs = self.dropout_layer(outputs) if self._use_residual: outputs = outputs + inputs outputs = self.layer_norm_in(outputs) inputs = outputs outputs, attention_inter_outputs = \ self.attention_cell_inter(inputs, mem_value, mem_value, mem_mask) outputs = self.proj_inter(outputs) outputs = self.dropout_layer(outputs) if self._use_residual: outputs = outputs + inputs outputs = self.layer_norm_inter(outputs) outputs = self.ffn(outputs) additional_outputs = [] if self._output_attention: additional_outputs.append(attention_in_outputs) additional_outputs.append(attention_inter_outputs) return outputs, additional_outputs class TransformerDecoder(HybridBlock, Seq2SeqDecoder): """Structure of the Transformer Decoder. Parameters ---------- attention_cell : AttentionCell or str, default 'multi_head' Arguments of the attention cell. Can be 'multi_head', 'scaled_luong', 'scaled_dot', 'dot', 'cosine', 'normed_mlp', 'mlp' num_layers : int units : int hidden_size : int number of units in the hidden layer of position-wise feed-forward networks max_length : int Maximum length of the input sequence. This is used for constructing position encoding num_heads : int Number of heads in multi-head attention scaled : bool Whether to scale the softmax input by the sqrt of the input dimension in multi-head attention dropout : float use_residual : bool output_attention: bool Whether to output the attention weights weight_initializer : str or Initializer Initializer for the input weights matrix, used for the linear transformation of the inputs. bias_initializer : str or Initializer Initializer for the bias vector. scale_embed : bool, default True Scale the input embeddings by sqrt(embed_size). prefix : str, default 'rnn_' Prefix for name of `Block`s (and name of weight if params is `None`). params : Parameter or None Container for weight sharing between cells. Created if `None`. """ def __init__(self, attention_cell='multi_head', num_layers=2, units=128, hidden_size=2048, max_length=50, num_heads=4, scaled=True, dropout=0.0, use_residual=True, output_attention=False, weight_initializer=None, bias_initializer='zeros', scale_embed=True, prefix=None, params=None): super(TransformerDecoder, self).__init__(prefix=prefix, params=params) assert units % num_heads == 0, 'In TransformerDecoder, the units should be divided ' \ 'exactly by the number of heads. Received units={}, ' \ 'num_heads={}'.format(units, num_heads) self._num_layers = num_layers self._units = units self._hidden_size = hidden_size self._num_states = num_heads self._max_length = max_length self._dropout = dropout self._use_residual = use_residual self._output_attention = output_attention self._scaled = scaled self._scale_embed = scale_embed with self.name_scope(): self.dropout_layer = nn.Dropout(dropout) self.layer_norm = nn.LayerNorm() encoding = _position_encoding_init(max_length, units) self.position_weight = self.params.get_constant('const', encoding) self.transformer_cells = nn.HybridSequential() for i in range(num_layers): self.transformer_cells.add( TransformerDecoderCell( units=units, hidden_size=hidden_size, num_heads=num_heads, attention_cell=attention_cell, weight_initializer=weight_initializer, bias_initializer=bias_initializer, dropout=dropout, scaled=scaled, use_residual=use_residual, output_attention=output_attention, prefix='transformer%d_' % i)) def init_state_from_encoder(self, encoder_outputs, encoder_valid_length=None): """Initialize the state from the encoder outputs. Parameters ---------- encoder_outputs : list encoder_valid_length : NDArray or None Returns ------- decoder_states : list The decoder states, includes: - mem_value : NDArray - mem_masks : NDArray, optional """ mem_value = encoder_outputs decoder_states = [mem_value] mem_length = mem_value.shape[1] if encoder_valid_length is not None: mem_masks = mx.nd.broadcast_lesser( mx.nd.arange(mem_length, ctx=encoder_valid_length.context).reshape((1, -1)), encoder_valid_length.reshape((-1, 1))) decoder_states.append(mem_masks) self._encoder_valid_length = encoder_valid_length return decoder_states def decode_seq(self, inputs, states, valid_length=None): """Decode the decoder inputs. This function is only used for training. Parameters ---------- inputs : NDArray, Shape (batch_size, length, C_in) states : list of NDArrays or None Initial states. The list of decoder states valid_length : NDArray or None Valid lengths of each sequence. This is usually used when part of sequence has been padded. Shape (batch_size,) Returns ------- output : NDArray, Shape (batch_size, length, C_out) states : list The decoder states, includes: - mem_value : NDArray - mem_masks : NDArray, optional additional_outputs : list of list Either be an empty list or contains the attention weights in this step. The attention weights will have shape (batch_size, length, mem_length) or (batch_size, num_heads, length, mem_length) """ batch_size = inputs.shape[0] length = inputs.shape[1] length_array = mx.nd.arange(length, ctx=inputs.context) mask = mx.nd.broadcast_lesser_equal( length_array.reshape((1, -1)), length_array.reshape((-1, 1))) if valid_length is not None: batch_mask = mx.nd.broadcast_lesser( mx.nd.arange(length, ctx=valid_length.context).reshape((1, -1)), valid_length.reshape((-1, 1))) mask = mx.nd.broadcast_mul(mx.nd.expand_dims(batch_mask, -1), mx.nd.expand_dims(mask, 0)) else: mask = mx.nd.broadcast_axes(mx.nd.expand_dims(mask, axis=0), axis=0, size=batch_size) states = [None] + states output, states, additional_outputs = self.forward(inputs, states, mask) states = states[1:] if valid_length is not None: output = mx.nd.SequenceMask(output, sequence_length=valid_length, use_sequence_length=True, axis=1) return output, states, additional_outputs def __call__(self, step_input, states): #pylint: disable=arguments-differ """One-step-ahead decoding of the Transformer decoder. Parameters ---------- step_input : NDArray states : list of NDArray Returns ------- step_output : NDArray The output of the decoder. In the train mode, Shape is (batch_size, length, C_out) In the test mode, Shape is (batch_size, C_out) new_states: list Includes - last_embeds : NDArray or None It is only given during testing - mem_value : NDArray - mem_masks : NDArray, optional step_additional_outputs : list of list Either be an empty list or contains the attention weights in this step. The attention weights will have shape (batch_size, length, mem_length) or (batch_size, num_heads, length, mem_length) """ return super(TransformerDecoder, self).__call__(step_input, states) def forward(self, step_input, states, mask=None): #pylint: disable=arguments-differ, missing-docstring input_shape = step_input.shape mem_mask = None # If it is in testing, transform input tensor to a tensor with shape NTC # Otherwise remove the None in states. if len(input_shape) == 2: if self._encoder_valid_length is not None: has_last_embeds = len(states) == 3 else: has_last_embeds = len(states) == 2 if has_last_embeds: last_embeds = states[0] step_input = mx.nd.concat(last_embeds, mx.nd.expand_dims(step_input, axis=1), dim=1) states = states[1:] else: step_input = mx.nd.expand_dims(step_input, axis=1) elif states[0] is None: states = states[1:] has_mem_mask = (len(states) == 2) if has_mem_mask: _, mem_mask = states augmented_mem_mask = mx.nd.expand_dims(mem_mask, axis=1)\ .broadcast_axes(axis=1, size=step_input.shape[1]) states[-1] = augmented_mem_mask if mask is None: length_array = mx.nd.arange(step_input.shape[1], ctx=step_input.context) mask = mx.nd.broadcast_lesser_equal( length_array.reshape((1, -1)), length_array.reshape((-1, 1))) mask = mx.nd.broadcast_axes(mx.nd.expand_dims(mask, axis=0), axis=0, size=step_input.shape[0]) steps = mx.nd.arange(step_input.shape[1], ctx=step_input.context) states.append(steps) if self._scale_embed: scaled_step_input = step_input * math.sqrt(step_input.shape[-1]) # pylint: disable=too-many-function-args step_output, step_additional_outputs = \ super(TransformerDecoder, self).forward(scaled_step_input, states, mask) states = states[:-1] if has_mem_mask: states[-1] = mem_mask new_states = [step_input] + states # If it is in testing, only output the last one if len(input_shape) == 2: step_output = step_output[:, -1, :] return step_output, new_states, step_additional_outputs def hybrid_forward(self, F, step_input, states, mask=None, position_weight=None): #pylint: disable=arguments-differ """ Parameters ---------- step_input : NDArray or Symbol, Shape (batch_size, length, C_in) states : list of NDArray or Symbol mask : NDArray or Symbol position_weight : NDArray or Symbol Returns ------- step_output : NDArray or Symbol The output of the decoder. Shape is (batch_size, length, C_out) step_additional_outputs : list Either be an empty list or contains the attention weights in this step. The attention weights will have shape (batch_size, length, mem_length) or (batch_size, num_heads, length, mem_length) """ has_mem_mask = (len(states) == 3) if has_mem_mask: mem_value, mem_mask, steps = states else: mem_value, steps = states mem_mask = None # Positional Encoding step_input = F.broadcast_add(step_input, F.expand_dims(F.Embedding(steps, position_weight, self._max_length, self._units), axis=0)) step_input = self.dropout_layer(step_input) step_input = self.layer_norm(step_input) inputs = step_input outputs = inputs step_additional_outputs = [] attention_weights_l = [] for cell in self.transformer_cells: outputs, attention_weights = cell(inputs, mem_value, mask, mem_mask) if self._output_attention: attention_weights_l.append(attention_weights) inputs = outputs if self._output_attention: step_additional_outputs.extend(attention_weights_l) return outputs, step_additional_outputs ############################################################################### # MODEL API # ############################################################################### model_store._model_sha1.update( {name: checksum for checksum, name in [ ('e25287c5a924b7025e08d626f02626d5fa3af2d1', 'transformer_en_de_512_WMT2014'), ]}) def get_transformer_encoder_decoder(num_layers=2, num_heads=8, scaled=True, units=512, hidden_size=2048, dropout=0.0, use_residual=True, max_src_length=50, max_tgt_length=50, weight_initializer=None, bias_initializer='zeros', prefix='transformer_', params=None): """Build a pair of Parallel Transformer encoder/decoder Parameters ---------- num_layers : int num_heads : int scaled : bool units : int hidden_size : int dropout : float use_residual : bool max_src_length : int max_tgt_length : int weight_initializer : mx.init.Initializer or None bias_initializer : mx.init.Initializer or None prefix : str, default 'transformer_' Prefix for name of `Block`s. params : Parameter or None Container for weight sharing between layers. Created if `None`. Returns ------- encoder : TransformerEncoder decoder :TransformerDecoder """ encoder = TransformerEncoder(num_layers=num_layers, num_heads=num_heads, max_length=max_src_length, units=units, hidden_size=hidden_size, dropout=dropout, scaled=scaled, use_residual=use_residual, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix=prefix + 'enc_', params=params) decoder = TransformerDecoder(num_layers=num_layers, num_heads=num_heads, max_length=max_tgt_length, units=units, hidden_size=hidden_size, dropout=dropout, scaled=scaled, use_residual=use_residual, weight_initializer=weight_initializer, bias_initializer=bias_initializer, prefix=prefix + 'dec_', params=params) return encoder, decoder def _get_transformer_model(model_cls, model_name, dataset_name, src_vocab, tgt_vocab, encoder, decoder, share_embed, embed_size, tie_weights, embed_initializer, pretrained, ctx, root, **kwargs): src_vocab = _load_vocab(dataset_name + '_src', src_vocab, root) tgt_vocab = _load_vocab(dataset_name + '_tgt', tgt_vocab, root) kwargs['encoder'] = encoder kwargs['decoder'] = decoder kwargs['src_vocab'] = src_vocab kwargs['tgt_vocab'] = tgt_vocab kwargs['share_embed'] = share_embed kwargs['embed_size'] = embed_size kwargs['tie_weights'] = tie_weights kwargs['embed_initializer'] = embed_initializer # XXX the existing model is trained with prefix 'transformer_' net = model_cls(prefix='transformer_', **kwargs) if pretrained: _load_pretrained_params(net, model_name, dataset_name, root, ctx) return net, src_vocab, tgt_vocab
[docs]def transformer_en_de_512(dataset_name=None, src_vocab=None, tgt_vocab=None, pretrained=False, ctx=cpu(), root=os.path.join('~', '.mxnet', 'models'), **kwargs): r"""Transformer pretrained model. Embedding size is 400, and hidden layer size is 1150. Parameters ---------- dataset_name : str or None, default None src_vocab : gluonnlp.Vocab or None, default None tgt_vocab : gluonnlp.Vocab or None, default None pretrained : bool, default False Whether to load the pretrained weights for model. ctx : Context, default CPU The context in which to load the pretrained weights. root : str, default '~/.mxnet/models' Location for keeping the model parameters. Returns ------- gluon.Block, gluonnlp.Vocab, gluonnlp.Vocab """ predefined_args = {'num_units': 512, 'hidden_size': 2048, 'dropout': 0.1, 'epsilon': 0.1, 'num_layers': 6, 'num_heads': 8, 'scaled': True, 'share_embed': True, 'embed_size': 512, 'tie_weights': True, 'embed_initializer': None} mutable_args = frozenset(['num_units', 'hidden_size', 'dropout', 'epsilon', 'num_layers', 'num_heads', 'scaled']) assert all((k not in kwargs or k in mutable_args) for k in predefined_args), \ 'Cannot override predefined model settings.' predefined_args.update(kwargs) encoder, decoder = get_transformer_encoder_decoder(units=predefined_args['num_units'], hidden_size=predefined_args['hidden_size'], dropout=predefined_args['dropout'], num_layers=predefined_args['num_layers'], num_heads=predefined_args['num_heads'], max_src_length=530, max_tgt_length=549, scaled=predefined_args['scaled']) return _get_transformer_model(NMTModel, 'transformer_en_de_512', dataset_name, src_vocab, tgt_vocab, encoder, decoder, predefined_args['share_embed'], predefined_args['embed_size'], predefined_args['tie_weights'], predefined_args['embed_initializer'], pretrained, ctx, root)