《统计学习方法》第十章,隐马尔科夫模型

▶ 隐马尔科夫模型的三个问题

● 代码

  1 import numpy as np
  2 import scipy as sp
  3 import matplotlib.pyplot as plt
  4 from matplotlib.patches import Rectangle
  5 
  6 dataSize = 200
  7 trainRatio = 0.3
  8 epsilon = 1E-10
  9 randomSeed = 109
 10 
 11 def dataSplit(dataX, dataY, part):                                                  # 将数据集分割为训练集和测试集
 12     return dataX[:part], dataY[:part], dataX[part:], dataY[part:]
 13 
 14 def normalCDF(x, μList, σList):
 15     return np.exp(-(x - μList)**2 / (2 * σList**2)) / (np.sqrt(2 * np.pi) * σList)
 16 
 17 def targetIndex(x, xList):                      # 二分查找 xList 中大于 x 的最小索引    
 18     if x < xList[0]:
 19         return 0
 20     lp = 0
 21     rp = len(xList) - 1
 22     while lp < rp - 1:
 23         mp = (lp + rp) >> 1
 24         if(x <= xList[mp]):
 25             rp = mp
 26         else:
 27             lp = mp
 28     return rp
 29 
 30 def createData(state, obs):                     # 创建数据,输入状态数、观测数,输出初始分布、状态转移分布、观测分布
 31     np.random.seed(randomSeed)
 32     π = np.random.rand(state)
 33     π /= np.sum(π)
 34     A = np.random.rand(state, state)    
 35     A = np.array( A / np.mat(np.sum(A,1)).T)    # 行归一化        
 36     B = np.random.rand(state, obs)    
 37     B = np.array(B / np.mat(np.sum(B,1)).T)    
 38 
 39     print("π = ", π)
 40     print("A = 
", A)
 41     print("B = 
", B)
 42     return π, A, B
 43 
 44 def createString(π, A, B, length, count = 1):   # 创建状态列和观测列
 45     state, obs = np.shape(B)
 46     accπ = np.nancumsum(π)
 47     accA = np.cumsum(A, 1)
 48     accB = np.cumsum(B, 1)
 49 
 50     if count == 1:                                                                  # count 为 1 时生成的 string 只有一维,否则二维
 51         stateString = np.zeros(length, dtype = int)
 52         obsString = np.zeros(length, dtype = int)
 53         stateString[0] = targetIndex(np.random.rand(), accπ)                        # 随机选择初态
 54         for t in range(1, length):                                                  # 生成状态列
 55             stateString[t] = targetIndex(np.random.rand(), accA[stateString[t-1]])  
 56         for t in range(length):                                                     # 生成观测列
 57             obsString[t] = targetIndex(np.random.rand(), accB[stateString[t]])
 58 
 59     else:
 60         stateString = np.zeros([count, length], dtype = int)
 61         obsString = np.zeros([count, length], dtype = int)
 62         stateString[:,0] = [ targetIndex(np.random.rand(), accπ) for i in range(count) ]
 63         for t in range(1, length):
 64             stateString[:,t] = [ targetIndex(np.random.rand(), accA[stateString[i,t-1]]) for i in range(count) ]
 65         for t in range(length):
 66             obsString[:,t] = [ targetIndex(np.random.rand(), accB[stateString[i,t]]) for i in range(count) ]
 67 
 68     #print("state = 
", stateString)
 69     #print("obs = 
", obsString)
 70     return stateString, obsString
 71 
 72 def pObsForwardSimplified(π, A, B, obsString):                  # 前向算法计算观测序列概率
 73     state, obs = np.shape(B)
 74     length = len(obsString)
 75 
 76     α = π * B[:,obsString[0]]
 77     for i in range(1, length):
 78         α = np.dot(α, A) * B[:,obsString[i]]                    # 等价代码 α = np.sum(α * A.T, 1) * B[:,obsString[i]]                            
 79     return np.sum(α)
 80 
 81 def pObsForward(π, A, B, obsString):                            # 前向算法计算观测序列概率,并且输出前向概率矩阵
 82     state, obs = np.shape(B)
 83     length = len(obsString)
 84 
 85     α = np.zeros([length,state])
 86     α[0] = π * B[:,obsString[0]]
 87     for i in range(1, length):
 88         α[i] = np.dot(α[i-1], A) * B[:,obsString[i]]            # α[i] = np.sum(α[i-1], A.T, 1) * B[:,obsString[i]]
 89     return np.sum(α[-1]), α
 90 
 91 def pObsBackwardSimplified(π, A, B, obsString):                 # 后向算法计算观测序列概率
 92     state, obs = np.shape(B)
 93     length = len(obsString)
 94 
 95     β = np.ones(state)
 96     for i in range(1, length)[::-1]:
 97         β = np.dot(A, β * B[:,obsString[i]])                    # β = np.sum(A * β * B[:,obsString[i]], 1)
 98     return np.sum(π * β * B[:,obsString[0]])
 99 
100 def pObsBackward(π, A, B, obsString):                           # 后向算法计算观测序列概率,并且输出后向概率矩阵
101     state, obs = np.shape(B)
102     length = len(obsString)
103 
104     β = np.zeros([length,state])
105     β[-1] = np.ones(state)
106     for i in range(1, length)[::-1]:
107         β[i-1] = np.dot(A, β[i] * B[:,obsString[i]])            # β[i-1] = np.sum(A * β[i] * B[:,obsString[i]], 1)
108     return np.sum(π * B[:,obsString[0]] * β[0]), β
109 
110 def pObsMixedSimplified(A, B, obsString, α, β):                 # 用 α 和 β 来算观测序列概率
111     return np.dot(α[0], np.dot(A, B[:,obsString[1]] * β[1]))    # np.sum(α[0] * np.sum(A * B[:,obsString[1]] * β[1], 1))
112 
113 def pObsMixed(A, B, obsString, α, β):                           # 用 α 和 β 来算观测序列概率,这 length - 1 个结果都相同
114     length = len(α)
115     return np.array([ np.dot(α[t], np.dot(A, B[:,obsString[t+1]] * β[t+1])) for t in range(length-1) ])    
116     #return np.array([ np.sum(α[t] * np.sum(A * B[:,obsString[t+1]] * β[t+1] , 1)) for t in range(length-1) ])
117 
118 def pΓ(α, β):                                                   # 求各时刻隐变量处于各状态的概率
119     t = α * β
120     return np.array( t / np.mat(np.sum(t, 1)).T )               
121 
122 def pΞ(A, B, obsString, α, β):                                  # t 时刻处于状态 i 而 t+1 时刻处于状态 j 的概率,t = 0 ~ length-2
123     state, obs = np.shape(B)
124     length = len(α)
125 
126     res = np.zeros([ length - 1, state, state ])
127     for t in range(length - 1):
128         res[t] = np.tile(α[t],[state,1]).T * A * B[:,obsString[t+1]] * β[t+1]
129     return res / pObsMixedSimplified(A, B, obsString, α, β)
130 
131 def supervisedLearn(dataState, dataObs, state, obs):            # 监督学习
132     count, length = np.shape(dataState)
133     π = np.zeros(state)
134     A = np.zeros([state, state])
135     B = np.zeros([state, obs])    
136     
137     for i in dataState[:,0]:
138         π[i] +=1
139     for i,j in zip(dataState[:,:-1].flatten(), dataState[:,1:].flatten()):
140         A[i,j] +=1
141     for i,j in zip(dataState.flatten(),dataObs.flatten()):
142         B[i,j] +=1
143 
144     π /= count                                         
145     A = np.array( A / np.mat(np.sum(A,1)).T)
146     B = np.array( B / np.mat(np.sum(B,1)).T)
147     return π, A, B
148 
149 def baumWelchLearn(dataObs, state, obs):                        # Baum - WelchLearn 学习
150     count, length = np.shape(dataObs)                           # 存在的问题:如果某个观测序列没有覆盖所有可能的观测结果,学习会产生 nan?
151     π = np.random.rand(state)                                   # 选择初始值
152     A = np.random.rand(state, state)
153     B = np.random.rand(state, obs)
154     π /= np.sum(π)
155     A = np.array( A / np.mat(np.sum(A,1)).T)
156     B = np.array( B / np.mat(np.sum(B,1)).T)
157 
158     for line in dataObs:                                        
159         α = pObsForward(π, A, B, line)[1]
160         β = pObsBackward(π, A, B, line)[1]
161         ξ = pΞ(A, B, line, α, β)
162         γ = pΓ(α, β)
163         π = γ[0]
164         A = np.array( np.sum(ξ, 0) / np.mat(np.sum(γ[:-1], 0)).T )
165         for k in range(obs):
166             B[:,k] = np.sum(γ[np.where(line==k)], 0)            
167         B = np.array( B / np.mat(np.sum(γ, 0)).T )
168     return π, A, B
169 
170 def appropritePredict(x, π, A, B):                              # 近似预测,依次计算出α,β,γ,取 γ 每行最大值所在列号
171     γ = pΓ(pObsForward(π, A, B, x)[1], pObsBackward(π, A, B, x)[1])
172     return np.argmax(γ, 1)
173 
174 def viterbiPredict(obsString, π, A, B):                         # Viterbi 预测,动态规划
175     state, obs = np.shape(B)
176     length = len(obsString)
177 
178     δ = π * B[:, obsString[0]]                                  # δ 记录以各状态为结尾的最大概率,ψ 记录该最大概率对应的上一节点号
179     ψ = np.zeros([length, state])
180     for t in range(1,length):
181         temp = δ * A.T * B[:, obsString[t]]
182         ψ[t-1] = np.argmax(temp,1)
183         δ = np.max(temp,1)
184         
185     trace = np.zeros(length, dtype=np.int)
186     trace[-1] = np.argmax(δ)
187     for i in range(length - 1)[::-1]:
188         trace[i] = ψ[i, trace[i+1]]
189     
190     return trace
191 
192 def test(state, obs, length):                                   # 单次测试
193     π, A, B = createData(state, obs)
194     allState, allObs = createString(π, A, B, length, dataSize)           
195     
196     if allObs.ndim > 1:
197         obsString = allObs[0]
198     else:
199         obsString = allObs
200     '''                                                         
201     π = np.array([0.2,0.4,0.4])                                 # 调试用数据
202     A = np.array([[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]])
203     B = np.array([[0.5,0.5],[0.4,0.6],[0.7,0.3]])    
204     obsString = [0,1,0]
205     '''    
206     pOFS = pObsForwardSimplified(π, A, B, obsString)
207     pOBS = pObsBackwardSimplified(π, A, B, obsString)
208     pOF = pObsForward(π, A, B, obsString)
209     pOB = pObsBackward(π, A, B, obsString)
210     pOMS = pObsMixedSimplified(A, B, obsString, pOF[1], pOB[1])
211     pOM  = pObsMixed(A, B, obsString, pOF[1], pOB[1])
212     print("pOFS = %f, pOBS = %f
pOF  = %f, pOB  = %f
pOMS  = %f"%(pOFS, pOBS, pOF[0], pOB[0], pOMS))
213     print("α = 
", pOF[1])
214     print("β = 
", pOB[1])
215     print("pOM = 
", pOM)
216     γ = pΓ(pOF[1], pOB[1])
217     print("γ = 
", γ)
218     ξ = pΞ(A, B, obsString, pOF[1], pOB[1])
219     print("ξ = 
", ξ)   
220 
221     print("
supervisedLearn:")
222     para = supervisedLearn(allState, allObs, state, obs)    
223     print("π = ", para[0])
224     print("A = 
", para[1])
225     print("B = 
", para[2])
226     
227     print("
baumWelchLearn:")
228     para = baumWelchLearn(allObs, state, obs)    
229     print("π = ", para[0])
230     print("A = 
", para[1])
231     print("B = 
", para[2])    
232 
233     myResult = [ appropritePredict(x, π, A, B) for x in allObs ]
234     #print(myResult)
235     errorRatio1 = np.sum( (np.array(myResult) != allState).astype(int) ) / (length * dataSize)
236     
237     myResult = [ viterbiPredict(x, π, A, B) for x in allObs ]
238     #print(myResult)
239     errorRatio2 = np.sum( (np.array(myResult) != allState).astype(int) ) / (length * dataSize)
240     
241     print("state = %d, obs = %d, length = %d
errorRatioAppropritePredict = %4f, errorRatioViterbiPredict = %4f"%(state, obs, length, errorRatio1, errorRatio2))
242 
243 if __name__ == '__main__':
244     test(4, 3, 10)

● 输出结果1,成功复现树上的样例数据

pOFS = 0.130218, pOBS = 0.130218
pOF  = 0.130218, pOB  = 0.130218
pOMS  = 0.130218
α =
 [[0.1      0.16     0.28    ]
 [0.077    0.1104   0.0606  ]
 [0.04187  0.035512 0.052836]]
β =
 [[0.2451 0.2622 0.2277]
 [0.54   0.49   0.57  ]
 [1.     1.     1.    ]]
pOM =
 [0.130218 0.130218]
γ =
 [[0.18822283 0.32216744 0.48960973]
 [0.31931069 0.41542644 0.26526287]
 [0.32153773 0.27271191 0.40575036]]
ξ =
 [[[0.1036723  0.04515505 0.03939548]
  [0.09952541 0.18062019 0.04202184]
  [0.11611298 0.1896512  0.18384555]]

 [[0.14782903 0.04730529 0.12417638]
  [0.12717136 0.16956181 0.11869327]
  [0.04653735 0.05584481 0.16288071]]]
appropritePredict: [2 1 2]
ViterbiPredict: [2 2 2]

● 输出结果2,用自己的数据来跑有问题【坑】

π =  [0.09536369 0.4423878  0.33021783 0.13203068]
A =
 [[0.38752484 0.17316366 0.39198697 0.04732453]
 [0.09835982 0.23793441 0.38519171 0.27851407]
 [0.44365676 0.12177898 0.3055296  0.12903465]
 [0.29614541 0.30691252 0.04969234 0.34724973]]
B =
 [[0.12418479 0.76243987 0.11337534]
 [0.34515219 0.28857853 0.36626928]
 [0.27430006 0.14931117 0.57638877]
 [0.22205449 0.24865406 0.52929144]]

supervisedLearn:
π =  [0.1  0.42 0.33 0.15]
A =
 [[0.38233515 0.18269812 0.38747731 0.04748941]
 [0.10843373 0.23782085 0.37139864 0.28234678]
 [0.44822934 0.12310287 0.30084317 0.12782462]
 [0.30813953 0.27151163 0.04476744 0.3755814 ]]
B =
 [[0.13203593 0.75479042 0.11317365]
 [0.34575569 0.313147   0.34109731]
 [0.27606952 0.14639037 0.57754011]
 [0.23502304 0.24193548 0.52304147]]

baumWelchLearn:
π =  [1.60211593e-01 8.26662258e-01 8.91008964e-06 1.31172391e-02]
A =
 [[0.17940016 0.25527889 0.31784884 0.24747211]
 [0.39215007 0.15576319 0.22874897 0.22333777]
 [0.32884167 0.30016482 0.15265056 0.21834295]
 [0.36968335 0.03746577 0.34497214 0.24787874]]
B =
 [[0.34244911 0.24148873 0.41606216]
 [0.29723256 0.24124855 0.46151888]
 [0.18049919 0.57465181 0.244849  ]
 [0.29483333 0.37769373 0.32747294]]
state = 4, obs = 3, length = 100
errorRatioAppropritePredict = 0.489400, errorRatioViterbiPredict = 0.692400
原文地址:https://www.cnblogs.com/cuancuancuanhao/p/11332177.html