Storm消费kafka写入hdfs

/ Storm / 没有评论 / 147浏览

介绍Storm消费kafka数据,写入hdfs这一场景。

说明

Strom基于HDP2.6.3,1.1.1版本

消费kafka数据

先介绍如下消费kakfa数据

依赖

这里使用storm-kafka-client,而不是之前老旧的storm-kafka

<!-- Storm Core,集群上有这个依赖 -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <scope>provided</scope>
</dependency>

<!-- Storm集成Kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>${storm.version}</version>
</dependency>

消费kakfa

Topology

使用KafkaSpout,接收kakfa数据

public class MyKafkaTopology {

    /**
     * kafka bootstrap servers
     */
    private static final String BOOTSTRAP_SERVERS = "crpprdap01:6667,crpprdap02:6667,crpprdap03:6667";

    /**
     * kafka topic
     */
    private static final String TOPIC = "order";

    /**
     * kafka partition num
     */
    private static final int PARTITION_NUM = 3;

    public static void main(String[] args) throws Exception {
        new MyKafkaTopology().run();
    }

    public void run() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout<>(getKafkaSpoutConfig()), PARTITION_NUM);
        builder.setBolt("KafkaBolt", new MyKafkaBolt(), PARTITION_NUM).shuffleGrouping("KafkaSpout");
        StormSubmitter.submitTopology("MyKafkaTopology", getConf(), builder.createTopology());
    }

    private KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                // 每次重启后消费组都不同,方便从头消费
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myKafkaTopologyTestGroup_" + System.nanoTime())
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2_000)
                .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class)
                // record转换
                .setRecordTranslator(record -> new Values(record.value()), new Fields("value"))
                // 重试策略
                .setRetry(newRetryService())
                // 3s提交一次offset
                // .setOffsetCommitPeriodMs(3_000)
                // 如果未提交offset信息,则从头开始消费;否则接着offset开始消费
                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
                // 最多允许1w个offset未提交
                // .setMaxUncommittedOffsets(10_000)
                .build();
    }

    private KafkaSpoutRetryService newRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(
                new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
        );
    }

    private Config getConf() {
        Config conf = new Config();
        conf.setDebug(false);
        return conf;
    }
}

Bolt

这里就简单的打印下

/**
 * BaseBasicBolt自带ack机制,execute方法结束会自动调用ack
 *
 * @author 奔波儿灞
 * @since 1.0
 */
public class MyKafkaBolt extends BaseBasicBolt {

    private static final Logger LOG = LoggerFactory.getLogger(MyKafkaBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String value = tuple.getStringByField("value");
        LOG.info("tuple => {}", value);
        collector.emit(new Values());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields());
    }
}

数据写入HDFS

依赖

<!-- Storm集成HDFS -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hdfs</artifactId>
    <version>${storm.version}</version>
</dependency>

写入HDFS

Topology

主要是KafkaSpout -> FieldBolt -> HdfsBolt

public class KafkaToHdfsTopology {

    /**
     * kafka bootstrap servers
     */
    private static final String BOOTSTRAP_SERVERS = "crpprdap01:6667,crpprdap02:6667,crpprdap03:6667";

    /**
     * kafka topic
     */
    private static final String TOPIC = "cmcc";

    /**
     * kafka partition num
     */
    private static final int PARTITION_NUM = 3;

    /**
     * 文件路径
     */
    private static final String FILE_SYSTEM_PATH = "/test/storm-programming/kafkaToHdfsTopology/";

    /**
     * 文件扩展名
     */
    private static final String FILE_EXTENSION = ".txt";

    /**
     * hdfs url
     */
    private static final String FS_URL = "hdfs://crpprdap01:8020";

    /**
     * 字段分隔符
     */
    private static final String FIELD_DELIMITER = "\000";

    public static void main(String[] args) throws Exception {
        new KafkaToHdfsTopology().run();
    }

