代码 ---
public class Util {
public static String getRegNo(String callerId , String callTime){
//区域00-99
int hash = (callerId + callTime.substring(0, 6)).hashCode();
hash =(hash & Integer.MAX_VALUE) % 100;
//hash区域号
DecimalFormat df = new DecimalFormat();
df.applyPattern("00");
String regNo = df.format(hash);
return regNo ;
}
public class CalleeLogRegionObserver extends BaseRegionObserver{
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
FileWriter fw = new FileWriter("/home/centos/kkk.txt",true);
super.postPut(e, put, edit, durability);
//
String tableName0 = TableName.valueOf("ns1:calllogs").getNameAsString();
//得到当前的TableName对象
String tableName1 = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
fw.write(tableName1 + "\r\n");
if(!tableName0.equals(tableName1)){
return ;
}
//得到主叫的rowkey
//xx , callerid , time , direction, calleid ,duration
//被叫:calleid,time,
String rowkey = Bytes.toString(put.getRow());
String[] arr = rowkey.split(",");
if(arr[3].equals("1")){
return ;
}
String hash = Util.getRegNo(arr[4], arr[2]);
//hash
String newRowKey = hash + "," + arr[4] + "," + arr[2] + ",1," + arr[1] + "," + arr[5];
Put newPut = new Put(Bytes.toBytes(newRowKey));
newPut.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("dummy"), Bytes.toBytes("no"));
TableName tn = TableName.valueOf("ns1:calllogs");
Table t = e.getEnvironment().getTable(tn);
//
fw.write(t.getName().getNameAsString() + "\r\n");
t.put(newPut);
fw.close();
}
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
super.preGetOp(e, get, results);
}
-- 配置
<!-- 启用完全分布式 -->
hbase.cluster.distributed
true
<!-- 指定hbase数据在hdfs上的存放路径 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster/hbase</value>
</property>
<!-- 配置zk地址 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>s101:2181,s102:2181,s103:2181</value>
</property>
<!-- zk的本地目录 -->
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/centos/zookeeper</value>
</property>
<!--协处理器 -->
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.tjx.hbasedemo.coprocessor.CalleeLogRegionObserver</value>
</property>
我再重启hbase 的时候没有在主目录下 生成 kkk.txt 文件,也没有报错,就是调用不了协处理器器
------------------------------------------------------------插入代码
@Test
public void testPut() throws Exception{
//创建conf对象
Configuration conf = HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
//通过连接查询tableName对象
TableName tname = TableName.valueOf("ns1:calllogs");
//获得table
Table table = conn.getTable(tname);
//主叫
String callerId = "13845456767";
//被叫
String calleeId = "13989898787";
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern("yyyyMMddHHmmss");
//通话时间
String callTime = sdf.format(new Date());
//通话时长
int duration = 100 ;
DecimalFormat dff = new DecimalFormat();
dff.applyPattern("00000");
String durStr = dff.format(duration);
// hash 区域00 -- 99
int hash = (calleeId + callTime.substring(0,6)).hashCode();
hash = hash & Integer.MAX_VALUE % 100;
DecimalFormat df = new DecimalFormat();
df.applyPattern("00");
String regNo = df.format(hash);
// 拼接rowkey
String rowkey = regNo +","+callerId+","+callTime+","+"0"+","+calleeId+","+durStr ;
byte[] rowid = Bytes.toBytes(rowkey);
//创建put对象
Put put = new Put(rowid);
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"),Bytes.toBytes("河北"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calleePos"),Bytes.toBytes("河南"));
//执行插入
table.put(put);
System.out.println("over");
}