HBase数据同步到Redis

/ Hadoop / 没有评论 / 201浏览

介绍将HBase数据同步到Redis。

思路

HBase的一行数据对应Redis的Map结构。

依赖

主要使用hbase-clientettuce-core

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hand.sync</groupId>
    <artifactId>hbase-redis-java</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
    </properties>

    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.4.8</version>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- This plugin adds all dependencies to JAR file during 'package' command -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                           <mainClass>com.example.sync.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

代码

主要通过HBase api的Scan对象遍历数据,存入Redis。只用了单线程,同步写入Redis。

测试下来,50个字段的宽表同步到Redis,约1w/s的同步速度。

package com.example.sync;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * hbase同步到redis
 *
 * @author 奔波儿灞
 * @since 1.0
 */
public class App {

    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) {
        LOG.info("begin...");
        long start = System.currentTimeMillis();

        // hbase配置
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "crpprdap02,crpprdap03,crpprdap04");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("zookeeper.znode.parent", "/hbase-unsecure");

        // redis配置
        RedisClient redisClient = RedisClient.create("redis://crpprdtd13:15021");
        StatefulRedisConnection<String, String> connect = redisClient.connect();
        RedisCommands<String, String> redisCommands = connect.sync();

        Connection connection = null;
        Table table = null;
        ResultScanner resultScanner = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            table = connection.getTable(TableName.valueOf("ebs:outer_xla_balance_plus_desc"));
            Scan scan = new Scan();
            resultScanner = table.getScanner(scan);
            // 遍历写入redis
            for (Result result : resultScanner) {
                String rowKey = Bytes.toString(result.getRow());
                Map<String, String> row = new HashMap<>();
                for (Cell cell : result.rawCells()) {
                    // hbase数据放入redis的map中
                    String family = Bytes.toString(CellUtil.cloneFamily(cell));
                    String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    row.put(family + ":" + qualifier, value);
                }
                redisCommands.hmset(rowKey, row);
            }
        } catch (IOException e) {
            LOG.error("scan error", e);
        } finally {
            // 关闭hbase资源
            if (resultScanner != null) {
                resultScanner.close();
            }
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    LOG.error("close table error", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    LOG.error("close connection error", e);
                }
            }
        }

        // 关闭redis资源
        connect.close();
        redisClient.shutdown();

        long end = System.currentTimeMillis();
        LOG.info("end!");
        LOG.info("cost: {}s", (end - start) / 1000);
    }

}

内存占用

8717671(870w)条数据,51个字段,Redis占用内存约27GB。