Storm使用RotationAction

/ Storm / 没有评论 / 58浏览

File Rotation Actions   Both the HDFS bolt and Trident State implementation allow you to register any number of RotationActions.   What RotationActions do is provide a hook to allow you to perform some action right after a file is rotated. For example, moving a file to a different location or renaming it.

大概意思就是,写完hdfs文件之后会调用这个注册的方法,可以做个善后工作,比如移动个文件夹啥的。

使用

new HdfsBolt()
    .withConfigKey("hdfs.config")
    .withFsUrl(FS_URL)
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(recordFormat)
    .withRotationPolicy(rotationPolicy)
    .withSyncPolicy(syncPolicy)
    .addRotationAction(new MoveFileAction().toDestination(FILE_SYSTEM_PATH_S))

官方默认的MoveFileAction,会在滚动一个文件后,移动文件到新的目录。

但是,如果父级目录不存在,则会报错。

MoveFileRotationAction

类似官方的MoveFileAction,加了创建父级目录。

package com.example.storm.kafka.hdfs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.MoveFileAction;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * 移动文件
 *
 * @author 奔波儿灞
 * @since 1.0
 */
public class MoveFileRotationAction implements RotationAction {

    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);

    private final String fsUrl;

    /**
     * 原始文件父级目录
     */
    private final String source;

    /**
     * 目标文件父级目录
     */
    private final String destination;

    public MoveFileRotationAction(String fsUrl, String source, String destination) {
        this.fsUrl = fsUrl;
        this.source = source;
        this.destination = destination;
    }

    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        // 原始文件路径 - 原始文件父级目录 = 后面的子路径
        String child = filePath.toString().substring(fsUrl.length() + source.length());
        // 目标文件父级目录 + 后面的子路径
        Path destPath = new Path(destination, child);
        LOG.info("Create if not exist desPath {}", destPath);
        // 创建父级目录
        fileSystem.mkdirs(destPath.getParent());
        LOG.info("Moving file {} to {}", filePath, destPath);
        // 移动文件
        fileSystem.rename(filePath, destPath);
    }
}