1、配置hbase-site.xml指向hdfs

<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://bigdata-senior01.home.com:9000/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>hdfs://bigdata-senior01.home.com:9000/hbase/zookeeper</value>
  </property>
  <property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
    <description>
      Controls whether HBase will check for stream capabilities (hflush/hsync).

      Disable this if you intend to run on LocalFileSystem, denoted by a rootdir
      with the 'file://' scheme, but be mindful of the NOTE below.

      WARNING: Setting this to false blinds you to potential data loss and
      inconsistent system state in the event of process and/or node failures. If
      HBase is complaining of an inability to use hsync or hflush it's most
      likely not a false positive.
    </description>
  </property>
</configuration>

2、依赖

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.4</version>
        </dependency>

3、mapper

//输入:文本方式,输出:字节作为键,hbase的Mutation作为输出值
public class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Mutation> {
    //计数器
    public enum Counters {
        LINES
    }

    private byte[] family = null;
    private byte[] qualifier = null;

    /**
     * Called once at the beginning of the task.
     *
     * @param context
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //从配置文件中读取列族信息,这个信息是控制台方式写入,并通过cli获取
        String column = context.getConfiguration().get("conf.column");
        ColParser parser = new ColParser();
        parser.parse(column);
        if(!parser.isValid()) throw new IOException("family or qualifier error");
        family = parser.getFamily();
        qualifier = parser.getQualifier();
    }

    /**
     * Called once for each key/value pair in the input split. Most applications
     * should override this, but the default is the identity function.
     *
     * @param key
     * @param value
     * @param context
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        try {
            String line = value.toString();
            //散列每行数据作为行键,根据需求调整
            byte[] rowKey = DigestUtils.md5(line);
            Put put = new Put(rowKey);
            put.addColumn(this.family,this.qualifier,Bytes.toBytes(line));
            context.write(new ImmutableBytesWritable(rowKey),put);
            context.getCounter(Counters.LINES).increment(1);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    
    class ColParser {
        private byte[] family;
        private byte[] qualifier;
        private boolean valid;

        public byte[] getFamily() {
            return family;
        }

        public byte[] getQualifier() {
            return qualifier;
        }

        public boolean isValid() {
            return valid;
        }

        public void parse(String value) {
            try {
                String[] sValue = value.split(":");
                if (sValue == null || sValue.length < 2 || sValue[0].isEmpty() || sValue[1].isEmpty()) {
                    valid = false;
                    return;
                }

                family = Bytes.toBytes(sValue[0]);
                qualifier = Bytes.toBytes(sValue[1]);
                valid = true;
            } catch (Exception e) {
                valid = false;
            }
        }


    }
}

4、main

public class ImportFromFile {
//    private static String HDFSUri = "hdfs://bigdata-senior01.home.com:9000";
    public static final String NAME = "ImportFromFile";

    private static CommandLine parseArgs(String[] args) throws ParseException{
        Options options = new Options();

        Option option = new Option("t","table",true,"表不能为空");
        option.setArgName("table-name");
        option.setRequired(true);
        options.addOption(option);

        option = new Option("c","column",true,"列族和列名不能为空");
        option.setArgName("family:qualifier");
        option.setRequired(true);
        options.addOption(option);

        option = new Option("i","input",true,"输入文件或者目录");
        option.setArgName("path-in-HDFS");
        option.setRequired(true);
        options.addOption(option);

        options.addOption("d","debug",false,"switch on DEBUG log level");
        CommandLineParser parser = new PosixParser();
        CommandLine cmd = null;
        try {
            cmd = parser.parse(options,args);
        }catch (Exception e){
            System.err.println("ERROR: " + e.getMessage() + "\n");
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp(NAME + " ", options, true);
            System.exit(-1);
        }
        if (cmd.hasOption("d")) {
            Logger log = Logger.getLogger("mapreduce");
            log.setLevel(Level.DEBUG);
        }

        return cmd;
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();

        String[] runArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        CommandLine cmd = parseArgs(runArgs);
        if (cmd.hasOption("d")) conf.set("conf.debug", "true");

        String table = cmd.getOptionValue("t");
        String input = cmd.getOptionValue("i");
        String column = cmd.getOptionValue("c");
        //写入配置后,在mapper阶段取出
        conf.set("conf.column", column);

        Job job = Job.getInstance(conf,"Import from file " + input +" into table " + table);
        job.setJarByClass(ImportFromFile.class);
        job.setMapperClass(ImportMapper.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);
        job.setNumReduceTasks(0); //不需要reduce

        FileInputFormat.addInputPath(job,new Path(input));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

 

5、执行

先在HBASE里建表
create 'importTable','data'

把jar包传到hdfs上执行
hadoop jar ImportFromFile.jar -t importTable -i /input/test-data.txt -c data:json 

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