失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 中文命名实体识别NER

中文命名实体识别NER

时间:2018-11-30 22:18:28

相关推荐

中文命名实体识别NER

命名实体识别(英语:Named Entity Recognition),简称NER,是指识别文本中具有特定意义的实体,主要包括人名、地名、机构名、专有名词等,以及时间、数量、货币、比例数值等文字。目前在NER上表现较好的模型都是基于深度学习或者是统计学习的方法的,这些方法共同的特点都是需要大量的数据来进行学习,本文使用的数据集是ACL论文中新浪财经收集的简历数据。

数据集链接:/jiesutd/LatticeLSTM

标注集采用BIOES(B表示实体开头,E表示实体结尾,I表示在实体内部,O表示非实体,S表示单个实体),句子之间用一个空行隔开。

对于命名实体识别其他方法举例

常用的模型以及涉及到的主要代码

1、隐马尔可夫模型(HMM)

隐马尔可夫模型描述由一个隐藏的马尔科夫链随机生成不可观测的状态随机序列,再由各个状态生成一个观测而产生观测随机序列的过程(李航 统计学习方法)。隐马尔可夫模型由初始状态分布,状态转移概率矩阵以及观测概率矩阵所确定。上面的定义太过学术看不懂没关系,我们只需要知道,NER本质上可以看成是一种序列标注问题(预测每个字的BIOES标记),在使用HMM解决NER这种序列标注问题的时候,我们所能观测到的是字组成的序列(观测序列),观测不到的是每个字对应的标注(状态序列)。对应的,HMM的三个要素可以解释为,初始状态分布就是每一个标注作为句子第一个字的标注的概率,状态转移概率矩阵就是由某一个标注转移到下一个标注的概率(设状态转移矩阵为,那么若前一个词的标注为,则下一个词的标注为的概率为),观测概率矩阵就是指在某个标注下,生成某个词的概率。根据HMM的三个要素,我们可以定义如下的HMM模型:

class HMM(object):def __init__(self, N, M):"""Args:N: 状态数,这里对应存在的标注的种类M: 观测数,这里对应有多少不同的字"""self.N = Nself.M = M# 状态转移概率矩阵 A[i][j]表示从i状态转移到j状态的概率self.A = torch.zeros(N, N)# 观测概率矩阵, B[i][j]表示i状态下生成j观测的概率self.B = torch.zeros(N, M)# 初始状态概率 Pi[i]表示初始时刻为状态i的概率self.Pi = torch.zeros(N)

有了模型定义,接下来的问题就是训练模型了。HMM模型的训练过程对应隐马尔可夫模型的学习问题(李航 统计学习方法),实际上就是根据训练数据根据最大似然的方法估计模型的三个要素,即上文提到的初始状态分布、状态转移概率矩阵以及观测概率矩阵。举个例子帮助理解,在估计初始状态分布的时候,假如某个标记在数据集中作为句子第一个字的标记的次数为k,句子的总数为N,那么该标记作为句子第一个字的概率可以近似估计为k/N,很简单对吧,使用这种方法,我们近似估计HMM的三个要素,代码如下(出现过的函数将用省略号代替):

class HMM(object):def __init__(self, N, M):....def train(self, word_lists, tag_lists, word2id, tag2id):"""HMM的训练,即根据训练语料对模型参数进行估计,因为我们有观测序列以及其对应的状态序列,所以我们可以使用极大似然估计的方法来估计隐马尔可夫模型的参数参数:word_lists: 列表,其中每个元素由字组成的列表,如 ['担','任','科','员']tag_lists: 列表,其中每个元素是由对应的标注组成的列表,如 ['O','O','B-TITLE', 'E-TITLE']word2id: 将字映射为IDtag2id: 字典,将标注映射为ID"""assert len(tag_lists) == len(word_lists)# 估计转移概率矩阵for tag_list in tag_lists:seq_len = len(tag_list)for i in range(seq_len - 1):current_tagid = tag2id[tag_list[i]]next_tagid = tag2id[tag_list[i+1]]self.A[current_tagid][next_tagid] += 1# 一个重要的问题:如果某元素没有出现过,该位置为0,这在后续的计算中是不允许的# 解决方法:我们将等于0的概率加上很小的数self.A[self.A == 0.] = 1e-10self.A = self.A / self.A.sum(dim=1, keepdim=True)# 估计观测概率矩阵for tag_list, word_list in zip(tag_lists, word_lists):assert len(tag_list) == len(word_list)for tag, word in zip(tag_list, word_list):tag_id = tag2id[tag]word_id = word2id[word]self.B[tag_id][word_id] += 1self.B[self.B == 0.] = 1e-10self.B = self.B / self.B.sum(dim=1, keepdim=True)# 估计初始状态概率for tag_list in tag_lists:init_tagid = tag2id[tag_list[0]]self.Pi[init_tagid] += 1self.Pi[self.Pi == 0.] = 1e-10self.Pi = self.Pi / self.Pi.sum()

