1. 前言
在上一篇《一文读懂NLP之隐马尔科夫模型(HMM)详解加python实现》中已经详细介绍了HMM模型的算法原理,这一篇主要是从零实现HMM模型。
定义HMM模型:
class HMM(object):
def __init__(self, n, m, a=None, b=None, pi=None):
# 可能的隐藏状态数
self.N = n
# 可能的观测数
self.M = m
# 状态转移概率矩阵
self.A = a
# 观测概率矩阵
self.B = b
# 初始状态概率矩阵
self.Pi = pi
# 观测序列
self.X = None
# 状态序列
self.Y = None
# 序列长度
self.T = 0
# 定义前向算法
self.alpha = None
# 定义后向算法
self.beta = None
2 概率计算问题
给定模型参数 ,和一个观测序列 ,,计算在这个模型参数 下,观测序列出现的最大概率,即 的最大值。
先定义盒子和球模型,状态集合为
,观测集合为
,
,
,
设
,
代码表示:
q = [1, 2, 3]
v = ["红", "白"]
n = len(q)
m = len(v)
x = ["红", "白", "红"]
# 建立一个字典
char_to_id = {}
id_to_char = {}
for i in v:
char_to_id[i] = len(char_to_id)
id_to_char[char_to_id[i]] = i
X = [char_to_id[i] for i in x]
a = np.array([[0.5, 0.2, 0.3],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5]])
b = np.array([[0.5, 0.5],
[0.4, 0.6],
[0.7, 0.3]])
pi = np.array([0.2, 0.4, 0.4])
2.1 前向算法
具体的算法原理就不讲了,可以参考上一篇文章《一文读懂NLP之隐马尔科夫模型(HMM)详解加python实现》。
- 初始
初始:
根据 生成 时刻的状态 ,概率为 ,并且根据发射概率矩阵 由 生成 ,概率为 ,则:
- 递推
对 ,有:
- 终止
def forward(self, x):
"""
前向算法
计算给定模型参数和观测序列的情况下,观测序列出现的最大概率
:param x: 观测序列
:return: 观测值
"""
# 序列长度
self.T = len(x)
self.X = np.array(x)
# alpha是一个具有T行N列的矩阵
self.alpha = np.zeros((self.T, self.N))
# 初始状态
for i in range(self.N):
self.alpha[0][i] = self.Pi[i] * self.B[i][self.X[0]]
# 递推
for t in range(1, self.T):
for i in range(self.N):
probability_sum = 0
for j in range(self.N):
probability_sum += self.alpha[t - 1][j] * self.A[j][i]
self.alpha[t][i] = probability_sum * self.B[i][self.X[t]]
# 终止
return sum(self.alpha[self.T - 1])
运行:
hmm = HMM(n, m, a, b, pi)
print("forward algorithm:", hmm.forward(X))
结果:
forward algorithm: 0.130218
2.2 后向算法
给定HMM模型
,定义时刻
状态为
的条件下,从
的部分观测序列为
的概率为后向概率,记作:
- 当
时:
- 递推
- 终止
def backward(self, x):
"""
后向算法
"""
# 序列长度
self.T = len(x)
self.X = np.array(x)
# beta是一个T行N列的矩阵
self.beta = np.zeros((self.T, self.N))
# 当t=T时,置值为1
for i in range(self.N):
self.beta[self.T - 1][i] = 1
# 从t=T-1递推到t=1
for t in range(self.T - 2, -1, -1):
for i in range(self.N):
for j in range(self.N):
self.beta[t][i] += self.A[i][j] * self.B[j][self.X[t + 1]] * self.beta[t + 1][j]
# 终止
sum_probability = 0
for i in range(self.N):
sum_probability += self.Pi[i] * self.B[i][self.X[0]] * self.beta[0][i]
return sum_probability
运行:
hmm = HMM(n, m, a, b, pi)
# print("forward algorithm:", hmm.forward(X))
print("backward algorithm:", hmm.forward(X))
结果:
backward algorithm: 0.130218
3 模型训练问题
给定训练集 ,估计模型参数 ,使得在该模型下观测序列概率 最大。
3.1 监督学习–最大似然估计
初始状态概率向量的估计:
统计S个样本中,初始状态为
的频率。
其中,
是初始状态为
的样本数量,S是样本的数量。
状态转移概率矩阵的估计:
设样本中时刻t处于状态
,时刻t+1处于状态
的频数为
,那么状态转移概率矩阵的估计为:
发射概率矩阵的估计:
设样本中状态为
并观测值为
的频数
,那么状态为
观测为
的概率
的估计为:
def train(self, train_data):
"""
训练模型,使用最大似然估计
:param train_data: 训练数据,每一个样本:[观测值,隐藏状态值]
:return: 返回一个HMM模型
"""
self.T = int(len(train_data[0]) / 2)
sample_num = len(train_data)
# 初始化
self.init()
# 初始状态概率矩阵的估计
for sequence in train_data:
self.Pi[sequence[0 + self.T]] += 1
self.Pi = self.Pi / sample_num
# 状态转移矩阵的估计
a_num = np.zeros((self.N, self.N))
for sequence in train_data:
for i in range(self.T - 1):
a_num[sequence[i + self.T]][sequence[i + 1 + self.T]] += 1.0
temp = a_num.sum(axis=1).reshape((3, 1))
self.A = a_num / temp
# 发射概率矩阵的估计
b_num = np.zeros((self.N, self.M))
for sequence in train_data:
for i in range(self.T - 1):
b_num[sequence[i + self.T]][sequence[i]] += 1.0
temp = b_num.sum(axis=1).reshape((3, 1))
self.B = b_num / temp
在训练开始之前,需要创建训练数据
def generate_train_data(n, m, t, a, b, pi, nums=10000):
"""
生成训练数据
:param pi: 初始状态概率矩阵
:param b: 发射概率矩阵
:param a: 状态转移矩阵
:param n: 隐藏状态数量
:param m:观测值数量
:param t: 序列长度
:param nums: 样本数量
:return: 训练数据集
"""
train_data = []
for i in range(nums):
state_sequence = []
observation_sequence = []
# 初始状态
temp = 0
p = random.random()
for j in range(n):
temp += pi[j]
if p > temp:
continue
else:
state_sequence.append(j)
break
# 递推
for t_index in range(t):
# 生成状态
if t_index != 0:
temp = 0
p = random.random()
for state in range(n):
temp += a[state_sequence[-1]][state]
if p > temp:
continue
else:
state_sequence.append(state)
break
# 生成观测序列
temp = 0
p = random.random()
for observation in range(m):
temp += b[state_sequence[-1]][observation]
if p > temp:
continue
else:
observation_sequence.append(observation)
break
observation_sequence.extend(state_sequence)
train_data.append(observation_sequence)
return train_data
开始训练:
train_data = generate_train_data(n, m, 8, a, b, pi)
print(train_data)
hmm = HMM(n, m)
hmm.train(train_data)
print(hmm.Pi)
print(hmm.A)
print(hmm.B)
结果:
[0.1984 0.4011 0.4005]
[[0.49525581 0.20134884 0.30339535]
[0.29948182 0.50112204 0.19939614]
[0.20220083 0.30086282 0.49693635]]
[[0.50483721 0.49516279]
[0.39422253 0.60577747]
[0.70305531 0.29694469]]
可以看见,和原始的数据几乎一样。
2.2 Baum·welch算法
def baum_welch(self, x, criterion=0.001):
self.T = len(x)
self.X = x
while True:
# 为了得到alpha和beta的矩阵
_ = self.forward(self.X)
_ = self.backward(self.X)
xi = np.zeros((self.T - 1, self.N, self.N), dtype=float)
for t in range(self.T - 1):
# 笨办法
# for i in range(self.N):
# gamma[t][i] = self.calculate_gamma(t, i)
# for j in range(self.N):
# xi[t][i][j] = self.calculate_psi(t, i, j)
# for i in range(self.N):
# gamma[self.T-1][i] = self.calculate_gamma(self.T-1, i)
# numpy矩阵的办法
denominator = np.sum(np.dot(self.alpha[t, :], self.A) *
self.B[:, self.X[t + 1]] * self.beta[t + 1, :])
for i in range(self.N):
molecular = self.alpha[t, i] * self.A[i, :] * self.B[:, self.X[t+1]]*self.beta[t+1, :]
xi[t, i, :] = molecular / denominator
gamma = np.sum(xi, axis=2)
prod = (self.alpha[self.T-1, :]*self.beta[self.T-1, :])
gamma = np.vstack((gamma, prod / np.sum(prod)))
new_pi = gamma[0, :]
new_a = np.sum(xi, axis=0) / np.sum(gamma[:-1, :], axis=0).reshape(-1, 1)
new_b = np.zeros(self.B.shape, dtype=float)
for k in range(self.B.shape[1]):
mask = self.X == k
new_b[:, k] = np.sum(gamma[mask, :], axis=0) / np.sum(gamma, axis=0)
if np.max(abs(self.Pi - new_pi)) < criterion and \
np.max(abs(self.A - new_a)) < criterion and \
np.max(abs(self.B - new_b)) < criterion:
break
self.A, self.B, self.Pi = new_a, new_b, new_pi
4 序列预测问题
序列预测问题就是已知模型参数 ,给定观测序列 ,求最有可能的状态序列 ,即求 的最大值。
4.1 维特比算法
- 初始化,当
时,最优路径的备选由N个状态组成,它的前驱为空:
- 递推,当
时,每条备选的路径像贪吃蛇一样吃入一个状态,长度增加一个单位,根据状态转移概率和发射概率计算花费。找出新的局部最优路径,更新两个数组。
- 终止,找出最终时刻
数组中最大概率
,以及相应的结尾状态下表
- 回溯,根据前驱数组
回溯前驱状态,取得最优路径状态下标
。
def viterbi(self, x):
self.X = x
self.T = len(x)
self.Y = np.zeros(self.N)
# 初始化delta和xi
delta = np.zeros((self.T, self.N))
psi = np.zeros((self.T, self.N))
# 初始化,t=1时
for i in range(self.N):
delta[0][i] = self.Pi[i] * self.B[i][self.X[0]]
psi[0][i] = 0
# 递推
for t in range(1, self.T):
for i in range(self.N):
temp = 0
index = 0
for j in range(self.N):
if temp < delta[t - 1][j] * self.A[j][i]:
temp = delta[t - 1][j] * self.A[j][i]
index = j
delta[t][i] = temp * self.B[i][self.X[t]]
psi[t][i] = j
# 最终
self.Y[-1] = delta.argmax(axis=1)[-1]
p = delta[self.T - 1][int(self.Y[-1])]
# 回溯
for i in range(self.T - 1, 0, -1):
self.Y[i - 1] = psi[i][int(self.Y[i])]
return p, self.Y
运行:
# 使用维特比
hmm = HMM(n, m, a, b, pi)
p, y = hmm.viterbi(X)
print(p)
print(y)
结果:
0.014699999999999998
[2. 2. 2.]
代码详情请见我的Github,请大家多多支持点star。
转载:https://blog.csdn.net/Elenstone/article/details/104970881