23 | 图像标题生成

简介

介绍基于注意力机制的图像标题生成模型的原理和实现

原理

输入是一张图片,输出是一句对图片进行描述的文本,这就是图像标题生成

基本思路是先通过预训练的图像分类模型,从某一个卷积层得到原始图片的表示,或者称为上下文contexts

例如从VGG19的conv5_3拿到原始图片的表示,shape为14*14*512,即512张14*14的小图

这样一来,可以理解为将原始图片分成14*14共196个小块,每个小块对应一个512维的特征

根据contexts使用LSTM逐步生成单词,即可产生原始图片对应的描述文本

在生成每一个单词时,应该对196个块有不同的偏重,即所谓的注意力机制

就像我们人一样,考虑下一个词时,对图片的不同区域会有不同的关注度,相关性更强的区域会获得更多的注意力即更高的权重

根据注意力权重对196个512维的特征进行加权求和,即可得到基于注意力机制的上下文context

和之前介绍过的Seq2Seq Learning联系起来,图像标题生成便属于one to many这种情况

数据

使用COCO2014数据,cocodataset.org/#download,训练集包括8W多张图片,验证集包括4W多张图片,并且提供了每张图片对应的标题

每张图片的标题不止一个,因此训练集一共411593个标题,而验证集一共201489个标题,平均一张图片五个标题

实现

基于以下项目实现,github.com/yunjey/show…

训练

首先是训练部分代码

加载库

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.utils import shuffle
from imageio import imread
import scipy.io
import cv2
import os
import json
from tqdm import tqdm
import pickle

加载数据,因为一张图片可能对应多个标题,因此以一个图片id和一个标题为一条数据。对于图片内容,保留中心正方形区域并缩放;对于标题,长度超过20个词则去除

batch_size = 128
maxlen = 20
image_size = 224

MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))

