qq_30128711 2018-08-11 10:01 采纳率: 0%
浏览 1487
已结题

自定义hbase协处理器失败

代码 ---
public class Util {

public static String getRegNo(String callerId , String callTime){
    //区域00-99
    int hash = (callerId + callTime.substring(0, 6)).hashCode();
    hash =(hash & Integer.MAX_VALUE) % 100;

    //hash区域号
    DecimalFormat df = new DecimalFormat();
    df.applyPattern("00");
    String regNo = df.format(hash);
    return regNo ;
}

public class CalleeLogRegionObserver extends BaseRegionObserver{

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
    FileWriter fw = new FileWriter("/home/centos/kkk.txt",true);

    super.postPut(e, put, edit, durability);
    //
    String tableName0 = TableName.valueOf("ns1:calllogs").getNameAsString();
    //得到当前的TableName对象
    String tableName1 = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
    fw.write(tableName1 + "\r\n");
    if(!tableName0.equals(tableName1)){
        return  ;
    }

    //得到主叫的rowkey
    //xx , callerid , time ,  direction, calleid  ,duration
    //被叫:calleid,time,

    String rowkey = Bytes.toString(put.getRow());
    String[] arr = rowkey.split(",");
    if(arr[3].equals("1")){
        return ;
    }


    String hash = Util.getRegNo(arr[4], arr[2]);
    //hash

    String newRowKey = hash + "," + arr[4] + "," + arr[2] + ",1," + arr[1] + "," + arr[5];
    Put newPut = new Put(Bytes.toBytes(newRowKey));
    newPut.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("dummy"), Bytes.toBytes("no"));
    TableName tn = TableName.valueOf("ns1:calllogs");
    Table t = e.getEnvironment().getTable(tn);
    //
    fw.write(t.getName().getNameAsString() + "\r\n");
    t.put(newPut);
    fw.close();
}

public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
    super.preGetOp(e, get, results);

}

-- 配置
<!-- 启用完全分布式 -->

hbase.cluster.distributed
true

        <!-- 指定hbase数据在hdfs上的存放路径 -->
        <property>
            <name>hbase.rootdir</name>
            <value>hdfs://mycluster/hbase</value>
        </property>
        <!-- 配置zk地址 -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>s101:2181,s102:2181,s103:2181</value>
        </property>
        <!-- zk的本地目录 -->
        <property>
            <name>hbase.zookeeper.property.dataDir</name>
            <value>/home/centos/zookeeper</value>
        </property>


        <!--协处理器 -->
          <property>
                            <name>hbase.coprocessor.region.classes</name>
            <value>com.tjx.hbasedemo.coprocessor.CalleeLogRegionObserver</value>
          </property>

我再重启hbase 的时候没有在主目录下 生成 kkk.txt 文件,也没有报错,就是调用不了协处理器器

------------------------------------------------------------插入代码
@Test
public void testPut() throws Exception{
//创建conf对象
Configuration conf = HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
//通过连接查询tableName对象
TableName tname = TableName.valueOf("ns1:calllogs");
//获得table
Table table = conn.getTable(tname);

    //主叫
    String callerId = "13845456767";
    //被叫
    String calleeId = "13989898787";

    SimpleDateFormat sdf = new SimpleDateFormat();
    sdf.applyPattern("yyyyMMddHHmmss");
    //通话时间
    String callTime = sdf.format(new Date());
    //通话时长
    int duration = 100 ;
    DecimalFormat dff = new DecimalFormat();
    dff.applyPattern("00000");
    String durStr = dff.format(duration);

    // hash 区域00 -- 99
    int hash = (calleeId + callTime.substring(0,6)).hashCode();
    hash = hash & Integer.MAX_VALUE % 100;

    DecimalFormat df = new DecimalFormat();
    df.applyPattern("00");
    String regNo = df.format(hash);


    // 拼接rowkey
    String rowkey = regNo +","+callerId+","+callTime+","+"0"+","+calleeId+","+durStr ;
    byte[] rowid = Bytes.toBytes(rowkey);
    //创建put对象
    Put put = new Put(rowid);
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"),Bytes.toBytes("河北"));
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calleePos"),Bytes.toBytes("河南"));
    //执行插入
    table.put(put);
    System.out.println("over");
}
  • 写回答

