Storm Trident介绍

/ Storm / 没有评论 / 48浏览

对于不使用trident api的人来说,使用基本的Storm spout, bolt操作,需要理解storm的ack机制,保证消息的完整性,Storm提供了三种不同层次的消息保证机制,分别是At Most OnceAt Least Once以及Exactly Once

trident可以理解为Storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。

trident介绍

[Storm中文文档]Trident教程

trident api

[Storm中文文档]Trident API概览

trident入门

public class WordCount {

    public static void main(String[] args) throws InterruptedException {
        FixedBatchSpout wordFixedBatchSpout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        wordFixedBatchSpout.setCycle(true);

        TridentTopology topology = new TridentTopology();
        topology.newStream("wordFixedBatchSpout", wordFixedBatchSpout)
                // 切分句子为单词
                .each(new Fields("sentence"), new WordSplit(" "), new Fields("word"))
                // 分组
                .groupBy(new Fields("word"))
                // 把所有的数据做一次聚合(基于内存)
                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                .parallelismHint(6);

        runLocal(topology.build());
    }

    private static void runLocal(StormTopology topology) throws InterruptedException {
        // 本地跑起来
        LocalCluster cluster = new LocalCluster();

        // 配置信息
        Config conf = new Config();
        conf.setDebug(false);
        //建议加上这行,使得每个bolt/spout的并发度都为1
        conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);

        // 提交拓扑
        cluster.submitTopology("wordCount", conf, topology);

        // 等待1分钟, 1分钟后会停止拓扑和集群, 视调试情况可增大该数值
        Thread.sleep(60000);

        // 结束拓扑
        cluster.killTopology("wordCount");

        // 关闭
        cluster.shutdown();
    }

}

trident博客