def load_data(image_dir, annotation_path):
    with open(annotation_path, 'r') as fr:
        annotation = json.load(fr)

    ids = []
    captions = []
    image_dict = {}
    for i in tqdm(range(len(annotation['annotations']))):
        item = annotation['annotations'][i]
        caption = item['caption'].strip().lower()
        caption = caption.replace('.', '').replace(',', '').replace("'", '').replace('"', '')
        caption = caption.replace('&', 'and').replace('(', '').replace(')', '').replace('-', ' ').split()
        caption = [w for w in caption if len(w) > 0]

        if len(caption) <= maxlen:
            if not item['image_id'] in image_dict:
                img = imread(image_dir + '%012d.jpg' % item['image_id'])
                h = img.shape[0]
                w = img.shape[1]
                if h > w:
                    img = img[h // 2 - w // 2: h // 2 + w // 2, :]
                else:
                    img = img[:, w // 2 - h // 2: w // 2 + h // 2]   
                img = cv2.resize(img, (image_size, image_size))

                if len(img.shape) < 3:
                    img = np.expand_dims(img, -1)
                    img = np.concatenate([img, img, img], axis=-1)

                image_dict[item['image_id']] = img

            ids.append(item['image_id'])
            captions.append(caption)

    return ids, captions, image_dict

train_json = 'data/train/captions_train2014.json'
train_ids, train_captions, train_dict = load_data('data/train/images/COCO_train2014_', train_json)
print(len(train_ids))

查看一下标题标注

data_index = np.arange(len(train_ids))
np.random.shuffle(data_index)
N = 4
data_index = data_index[:N]
plt.figure(figsize=(12, 20))
for i in range(N):
    caption = train_captions[data_index[i]]
    img = train_dict[train_ids[data_index[i]]]
    plt.subplot(4, 1, i + 1)
    plt.imshow(img)
    plt.title(' '.join(caption))
    plt.axis('off')

整理词典,一共23728个词,建立词和id之间的映射,并使用到三个特殊词

vocabulary = {}
for caption in train_captions:
    for word in caption:
        vocabulary[word] = vocabulary.get(word, 0) + 1

vocabulary = sorted(vocabulary.items(), key=lambda x:-x[1])
vocabulary = [w[0] for w in vocabulary]

word2id = {'<pad>': 0, '<start>': 1, '<end>': 2}
for i, w in enumerate(vocabulary):
    word2id[w] = i + 3
id2word = {i: w for w, i in word2id.items()}

print(len(vocabulary), vocabulary[:20])

with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([vocabulary, word2id, id2word], fw)

def translate(ids):
    words = [id2word[i] for i in ids if i >= 3]
    return ' '.join(words) + '.'

将标题转换为id序列

def convert_captions(data):
    result = []
    for caption in data:
        vector = [word2id['<start>']]
        for word in caption:
            if word in word2id:
                vector.append(word2id[word])
        vector.append(word2id['<end>'])
        result.append(vector)

    array = np.zeros((len(data), maxlen + 2), np.int32)
    for i in tqdm(range(len(result))):
        array[i, :len(result[i])] = result[i]
    return array

train_captions = convert_captions(train_captions)
print(train_captions.shape)
print(train_captions[0])
print(translate(train_captions[0]))

使用图像风格迁移中用过的imagenet-vgg-verydeep-19.mat来提取图像特征,加载vgg19模型并定义一个函数,对于给定的输入,返回vgg19各个层的输出值,通过variable_scope实现网络的重用,将conv5_3的输出作为原始图片的表示

vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')
vgg_layers = vgg['layers']

def vgg_endpoints(inputs, reuse=None):
    with tf.variable_scope('endpoints', reuse=reuse):
        def _weights(layer, expected_layer_name):
            W = vgg_layers[0][layer][0][0][2][0][0]
            b = vgg_layers[0][layer][0][0][2][0][1]
            layer_name = vgg_layers[0][layer][0][0][0][0]
            assert layer_name == expected_layer_name
            return W, b

        def _conv2d_relu(prev_layer, layer, layer_name):
            W, b = _weights(layer, layer_name)
            W = tf.constant(W)
            b = tf.constant(np.reshape(b, (b.size)))
            return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b)

        def _avgpool(prev_layer):
            return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

        graph = {}
        graph['conv1_1']  = _conv2d_relu(inputs, 0, 'conv1_1')
        graph['conv1_2']  = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2')
        graph['avgpool1'] = _avgpool(graph['conv1_2'])
        graph['conv2_1']  = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1')
        graph['conv2_2']  = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2')
        graph['avgpool2'] = _avgpool(graph['conv2_2'])
        graph['conv3_1']  = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1')
        graph['conv3_2']  = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2')
        graph['conv3_3']  = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3')
        graph['conv3_4']  = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4')
        graph['avgpool3'] = _avgpool(graph['conv3_4'])
        graph['conv4_1']  = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1')
        graph['conv4_2']  = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2')
        graph['conv4_3']  = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3')
        graph['conv4_4']  = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4')
        graph['avgpool4'] = _avgpool(graph['conv4_4'])
        graph['conv5_1']  = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1')
        graph['conv5_2']  = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2')
        graph['conv5_3']  = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3')
        graph['conv5_4']  = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4')
        graph['avgpool5'] = _avgpool(graph['conv5_4'])

        return graph

X = tf.placeholder(tf.float32, [None, image_size, image_size, 3])
encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']
print(encoded)

基于以上contexts,实现初始化、词嵌入、特征映射等部分

k_initializer = tf.contrib.layers.xavier_initializer()
b_initializer = tf.constant_initializer(0.0)
e_initializer = tf.random_uniform_initializer(-1.0, 1.0)

def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None):
    return tf.layers.dense(inputs, units, activation, use_bias,
                           kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)

def batch_norm(inputs, name):
    return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=True, 
                                        updates_collections=None, scope=name)

def dropout(inputs):
    return tf.layers.dropout(inputs, rate=0.5, training=True)

num_block = 14 * 14
num_filter = 512
hidden_size = 1024
embedding_size = 512

encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filter
contexts = batch_norm(encoded, 'contexts')

Y = tf.placeholder(tf.int32, [None, maxlen + 2])
Y_in = Y[:, :-1]
Y_out = Y[:, 1:]
mask = tf.to_float(tf.not_equal(Y_out, word2id['<pad>']))

with tf.variable_scope('initialize'):
    context_mean = tf.reduce_mean(contexts, 1)
    state = dense(context_mean, hidden_size, name='initial_state')
    memory = dense(context_mean, hidden_size, name='initial_memory')

with tf.variable_scope('embedding'):
    embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer)
    embedded = tf.nn.embedding_lookup(embeddings, Y_in)

