问题
- 计算两个文本之间的相似度,相似返回1,不相似返回0
- 从n个候选文本中选取出与当前文本最相似的文本
解决方案
问题1
- 问题:计算两个文本之是否相似,相似返回1,不相似返回0
- 数据集:shibing624/sts-sohu20212021搜狐校园文本匹配算法大赛数据集,数据来源https://www.biendata.xyz/competition/sohu_2021/data/,由于计算资源有限,只选择其中短短文本匹配作为样例。
- 模型:哈工大的一个中文BERT,下载地址hfl/chinese-macbert-base
- 代码实现:
展开查看问题1代码
1. 数据预处理datas = load_dataset("shibing624/sts-sohu2021",'dda')
tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")
def process_fun(examples):
tokenized_examples=tokenizer(
examples['sentence1'],examples['sentence2'],
padding=True,max_length=64,return_tensors='pt')
tokenized_examples["labels"] = [label for label in examples["label"]]
return tokenized_examples
data_tokenizer = datas.map(process_fun,batched=True,remove_columns=datas["train"].column_names)
- 加载模型
注意指定num_labels来说明这是一个二分类问题
model = AutoModelForSequenceClassification.from_pretrained("hfl/chinese-macbert-base", num_labels=2)
- 创建评估函数
import evaluate
acc_metric = evaluate.load("accuracy")
f1_metirc = evaluate.load("f1")
def eval_metric(eval_predict):
predictions, labels = eval_predict
predictions = predictions.argmax(axis=-1)
acc = acc_metric.compute(predictions=predictions, references=labels)
f1 = f1_metirc.compute(predictions=predictions, references=labels,average='macro')
acc.update(f1)
return acc
- 设置训练参数
train_args = TrainingArguments(
output_dir="./similarity_model", # 输出文件夹
per_device_train_batch_size=32, # 训练时的batch_size
per_device_eval_batch_size=32, # 验证时的batch_size
logging_steps=10, # log 打印的频率
evaluation_strategy="epoch", # 评估策略
save_strategy="epoch", # 保存策略
save_total_limit=3, # 最大保存数
learning_rate=2e-5, # 学习率
weight_decay=0.01, # weight_decay
metric_for_best_model="f1", # 设定评估指标
load_best_model_at_end=True) # 训练完成后加载最优模型
- 定义训练器
trainer = Trainer(model=model,
args=train_args,
train_dataset=data_tokenizer["train"],
eval_dataset=data_tokenizer["test"],
data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
compute_metrics=eval_metric)
- 训练
trainer.train()
- 评估
eval_result = trainer.evaluate(data_tokenizer["test"])
eval_result
结果如下:
{'eval_loss': 0.36066102981567383,
'eval_accuracy': 0.837,
'eval_f1': 0.8010678871090771,
'eval_runtime': 9.8404,
'eval_samples_per_second': 101.622,
'eval_steps_per_second': 3.252,
'epoch': 3.0}
- 推理
from transformers import pipeline, TextClassificationPipeline
model.config.id2label = {0: "不相似", 1: "相似"}
pipe = pipeline('text-classification', model=model, tokenizer=tokenizer,device=0)
result = pipe({"text": "我喜欢北京", "text_pair": "北京是个好地方"}, function_to_apply="none")
result
结果如下:
{'label': '相似', 'score': 0.049160078167915344}
问题2
- 问题:从n个候选文本中选取出与当前文本最相似的文本
- 思路:大体思路与问题1 相似,但是如果直接用问题1的办法,则需要两两对比,时间复杂度是O(n^2),效率太低了。所以解决思路为训练一个计算两个文本相似度分数的编码模型,然后将所有的候选文本利用这个编码模型进行编码存起来,然后就可以利用向量数据库进行查询,参考基于向量匹配的检索式问答实战
- 数据集:shibing624/sts-sohu20212021搜狐校园文本匹配算法大赛数据集,数据来源https://www.biendata.xyz/competition/sohu_2021/data/,由于计算资源有限,只选择其中短短文本匹配作为样例。
- 模型:哈工大的一个中文BERT,下载地址hfl/chinese-macbert-base
- 代码实现:
展开查看问题2代码
1. 预处理数据datas = load_dataset("shibing624/sts-sohu2021",'dda')
tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")
# 定义数据处理函数
def process_fun(examples):
sentences = []
labels = []
for sen1, sen2, label in zip(examples["sentence1"], examples["sentence2"], examples["label"]):
sentences.append(sen1)
sentences.append(sen2)
labels.append(1 if int(label) == 1 else -1)
# input_ids, attention_mask, token_type_ids
tokenized_examples = tokenizer(sentences, max_length=128, truncation=True, padding="max_length")
tokenized_examples = {k: [v[i: i + 2] for i in range(0, len(v), 2)] for k, v in tokenized_examples.items()}
tokenized_examples["labels"] = labels
return tokenized_examples
data_tokenizer = datas.map(process_fun,batched=True,
remove_columns=datas["train"].column_names)
- 自定义模型
from transformers import BertForSequenceClassification,BertModel
# 导入余弦函数
from torch.nn import CosineSimilarity,CosineEmbeddingLoss
from typing import Optional
import torch
class SentenceEncoderModel(BertForSequenceClassification):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.config = config
self.bert = BertModel(config)
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# 获取sentenceA 和 sentenceB的输入
senA_input_ids, senB_input_ids = input_ids[:, 0], input_ids[:, 1]
senA_attention_mask, senB_attention_mask = attention_mask[:, 0], attention_mask[:, 1]
senA_token_type_ids, senB_token_type_ids = token_type_ids[:, 0], token_type_ids[:, 1]
# 分别获取sentenceA 和 sentenceB的向量表示
senA_outputs = self.bert(
senA_input_ids,
attention_mask=senA_attention_mask,
token_type_ids=senA_token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
senA_pooled_output = senA_outputs[1] # [batch, hidden]
senB_outputs = self.bert(
senB_input_ids,
attention_mask=senB_attention_mask,
token_type_ids=senB_token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
senB_pooled_output = senB_outputs[1] # [batch, hidden]
# 计算相似度
cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output) # [batch, ]
# 计算loss
loss = None
if labels is not None:
loss_fct = CosineEmbeddingLoss(0.3)
loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)
output = (cos,)
return ((loss,) + output) if loss is not None else output
- 加载模型
model = SentenceEncoderModel.from_pretrained("/data1/model/chinese-macbert-base", num_labels=2)
- 创建评估函数
import evaluate
acc_metric = evaluate.load("accuracy")
f1_metirc = evaluate.load("f1")
def eval_metric(eval_predict):
predictions, labels = eval_predict
# 这里需要一个置信度,代表概率大于0.7的我们就认为i相似
predictions = [int(p > 0.7) for p in predictions]
labels = [int(l > 0) for l in labels]
# predictions = predictions.argmax(axis=-1)
acc = acc_metric.compute(predictions=predictions, references=labels)
f1 = f1_metirc.compute(predictions=predictions, references=labels)
acc.update(f1)
return acc
- 设置训练参数
train_args = TrainingArguments(
output_dir="./encoder_model",
per_device_train_batch_size=32,
per_device_eval_batch_size=32,
logging_steps=10,
evaluation_strategy="epoch",
save_strategy="epoch",
learning_rate=2e-5,
metric_for_best_model="f1")
- 定义训练器
trainer = Trainer(model=model,
args=train_args,
train_dataset=data_tokenizer["train"],
eval_dataset=data_tokenizer["test"],
data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
compute_metrics=eval_metric)
- 训练
trainer.train()
- 评估
eval_result = trainer.evaluate(data_tokenizer["test"])
eval_result
结果
{'eval_loss': 0.16230031847953796,
'eval_accuracy': 0.824,
'eval_f1': 0.6408163265306122,
'eval_runtime': 7.8649,
'eval_samples_per_second': 127.148,
'eval_steps_per_second': 4.069,
'epoch': 3.0}
- 推理
# 由于是自定义模型,这里就要自己写推理方法了,
#就是利用模型对输入的数据进行编码,然后手动计算相似度
text1="我喜欢北京"
text2="今天天气怎么样"
inputs = tokenizer([text1, text2], max_length=128, truncation=True, return_tensors="pt", padding=True)
inputs = {k: v.to('cuda:0') for k, v in inputs.items()}
# 利用这个编码模型,编码两个句子后,计算这两个句子是否相似
output = model.bert(**inputs)
logits=output[1] # 2*768
cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
print(cos) #0.2078535109758377
print('相似' if cos>0.7 else '不相似') #不相似
评论区