HBase数据同步到Elasticsearch

/ Hadoop / 没有评论 / 87浏览

介绍将HBase数据同步到Elasticsearch。

思路

HBase数据对应Elasticsearch的文档。

依赖

主要使用hbase-clientelasticsearch-rest-high-level-client

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>hbase-elasticsearch-java</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
    </properties>

    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.6.1</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <!-- This plugin adds all dependencies to JAR file during 'package' command -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.example.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

代码

主要通过HBase api的Scan对象遍历数据,存入Elasticsearch。只用了单线程,异步写入Elasticsearch。

测试下来,50个字段的宽表同步到Elasticsearch,约7k/s的同步速度。

package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * hbase同步到elasticsearch
 *
 * @author 奔波儿灞
 * @since 1.0
 */
public class App {

    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) {
        LOG.info("begin...");
        long start = System.currentTimeMillis();

        // hbase配置
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "crpprdap02,crpprdap03,crpprdap04");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("zookeeper.znode.parent", "/hbase-unsecure");

        // es配置
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );

        Connection connection = null;
        Table table = null;
        ResultScanner resultScanner = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            table = connection.getTable(TableName.valueOf("ebs:outer_xla_balance_plus_desc"));
            Scan scan = new Scan();
            resultScanner = table.getScanner(scan);
            // 遍历写入redis
            for (Result result : resultScanner) {
                String rowKey = Bytes.toString(result.getRow());
                Map<String, String> source = new HashMap<>();
                for (Cell cell : result.rawCells()) {
                    // hbase数据放入es的source中
                    String family = Bytes.toString(CellUtil.cloneFamily(cell));
                    String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    source.put(family + ":" + qualifier, value);
                }
                // 异步写入es
                IndexRequest request = new IndexRequest("dw", "xla_balance_plus_desc", rowKey)
                        .source(source);
                client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {

                    @Override
                    public void onResponse(IndexResponse indexResponse) {
                        // allows to retrieve information about the executed operation
                    }

                    @Override
                    public void onFailure(Exception e) {
                        LOG.error("index error", e);
                    }

                });
            }
        } catch (IOException e) {
            LOG.error("scan error", e);
        } finally {
            // 关闭hbase资源
            if (resultScanner != null) {
                resultScanner.close();
            }
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    LOG.error("close table error", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    LOG.error("close connection error", e);
                }
            }
        }

        // 关闭es资源
        try {
            client.close();
        } catch (IOException e) {
            LOG.error("close client error", e);
        }

        long end = System.currentTimeMillis();
        LOG.info("end!");
        LOG.info("cost: {}s", (end - start) / 1000);
    }

}