    public void run() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", new KafkaSpout<>(getKafkaSpoutConfig()), PARTITION_NUM);
        builder.setBolt("extractFieldBolt", new CmccFieldBolt(), 16).localOrShuffleGrouping("kafkaSpout");
        builder.setBolt("hdfsBolt", getHdfsBolt(), 16).localOrShuffleGrouping("extractFieldBolt");
        StormSubmitter.submitTopology("KafkaToHdfsTopology", getConf(), builder.createTopology());
    }

    private KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                // 每次重启后消费组都不同,方便从头消费
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myKafkaTopologyTestGroup_" + System.nanoTime())
                .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class)
                // record转换
                .setRecordTranslator(record -> new Values(record.value()), new Fields("value"))
                // 重试策略
                .setRetry(newRetryService())
                // 3s提交一次offset
                // .setOffsetCommitPeriodMs(3_000)
                // 如果未提交offset信息,则从头开始消费;否则接着offset开始消费
                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
                // 最多允许1w个offset未提交
                // .setMaxUncommittedOffsets(10_000)
                .build();
    }

    private KafkaSpoutRetryService newRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(
                new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
        );
    }

    private HdfsBolt getHdfsBolt() {
        // 1k个tuple后同步一次文件系统
        SyncPolicy syncPolicy = new CountSyncPolicy(200);

        // 128MB滚动一个文件
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(128, FileSizeRotationPolicy.Units.MB);

        // 文件名
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath(FILE_SYSTEM_PATH)
                .withExtension(FILE_EXTENSION);

        // 字段分隔符
        RecordFormat recordFormat = new DelimitedRecordFormat()
                .withFieldDelimiter(FIELD_DELIMITER);

        return new HdfsBolt()
                .withConfigKey("hdfs.config")
                .withFsUrl(FS_URL)
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(recordFormat)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);
    }

    private Config getConf() {
        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(16);
        conf.setNumAckers(4);
        return conf;
    }

}

Bolt

提取下字段信息

