2 jc 1129 jc_1129 于 2016.04.18 19:33 提问

基于hadoop的hmm模型实现

哪位大神能帮我看一下,为什么我基于hadoop实现的hmm,输入总是报错?一开始报not a sequencefile ,然后我用mahout里面的seqdirectory把txt文件变成了sequencefile,接着又报java.lang.NullPointerException。我都要哭了,刚开始学hadoop
import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HmmParallel {
public static class HmmParallelMap extends Mapper{

// the number of Observations;
private static int Observations;
// the number of States;
private static int States;
// the observationSequence;
int[] observedSequence;
// the hiddenSequence;
int[] hiddenSequence;
    private HmmModel hmm;

    private ArrayWritable piStripe;
    private ArrayWritable emissionStripe;
    private ArrayWritable transitionStripe;
    private Writable[] pi_tmp;
    private Writable[] em_tmp;
    private Writable[] tr_tmp;
// the Observations of training data
int[] o;

/**
 * Create an supervised initial estimate of an HMM Model based on
 * transitionMatrix sequence of observed and hidden states.
 * 
 * @return An initial model using the estimated parameters
 */
    public void setup(Context context)
            throws IOException, InterruptedException{
        hmm = HmmTrainer.trainSupervised(States, Observations, observedSequence, hiddenSequence, 0);    
    }

/**
 * Map function of the HMM Parallel. The input of <key, value> is: key
 * contains the offset of the first character in a line, value contains one
 * line data of the training data. The output of <key, value> is: <initial,
 * initialProbabilities>, <emit from states, Stripe emissionMatrix>,
 * <transit from states, Stripe transitionMatrix>
 */
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException{

        for(int i = 0; i < value.getLength(); i++){
            o[i] = value.charAt(i);
        }

        int T = value.getLength();
        double[][] fwd;
        double[][] bwd;

  double initialProbabilities[] = new double[hmm.numStates];
  double transitionMatrix[][] = new double[hmm.numStates][hmm.numStates];
  double emissionMatrix[][] = new double[hmm.numStates][hmm.sigmaSize];

        //calculation of forward and backward Variables from the current model
        fwd = HmmAlgorithms.forwardAlgorithm(hmm, o, true);
        bwd = HmmAlgorithms.backwardAlgorithm(hmm, o, true);

        //re-estimation of initial state probabilities
        for(int i = 0; i < hmm.numStates; i++)
    initialProbabilities[i] = gamma(i, 0, o, fwd, bwd);

        //re-estimation of transition probabilities
        for(int i =0; i < hmm.numStates; i++){
            for(int j = 0; j < hmm.numStates; j++){
                double num = 0;
                double denum = 0;

                for(int t = 0; t < T - 1; t++){
                    num += p(t, i, j, o, fwd, bwd);
                    denum += gamma(i, t, o, fwd, bwd);
                }
      transitionMatrix[i][j] = divide(num, denum);
            }
        }

        //re-estimation of emission probabilities
        for(int i = 0; i < hmm.numStates; i++){
            for(int k = 0; k <hmm.sigmaSize; k++){
                double num = 0;
                double denum = 0;

                for(int t = 0; t < T - 1; t++){
                    double g = gamma(i, t, o, fwd, bwd);
                    num += g * (k == o[t] ? 1 : 0);
                    denum += g;
                }
      emissionMatrix[i][k] = divide(num, denum);
            }
        }

  // emit the context
        for(int i = 0; i < hmm.numStates; i++){
    pi_tmp[i] = new DoubleWritable(hmm.initialProbabilities[i]);
        }
        piStripe.set(pi_tmp);
        context.write(new Text("initial"), piStripe);


        for(int i = 0; i < hmm.numStates; i++){
            for(int j = 0; j < hmm.sigmaSize; j++){
      em_tmp[j] = new DoubleWritable(hmm.emissionMatrix[i][j]);
            }
            emissionStripe.set(em_tmp);
            context.write(new Text("emit from" + hiddenSequence.toString()), emissionStripe);
        }


        for(int i = 0; i < hmm.numStates; i++){
            for(int j = 0; j < hmm.numStates; j++){
      tr_tmp[j] = new DoubleWritable(hmm.transitionMatrix[i][j]);
            }
            transitionStripe.set(tr_tmp);
            context.write(new Text("transit from" + hiddenSequence.toString()), transitionStripe);
        }
    }

    /** calculation of probability P(X_t = s_i, X_t+1 = s_j | O, m).
      @param t time t
      @param i the number of state s_i
      @param j the number of state s_j
      @param o an output sequence o
      @param fwd the Forward-Variables for o
      @param bwd the Backward-Variables for o
      @return P
  */
  public double p(int t, int i, int j, int[] o, double[][] fwd, double[][] bwd) {
    double num;
    if (t == o.length - 1)
    num = fwd[i][t] * hmm.transitionMatrix[i][j];
    else
    num =
        fwd[i][t] * hmm.transitionMatrix[i][j]
            * hmm.emissionMatrix[j][o[t + 1]]
            * bwd[j][t + 1];

    double denom = 0;

    for (int k = 0; k < hmm.numStates; k++)
      denom += (fwd[k][t] * bwd[k][t]);
  // normalize the rows of the matrix
    return divide(num, denom);
  }

/**
 * computes gamma(i, t)
 * */
  public double gamma(int i, int t, int[] o, double[][] fwd, double[][] bwd) {
    double num = fwd[i][t] * bwd[i][t];
    double denom = 0;

    for (int j = 0; j < hmm.numStates; j++)
      denom += fwd[j][t] * bwd[j][t];
    // normalize the rows of matrix
    return divide(num, denom);
  }

  /** divides two doubles. 0 / 0 = 0! */
  public double divide(double n, double d) {
    if (n == 0)
      return 0;
    else
      return n / d;
  }
}

public static class HmmParallelReduce extends Reducer<Text, ArrayWritable, Text, ArrayWritable>{
    private ArrayWritable Cf;
    private DoubleWritable[] Cf_tmp;
    double[] doubleValue;

/**
 * Reduce function of the HMM Parallel. The input of <key, value> is:
 * <initial, initialProbabilities>, <emit from states, Stripe
 * emissionMatrix>, <transit from states, Stripe transitionMatrix>. Then
 * this part aggregates the results from different mappers. The output of
 * <key, value> is also: <initial, initialProbabilities>, <emit from states,
 * Stripe emissionMatrix>, <transit from states, Stripe transitionMatrix>
 */
    public void reduce(Text key, Iterable<ArrayWritable> values, Context context)
        throws IOException, InterruptedException{


        for(ArrayWritable value : values){
            for(int i = 0; i < value.get().length ;i++){
                doubleValue[i] += Double.parseDouble(value.get()[i].toString());
                Cf_tmp[i] = new DoubleWritable(doubleValue[i]);
            }
        }
        Cf.set(Cf_tmp);
        double z = 0;
        for(int i = 0; i < Cf.get().length; i++){
            z += Double.parseDouble(Cf.get()[i].toString());
        }
        //normalization
        for(int i = 0; i < Cf.get().length; i++){
            Cf_tmp[i] = new DoubleWritable(Double.parseDouble(Cf.get()[i].toString()) / z);
        }
        Cf.set(Cf_tmp);

        context.write(key, Cf);
    }
}


public static void main(String[] args)
        throws IOException, InterruptedException, ClassNotFoundException{

/**
 * The first arguments of the map-reduce program is: the input Path of the
 * training data; the second arguments is the output Path of the result; the
 * third arguments is the number of iteration.
 */
    int iterationNum = Integer.parseInt(args[2]);

    Job hmmParallel = new Job();
    hmmParallel.setJobName("hmmParallel");
    hmmParallel.setJarByClass(HmmParallel.class);

    FileInputFormat.addInputPath(hmmParallel, new Path(args[0]));
    FileOutputFormat.setOutputPath(hmmParallel, new Path("temp0"));

    hmmParallel.setMapperClass(HmmParallelMap.class);
    hmmParallel.setReducerClass(HmmParallelReduce.class);
    hmmParallel.setMapOutputKeyClass(Text.class);
    hmmParallel.setMapOutputValueClass(ArrayWritable.class);
    hmmParallel.setOutputKeyClass(Text.class);
    hmmParallel.setOutputValueClass(ArrayWritable.class);
    hmmParallel.setOutputFormatClass(SequenceFileOutputFormat.class);
    hmmParallel.setInputFormatClass(SequenceFileInputFormat.class);

    hmmParallel.waitForCompletion(true);
    for(int i = 1; i < iterationNum; i++){
        Job loopJob = new Job();
        loopJob.setJobName("loopJob");
        loopJob.setJarByClass(HmmParallel.class);

        FileInputFormat.addInputPath(loopJob, new Path("temp" + new Integer(i - 1)));
        if(i == (iterationNum - 1))
            FileOutputFormat.setOutputPath(loopJob, new Path(args[1]));
        else
            FileOutputFormat.setOutputPath(loopJob, new Path("temp" + new Integer(i)));

        loopJob.setMapperClass(HmmParallelMap.class);
        loopJob.setReducerClass(HmmParallelReduce.class);
        loopJob.setMapOutputKeyClass(Text.class);
        loopJob.setMapOutputValueClass(ArrayWritable.class);
        loopJob.setOutputKeyClass(Text.class);
        loopJob.setOutputValueClass(ArrayWritable.class);
        loopJob.setInputFormatClass(SequenceFileInputFormat.class);
        loopJob.setOutputFormatClass(SequenceFileOutputFormat.class);

        loopJob.waitForCompletion(true);
        FileSystem.get(new Configuration()).delete(new Path("temp" + new Integer(i - 1)));
    }
}

}