with tf.variable_scope('projected'):
    projected_contexts = tf.reshape(contexts, [-1, num_filter]) # batch_size * num_block, num_filter
    projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts')
    projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filter

lstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)
loss = 0
alphas = []

依次生成标题中的每个词,包括计算注意力和context、计算选择器、lstm处理、计算输出、计算损失函数几个部分

for t in range(maxlen + 1):
    with tf.variable_scope('attend'):
        h0 = dense(state, num_filter, activation=None, name='fc_state') # batch_size, num_filter
        h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter
        h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter
        h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1
        h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block

        alpha = tf.nn.softmax(h0) # batch_size, num_block
        # contexts:                 batch_size, num_block, num_filter
        # tf.expand_dims(alpha, 2): batch_size, num_block, 1
        context = tf.reduce_sum(contexts * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter
        alphas.append(alpha)

    with tf.variable_scope('selector'):
        beta = dense(state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1
        context = tf.multiply(beta, context, name='selected_context')  # batch_size, num_filter

    with tf.variable_scope('lstm'):
        h0 = tf.concat([embedded[:, t, :], context], 1) # batch_size, embedding_size + num_filter
        _, (memory, state) = lstm(inputs=h0, state=[memory, state])

    with tf.variable_scope('decode'):
        h0 = dropout(state)
        h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state')
        h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context')
        h0 += embedded[:, t, :]
        h0 = tf.nn.tanh(h0)

        h0 = dropout(h0)
        logits = dense(h0, len(word2id), activation=None, name='fc_logits')

    loss += tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=Y_out[:, t], logits=logits) * mask[:, t])
    tf.get_variable_scope().reuse_variables()

在损失函数中加入注意力正则项,定义优化器

alphas = tf.transpose(tf.stack(alphas), (1, 0, 2)) # batch_size, maxlen + 1, num_block
alphas = tf.reduce_sum(alphas, 1) # batch_size, num_block
attention_loss = tf.reduce_sum(((maxlen + 1) / num_block - alphas) ** 2)
total_loss = (loss + attention_loss) / batch_size

with tf.variable_scope('optimizer', reuse=tf.AUTO_REUSE):
    global_step = tf.Variable(0, trainable=False)
    vars_t = [var for var in tf.trainable_variables() if not var.name.startswith('endpoints')]
    train = tf.contrib.layers.optimize_loss(total_loss, global_step, 0.001, 'Adam', clip_gradients=5.0, variables=vars_t)

训练模型,将一些tensor的值写入events文件,便于后续使用tensorboard查看

sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()
OUTPUT_DIR = 'model'
if not os.path.exists(OUTPUT_DIR):
    os.mkdir(OUTPUT_DIR)

tf.summary.scalar('losses/loss', loss)
tf.summary.scalar('losses/attention_loss', attention_loss)
tf.summary.scalar('losses/total_loss', total_loss)
summary = tf.summary.merge_all()
writer = tf.summary.FileWriter(OUTPUT_DIR)