public class CmccFieldBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        JSONObject cmcc = JSONObject.parseObject(tuple.getStringByField("value"));
        collector.emit(new Values(
                cmcc.get("AE_HEADER_ID"),
                cmcc.get("APPLICATION_ID"),
                cmcc.get("LEDGER_ID"),
                cmcc.get("ENTITY_ID"),
                cmcc.get("EVENT_ID"),
                cmcc.get("EVENT_TYPE_CODE"),
                cmcc.get("ACCOUNTING_DATE"),
                cmcc.get("GL_TRANSFER_STATUS_CODE"),
                cmcc.get("GL_TRANSFER_DATE"),
                cmcc.get("JE_CATEGORY_NAME"),
                cmcc.get("ACCOUNTING_ENTRY_STATUS_CODE"),
                cmcc.get("ACCOUNTING_ENTRY_TYPE_CODE"),
                cmcc.get("AMB_CONTEXT_CODE"),
                cmcc.get("PRODUCT_RULE_TYPE_CODE"),
                cmcc.get("PRODUCT_RULE_CODE"),
                cmcc.get("PRODUCT_RULE_VERSION"),
                cmcc.get("DESCRIPTION"),
                cmcc.get("DOC_SEQUENCE_ID"),
                cmcc.get("DOC_SEQUENCE_VALUE"),
                cmcc.get("ACCOUNTING_BATCH_ID"),
                cmcc.get("COMPLETION_ACCT_SEQ_VERSION_ID"),
                cmcc.get("CLOSE_ACCT_SEQ_VERSION_ID"),
                cmcc.get("COMPLETION_ACCT_SEQ_VALUE"),
                cmcc.get("CLOSE_ACCT_SEQ_VALUE"),
                cmcc.get("BUDGET_VERSION_ID"),
                cmcc.get("FUNDS_STATUS_CODE"),
                cmcc.get("ENCUMBRANCE_TYPE_ID"),
                cmcc.get("BALANCE_TYPE_CODE"),
                cmcc.get("REFERENCE_DATE"),
                cmcc.get("COMPLETED_DATE"),
                cmcc.get("PERIOD_NAME"),
                cmcc.get("PACKET_ID"),
                cmcc.get("COMPLETION_ACCT_SEQ_ASSIGN_ID"),
                cmcc.get("CLOSE_ACCT_SEQ_ASSIGN_ID"),
                cmcc.get("DOC_CATEGORY_CODE"),
                cmcc.get("ATTRIBUTE_CATEGORY"),
                cmcc.get("ATTRIBUTE1"),
                cmcc.get("ATTRIBUTE2"),
                cmcc.get("ATTRIBUTE3"),
                cmcc.get("ATTRIBUTE4"),
                cmcc.get("ATTRIBUTE5"),
                cmcc.get("ATTRIBUTE6"),
                cmcc.get("ATTRIBUTE7"),
                cmcc.get("ATTRIBUTE8"),
                cmcc.get("ATTRIBUTE9"),
                cmcc.get("ATTRIBUTE10"),
                cmcc.get("ATTRIBUTE11"),
                cmcc.get("ATTRIBUTE12"),
                cmcc.get("ATTRIBUTE13"),
                cmcc.get("ATTRIBUTE14"),
                cmcc.get("ATTRIBUTE15"),
                cmcc.get("GROUP_ID"),
                cmcc.get("DOC_SEQUENCE_VERSION_ID"),
                cmcc.get("DOC_SEQUENCE_ASSIGN_ID"),
                cmcc.get("CREATION_DATE"),
                cmcc.get("CREATED_BY"),
                cmcc.get("LAST_UPDATE_DATE"),
                cmcc.get("LAST_UPDATED_BY"),
                cmcc.get("LAST_UPDATE_LOGIN"),
                cmcc.get("PROGRAM_UPDATE_DATE"),
                cmcc.get("PROGRAM_APPLICATION_ID"),
                cmcc.get("PROGRAM_ID"),
                cmcc.get("REQUEST_ID"),
                cmcc.get("UPG_BATCH_ID"),
                cmcc.get("UPG_SOURCE_APPLICATION_ID"),
                cmcc.get("UPG_VALID_FLAG"),
                cmcc.get("ZERO_AMOUNT_FLAG"),
                cmcc.get("PARENT_AE_HEADER_ID"),
                cmcc.get("PARENT_AE_LINE_NUM"),
                cmcc.get("ACCRUAL_REVERSAL_FLAG"),
                cmcc.get("MERGE_EVENT_ID"),
                cmcc.get("NEED_BAL_FLAG")
        ));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(
                "AE_HEADER_ID",
                "APPLICATION_ID",
                "LEDGER_ID",
                "ENTITY_ID",
                "EVENT_ID",
                "EVENT_TYPE_CODE",
                "ACCOUNTING_DATE",
                "GL_TRANSFER_STATUS_CODE",
                "GL_TRANSFER_DATE",
                "JE_CATEGORY_NAME",
                "ACCOUNTING_ENTRY_STATUS_CODE",
                "ACCOUNTING_ENTRY_TYPE_CODE",
                "AMB_CONTEXT_CODE",
                "PRODUCT_RULE_TYPE_CODE",
                "PRODUCT_RULE_CODE",
                "PRODUCT_RULE_VERSION",
                "DESCRIPTION",
                "DOC_SEQUENCE_ID",
                "DOC_SEQUENCE_VALUE",
                "ACCOUNTING_BATCH_ID",
                "COMPLETION_ACCT_SEQ_VERSION_ID",
                "CLOSE_ACCT_SEQ_VERSION_ID",
                "COMPLETION_ACCT_SEQ_VALUE",
                "CLOSE_ACCT_SEQ_VALUE",
                "BUDGET_VERSION_ID",
                "FUNDS_STATUS_CODE",
                "ENCUMBRANCE_TYPE_ID",
                "BALANCE_TYPE_CODE",
                "REFERENCE_DATE",
                "COMPLETED_DATE",
                "PERIOD_NAME",
                "PACKET_ID",
                "COMPLETION_ACCT_SEQ_ASSIGN_ID",
                "CLOSE_ACCT_SEQ_ASSIGN_ID",
                "DOC_CATEGORY_CODE",
                "ATTRIBUTE_CATEGORY",
                "ATTRIBUTE1",
                "ATTRIBUTE2",
                "ATTRIBUTE3",
                "ATTRIBUTE4",
                "ATTRIBUTE5",
                "ATTRIBUTE6",
                "ATTRIBUTE7",
                "ATTRIBUTE8",
                "ATTRIBUTE9",
                "ATTRIBUTE10",
                "ATTRIBUTE11",
                "ATTRIBUTE12",
                "ATTRIBUTE13",
                "ATTRIBUTE14",
                "ATTRIBUTE15",
                "GROUP_ID",
                "DOC_SEQUENCE_VERSION_ID",
                "DOC_SEQUENCE_ASSIGN_ID",
                "CREATION_DATE",
                "CREATED_BY",
                "LAST_UPDATE_DATE",
                "LAST_UPDATED_BY",
                "LAST_UPDATE_LOGIN",
                "PROGRAM_UPDATE_DATE",
                "PROGRAM_APPLICATION_ID",
                "PROGRAM_ID",
                "REQUEST_ID",
                "UPG_BATCH_ID",
                "UPG_SOURCE_APPLICATION_ID",
                "UPG_VALID_FLAG",
                "ZERO_AMOUNT_FLAG",
                "PARENT_AE_HEADER_ID",
                "PARENT_AE_LINE_NUM",
                "ACCRUAL_REVERSAL_FLAG",
                "MERGE_EVENT_ID",
                "NEED_BAL_FLAG"
        ));
    }

}

