1. 下载protobuf-2.5.0解压,如果是window下,额外下载protoc-2.5.0-win32,解压,将protoc.exe放在protobuf-2.5.0下的src目录下

hbase自定义过滤器 随笔 第1张hbase自定义过滤器 随笔 第2张

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

2. 配置环境变量,添加path路径指向protobuf目录的src中

hbase自定义过滤器 随笔 第3张hbase自定义过滤器 随笔 第4张

3. 查看当前版本,在命令提示符中输入命令

hbase自定义过滤器 随笔 第5张hbase自定义过滤器 随笔 第6张

4. 创建一个空白的文本文件 命名为 CustomNumberComparator.proto  即 后缀文件类型为proto

hbase自定义过滤器 随笔 第7张hbase自定义过滤器 随笔 第8张

 

5. 用记事本打开CustomNumberComparator.proto文件输入以下内容

 

 

 
hbase自定义过滤器 随笔 第9张
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// This file contains protocol buffers that are used for filters

option java_package = "com.pateo.hbase.defined.comparator";//生成java代码的包名
option java_outer_classname = "MyComparatorProtos";//生成的类名
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

// This file contains protocol buffers that are used for comparators (e.g. in filters)

message CustomNumberComparator {
    required bytes value = 1;     //自定义比较器中需序列化的字段
    required string fieldType = 2;//自定义比较器中需序列化的字段
} 
hbase自定义过滤器 随笔 第10张

 

 

6. 进入命令提示符,使用命令读取CustomNumberComparator.proto的内容生成java代码,即自定义比较器的序列化类

  内容:   protoc.exe -I=C:/proto --java_out=C:/proto C:/proto/CustomNumberComparator.proto

hbase自定义过滤器 随笔 第11张hbase自定义过滤器 随笔 第12张

 

输入后会在指定的/protoc中生成一个文件夹

hbase自定义过滤器 随笔 第13张hbase自定义过滤器 随笔 第14张

得到自定义比较器的序列化类

hbase自定义过滤器 随笔 第15张hbase自定义过滤器 随笔 第16张

7. 将生成的文件夹拷贝到idea编程工具中,注意粘贴的路径为java下

hbase自定义过滤器 随笔 第17张hbase自定义过滤器 随笔 第18张

8. 新建一个自定义过滤器类CustomNumberComparator

hbase自定义过滤器 随笔 第19张hbase自定义过滤器 随笔 第20张

9. CustomNumberComparator继承ByteArrayComparable类,重写方法,代码如下

 

package com.pateo.hbase.defined.comparator;

import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.util.Locale;

/**
 * 自定义比较器:使用方法见 CompareTest
 *
 * @param : fieldType 传递数据格式的类型,支持的数据类型:double
 * @param : data 通过Bytes转换得到的字节数组 使用注意事项 : 使用的时候要注意数据类型的匹配问题
 */
public class CustomNumberComparator extends ByteArrayComparable {
    /**
     * 目前只支持 double类型
     */
    private String fieldType;
    private byte[] data;

    /**
     * Constructor
     *
     * @param value
     * @param fieldType
     */
    public CustomNumberComparator(byte[] value, String fieldType) {
        super(value);
        this.fieldType = fieldType;
        this.fieldType = "String";//只支持
        this.data = value;
    }

    @Override
    // 重写该方法
    public byte[] toByteArray() {

        MyComparatorProtos.CustomNumberComparator.Builder builder = MyComparatorProtos.CustomNumberComparator
                .newBuilder();
        builder.setValue(ByteString.copyFrom(this.data));
        builder.setFieldType(this.fieldType);
        return builder.build().toByteArray();
    }

    // 定义该方法,用于对象反序列化操作
    public static CustomNumberComparator parseFrom(final byte[] bytes)
            throws DeserializationException {
        MyComparatorProtos.CustomNumberComparator proto = null;
        try {
            proto = MyComparatorProtos.CustomNumberComparator.parseFrom(bytes);
        } catch (InvalidProtocolBufferException e) {
            throw new DeserializationException(e);
        }
        return new CustomNumberComparator(proto.getValue().toByteArray(),
                proto.getFieldType());
    }

    // 重写比较方法 里面就可以按照自己的意愿来实现自己的比较器
    @Override
    public int compareTo(byte[] bytes, int offset, int length) {

        if (fieldType.equalsIgnoreCase("String")) {
            String Rowkey = Bytes.toString(bytes, offset, length).toLowerCase(Locale.ROOT);//得到rowkey
            String substring1 = Rowkey.substring(1, 5);
            String substring2 = Rowkey.substring(17, 22);
            String paramValue = byteConvertObj(String.class, this.data);
            String[] split = paramValue.split(",");
            if (substring1.contains(split[0]) && substring2.contains(split[1])) {
                return 0;
            } else {
                return 1;
            }
        }
        return 1;
    }

    private <T> T byteConvertObj(Class<T> clazz, byte[] data) { String clazzName = clazz.getSimpleName(); if (clazzName.equalsIgnoreCase("Integer")) { Integer paramValue; try { paramValue = Bytes.toInt(data); } catch (IllegalArgumentException e) { paramValue = Integer.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Long")) { Long paramValue; try { paramValue = Bytes.toLong(data); } catch (IllegalArgumentException e) { paramValue = Long.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Float")) { Float paramValue; try { paramValue = Bytes.toFloat(data); } catch (IllegalArgumentException e) { paramValue = Float.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Double")) { Double paramValue; try { paramValue = Bytes.toDouble(data); } catch (IllegalArgumentException e) { paramValue = Double.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Short")) { Short paramValue; try { paramValue = Bytes.toShort(data); } catch (IllegalArgumentException e) { paramValue = Short.valueOf(Bytes.toString(data)); } return (T) paramValue; } return (T) Bytes.toString(data); } }
