AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


聊聊基于BERT模型实现多标签分类任务的实践与思考
发布日期:2024-05-03 08:44:49 浏览次数: 2107


概述

以预训练大模型为基座神经网络模型,通过模型预训练后的泛化能力与微调后的领域能力,作为NLP任务的解决方案。

在github上找了一个简单的仓库——multi_label_classification,该仓库基于BERT预训练大模型实现了多分类任务。通过对该仓库源码的分析,深入研究其逻辑原理。

代码文件的注释如下:

模型类定义

基于BERT预训练模型,结合线性层与sigmoid函数,输出多分类任务的概率分布

import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel

# 基于BERT模型的多分类模型定义
class BertMultiLabelCls(nn.Module):
    def __init__(self, hidden_size, class_num, dropout=0.1):
        super(BertMultiLabelCls, self).__init__()
        # 定义线性层(密集层)
        # 输入的维度是hidden_size;输出的维度是多分类的个数。
        self.fc = nn.Linear(hidden_size, class_num)
        self.drop = nn.Dropout(dropout)
        # 加载预训练模型
        self.bert = BertModel.from_pretrained("bert-base-chinese")

    def forward(self, input_ids, attention_mask, token_type_ids):
        # 将输入的input_ids(文本token的ID),
        # attention_mask(表示哪些token是重要的)
        # token_type_ids(区分不同类型的token,如句子A和句子B)传递给BERT模型,获取模型的输出。
        outputs = self.bert(input_ids, attention_mask, token_type_ids)
        # 打印输出
        # print(outputs)
        # 从BERT模型的输出中选择第一个元素(通常是[CLS]标记的输出),然后通过dropout层。
        cls = self.drop(outputs[1])
        # 将经过dropout层的[CLS]标记的输出传递到全连接层self.fc,然后应用sigmoid激活函数,将输出转换为概率分布。
        out = F.sigmoid(self.fc(cls))
        # 返回最终的输出概率分布,用于多分类任务。
        return out

Sigmoid函数通常用于二分类问题的神经网络中;Softmax函数通常用于多分类问题的神经网络中。而这里用到的是Sigmoid函数,这个原因在后面也会讲到。

数据预处理

一般都会有一个数据预处理的流程,因为模型的数据需要处理成指定的JSON格式后,才能喂到模型训练。为什么要有对应的格式?因为模型的训练与微调有对应的格式要求。

譬如,alpaca格式数据集与sharegpt格式数据集等常见格式要求。因此就会要求将原始数据处理成对应格式。

# -*- coding: utf-8 -*-
import json
import torch
from torch.utils.data import Dataset
from transformers import BertTokenizer
from data_preprocess import load_json


class MultiClsDataSet(Dataset):
    def __init__(self, data_path, max_len=128, label2idx_path="./data/label2idx.json"):
        self.label2idx = load_json(label2idx_path)
        self.class_num = len(self.label2idx)
        self.tokenizer = BertTokenizer.from_pretrained("bert-base-chinese")
        self.max_len = max_len
        self.input_ids, self.token_type_ids, self.attention_mask, self.labels = self.encoder(data_path)

    def encoder(self, data_path):
        texts = []
        labels = []
        with open(data_path, encoding="utf-8"as f:
            for line in f:
                line = json.loads(line)
                texts.append(line["text"])
                tmp_label = [0] * self.class_num
                for label in line["label"]:
                    tmp_label[self.label2idx[label]] = 1
                labels.append(tmp_label)

        tokenizers = self.tokenizer(texts,
                                    padding=True,
                                    truncation=True,
                                    max_length=self.max_len,
                                    return_tensors="pt",
                                    is_split_into_words=False)
        input_ids = tokenizers["input_ids"]
        token_type_ids = tokenizers["token_type_ids"]
        attention_mask = tokenizers["attention_mask"]

        return input_ids, token_type_ids, attention_mask, \
               torch.tensor(labels, dtype=torch.float)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, item):
        return self.input_ids[item],  self.attention_mask[item], \
               self.token_type_ids[item], self.labels[item]


if __name__ == '__main__':
    dataset = MultiClsDataSet(data_path="./data/train.json")
    print(dataset.input_ids)
    print(dataset.token_type_ids)
    print(dataset.attention_mask)
    print(dataset.labels)
# -*- coding: utf-8 -*-

"""
数据预处理
"""


import json


