Storm写HDFS文件滚动策略

/ Storm / 没有评论 / 56浏览

介绍storm写hdfs文件默认的两种滚动策略,以及自定义策略。

滚动策略

滚动例子

通过withRotationPolicy指定文件滚动策略:

return new HdfsBolt()
    .withConfigKey("hdfs.config")
    .withFsUrl(FS_URL)
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(recordFormat)
    // 滚动策略
    .withRotationPolicy(rotationPolicy)
    // 分区
    .withPartitioner(new PathPartitioner("timestamp"))
    .withSyncPolicy(syncPolicy)
    .addRotationAction(new MoveFileRotationAction(FILE_SYSTEM_PATH, FILE_SYSTEM_PATH_S));

可以看到需要实现FileRotationPolicy接口,默认的三种实现为:

文件大小滚动

比如写入128MB,滚动一个新的文件,依次类推。

FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(128, FileSizeRotationPolicy.Units.MB);

...

return new HdfsBolt()
    // 滚动策略
    .withRotationPolicy(rotationPolicy)
    ...;

时间滚动

比如每10分钟滚动一个新的文件,依次类推。

FileRotationPolicy rotationPolicy = new TimedRotationPolicy(10, TimedRotationPolicy.TimeUnit.MINUTES);

...

return new HdfsBolt()
    // 滚动策略
    .withRotationPolicy(rotationPolicy)
    ...;

自定滚动策略

在项目上,最开始使用文件大小滚动,后面发现有些文件永远写不满。后改成按照时间滚动,又发现不在高峰期时,数据量太小,生成了太多的小文件。

于是,有没有方法能够全都要先根据文件大小滚动,如果一定时间没有写满,那么也滚动一个新文件。

这样,我们可以满足这样的场景:120MB滚动一个文件,如果写不满,1个小时后也滚动。

思路

最简单的实现方案是从FileRotationPolicy接口入手,来自定义。先从FileSizeRotationPolicy着手,它内部不断累加文件大小,到达限制返回true标志(mark方法)。

/**
 * File rotation policy that will rotate files when a certain
 * file size is reached.
 *
 * For example:
 * <pre>
 *     // rotate when files reach 5MB
 *     FileSizeRotationPolicy policy =
 *          new FileSizeRotationPolicy(5.0, Units.MB);
 * </pre>
 *
 */
public class FileSizeRotationPolicy implements FileRotationPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);

    public static enum Units {

        KB((long)Math.pow(2, 10)),
        MB((long)Math.pow(2, 20)),
        GB((long)Math.pow(2, 30)),
        TB((long)Math.pow(2, 40));

        private long byteCount;

        private Units(long byteCount){
            this.byteCount = byteCount;
        }

        public long getByteCount(){
            return byteCount;
        }
    }

    private long maxBytes;

    private long lastOffset = 0;
    private long currentBytesWritten = 0;

    public FileSizeRotationPolicy(float count, Units units){
        this.maxBytes = (long)(count * units.getByteCount());
    }

    protected FileSizeRotationPolicy(long maxBytes) {
        this.maxBytes = maxBytes;
    }

    @Override
    public boolean mark(Tuple tuple, long offset) {
        long diff = offset - this.lastOffset;
        this.currentBytesWritten += diff;
        this.lastOffset = offset;
        return this.currentBytesWritten >= this.maxBytes;
    }

    @Override
    public void reset() {
        this.currentBytesWritten = 0;
        this.lastOffset = 0;
    }

    @Override
    public FileRotationPolicy copy() {
        return new FileSizeRotationPolicy(this.maxBytes);
    }
}

那么,我们可以基于它加一个时间标志,如果文件大小没有达到,但是当前间隔达到限制,那么返回true。

FileSizeAndTimedRotationPolicy

/**
 * 根据文件大小、时间滚动文件
 * 文件大小未达到,时间到了也滚动。
 *
 * @author 奔波儿灞
 * @since 1.0
 */
public class FileSizeAndTimedRotationPolicy implements FileRotationPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(FileSizeAndTimedRotationPolicy.class);

    public static enum SizeUnit {

        KB((long)Math.pow(2, 10)),
        MB((long)Math.pow(2, 20)),
        GB((long)Math.pow(2, 30)),
        TB((long)Math.pow(2, 40));

        private long byteCount;

        private SizeUnit(long byteCount){
            this.byteCount = byteCount;
        }

        public long getByteCount(){
            return byteCount;
        }
    }

    public static enum TimeUnit {

        SECONDS((long)1000),
        MINUTES((long)1000*60),
        HOURS((long)1000*60*60),
        DAYS((long)1000*60*60*24);

        private long milliSeconds;

        private TimeUnit(long milliSeconds){
            this.milliSeconds = milliSeconds;
        }

        public long getMilliSeconds(){
            return milliSeconds;
        }
    }

    /**
     * 滚动文件最大字节限制
     */
    private final long maxBytes;

    private long lastOffset = 0;
    private long currentBytesWritten = 0;

    /**
     * 滚动文件间隔
     */
    private final long interval;

    /**
     * 记录开始时间撮
     */
    private long startTimestamp = System.currentTimeMillis();

    public FileSizeAndTimedRotationPolicy(float count, FileSizeAndTimedRotationPolicy.SizeUnit sizeUnit,
                                          long interval, FileSizeAndTimedRotationPolicy.TimeUnit timeUnit){
        this.maxBytes = (long)(count * sizeUnit.getByteCount());
        this.interval = interval * timeUnit.getMilliSeconds();
    }

    protected FileSizeAndTimedRotationPolicy(long maxBytes, long interval) {
        this.maxBytes = maxBytes;
        this.interval = interval;
    }

    @Override
    public boolean mark(Tuple tuple, long offset) {
        long diff = offset - this.lastOffset;
        this.currentBytesWritten += diff;
        this.lastOffset = offset;
        // 是否到达大小滚动
        boolean needsSizeRotation = this.currentBytesWritten >= this.maxBytes;
        // 大小滚动未到达,看时间
        if (!needsSizeRotation) {
            // 是否到达时间滚动
            return System.currentTimeMillis() - startTimestamp >= interval;
        }
        return true;
    }

    @Override
    public void reset() {
        this.currentBytesWritten = 0;
        this.lastOffset = 0;
        // 重置开始时间撮
        this.startTimestamp = System.currentTimeMillis();
    }

    @Override
    public FileRotationPolicy copy() {
        return new FileSizeAndTimedRotationPolicy(this.maxBytes, interval);
    }
}

这种实现方案较为简单,不需要考虑太多。当然,如果使用timer tigger的方式肯定是可以实现的,即TimedRotationPolicy的实现方案。AbstractHdfsBolt中会判断是否为TimedRotationPolicy,如果是则会启动一个Timer来进行滚动处理:

private void startTimedRotationPolicy() {
        long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
        this.rotationTimer = new Timer(true);
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                // fix: Storm HDFS bolt throws ClosedChannelException when Time rotation policy is used
                // STORM-2993
                // https://issues.apache.org/jira/browse/STORM-2993
                // https://github.com/apache/storm/pull/2592/files/9bb2a509776e2ec655a368a5d0e604141986ac62
                synchronized (writeLock) {
                    for (final AbstractHDFSWriter writer : writers.values()) {
                        try {
                            rotateOutputFile(writer);
                        } catch (IOException e) {
                            LOG.warn("IOException during scheduled file rotation.", e);
                        }
                    }
                    writers.clear();
                }
            }
        };
        this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
    }