hbase自定义过滤器 随笔 第21张

10.核心内容为compareTo方法的内容,即为过滤的逻辑实现

    @Override
    public int compareTo(byte[] bytes, int offset, int length) {

        if (fieldType.equalsIgnoreCase("String")) {
            //HbaseValue是在Hbase上搜索到的一条数据
            String HbaseValue = Bytes.toString(bytes, offset, length).toLowerCase(Locale.ROOT);
            String substring1 = HbaseValue.substring(1, 5);
            String substring2 = HbaseValue.substring(17, 22);
            String ClientValue = byteConvertObj(String.class, this.data);//客户端传入的过滤内容
            String[] split = ClientValue.split(",");
            
            if (substring1.contains(split[0]) && substring2.contains(split[1])) {//是否需要过滤
                return 0;//选择
            } else {
                return 1;//过滤
            }
        }
        return 1;//过滤
    }

11.将这个项目打成jar架包放入hbase根目录中的lib下

hbase自定义过滤器 随笔 第22张hbase自定义过滤器 随笔 第23张

hbase自定义过滤器 随笔 第24张hbase自定义过滤器 随笔 第25张

 

hbase自定义过滤器 随笔 第26张hbase自定义过滤器 随笔 第27张

 

hbase自定义过滤器 随笔 第28张hbase自定义过滤器 随笔 第29张

 

hbase自定义过滤器 随笔 第30张hbase自定义过滤器 随笔 第31张

选中之后自动打包成jar

hbase自定义过滤器 随笔 第32张hbase自定义过滤器 随笔 第33张

将这个架包发送到hbase的lib目录中,重启hbase

12. 使用自定义类查询结果

代码如下

 

/**
 * 过滤器实现了类介绍
 * 行键过滤 RowFilter
 * 列簇名过滤 FamilyFilter
 * 列过滤 QualifierFilter
 * 值过滤 ValueFilter
 */

public static void main(String[] args) throws IOException {
    long time1 = new Date().getTime();
    //分页查询
    String page = getPage("00003", 2);//分页查询传入起始位置与返回数量
    System.out.println("下一个起始页码" + page);
    long time2 = new Date().getTime();
    long l = time2 - time1;
    System.out.println("[ "+l/1000+" s ]");
}

/**
 *
 * @param lastRowkey  起始行键
 * @param page 页码
 * @return 下一次查询的起始行键
 * @throws IOException
 */

public static String getPage(String lastRowkey, Integer page) throws IOException {
    //配置参数
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "mini1");
    conf.set("hbase.zookeeper.property.clientPort", "2181");
    conf.set("hbase.master", "mini1:6000");

    //创建查询类
    Scan scan = new Scan();
    //定义过滤器
    Filter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator("999".getBytes()));//前缀查询
    Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new CustomNumberComparator("99,034".getBytes(),"String"));//包含查询
    //过滤器组合
    FilterList filterlist = new FilterList();
    //filterlist.addFilter(filter1);
    //filterlist.addFilter(filter1);
    filterlist.addFilter(filter);

    //组合查询
    scan.setFilter(filterlist);
    //scan.setStartRow(lastRowkey.getBytes());//分页查询使用的key为下一个其实位置,需要添加0
    long time1 = new Date().getTime();
    HTable table = new HTable(conf, "table");
    long time2 = new Date().getTime();
    System.out.println("创建 HTable:"+(time2-time1)/1000.0+"s");
    ResultScanner scanner = table.getScanner(scan);//获取所有结果集
    long time3 = new Date().getTime();
    System.out.println("获取ResultScanner :"+(time3-time1)/1000.0+"s");
    String format = "  %-40s%-14s%-35s%-10s";//输出格式
    System.out.println(String.format("%-40s%-14s", "ROW", "COLUMN+CELL"));//格式化输出
    Long index = 0L;
    for (Result res : scanner) {//获取一行数据
        index++;
        for (Cell cell : res.listCells()) {//获取各个列的值
            String row = Bytes.toString(CellUtil.cloneRow(cell));//行键
            String value = "value=" + Bytes.toString(CellUtil.cloneValue(cell));//值
            String family = Bytes.toString(CellUtil.cloneFamily(cell));//列簇
            String col = Bytes.toString(CellUtil.cloneQualifier(cell));//列名
            String column = "column=" + family + ":" + col;//列簇与列
            String timestamp = "timestamp=" + cell.getTimestamp();//时间戳
            System.out.println(String.format(format, row, column, timestamp, value));//格式化输出
        }
        //lastRowkey = Bytes.toString(res.getRow()) + "0";//给下一次查询起始rowkey位置赋值
    }
    long time4 = new Date().getTime();
    System.out.println(String.format("一共 %d 条数据",index));
    System.out.println("输出耗时 :"+(time4-time3)/1000.0+"s");
    //防止程序异常,这里需要try-catch关闭连接
    scanner.close();
    table.close();

    //返回下一次查询的起始行键,用于翻页

    return lastRowkey;
}
hbase自定义过滤器 随笔 第34张

 

  hbase通过网关访问即thrift2      <hbase_home>/bin/hbase thrift2/thrift start    启动

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