模型训练完毕之后,要利用训练好的模型进行解码,就是对给定的模型未见过的句子,求句子中的每个字对应的标注,针对这个解码问题,我们使用的是维特比(viterbi)算法。关于该算法的数学推导,可以查阅一下李航统计学习方法。

HMM存在两个缺陷:1)观察值之间严格独立,观测到的句子中每个字相互独立

2)状态转移过程中当前状态只与前一状态有关,没有关注到后一时刻的状态

HMM代码实现的主要模型部分如下:

import torchclass HMM(object):def __init__(self, N, M):"""Args:N: 状态数,这里对应存在的标注的种类M: 观测数,这里对应有多少不同的字"""self.N = Nself.M = M# 状态转移概率矩阵 A[i][j]表示从i状态转移到j状态的概率self.A = torch.zeros(N, N)# 观测概率矩阵, B[i][j]表示i状态下生成j观测的概率self.B = torch.zeros(N, M)# 初始状态概率 Pi[i]表示初始时刻为状态i的概率self.Pi = torch.zeros(N)def train(self, word_lists, tag_lists, word2id, tag2id):"""HMM的训练,即根据训练语料对模型参数进行估计,因为我们有观测序列以及其对应的状态序列,所以我们可以使用极大似然估计的方法来估计隐马尔可夫模型的参数参数:word_lists: 列表,其中每个元素由字组成的列表,如 ['担','任','科','员']tag_lists: 列表,其中每个元素是由对应的标注组成的列表,如 ['O','O','B-TITLE', 'E-TITLE']word2id: 将字映射为IDtag2id: 字典,将标注映射为ID"""assert len(tag_lists) == len(word_lists)# 估计转移概率矩阵for tag_list in tag_lists:seq_len = len(tag_list)for i in range(seq_len - 1):current_tagid = tag2id[tag_list[i]]next_tagid = tag2id[tag_list[i+1]]self.A[current_tagid][next_tagid] += 1# 问题:如果某元素没有出现过,该位置为0,这在后续的计算中是不允许的# 解决方法:我们将等于0的概率加上很小的数self.A[self.A == 0.] = 1e-10self.A = self.A / self.A.sum(dim=1, keepdim=True)# 估计观测概率矩阵for tag_list, word_list in zip(tag_lists, word_lists):assert len(tag_list) == len(word_list)for tag, word in zip(tag_list, word_list):tag_id = tag2id[tag]word_id = word2id[word]self.B[tag_id][word_id] += 1self.B[self.B == 0.] = 1e-10self.B = self.B / self.B.sum(dim=1, keepdim=True)# 估计初始状态概率for tag_list in tag_lists:init_tagid = tag2id[tag_list[0]]self.Pi[init_tagid] += 1self.Pi[self.Pi == 0.] = 1e-10self.Pi = self.Pi / self.Pi.sum()def test(self, word_lists, word2id, tag2id):pred_tag_lists = []for word_list in word_lists:pred_tag_list = self.decoding(word_list, word2id, tag2id)pred_tag_lists.append(pred_tag_list)return pred_tag_listsdef decoding(self, word_list, word2id, tag2id):"""使用维特比算法对给定观测序列求状态序列, 这里就是对字组成的序列,求其对应的标注。维特比算法实际是用动态规划解隐马尔可夫模型预测问题,即用动态规划求概率最大路径(最优路径)这时一条路径对应着一个状态序列"""# 问题:整条链很长的情况下,十分多的小概率相乘,最后可能造成下溢# 解决办法:采用对数概率,这样源空间中的很小概率,就被映射到对数空间的大的负数# 同时相乘操作也变成简单的相加操作A = torch.log(self.A)B = torch.log(self.B)Pi = torch.log(self.Pi)# 初始化 维比特矩阵viterbi 它的维度为[状态数, 序列长度]# 其中viterbi[i, j]表示标注序列的第j个标注为i的所有单个序列(i_1, i_2, ..i_j)出现的概率最大值seq_len = len(word_list)viterbi = torch.zeros(self.N, seq_len)# backpointer是跟viterbi一样大小的矩阵# backpointer[i, j]存储的是 标注序列的第j个标注为i时,第j-1个标注的id# 等解码的时候,我们用backpointer进行回溯,以求出最优路径backpointer = torch.zeros(self.N, seq_len).long()# self.Pi[i] 表示第一个字的标记为i的概率# Bt[word_id]表示字为word_id的时候,对应各个标记的概率# self.A.t()[tag_id]表示各个状态转移到tag_id对应的概率# 所以第一步为start_wordid = word2id.get(word_list[0], None)Bt = B.t()if start_wordid is None:# 如果字不再字典里,则假设状态的概率分布是均匀的bt = torch.log(torch.ones(self.N) / self.N)else:bt = Bt[start_wordid]viterbi[:, 0] = Pi + btbackpointer[:, 0] = -1# 递推公式:# viterbi[tag_id, step] = max(viterbi[:, step-1]* self.A.t()[tag_id] * Bt[word])# 其中word是step时刻对应的字# 由上述递推公式求后续各步for step in range(1, seq_len):wordid = word2id.get(word_list[step], None)# 处理字不在字典中的情况# bt是在t时刻字为wordid时,状态的概率分布if wordid is None:# 如果字不再字典里,则假设状态的概率分布是均匀的bt = torch.log(torch.ones(self.N) / self.N)else:bt = Bt[wordid] # 否则从观测概率矩阵中取btfor tag_id in range(len(tag2id)):max_prob, max_id = torch.max(viterbi[:, step-1] + A[:, tag_id],dim=0)viterbi[tag_id, step] = max_prob + bt[tag_id]backpointer[tag_id, step] = max_id# 终止, t=seq_len 即 viterbi[:, seq_len]中的最大概率,就是最优路径的概率best_path_prob, best_path_pointer = torch.max(viterbi[:, seq_len-1], dim=0)# 回溯,求最优路径best_path_pointer = best_path_pointer.item()best_path = [best_path_pointer]for back_step in range(seq_len-1, 0, -1):best_path_pointer = backpointer[best_path_pointer, back_step]best_path_pointer = best_path_pointer.item()best_path.append(best_path_pointer)# 将tag_id组成的序列转化为tagassert len(best_path) == len(word_list)id2tag = dict((id_, tag) for tag, id_ in tag2id.items())tag_list = [id2tag[id_] for id_ in reversed(best_path)]return tag_list

2、条件随机场

上面讲的HMM模型中存在两个假设,一是输出观察值之间严格独立,二是状态转移过程中当前状态只与前一状态有关。也就是说,在命名实体识别的场景下,HMM认为观测到的句子中的每个字都是相互独立的,而且当前时刻的标注只与前一时刻的标注相关。但实际上,命名实体识别往往需要更多的特征,比如词性,词的上下文等等,同时当前时刻的标注应该与前一时刻以及后一时刻的标注都相关联。由于这两个假设的存在,显然HMM模型在解决命名实体识别的问题上是存在缺陷的。

而条件随机场就没有这种问题,它通过引入自定义的特征函数,不仅可以表达观测之间的依赖,还可表示当前观测与前后多个状态之间的复杂依赖,可以有效克服HMM模型面临的问题。条件随机场数学公式不在此讲述了。其解码也是采用维特比算法。

from sklearn_crfsuite import CRF # CRF的具体实现太过复杂,这里我们借助一个外部的库def word2features(sent, i):"""抽取单个字的特征"""word = sent[i]prev_word = "<s>" if i == 0 else sent[i-1]next_word = "</s>" if i == (len(sent)-1) else sent[i+1]# 因为每个词相邻的词会影响这个词的标记# 所以我们使用:# 前一个词,当前词,后一个词,# 前一个词+当前词, 当前词+后一个词# 作为特征features = {'w': word,'w-1': prev_word,'w+1': next_word,'w-1:w': prev_word+word,'w:w+1': word+next_word,'bias': 1}return featuresdef sent2features(sent):"""抽取序列特征"""return [word2features(sent, i) for i in range(len(sent))]class CRFModel(object):def __init__(self,algorithm='lbfgs',c1=0.1,c2=0.1,max_iterations=100,all_possible_transitions=False):self.model = CRF(algorithm=algorithm,c1=c1,c2=c2,max_iterations=max_iterations,all_possible_transitions=all_possible_transitions)def train(self, sentences, tag_lists):"""训练模型"""features = [sent2features(s) for s in sentences]self.model.fit(features, tag_lists)def test(self, sentences):"""解码,对给定句子预测其标注"""features = [sent2features(s) for s in sentences]pred_tag_lists = self.model.predict(features)return pred_tag_lists