epochs = 20
for e in range(epochs):
    train_ids, train_captions = shuffle(train_ids, train_captions)
    for i in tqdm(range(len(train_ids) // batch_size)):
        X_batch = np.array([train_dict[x] for x in train_ids[i * batch_size: i * batch_size + batch_size]])
        Y_batch = train_captions[i * batch_size: i * batch_size + batch_size]

        _ = sess.run(train, feed_dict={X: X_batch, Y: Y_batch})

        if i > 0 and i % 100 == 0:
            writer.add_summary(sess.run(summary, 
                                        feed_dict={X: X_batch, Y: Y_batch}), 
                                        e * len(train_ids) // batch_size + i)
            writer.flush()

    saver.save(sess, os.path.join(OUTPUT_DIR, 'image_caption'))

使用以下命令可以在tensorboard中查看历史训练数据

tensorboard --logdir=model

验证

接下来是验证部分代码,即在验证集上生成每张图片的标题,然后和标注进行对比和评估

在生成每一个词的时候,可以选择概率最大的词,即贪婪的做法,但不一定最优,因为当前概率最大的词并不能保证之后产生的序列整体概率最大

也不能像中文分词中那样使用viterbi算法,因为viterbi算法要求整个序列的概率分布已知,才能使用动态规划找到最大概率路径

但生成标题的时候,是一个词一个词地生成,而且选择的类别等于词典的大小,远远超出中文分词序列标注中的四分类,因此不可能穷尽所有可能的序列

一种折中的做法是使用beam search,涉及一个参数beam size,举个例子,当beam size等于3时

  • 生成第一个词时,保留概率最大的三个词
  • 生成第二个词时,在以上三个词的基础上,进一步生成九个词,保留九个序列中概率最大的三个
  • 生成第n个词时,基于上一步保留下来的三个序列,进一步生成九个词,保留新的九个序列中概率最大的三个
  • 就好比一棵树,每一次所有的树枝都会进一步长出三个子树枝,然后对于所有树枝,保留最好的三个,其他全部砍掉
  • 重复以上过程,直到生成了结束词,或者生成的序列达到了最大长度

验证部分的大多数代码和训练部分相同

加载库

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.utils import shuffle
from imageio import imread
import scipy.io
import cv2
import os
import json
from tqdm import tqdm
import pickle

加载数据

batch_size = 128
maxlen = 20
image_size = 224

MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))

def load_data(image_dir, annotation_path):
    with open(annotation_path, 'r') as fr:
        annotation = json.load(fr)

    ids = []
    captions = []
    image_dict = {}
    for i in tqdm(range(len(annotation['annotations']))):
        item = annotation['annotations'][i]
        caption = item['caption'].strip().lower()
        caption = caption.replace('.', '').replace(',', '').replace("'", '').replace('"', '')
        caption = caption.replace('&', 'and').replace('(', '').replace(')', '').replace('-', ' ').split()
        caption = [w for w in caption if len(w) > 0]

        if len(caption) <= maxlen:
            if not item['image_id'] in image_dict:
                img = imread(image_dir + '%012d.jpg' % item['image_id'])
                h = img.shape[0]
                w = img.shape[1]
                if h > w:
                    img = img[h // 2 - w // 2: h // 2 + w // 2, :]
                else:
                    img = img[:, w // 2 - h // 2: w // 2 + h // 2]   
                img = cv2.resize(img, (image_size, image_size))

                if len(img.shape) < 3:
                    img = np.expand_dims(img, -1)
                    img = np.concatenate([img, img, img], axis=-1)

                image_dict[item['image_id']] = img

            ids.append(item['image_id'])
            captions.append(caption)

    return ids, captions, image_dict

val_json = 'data/val/captions_val2014.json'
val_ids, val_captions, val_dict = load_data('data/val/images/COCO_val2014_', val_json)
print(len(val_ids))

整理正确答案

gt = {}
for i in tqdm(range(len(val_ids))):
    val_id = val_ids[i]
    if not val_id in gt:
        gt[val_id] = []
    gt[val_id].append(' '.join(val_captions[i]) + ' .')
print(len(gt))

加载训练部分整理好的词典

with open('dictionary.pkl', 'rb') as fr:
    [vocabulary, word2id, id2word] = pickle.load(fr)

def translate(ids):
    words = [id2word[i] for i in ids if i >= 3]
    return ' '.join(words) + ' .'

加载vgg19模型

vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')
vgg_layers = vgg['layers']

def vgg_endpoints(inputs, reuse=None):
    with tf.variable_scope('endpoints', reuse=reuse):
        def _weights(layer, expected_layer_name):
            W = vgg_layers[0][layer][0][0][2][0][0]
            b = vgg_layers[0][layer][0][0][2][0][1]
            layer_name = vgg_layers[0][layer][0][0][0][0]
            assert layer_name == expected_layer_name
            return W, b

        def _conv2d_relu(prev_layer, layer, layer_name):
            W, b = _weights(layer, layer_name)
            W = tf.constant(W)
            b = tf.constant(np.reshape(b, (b.size)))
            return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b)

        def _avgpool(prev_layer):
            return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

        graph = {}
        graph['conv1_1']  = _conv2d_relu(inputs, 0, 'conv1_1')
        graph['conv1_2']  = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2')
        graph['avgpool1'] = _avgpool(graph['conv1_2'])
        graph['conv2_1']  = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1')
        graph['conv2_2']  = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2')
        graph['avgpool2'] = _avgpool(graph['conv2_2'])
        graph['conv3_1']  = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1')
        graph['conv3_2']  = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2')
        graph['conv3_3']  = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3')
        graph['conv3_4']  = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4')
        graph['avgpool3'] = _avgpool(graph['conv3_4'])
        graph['conv4_1']  = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1')
        graph['conv4_2']  = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2')
        graph['conv4_3']  = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3')
        graph['conv4_4']  = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4')
        graph['avgpool4'] = _avgpool(graph['conv4_4'])
        graph['conv5_1']  = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1')
        graph['conv5_2']  = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2')
        graph['conv5_3']  = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3')
        graph['conv5_4']  = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4')
        graph['avgpool5'] = _avgpool(graph['conv5_4'])

        return graph

X = tf.placeholder(tf.float32, [None, image_size, image_size, 3])
encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']
print(encoded)

验证部分需要定义几个placeholder,因为要使用到beam search,所以每生成一个词就需要输入一些相关的值

k_initializer = tf.contrib.layers.xavier_initializer()
b_initializer = tf.constant_initializer(0.0)
e_initializer = tf.random_uniform_initializer(-1.0, 1.0)

def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None):
    return tf.layers.dense(inputs, units, activation, use_bias,
                           kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)

def batch_norm(inputs, name):
    return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=False, 
                                        updates_collections=None, scope=name)

