基于Spark Streaming Kafka统计单词

/ SparkKafka / 没有评论 / 35浏览

介绍Spark Streaming整合kafka,统计单词。

依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

程序

监听topic: test-kafka,每10秒计算一次。

/**
 * 从kafka中获取流,每10秒统计一次
 *
 * bash:
 * bin/spark-submit --class "com.example.spark.streaming.KafkaWordCountStreamingApp" spark-example-1.0-SNAPSHOT.jar
 *
 * @author xuan
 * @since 1.0.0
 */
public class KafkaWordCountStreamingApp {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaWordCountStreamingApp.class);

    public static void main(String[] args) throws InterruptedException {
        // Create a StreamingContext
        SparkConf conf = new SparkConf().setAppName(KafkaWordCountStreamingApp.class.getSimpleName());
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "47.104.131.255:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", KafkaWordCountStreamingApp.class.getName());
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Collections.singletonList("test-kafka");

        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        stream.map(ConsumerRecord::value)
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((a, b) -> a + b)
                .map(tuple -> tuple._1 + ": " + tuple._2)
                // 存储,这里直接打印了
                .foreachRDD(rdd -> {
                    LOG.info("=============================");
                    rdd.foreachPartition(partitionRecords -> partitionRecords.forEachRemaining(data -> LOG.info("data: {}", data)));
                    LOG.info("=============================");
                });

        ssc.start();

        ssc.awaitTermination();
    }

}

运行

打包,提交到spark中运行:

$SPARK_HOME> bin/spark-submit --class "com.example.spark.streaming.KafkaWordCountStreamingApp" spark-example-1.0-SNAPSHOT.jar

这里注意的是,如果没有把依赖的jar打包到spark-example-1.0-SNAPSHOT.jar中,则需要自己下载jar,放在$SPARK_HOME/jars目录。

向topic中生产数据

这里提供一个java类,向topic: test-kafka中生产单词。

public class MessageProducer {

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.104.131.255:9092");
        props.put("acks", "all");
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Producer
        Producer<String, String> producer = new KafkaProducer<>(props);

        // Send
        producer.send(new ProducerRecord<>("test-kafka", "hello", "hello"));
        producer.send(new ProducerRecord<>("test-kafka", "hello", "hello"));
        producer.send(new ProducerRecord<>("test-kafka", "world", "world"));
        producer.send(new ProducerRecord<>("test-kafka", "world", "world"));
        producer.send(new ProducerRecord<>("test-kafka", "spring", "spring"));
        producer.send(new ProducerRecord<>("test-kafka", "spring", "spring"));

        // Close
        producer.close();
    }

}

结果

结果

Storing Offsets By Kafka

在上面的例子中,没有提交offset,根据官网资料,我们做如下修改:

stream.foreachRDD(rdd -> {
    // offsets
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

    // calculation
    rdd.map(ConsumerRecord::value)
            .mapToPair(word -> new Tuple2<>(word, 1))
            .reduceByKey((a, b) -> a + b)
            .map(tuple -> tuple._1 + ": " + tuple._2)
            .foreachPartition(results -> results.forEachRemaining(data -> LOG.info("data: {}", data)));

    // Persist Offsets in Kafka
    ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

附官网采用kafka提交offset参考:

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

当然还有通过如下方案:

或者不管理offsets。

可以参考如下博客: