自定义hbase协处理器失败 10C

代码 ---
public class Util {

public static String getRegNo(String callerId , String callTime){
    //区域00-99
    int hash = (callerId + callTime.substring(0, 6)).hashCode();
    hash =(hash & Integer.MAX_VALUE) % 100;

    //hash区域号
    DecimalFormat df = new DecimalFormat();
    df.applyPattern("00");
    String regNo = df.format(hash);
    return regNo ;
}

public class CalleeLogRegionObserver extends BaseRegionObserver{

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
    FileWriter fw = new FileWriter("/home/centos/kkk.txt",true);

    super.postPut(e, put, edit, durability);
    //
    String tableName0 = TableName.valueOf("ns1:calllogs").getNameAsString();
    //得到当前的TableName对象
    String tableName1 = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
    fw.write(tableName1 + "\r\n");
    if(!tableName0.equals(tableName1)){
        return  ;
    }

    //得到主叫的rowkey
    //xx , callerid , time ,  direction, calleid  ,duration
    //被叫:calleid,time,

    String rowkey = Bytes.toString(put.getRow());
    String[] arr = rowkey.split(",");
    if(arr[3].equals("1")){
        return ;
    }


    String hash = Util.getRegNo(arr[4], arr[2]);
    //hash

    String newRowKey = hash + "," + arr[4] + "," + arr[2] + ",1," + arr[1] + "," + arr[5];
    Put newPut = new Put(Bytes.toBytes(newRowKey));
    newPut.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("dummy"), Bytes.toBytes("no"));
    TableName tn = TableName.valueOf("ns1:calllogs");
    Table t = e.getEnvironment().getTable(tn);
    //
    fw.write(t.getName().getNameAsString() + "\r\n");
    t.put(newPut);
    fw.close();
}

public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
    super.preGetOp(e, get, results);

}

-- 配置
<!-- 启用完全分布式 -->

hbase.cluster.distributed
true

        <!-- 指定hbase数据在hdfs上的存放路径 -->
        <property>
            <name>hbase.rootdir</name>
            <value>hdfs://mycluster/hbase</value>
        </property>
        <!-- 配置zk地址 -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>s101:2181,s102:2181,s103:2181</value>
        </property>
        <!-- zk的本地目录 -->
        <property>
            <name>hbase.zookeeper.property.dataDir</name>
            <value>/home/centos/zookeeper</value>
        </property>


        <!--协处理器 -->
          <property>
                            <name>hbase.coprocessor.region.classes</name>
            <value>com.tjx.hbasedemo.coprocessor.CalleeLogRegionObserver</value>
          </property>

我再重启hbase 的时候没有在主目录下 生成 kkk.txt 文件,也没有报错,就是调用不了协处理器器

------------------------------------------------------------插入代码
@Test
public void testPut() throws Exception{
//创建conf对象
Configuration conf = HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
//通过连接查询tableName对象
TableName tname = TableName.valueOf("ns1:calllogs");
//获得table
Table table = conn.getTable(tname);

    //主叫
    String callerId = "13845456767";
    //被叫
    String calleeId = "13989898787";

    SimpleDateFormat sdf = new SimpleDateFormat();
    sdf.applyPattern("yyyyMMddHHmmss");
    //通话时间
    String callTime = sdf.format(new Date());
    //通话时长
    int duration = 100 ;
    DecimalFormat dff = new DecimalFormat();
    dff.applyPattern("00000");
    String durStr = dff.format(duration);

    // hash 区域00 -- 99
    int hash = (calleeId + callTime.substring(0,6)).hashCode();
    hash = hash & Integer.MAX_VALUE % 100;

    DecimalFormat df = new DecimalFormat();
    df.applyPattern("00");
    String regNo = df.format(hash);


    // 拼接rowkey
    String rowkey = regNo +","+callerId+","+callTime+","+"0"+","+calleeId+","+durStr ;
    byte[] rowid = Bytes.toBytes(rowkey);
    //创建put对象
    Put put = new Put(rowid);
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"),Bytes.toBytes("河北"));
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calleePos"),Bytes.toBytes("河南"));
    //执行插入
    table.put(put);
    System.out.println("over");
}

1个回答

(一)Hbase协处理器的前世今生

Hbase是仿照Google的BigTable设计的,而其协处理器也是仿照BigTable的协处理实现完成的,具体链接可
参考:http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf

(二)什么是Hbase协处理器(Coprocessors )?

Hbase的协处理器在Hbase中属于高级的应用功能,它可以让开发者自定义的代码在服务器端执行,来完成特定的一些功能。

(三)为什么要用协处理器?

Hbase是一款高效的基于KV的NOSQL数据库,它有非常多的优点,但是也有不少缺点,hbase的设计全在rowkey上,所有能够高效的查询全是基于rowkey的,除了rowkey的设计之外,我们可能还有
一些其他的功能,如
(1)访问权限控制
(2)引用完整性,基于外键检验数据,
(3)给hbase设计二级索引,从而提高基于列过滤时的查询性能,
(4)像监控MySQL的binlog一样,监控hbase的wal预写log
(5)服务端自定义实现一些聚合函数的功能
(6).......
这样额外的功能,使用hbase的协处理来处理是非常方便的

(四)Hbase中协处理器的分类

在Hbase里面有两类Coprocessors :
1,基于Observer的Coprocessors ,类似于关系型数据库的触发器,可用来实现上面提到的功能中的1,2,3,4功能
常用的Observer:
RegionServerObserver 能够切面监测rowkey的数据的访问与删除
BaseMasterAndRegionObserver 能够切面监测hbase表的创建,删除,sheml修改
BaseWALObserver 能够切面监测hbase的wal的log写入

2,基于Endpoint的Coprocessors ,类似于关系型数据库的存储过程,可用来实现上面提到的功能中的5功能
在hbase0.96之后,采用probuff序列化通信的RPC数据,使用endpoint,需要同过protoc生成相关的service接口的java类
然后继承自己的生成的Service类并且实现hbase的Coprocessor, CoprocessorService接口,从而重写其中的业务方法构建一个
自定义的Endpoint的Coprocessors

协处理器的编程概念与Spring AOP的理念很相似,它也像MapReduce的数据运算方式,与本地local的数据产生计算,而不是远程读取数据再计算,
通过local计算的方式与RegionServer绑定,从而能提升数据计算的效率。

(5)协处理器的安装使用

Java代码

hbase.coprocessor.region.classes for RegionObservers and Endpoints.

hbase.coprocessor.wal.classes for WALObservers.

hbase.coprocessor.master.classes for MasterObservers.

在hbase官网文档中,介绍了两种使用方式:

静态方式(系统级),使用配置文件:

1,编写协处理器,并打成一个jar包,加入hbase/lib目录下,或者在hbase-env.sh里面配置相对应的jar,以及依赖的jar的路径
2,加入静态的配置,在hbase-site.xml里配置主类

Xml代码



hbase.coprocessor.region.classes

org.myname.hbase.coprocessor.endpoint.SumEndPoint

3,把依赖的jar分发到每一个regionserver上,然后重启hbase,
协处理生效,是系统级的协处理器

动态方式(表级别),使用Hbase shell:
1,编写协处理器,打成一个jar包,上传至HDFS,将依赖的jar拷贝到hbase的lib下,配置hbase-env.sh指定依赖jar的
2,建立表:

Java代码

create 'c', NAME=>'cf'

3,禁用表

Java代码

disable 'c'

4,指定协处理器的jar

Java代码

alter 'c', METHOD => 'table_att', 'coprocessor'=>'hdfs:///user/hbase_solr/hbase-increment-index.jar|com.hbase.easy.index.HbaseSolrIndexCoprocesser|1001|'

5,激活表

Java代码

enable 'c'

6,删除协处理jar
如果有多个协处理器,按照$1 $2 $n删除指定的jar配置

Java代码
alter 'c',METHOD => 'table_att_unset',NAME =>'coprocessor$1'

最后说一下,hbase的官方文档指出动态级别的协处理器,可以做到不重启hbase,更新协处理,做法就是
禁用表,卸载协处理器,重新指定协处理器, 激活表,即可,但实际测试发现
动态加载无效,是hbase的一个bug,看这个链接:

https://issues.apache.org/jira/browse/HBASE-8445

因为协处理器,已经被JVM加载,即使删除jar也不能重新load的jar,因为cache里面的hdfs的jar路径,没有变化,所以动态更新无效
,除非重启JVM,那样就意味着,需要重启RegionServer,
里面的小伙伴们指出了两种办法,使协处理器加载生效:
(1)滚动重启regionserver,避免停掉所有的节点
(2)改变协处理器的jar的类名字或者hdfs加载路径,以方便有新的ClassLoad去加载它

但总体来看,第2种方法,比较安全,第一种风险太大,一般情况下没有人会随便滚动重启线上的服务器的,这只在hbase升级的时候使用

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