不足

上面这样就简单的将数据保存在hdfs中了,但是有些不足。例如,不能压缩、不能根据某个字段分区(即根据时间字段保存在不同的目录中)

自定义路径

这里介绍如何根据时间字段去分区,例如根据时间字段(lastUpdateDate,yyyy/MM/dd HH🇲🇲ss),保存在目录:/prefix/yyyy/MM/dd/xxxx

查看AbstractHdfsBolt的源码可以发现,Partitioner可以用来根据字段设置不同的path,其默认实现为NullPartitioner

自定义PathPartitioner,实现getPartitionPath()方法即可。

public class PathPartitioner implements Partitioner {

    /**
     * 时间分区字段
     */
    private final String partField;

    public PathPartitioner(String partField) {
        this.partField = partField;
    }

    /**
     * 时间提取正则,2018/1/27 17:47:31 => 2018/1/27
     */
    private static final Pattern PATTERN = Pattern.compile("(\\d+/\\d+/\\d+)");

    @Override
    public String getPartitionPath(Tuple tuple) {
        String value = tuple.getStringByField(partField);
        return "/" + getPart(value);
    }

    private String getPart(String date) {
        // String date = "2018/1/27 17:47:31";
        Matcher matcher = PATTERN.matcher(date);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return "no";
    }
}

最后设置:

new HdfsBolt()
    .withConfigKey("hdfs.config")
    .withFsUrl(FS_URL)
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(recordFormat)
    .withRotationPolicy(rotationPolicy)
    // 分区
    .withPartitioner(new PathPartitioner("LAST_UPDATE_DATE"))
    .withSyncPolicy(syncPolicy);

完整代码如下:

public class KafkaToHdfsPartitionerTopology {

    /**
     * kafka bootstrap servers
     */
    private static final String BOOTSTRAP_SERVERS = "crpprdap01:6667,crpprdap02:6667,crpprdap03:6667";

    /**
     * kafka topic
     */
    private static final String TOPIC = "cmcc";