2个回答

jc_1129
jc_1129   2016.04.18 19:34

这是并行化hmm模型的代码……

CSDNXIAOD
CSDNXIAOD   2016.04.18 19:42

隐马尔科夫模型(HMM)及其实现
----------------------biu~biu~biu~~~在下问答机器人小D,这是我依靠自己的聪明才智给出的答案,如果不正确,你来咬我啊!

Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!
其他相关推荐
分词:基于HMM的中文分词模型实现
一、前言 本文主要是实现了一个纯粹的HMM中文分词模型,关于中文分词可以参考:中文分词。分词的基本思想与该文基本一致,请确保已经了解分词的基本知识。 二、实战 1、语料源 语料来源于Bakeoff 2005的主页,这里选用了icwb2-data.rar语料,大概介绍如下: *  /icwb2-data.rar/training/msr_training.txt    用以训练HMM,其中
中文分词的python实现-基于HMM算法
隐马尔科夫模型(HMM)模型介绍HMM模型是由一个“五元组”组成: StatusSet: 状态值集合 ObservedSet: 观察值集合 TransProbMatrix: 转移概率矩阵 EmitProbMatrix: 发射概率矩阵 InitStatus: 初始状态分布
HMM模型在中文分词中的应用
模型介绍 第一次听说HMM模型是从李开复的博文论文中听说的: 李开复1988年的博士论文发表了第一个基于隐马尔科夫模型(HMM)的语音识别系统Sphinx,被《商业周刊》评为1988年美国最重要的科技发明。 出处请见KaifuLeeHMM 乍一听似乎很玄妙,但是其实很简单。下面是相关参数介绍,也是第一眼觉得很抽象,但是慢慢看下去随着具体含义的解释就渐渐清晰。 HMM(Hidden
python实现hmm
用python实现的Hmm,有详细的代码注解,对Hmm感兴趣的同学值得下载学习
python实现的基于hmm模型的词性标注系统
python实现的基于hmm模型的词性标注系统任务定义实现一个词性标注系统,输入分好词的单词序列,输出一个词性标注后的结果序使用的语料库为人民日报98年公开语料库,一共约18000行语料。在用户交互模式下,所有语料库均用作训练。在文件读写模式下,前3000行语句用来做测试,后面的语句用来做训练。方法描述隐马尔科夫模型理解 隐马尔科夫模型是结构最简单的动态贝叶斯网络。描述由一个隐藏的马尔科夫链随机生成
隐马尔可夫模型(HMM)的MATLAB实现——Viterbi算法
维特比算法实际是用动态规划求解隐马尔可夫模型解码问题,即用动态规划求概率最大路径(最优路径)。代码中有示例,来自李航《统计学习方法》 function [Delta,Psi,P,I] = Viterbi(A,B,Pi,O) % 函数功能:利用维特比算法找到观测序列O的最优路径 % % 参考文献:李航《统计学习方法》 % % 思路: % 1,初始化 % delta_1(i) = Pi_i *
使用HMM隐式马尔科夫链实现基于拼音的文本纠错
HMM实现基于拼音的文本纠错 文章将从以下4个小节进行描述: 1.问题描述 2.思路 3.源码 4.应用 1.问题描述 对于歌曲的语音搜索实现方案之一如下。 使用百度语音进行用户语音识别,返回的字符串调用歌词搜索,而歌词搜索使用的是分词的方式进行的索引建立,因此字符串识别的准确率直接影响最后返回歌曲的正确性。 而百度语音是基于非特定样本
使用隐马尔科夫模型(HMM)进行语音识别
         在实验室待了一段时间了,在实验室的报告及小组会议中,深刻的体会到了HMM算法 ,SVM等的重要性.         这几天翻译了一篇使用隐马尔科夫模型(HMM)进行语音识别的论文的部分.           这篇论文是通过google的学术搜索搜到的,通过在互联网上的查找,发现这篇论文引用率很高。(排在google的第一位),并且没发现对其的汉语翻译。或许这篇是对该文的第
HMM 隐马尔可夫模型 代码实现
#encoding:utf-8 import sys import pickle from copy import deepcopyis_train = FalseDEFAULT_PROB = 0.000000000001 MIN_PROB = -1 * float('inf')train_path = "train.in" test_path = "test.in" output_path = "
基于隐马尔可夫模型的有监督词性标注
代码下载:基于隐马尔可夫模型的有监督词性标注 词性标注(Part-of-Speech tagging 或 POS tagging)是指对于句子中的每个词都指派一个合适的词性,也就是要确定每个词是名词、动词、形容词或其他词性的过程,又称词类标注或者简称标注。词性标注是自然语言处理中的一项基础任务,在语音识别、信息检索及自然语言处理的许多领域都发挥着重要的作用。        词性标注本质上是