def dropout(inputs):
    return tf.layers.dropout(inputs, rate=0.5, training=False)

num_block = 14 * 14
num_filter = 512
hidden_size = 1024
embedding_size = 512

encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filter
contexts = batch_norm(encoded, 'contexts')

with tf.variable_scope('initialize'):
    context_mean = tf.reduce_mean(contexts, 1)
    initial_state = dense(context_mean, hidden_size, name='initial_state')
    initial_memory = dense(context_mean, hidden_size, name='initial_memory')

contexts_phr = tf.placeholder(tf.float32, [None, num_block, num_filter])
last_memory = tf.placeholder(tf.float32, [None, hidden_size])
last_state = tf.placeholder(tf.float32, [None, hidden_size])
last_word = tf.placeholder(tf.int32, [None])

with tf.variable_scope('embedding'):
    embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer)
    embedded = tf.nn.embedding_lookup(embeddings, last_word)

with tf.variable_scope('projected'):
    projected_contexts = tf.reshape(contexts_phr, [-1, num_filter]) # batch_size * num_block, num_filter
    projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts')
    projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filter

lstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)

生成部分,不需要循环,重复一次即可,后面进行beam search时再进行多次输入数据并得到输出

with tf.variable_scope('attend'):
    h0 = dense(last_state, num_filter, activation=None, name='fc_state') # batch_size, num_filter
    h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter
    h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter
    h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1
    h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block

    alpha = tf.nn.softmax(h0) # batch_size, num_block
    # contexts:                 batch_size, num_block, num_filter
    # tf.expand_dims(alpha, 2): batch_size, num_block, 1
    context = tf.reduce_sum(contexts_phr * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter

with tf.variable_scope('selector'):
    beta = dense(last_state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1
    context = tf.multiply(beta, context, name='selected_context')  # batch_size, num_filter

with tf.variable_scope('lstm'):
    h0 = tf.concat([embedded, context], 1) # batch_size, embedding_size + num_filter
    _, (current_memory, current_state) = lstm(inputs=h0, state=[last_memory, last_state])

with tf.variable_scope('decode'):
    h0 = dropout(current_state)
    h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state')
    h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context')
    h0 += embedded
    h0 = tf.nn.tanh(h0)

    h0 = dropout(h0)
    logits = dense(h0, len(word2id), activation=None, name='fc_logits')
    probs = tf.nn.softmax(logits)

加载训练好的模型,对每个batch的数据进行beam search,依次生成每一个词

这里beam size设为1主要是为了节省时间,验证共花了10个小时,具体应用时可以适当加大beam size

MODEL_DIR = 'model'
sess = tf.Session()
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint(MODEL_DIR))

beam_size = 1
id2sentence = {}

val_ids = list(set(val_ids))
if len(val_ids) % batch_size != 0:
    for i in range(batch_size - len(val_ids) % batch_size):
        val_ids.append(val_ids[0])
print(len(val_ids))

for i in tqdm(range(len(val_ids) // batch_size)):
    X_batch = np.array([val_dict[x] for x in val_ids[i * batch_size: i * batch_size + batch_size]])
    contexts_, initial_memory_, initial_state_ = sess.run([contexts, initial_memory, initial_state], feed_dict={X: X_batch})

    result = []
    complete = []
    for b in range(batch_size):
        result.append([{
            'sentence': [], 
            'memory': initial_memory_[b], 
            'state': initial_state_[b],
            'score': 1.0,
            'alphas': []
        }])
        complete.append([])

    for t in range(maxlen + 1):
        cache = [[] for b in range(batch_size)]
        step = 1 if t == 0 else beam_size
        for s in range(step):
            if t == 0:
                last_word_ = np.ones([batch_size], np.int32) * word2id['<start>']
            else:
                last_word_ = np.array([result[b][s]['sentence'][-1] for b in range(batch_size)], np.int32)

            last_memory_ = np.array([result[b][s]['memory'] for b in range(batch_size)], np.float32)
            last_state_ = np.array([result[b][s]['state'] for b in range(batch_size)], np.float32)

            current_memory_, current_state_, probs_, alpha_ = sess.run(
                [current_memory, current_state, probs, alpha], feed_dict={
                    contexts_phr: contexts_, 
                    last_memory: last_memory_,
                    last_state: last_state_,
                    last_word: last_word_
                    })

            for b in range(batch_size):
                word_and_probs = [[w, p] for w, p in enumerate(probs_[b])]
                word_and_probs.sort(key=lambda x:-x[1])
                word_and_probs = word_and_probs[:beam_size + 1]

                for w, p in word_and_probs:
                    item = {
                        'sentence': result[b][s]['sentence'] + [w], 
                        'memory': current_memory_[b], 
                        'state': current_state_[b],
                        'score': result[b][s]['score'] * p,
                        'alphas': result[b][s]['alphas'] + [alpha_[b]]
                    }

                    if id2word[w] == '<end>':
                        complete[b].append(item)
                    else:
                        cache[b].append(item)

        for b in range(batch_size):
            cache[b].sort(key=lambda x:-x['score'])
            cache[b] = cache[b][:beam_size]
        result = cache.copy()

    for b in range(batch_size):
        if len(complete[b]) == 0:
            final_sentence = result[b][0]['sentence']
        else:
            final_sentence = complete[b][0]['sentence']

        val_id = val_ids[i * batch_size + b] 
        if not val_id in id2sentence:
            id2sentence[val_id] = [translate(final_sentence)]

print(len(id2sentence))

将标题生成结果写入文件,便于后续评估

with open('generated.txt', 'w') as fw:
    for i in id2sentence.keys():
        fw.write(str(i) + '^' + id2sentence[i][0] + '^' + '_'.join(gt[i]) + '\n')

调用以下项目进行评估,github.com/tylin/coco-…,评估指标包括BLEU、Rouge、Cider三个

其中BLEU在图像标题生成、机器翻译等任务中用得比较多,可以简单理解为1-gram、2-gram、3-gram、4-gram的命中率

吴恩达深度学习微专业课中关于BLEU的介绍,mooc.study.163.com/course/2001…

from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.cider.cider import Cider

id2sentence = {}
gt = {}
with open('generated.txt', 'r') as fr:
    lines = fr.readlines()
    for line in lines:
        line = line.strip('\n').split('^')
        i = line[0]
        id2sentence[i] = [line[1]]
        gt[i] = line[2].split('_')

scorers = [
    (Bleu(4), ['Bleu_1', 'Bleu_2', 'Bleu_3', 'Bleu_4']),
    (Rouge(), 'ROUGE_L'),
    (Cider(), 'CIDEr')
]

for scorer, name in scorers:
    score, _ = scorer.compute_score(gt, id2sentence)
    if type(score) == list:
        for n, s in zip(name, score):
            print(n, s)
    else:
        print(name, score)

评估结果如下,适当加大beam size可以进一步提高各项指标

  • Bleu_1:0.6878
  • Bleu_2:0.4799
  • Bleu_3:0.3347
  • Bleu_4:0.2355
  • ROUGE: 0.5304
  • CIDEr: 0.7293

使用

最后,通过以下代码在本机上使用训练好的模型,为任意图片生成标题

整体代码结构和验证部分比较类似,但是由于只需要对一张图片生成标题,所以beam search部分的代码简化很多

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from imageio import imread
import scipy.io
import cv2
import os
import pickle

batch_size = 1
maxlen = 20
image_size = 224

MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))

with open('dictionary.pkl', 'rb') as fr:
    [vocabulary, word2id, id2word] = pickle.load(fr)

def translate(ids):
    words = [id2word[i] for i in ids if i >= 3]
    return ' '.join(words) + ' .'

vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')
vgg_layers = vgg['layers']

def vgg_endpoints(inputs, reuse=None):
    with tf.variable_scope('endpoints', reuse=reuse):
        def _weights(layer, expected_layer_name):
            W = vgg_layers[0][layer][0][0][2][0][0]
            b = vgg_layers[0][layer][0][0][2][0][1]
            layer_name = vgg_layers[0][layer][0][0][0][0]
            assert layer_name == expected_layer_name
            return W, b

        def _conv2d_relu(prev_layer, layer, layer_name):
            W, b = _weights(layer, layer_name)
            W = tf.constant(W)
            b = tf.constant(np.reshape(b, (b.size)))
            return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b)

        def _avgpool(prev_layer):
            return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

        graph = {}
        graph['conv1_1']  = _conv2d_relu(inputs, 0, 'conv1_1')
        graph['conv1_2']  = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2')
        graph['avgpool1'] = _avgpool(graph['conv1_2'])
        graph['conv2_1']  = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1')
        graph['conv2_2']  = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2')
        graph['avgpool2'] = _avgpool(graph['conv2_2'])
        graph['conv3_1']  = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1')
        graph['conv3_2']  = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2')
        graph['conv3_3']  = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3')
        graph['conv3_4']  = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4')
        graph['avgpool3'] = _avgpool(graph['conv3_4'])
        graph['conv4_1']  = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1')
        graph['conv4_2']  = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2')
        graph['conv4_3']  = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3')
        graph['conv4_4']  = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4')
        graph['avgpool4'] = _avgpool(graph['conv4_4'])
        graph['conv5_1']  = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1')
        graph['conv5_2']  = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2')
        graph['conv5_3']  = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3')
        graph['conv5_4']  = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4')
        graph['avgpool5'] = _avgpool(graph['conv5_4'])

        return graph