    /**
     * kafka partition num
     */
    private static final int PARTITION_NUM = 3;

    /**
     * 文件路径
     */
    private static final String FILE_SYSTEM_PATH = "/test/storm-programming/kafkaToHdfsPartitionerTopology/";

    /**
     * 文件扩展名
     */
    private static final String FILE_EXTENSION = ".txt";

    /**
     * hdfs url
     */
    private static final String FS_URL = "hdfs://crpprdap01:8020";

    /**
     * 字段分隔符
     */
    private static final String FIELD_DELIMITER = "\000";

    public static void main(String[] args) throws Exception {
        new KafkaToHdfsPartitionerTopology().run();
    }

    public void run() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", new KafkaSpout<>(getKafkaSpoutConfig()), PARTITION_NUM);
        builder.setBolt("extractFieldBolt", new CmccFieldBolt(), 16).localOrShuffleGrouping("kafkaSpout");
        builder.setBolt("hdfsBolt", getHdfsBolt(), 16).localOrShuffleGrouping("extractFieldBolt");
        StormSubmitter.submitTopology("KafkaToHdfsPartitionerTopology", getConf(), builder.createTopology());
    }

    private KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                // 每次重启后消费组都不同,方便从头消费
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myKafkaTopologyTestGroup_" + System.nanoTime())
                .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class)
                // record转换
                .setRecordTranslator(record -> new Values(record.value()), new Fields("value"))
                // 重试策略
                .setRetry(newRetryService())
                // 3s提交一次offset
                // .setOffsetCommitPeriodMs(3_000)
                // 如果未提交offset信息,则从头开始消费;否则接着offset开始消费
                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
                // 最多允许1w个offset未提交
                // .setMaxUncommittedOffsets(10_000)
                .build();
    }

    private KafkaSpoutRetryService newRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(
                new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
        );
    }

    private HdfsBolt getHdfsBolt() {
        // 1k个tuple后同步一次文件系统
        SyncPolicy syncPolicy = new CountSyncPolicy(200);

        // 128MB滚动一个文件
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(128, FileSizeRotationPolicy.Units.MB);

        // 文件名
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath(FILE_SYSTEM_PATH)
                .withExtension(FILE_EXTENSION);

        // 字段分隔符
        RecordFormat recordFormat = new DelimitedRecordFormat()
                .withFieldDelimiter(FIELD_DELIMITER);

        return new HdfsBolt()
                .withConfigKey("hdfs.config")
                .withFsUrl(FS_URL)
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(recordFormat)
                .withRotationPolicy(rotationPolicy)
                // 分区
                .withPartitioner(new PathPartitioner("LAST_UPDATE_DATE"))
                .withSyncPolicy(syncPolicy);
    }

    private Config getConf() {
        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(16);
        conf.setNumAckers(4);
        return conf;
    }

}

思考,HdfsBolt是如何实现分区写入的?

可以发现AbstractHdfsBolt有个writers属性,使用map保存不同路径对应的不AbstractHDFSWriter

protected Map<String, AbstractHDFSWriter> writers;

再思考,如何实现AbstractHDFSWriter的正常关闭呢?随着时间的推移,writers应该越来越大。

继续看源码发现writers的实现类为WritersMap,继承了LinkedHashMap,当writers大小超过maxWriters后移除掉旧的Writer,以便资源回收。

static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
    final long maxWriters;

    public WritersMap(long maxWriters) {
        super((int)maxWriters, 0.75f, true);
        this.maxWriters = maxWriters;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> eldest) {
        return this.size() > this.maxWriters;
    }
}

继续看下去,很容易发现为maxWriters为50。

writers = new WritersMap(this.maxOpenFiles);
protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
private static final Integer DEFAULT_MAX_OPEN_FILES = 50;

这里要注意的是,如果bolt的并行度较大,那么需要将this.maxOpenFiles相应的调大一点,防止writers被清除还是使用的Writer。