def load_json(data_path):
    with open(data_path, encoding="utf-8"as f:
        return json.loads(f.read())


def dump_json(project, out_path):
    with open(out_path, "w", encoding="utf-8"as f:
        json.dump(project, f, ensure_ascii=False)


def preprocess(train_data_path, label2idx_path, max_len_ratio=0.9):
    """
    :param train_data_path:
    :param label2idx_path:
    :param max_len_ratio:
    :return:
    """

    labels = []
    text_length = []
    with open(train_data_path, encoding="utf-8"as f:
        for data in f:
            data = json.loads(data)
            text_length.append(len(data["text"]))
            labels.extend(data["label"])
    labels = list(set(labels))
    label2idx = {label: idx for idx, label in enumerate(labels)}

    dump_json(label2idx, label2idx_path)

    text_length.sort()

    print("当设置max_len={}时,可覆盖{}的文本".format(text_length[int(len(text_length)*max_len_ratio)], max_len_ratio))


if __name__ == '__main__':
    preprocess("./data/train.json""./data/label2idx.json")

训练

训练的源码如下:

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import AdamW
import numpy as np
from data_preprocess import load_json
from bert_multilabel_cls import BertMultiLabelCls
from data_helper import MultiClsDataSet
from sklearn.metrics import accuracy_score

train_path = "./data/train.json"
dev_path = "./data/dev.json"
test_path = "./data/test.json"
label2idx_path = "./data/label2idx.json"
save_model_path = "./model/multi_label_cls.pth"
label2idx = load_json(label2idx_path)
class_num = len(label2idx)
device = "cuda" if torch.cuda.is_available() else "cpu"
lr = 2e-5
batch_size = 128
max_len = 128
hidden_size = 768
epochs = 10

# 预处理数据
train_dataset = MultiClsDataSet(train_path, max_len=max_len, label2idx_path=label2idx_path)
dev_dataset = MultiClsDataSet(dev_path, max_len=max_len, label2idx_path=label2idx_path)

# 从数据集中 批量 加载数据,批大小为batch_size
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_dataloader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)

# 计算准确率
def get_acc_score(y_true_tensor, y_pred_tensor):
    y_pred_tensor = (y_pred_tensor.cpu() > 0.5).int().numpy()
    y_true_tensor = y_true_tensor.cpu().numpy()
    return accuracy_score(y_true_tensor, y_pred_tensor)

# 训练
def train():
    model = BertMultiLabelCls(hidden_size=hidden_size, class_num=class_num)
    # 启用 batch normalization 和 dropout 。
    model.train()
    model.to(device)
    # 定义优化器
    optimizer = AdamW(model.parameters(), lr=lr)
    # 定义了一个二进制交叉熵损失函数(BCELoss),用于多标签分类问题,因为它可以处理多个标签。
    criterion = nn.BCELoss()

    dev_best_acc = 0.
    
    # 按epoch训练,即训练轮数
    for epoch in range(1, epochs):
        # 启用 batch normalization 和 dropout 。
        model.train()
        # 按batch训练,即训练批次
        for i, batch in enumerate(train_dataloader):
            # 清空梯度
            optimizer.zero_grad()
            batch = [d.to(device) for d in batch]
            # 获取批数据中的标签label数据
            labels = batch[-1]
            # 执行预训练模型的forward方法
            logits = model(*batch[:3])
            # 通过二分类交叉熵损失,计算模型返回值与标签实际值的损失概率
            loss = criterion(logits, labels)
            # 反向传播
            loss.backward()
            # 梯度更新
            optimizer.step()

            # 打印数据
            if i % 100 == 0:
                acc_score = get_acc_score(labels, logits)
                print("Train epoch:{} step:{}  acc: {} loss:{} ".format(epoch, i, acc_score, loss.item()))

        # 验证集合
        dev_loss, dev_acc = dev(model, dev_dataloader, criterion)
        print("Dev epoch:{} acc:{} loss:{}".format(epoch, dev_acc, dev_loss))
        if dev_acc > dev_best_acc:
            dev_best_acc = dev_acc
            torch.save(model.state_dict(), save_model_path)

    # 测试
    test_acc = test(save_model_path, test_path)
    print("Test acc: {}".format(test_acc))

# 验证
def dev(model, dataloader, criterion):
    all_loss = []
    # 切换成评估模式
    model.eval()
    true_labels = []
    pred_labels = []
    with torch.no_grad():
        for i, batch in enumerate(dataloader):
            input_ids, attention_mask, token_type_ids, labels = [d.to(device) for d in batch]
            logits = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
            loss = criterion(logits, labels)
            all_loss.append(loss.item())
            true_labels.append(labels)
            pred_labels.append(logits)
    true_labels = torch.cat(true_labels, dim=0)
    pred_labels = torch.cat(pred_labels, dim=0)
    acc_score = get_acc_score(true_labels, pred_labels)
    return np.mean(all_loss), acc_score

# 测试
def test(model_path, test_data_path):
    test_dataset = MultiClsDataSet(test_data_path, max_len=max_len, label2idx_path=label2idx_path)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    model = BertMultiLabelCls(hidden_size=hidden_size, class_num=class_num)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    # 切换成评估模式
    model.eval()
    true_labels = []
    pred_labels = []
    with torch.no_grad():
        for i, batch in enumerate(test_dataloader):
            input_ids, attention_mask, token_type_ids, labels = [d.to(device) for d in batch]
            logits = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
            true_labels.append(labels)
            pred_labels.append(logits)
    true_labels = torch.cat(true_labels, dim=0)
    pred_labels = torch.cat(pred_labels, dim=0)
    acc_score = get_acc_score(true_labels, pred_labels)
    return acc_score


if __name__ == '__main__':
    train()

在上面的代码中,可以看到很多熟悉的参数。譬如epochsbatch_sizemax_lenhidden_size——这些参数一般在配置文件config中毕竟常见,而且都是有默认值。以chatglm3-6b的配置文件为例:

{
  "_name_or_path""THUDM/chatglm-6b",
  "architectures": [
    "ChatGLMModel"
  ],
  "bos_token_id"130004,
  "eos_token_id"130005,
  "mask_token_id"130000,
  "gmask_token_id"130001,
  "pad_token_id"3,
  "hidden_size"4096,
  "inner_hidden_size"16384,
  "layernorm_epsilon"1e-05,
  "max_sequence_length"2048,
  "model_type""chatglm",
  "num_attention_heads"32,
  "num_layers"28,
  "position_encoding_2d": true,
  "torch_dtype""float16",
  "use_cache": true,
  "vocab_size"130528
}

在每个batch的训练中,其流程是这样的:

  1. 清空梯度
  2. 获取实际的标签
  3. 通过预训练模型输出预测值
  4. 基于二分类交叉熵损失函数,计算实际值与预测值的损失
  5. 将损失反向传播
  6. 更新梯度

严格来说,多分类任务应该用对应的损失函数,譬如CrossEntropyLoss或者NLLLoss( Negative Log Likelihood Loss)。这两个损失函数都是为多分类问题设计的,CrossEntropyLoss是更为常用的选择。

不过这里用二分类交叉熵损失函数也可以,即将多分类问题转化为多个二分类问题。所以在模型的定义中,使用了Sigmode函数。

预测

推理预测的代码如下:

# -*- coding: utf-8 -*-

import torch
from data_preprocess import load_json
from bert_multilabel_cls import BertMultiLabelCls
from transformers import BertTokenizer

hidden_size = 768
class_num = 3
label2idx_path = "./data/label2idx.json"
save_model_path = "./model/multi_label_cls.pth"
label2idx = load_json(label2idx_path)
idx2label = {idx: label for label, idx in label2idx.items()}
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = BertTokenizer.from_pretrained("bert-base-chinese")
max_len = 128

model = BertMultiLabelCls(hidden_size=hidden_size, class_num=class_num)
model.load_state_dict(torch.load(save_model_path))
model.to(device)
# 切换评估模式
model.eval()

def predict(texts):
    # 加载分词器分词
    outputs = tokenizer(texts, return_tensors="pt", max_length=max_len,
                        padding=True, truncation=True)
    # 加载模型
    logits = model(outputs["input_ids"].to(device),
                   outputs["attention_mask"].to(device),
                   outputs["token_type_ids"].to(device))
    logits = logits.cpu().tolist()
    # print(logits)
    result = []
    for sample in logits:
        pred_label = []
        for idx, logit in enumerate(sample):
            if logit > 0.5:
                pred_label.append(idx2label[idx])
        result.append(pred_label)
    return result


if __name__ == '__main__':
    texts = ["中超-德尔加多扳平郭田雨绝杀 泰山2-1逆转亚泰""今日沪深两市指数整体呈现震荡调整格局"]
    result = predict(texts)
    print(result)

推理预测的对象是用户,这个流程一般就两个:分词器、模型。分词器会将输入文本处理后输出各种维度的数据,并将其作为模型的输入,最终返回分类的概率分布。

总结

从上面来看,所谓基于预训练大模型来做解决方案,总的来说就是接入大模型,利用大模型已有的泛化能力,训练/微调出一个"领域任务",使其在某个任务上更具有领域性与专业性。

归根来说,其流程就是神经网络模型建模与训练的过程:在基座大模型的基础上,再次训练了一个多分类任务的神经网络模型,以满足特定任务的需要;但模型的基本信息,都是基于基座大模型的。

如果对这里不了解的话,建议可以先去看看神经网络模型的建模与训练推理流程——基于Pytorch建模一个简单的神经网络模型,再做训练与推理;与这里面的流程做一个比对。



53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询