X = tf.placeholder(tf.float32, [None, image_size, image_size, 3])
encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']

k_initializer = tf.contrib.layers.xavier_initializer()
b_initializer = tf.constant_initializer(0.0)
e_initializer = tf.random_uniform_initializer(-1.0, 1.0)

def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None):
    return tf.layers.dense(inputs, units, activation, use_bias,
                           kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)

def batch_norm(inputs, name):
    return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=False, 
                                        updates_collections=None, scope=name)

def dropout(inputs):
    return tf.layers.dropout(inputs, rate=0.5, training=False)

num_block = 14 * 14
num_filter = 512
hidden_size = 1024
embedding_size = 512

encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filter
contexts = batch_norm(encoded, 'contexts')

with tf.variable_scope('initialize'):
    context_mean = tf.reduce_mean(contexts, 1)
    initial_state = dense(context_mean, hidden_size, name='initial_state')
    initial_memory = dense(context_mean, hidden_size, name='initial_memory')

contexts_phr = tf.placeholder(tf.float32, [None, num_block, num_filter])
last_memory = tf.placeholder(tf.float32, [None, hidden_size])
last_state = tf.placeholder(tf.float32, [None, hidden_size])
last_word = tf.placeholder(tf.int32, [None])

with tf.variable_scope('embedding'):
    embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer)
    embedded = tf.nn.embedding_lookup(embeddings, last_word)