1条回答 默认 最新

  • 倔强の孩纸 2018-08-11 11:33
    关注

    (一)Hbase协处理器的前世今生

    Hbase是仿照Google的BigTable设计的,而其协处理器也是仿照BigTable的协处理实现完成的,具体链接可
    参考:http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf

    (二)什么是Hbase协处理器(Coprocessors )?

    Hbase的协处理器在Hbase中属于高级的应用功能,它可以让开发者自定义的代码在服务器端执行,来完成特定的一些功能。

    (三)为什么要用协处理器?

    Hbase是一款高效的基于KV的NOSQL数据库,它有非常多的优点,但是也有不少缺点,hbase的设计全在rowkey上,所有能够高效的查询全是基于rowkey的,除了rowkey的设计之外,我们可能还有
    一些其他的功能,如
    (1)访问权限控制
    (2)引用完整性,基于外键检验数据,
    (3)给hbase设计二级索引,从而提高基于列过滤时的查询性能,
    (4)像监控MySQL的binlog一样,监控hbase的wal预写log
    (5)服务端自定义实现一些聚合函数的功能
    (6).......
    这样额外的功能,使用hbase的协处理来处理是非常方便的

    (四)Hbase中协处理器的分类

    在Hbase里面有两类Coprocessors :
    1,基于Observer的Coprocessors ,类似于关系型数据库的触发器,可用来实现上面提到的功能中的1,2,3,4功能
    常用的Observer:
    RegionServerObserver 能够切面监测rowkey的数据的访问与删除
    BaseMasterAndRegionObserver 能够切面监测hbase表的创建,删除,sheml修改
    BaseWALObserver 能够切面监测hbase的wal的log写入

    2,基于Endpoint的Coprocessors ,类似于关系型数据库的存储过程,可用来实现上面提到的功能中的5功能
    在hbase0.96之后,采用probuff序列化通信的RPC数据,使用endpoint,需要同过protoc生成相关的service接口的java类
    然后继承自己的生成的Service类并且实现hbase的Coprocessor, CoprocessorService接口,从而重写其中的业务方法构建一个
    自定义的Endpoint的Coprocessors

    协处理器的编程概念与Spring AOP的理念很相似,它也像MapReduce的数据运算方式,与本地local的数据产生计算,而不是远程读取数据再计算,
    通过local计算的方式与RegionServer绑定,从而能提升数据计算的效率。

    (5)协处理器的安装使用

    Java代码

    hbase.coprocessor.region.classes for RegionObservers and Endpoints.

    hbase.coprocessor.wal.classes for WALObservers.

    hbase.coprocessor.master.classes for MasterObservers.

    在hbase官网文档中,介绍了两种使用方式:

    静态方式(系统级),使用配置文件:

    1,编写协处理器,并打成一个jar包,加入hbase/lib目录下,或者在hbase-env.sh里面配置相对应的jar,以及依赖的jar的路径
    2,加入静态的配置,在hbase-site.xml里配置主类

    Xml代码



    hbase.coprocessor.region.classes

    org.myname.hbase.coprocessor.endpoint.SumEndPoint

    3,把依赖的jar分发到每一个regionserver上,然后重启hbase,
    协处理生效,是系统级的协处理器

    动态方式(表级别),使用Hbase shell:
    1,编写协处理器,打成一个jar包,上传至HDFS,将依赖的jar拷贝到hbase的lib下,配置hbase-env.sh指定依赖jar的
    2,建立表:

    Java代码

    create 'c', NAME=>'cf'

    3,禁用表

    Java代码

    disable 'c'

    4,指定协处理器的jar

    Java代码

    alter 'c', METHOD => 'table_att', 'coprocessor'=>'hdfs:///user/hbase_solr/hbase-increment-index.jar|com.hbase.easy.index.HbaseSolrIndexCoprocesser|1001|'

    5,激活表

    Java代码

    enable 'c'

    6,删除协处理jar
    如果有多个协处理器,按照$1 $2 $n删除指定的jar配置

    Java代码
    alter 'c',METHOD => 'table_att_unset',NAME =>'coprocessor$1'

    最后说一下,hbase的官方文档指出动态级别的协处理器,可以做到不重启hbase,更新协处理,做法就是
    禁用表,卸载协处理器,重新指定协处理器, 激活表,即可,但实际测试发现
    动态加载无效,是hbase的一个bug,看这个链接:

    https://issues.apache.org/jira/browse/HBASE-8445

    因为协处理器,已经被JVM加载,即使删除jar也不能重新load的jar,因为cache里面的hdfs的jar路径,没有变化,所以动态更新无效
    ,除非重启JVM,那样就意味着,需要重启RegionServer,
    里面的小伙伴们指出了两种办法,使协处理器加载生效:
    (1)滚动重启regionserver,避免停掉所有的节点
    (2)改变协处理器的jar的类名字或者hdfs加载路径,以方便有新的ClassLoad去加载它

    但总体来看,第2种方法,比较安全,第一种风险太大,一般情况下没有人会随便滚动重启线上的服务器的,这只在hbase升级的时候使用

    评论

报告相同问题?

悬赏问题

  • ¥15 改算法,照着压缩包里边,参考其他代码封装的格式 写到main函数里
  • ¥15 用windows做服务的同志有吗
  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 一直显示正在等待HID—ISP