3、Bi_LSTM_CRF

简单的LSTM的优点是能够通过双向的设置学习到观测序列(输入的字)之间的依赖,在训练过程中,LSTM能够根据目标(比如识别实体)自动提取观测序列的特征,但是缺点是无法学习到状态序列(输出的标注)之间的关系,要知道,在命名实体识别任务中,标注之间是有一定的关系的,比如B类标注(表示某实体的开头)后面不会再接一个B类标注,所以LSTM在解决NER这类序列标注任务时,虽然可以省去很繁杂的特征工程,但是也存在无法学习到标注上下文的缺点。相反,CRF的优点就是能对隐含状态建模,学习状态序列的特点,但它的缺点是需要手动提取序列特征。所以一般的做法是,在LSTM后面再加一层CRF,以获得两者的优点。

下面是给Bi-LSTM加一层CRF的代码实现:

from itertools import zip_longestfrom copy import deepcopyimport torchimport torch.nn as nnimport torch.optim as optimfrom .util import tensorized, sort_by_lengths, cal_loss, cal_lstm_crf_lossfrom .config import TrainingConfig, LSTMConfigfrom .bilstm import BiLSTMclass BILSTM_Model(object):def __init__(self, vocab_size, out_size, crf=True):"""功能:对LSTM的模型进行训练与测试参数:vocab_size:词典大小out_size:标注种类crf选择是否添加CRF层"""self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 加载模型参数self.emb_size = LSTMConfig.emb_sizeself.hidden_size = LSTMConfig.hidden_sizeself.crf = crf# 根据是否添加crf初始化不同的模型 选择不一样的损失计算函数if not crf:self.model = BiLSTM(vocab_size, self.emb_size,self.hidden_size, out_size).to(self.device)self.cal_loss_func = cal_losselse:self.model = BiLSTM_CRF(vocab_size, self.emb_size,self.hidden_size, out_size).to(self.device)self.cal_loss_func = cal_lstm_crf_loss# 加载训练参数:self.epoches = TrainingConfig.epochesself.print_step = TrainingConfig.print_stepself.lr = TrainingConfig.lrself.batch_size = TrainingConfig.batch_size# 初始化优化器self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)# 初始化其他指标self.step = 0self._best_val_loss = 1e18self.best_model = Nonedef train(self, word_lists, tag_lists,dev_word_lists, dev_tag_lists,word2id, tag2id):# 对数据集按照长度进行排序word_lists, tag_lists, _ = sort_by_lengths(word_lists, tag_lists)dev_word_lists, dev_tag_lists, _ = sort_by_lengths(dev_word_lists, dev_tag_lists)B = self.batch_sizefor e in range(1, self.epoches+1):self.step = 0losses = 0.for ind in range(0, len(word_lists), B):batch_sents = word_lists[ind:ind+B]batch_tags = tag_lists[ind:ind+B]losses += self.train_step(batch_sents,batch_tags, word2id, tag2id)if self.step % TrainingConfig.print_step == 0:total_step = (len(word_lists) // B + 1)print("Epoch {}, step/total_step: {}/{} {:.2f}% Loss:{:.4f}".format(e, self.step, total_step,100. * self.step / total_step,losses / self.print_step))losses = 0.# 每轮结束测试在验证集上的性能,保存最好的一个val_loss = self.validate(dev_word_lists, dev_tag_lists, word2id, tag2id)print("Epoch {}, Val Loss:{:.4f}".format(e, val_loss))def train_step(self, batch_sents, batch_tags, word2id, tag2id):self.model.train()self.step += 1# 准备数据tensorized_sents, lengths = tensorized(batch_sents, word2id)tensorized_sents = tensorized_sents.to(self.device)targets, lengths = tensorized(batch_tags, tag2id)targets = targets.to(self.device)# forwardscores = self.model(tensorized_sents, lengths)# 计算损失 更新参数self.optimizer.zero_grad()loss = self.cal_loss_func(scores, targets, tag2id).to(self.device)loss.backward()self.optimizer.step()return loss.item()def validate(self, dev_word_lists, dev_tag_lists, word2id, tag2id):self.model.eval()with torch.no_grad():val_losses = 0.val_step = 0for ind in range(0, len(dev_word_lists), self.batch_size):val_step += 1# 准备batch数据batch_sents = dev_word_lists[ind:ind+self.batch_size]batch_tags = dev_tag_lists[ind:ind+self.batch_size]tensorized_sents, lengths = tensorized(batch_sents, word2id)tensorized_sents = tensorized_sents.to(self.device)targets, lengths = tensorized(batch_tags, tag2id)targets = targets.to(self.device)# forwardscores = self.model(tensorized_sents, lengths)# 计算损失loss = self.cal_loss_func(scores, targets, tag2id).to(self.device)val_losses += loss.item()val_loss = val_losses / val_stepif val_loss < self._best_val_loss:print("保存模型...")self.best_model = deepcopy(self.model)self._best_val_loss = val_lossreturn val_lossdef test(self, word_lists, tag_lists, word2id, tag2id):"""返回最佳模型在测试集上的预测结果"""# 准备数据word_lists, tag_lists, indices = sort_by_lengths(word_lists, tag_lists)tensorized_sents, lengths = tensorized(word_lists, word2id)tensorized_sents = tensorized_sents.to(self.device)self.best_model.eval()with torch.no_grad():batch_tagids = self.best_model.test(tensorized_sents, lengths, tag2id)# 将id转化为标注pred_tag_lists = []id2tag = dict((id_, tag) for tag, id_ in tag2id.items())for i, ids in enumerate(batch_tagids):tag_list = []if self.crf:for j in range(lengths[i] - 1): # crf解码过程中,end被舍弃tag_list.append(id2tag[ids[j].item()])else:for j in range(lengths[i]):tag_list.append(id2tag[ids[j].item()])pred_tag_lists.append(tag_list)# indices存有根据长度排序后的索引映射的信息# 比如若indices = [1, 2, 0] 则说明原先索引为1的元素映射到的新的索引是0,# 索引为2的元素映射到新的索引是1...# 下面根据indices将pred_tag_lists和tag_lists转化为原来的顺序ind_maps = sorted(list(enumerate(indices)), key=lambda e: e[1])indices, _ = list(zip(*ind_maps))pred_tag_lists = [pred_tag_lists[i] for i in indices]tag_lists = [tag_lists[i] for i in indices]return pred_tag_lists, tag_listsclass BiLSTM_CRF(nn.Module):def __init__(self, vocab_size, emb_size, hidden_size, out_size):"""初始化参数:vocab_size:字典的大小emb_size:词向量的维数hidden_size:隐向量的维数out_size:标注的种类"""super(BiLSTM_CRF, self).__init__()self.bilstm = BiLSTM(vocab_size, emb_size, hidden_size, out_size)# CRF实际上就是多学习一个转移矩阵 [out_size, out_size] 初始化为均匀分布self.transition = nn.Parameter(torch.ones(out_size, out_size) * 1/out_size)# self.transition.data.zero_()def forward(self, sents_tensor, lengths):# [B, L, out_size]emission = self.bilstm(sents_tensor, lengths)# 计算CRF scores, 这个scores大小为[B, L, out_size, out_size]# 也就是每个字对应一个 [out_size, out_size]的矩阵# 这个矩阵第i行第j列的元素的含义是:上一时刻tag为i,这一时刻tag为j的分数batch_size, max_len, out_size = emission.size()crf_scores = emission.unsqueeze(2).expand(-1, -1, out_size, -1) + self.transition.unsqueeze(0)return crf_scoresdef test(self, test_sents_tensor, lengths, tag2id):"""使用维特比算法进行解码"""start_id = tag2id['<start>']end_id = tag2id['<end>']pad = tag2id['<pad>']tagset_size = len(tag2id)crf_scores = self.forward(test_sents_tensor, lengths)device = crf_scores.device# B:batch_size, L:max_len, T:target set sizeB, L, T, _ = crf_scores.size()# viterbi[i, j, k]表示第i个句子,第j个字对应第k个标记的最大分数viterbi = torch.zeros(B, L, T).to(device)# backpointer[i, j, k]表示第i个句子,第j个字对应第k个标记时前一个标记的id,用于回溯backpointer = (torch.zeros(B, L, T).long() * end_id).to(device)lengths = torch.LongTensor(lengths).to(device)# 向前递推for step in range(L):batch_size_t = (lengths > step).sum().item()if step == 0:# 第一个字它的前一个标记只能是start_idviterbi[:batch_size_t, step,:] = crf_scores[: batch_size_t, step, start_id, :]backpointer[: batch_size_t, step, :] = start_idelse:max_scores, prev_tags = torch.max(viterbi[:batch_size_t, step-1, :].unsqueeze(2) +crf_scores[:batch_size_t, step, :, :],# [B, T, T]dim=1)viterbi[:batch_size_t, step, :] = max_scoresbackpointer[:batch_size_t, step, :] = prev_tags# 在回溯的时候我们只需要用到backpointer矩阵backpointer = backpointer.view(B, -1) # [B, L * T]tagids = [] # 存放结果tags_t = Nonefor step in range(L-1, 0, -1):batch_size_t = (lengths > step).sum().item()if step == L-1:index = torch.ones(batch_size_t).long() * (step * tagset_size)index = index.to(device)index += end_idelse:prev_batch_size_t = len(tags_t)new_in_batch = torch.LongTensor([end_id] * (batch_size_t - prev_batch_size_t)).to(device)offset = torch.cat([tags_t, new_in_batch],dim=0) # 这个offset实际上就是前一时刻的index = torch.ones(batch_size_t).long() * (step * tagset_size)index = index.to(device)index += offset.long()try:tags_t = backpointer[:batch_size_t].gather(dim=1, index=index.unsqueeze(1).long())except RuntimeError:import pdbpdb.set_trace()tags_t = tags_t.squeeze(1)tagids.append(tags_t.tolist())# tagids:[L-1](L-1是因为扣去了end_token),大小的liebiao# 其中列表内的元素是该batch在该时刻的标记# 下面修正其顺序,并将维度转换为 [B, L]tagids = list(zip_longest(*reversed(tagids), fillvalue=pad))tagids = torch.Tensor(tagids).long()# 返回解码的结果return tagids

注:关于维特比算法推荐看链接,讲解的通俗易懂如何通俗讲解维特比算法

其他学习连接:

Advanced: Making Dynamic Decisions and the Bi-LSTM CRF — PyTorch Tutorials 1.11.0+cu102 documentation

Bi-LSTM-CRF for Sequence Labeling - 知乎

/jiesutd/LatticeLSTM

如果觉得《中文命名实体识别NER》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。