with tf.variable_scope('projected'):
    projected_contexts = tf.reshape(contexts_phr, [-1, num_filter]) # batch_size * num_block, num_filter
    projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts')
    projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filter

lstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)

with tf.variable_scope('attend'):
    h0 = dense(last_state, num_filter, activation=None, name='fc_state') # batch_size, num_filter
    h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter
    h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter
    h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1
    h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block

    alpha = tf.nn.softmax(h0) # batch_size, num_block
    # contexts:                 batch_size, num_block, num_filter
    # tf.expand_dims(alpha, 2): batch_size, num_block, 1
    context = tf.reduce_sum(contexts_phr * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter

with tf.variable_scope('selector'):
    beta = dense(last_state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1
    context = tf.multiply(beta, context, name='selected_context')  # batch_size, num_filter

with tf.variable_scope('lstm'):
    h0 = tf.concat([embedded, context], 1) # batch_size, embedding_size + num_filter
    _, (current_memory, current_state) = lstm(inputs=h0, state=[last_memory, last_state])

with tf.variable_scope('decode'):
    h0 = dropout(current_state)
    h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state')
    h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context')
    h0 += embedded
    h0 = tf.nn.tanh(h0)

    h0 = dropout(h0)
    logits = dense(h0, len(word2id), activation=None, name='fc_logits')
    probs = tf.nn.softmax(logits)

MODEL_DIR = 'model'
sess = tf.Session()
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint(MODEL_DIR))

beam_size = 3
img = imread('test.png')
if img.shape[-1] == 4:
    img = img[:, :, :-1]
h = img.shape[0]
w = img.shape[1]
if h > w:
    img = img[h // 2 - w // 2: h // 2 + w // 2, :]
else:
    img = img[:, w // 2 - h // 2: w // 2 + h // 2]
img = cv2.resize(img, (image_size, image_size))
X_data = np.expand_dims(img, 0)

contexts_, initial_memory_, initial_state_ = sess.run([contexts, initial_memory, initial_state], feed_dict={X: X_data})

result = [{
    'sentence': [], 
    'memory': initial_memory_[0], 
    'state': initial_state_[0],
    'score': 1.0,
    'alphas': []
}]
complete = []
for t in range(maxlen + 1):
    cache = []
    step = 1 if t == 0 else beam_size
    for s in range(step):
        if t == 0:
            last_word_ = np.ones([batch_size], np.int32) * word2id['<start>']
        else:
            last_word_ = np.array([result[s]['sentence'][-1]], np.int32)

        last_memory_ = np.array([result[s]['memory']], np.float32)
        last_state_ = np.array([result[s]['state']], np.float32)

        current_memory_, current_state_, probs_, alpha_ = sess.run(
            [current_memory, current_state, probs, alpha], feed_dict={
                contexts_phr: contexts_, 
                last_memory: last_memory_,
                last_state: last_state_,
                last_word: last_word_
                })

        word_and_probs = [[w, p] for w, p in enumerate(probs_[0])]
        word_and_probs.sort(key=lambda x:-x[1])
        word_and_probs = word_and_probs[:beam_size + 1]

        for w, p in word_and_probs:
            item = {
                'sentence': result[s]['sentence'] + [w], 
                'memory': current_memory_[0], 
                'state': current_state_[0],
                'score': result[s]['score'] * p,
                'alphas': result[s]['alphas'] + [alpha_[0]]
            }
            if id2word[w] == '<end>':
                complete.append(item)
            else:
                cache.append(item)

    cache.sort(key=lambda x:-x['score'])
    cache = cache[:beam_size]
    result = cache.copy()

if len(complete) == 0:
    final_sentence = result[0]['sentence']
    alphas = result[0]['alphas']
else:
    final_sentence = complete[0]['sentence']
    alphas = complete[0]['alphas']

sentence = translate(final_sentence)
print(sentence)
sentence = sentence.split(' ')

img = (img - img.min()) / (img.max() - img.min())
n = int(np.ceil(np.sqrt(len(sentence))))
plt.figure(figsize=(10, 8))
for i in range(len(sentence)):
    word = sentence[i]
    a = np.reshape(alphas[i], (14, 14))
    a = cv2.resize(a, (image_size, image_size))
    a = np.expand_dims(a, -1)
    a = (a - a.min()) / (a.max() - a.min())
    combine = 0.5 * img +  0.5 * a
    plt.subplot(n, n, i + 1)
    plt.text(0, 1, word, color='black', backgroundcolor='white', fontsize=12)
    plt.imshow(combine)
    plt.axis('off')
plt.show()

标题生成结果如下,非常准确地涵盖了新娘、新郎、摆pose、拍照等关键词,并且注意力可视化也很好地反映了生成每个词时模型对图片不同区域的关注度

参考

Show, Attend and Tell: Neural Image Caption Generation with Visual Attention:arxiv.org/abs/1502.03…

TensorFlow Implementation of "Show, Attend and Tell":github.com/yunjey/show…

results matching ""

    No results matching ""