kafka最佳实践

/ Kafka / 没有评论 / 179浏览

kafka——分布式流数据平台

核心概念

kafka-apis

Topics and Logs

A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

For each topic, the Kafka cluster maintains a partitioned log that looks like this:

log_anatomy

建议看下官网的介绍

总结

下面是对kafka的总结:

快速开始

下载

下载并解压

> tar -xzf kafka_2.11-1.1.0.tgz
> cd kafka_2.11-1.1.0

启动服务

kafka需要zookeeper服务,如果测试可以使用内嵌的zookeeper,现在启动它:

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

然后启动服务:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

创建一个topic

创建一个叫testtopic,副本为1,分区为1

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在查看topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

其他

生产者、消费者、集群的使用配置,看官网即可。

Java Api

主要有官方的kafka client,以及用的很多的spring-kafka。

下面介绍官方的kafka client。

单生产者、消费者

例子是一个生产者、一个消费者,当然在生产是不能这么干的。

上代码:

生产者

生产者为单线程生产:

/**
 * 消息生产者
 *
 * @author xuan
 * @since 1.0.0
 */
public class MessageProducerTest {

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.104.131.255:9092");
        props.put("acks", "all");
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置属性 自定义分区类
        props.put("partitioner.class", "com.example.kafka.producer.MyPartition");

        // Producer
        Producer<String, String> producer = new KafkaProducer<>(props);

        // Send
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
        }

        // Close
        producer.close();
    }

}

其中自定义分区:props.put("partitioner.class", "com.example.kafka.producer.MyPartition");,当然也可以send时指定partition。

自定义分区

/**
 * @author xuan
 * @since 1.0.0
 */
public class MyPartition implements Partitioner {

    private static final Logger LOG = LoggerFactory.getLogger(MyPartition.class);

    public MyPartition() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partitionNum;
        try {
            partitionNum = Integer.parseInt((String) key);
        } catch (Exception e) {
            partitionNum = key.hashCode();
        }
        LOG.debug("the message sendTo topic:" + topic + " and the partitionNum:" + partitionNum);
        return Math.abs(partitionNum % numPartitions);
    }

    @Override
    public void close() {
    }

}

消费者

自动提交
/**
 * 消息消费者-自动提交
 *
 * @author xuan
 * @since 1.0.0
 */
public class MessageConsumerAutoCommitTest {

    private static final Logger LOG = LoggerFactory.getLogger(MessageConsumerAutoCommitTest.class);

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.104.131.255:9092");
        // group
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅Topic
        consumer.subscribe(Collections.singletonList("test"));

        // 获取消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }

}

主要配置:

props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
手动提交
/**
 * 消息消费者-手动提交
 *
 * @author xuan
 * @since 1.0.0
 */
public class MessageConsumerTest {

    private static final Logger LOG = LoggerFactory.getLogger(MessageConsumerTest.class);

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.104.131.255:9092");
        // group
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅Topic
        consumer.subscribe(Collections.singletonList("test"));

        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize);
        // 获取消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    LOG.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    buffer.add(record);
                }

                if (buffer.size() >= minBatchSize) {
                    // insertIntoDb(buffer);
                    // 手动提交
                    consumer.commitSync();
                    // 清空buffer
                    buffer.clear();
                }
            }
        } finally {
            consumer.close();
        }
    }

}

多生产者、消费者

生产者

由于生产者是线程安全的,因此,更推荐一个生产者实例,多线程引用进行生产的方式。这种方式通常要比每个线程维护一个KafkaProducer实例效率要高。

消费者

对于KafkaConsumer而言,它不是线程安全的,所以实现多线程时通常由两种实现方法:

优缺点分析:

这两种方法或是模型都有各自的优缺点,个人觉得第二种更好,即不要将很重的处理逻辑放入消费者的代码中,很多kafka consumer使用者碰到的各种rebalance超时、coordinator重新选举、心跳无法维持等问题都来源于此。

代码:

/**
 * 消息消费者,多线程
 *
 * 实现方案:(一)多个消费者实例,一个任务线程池处理record。
 *
 * @author xuan
 * @since 1.0.0
 */
public class MessageConsumerMultiThreadTest {

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.104.131.255:9092");
        // group
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 消费组
        ConsumerGroup<String, String> consumerGroup = new ConsumerGroup<>(props, 5);
        // 订阅
        consumerGroup.subscribe(Collections.singletonList("test"));
        // 启动
        consumerGroup.start();

        // 这里主要是为了优雅停止,模拟30s停止线程池
        try {
            Thread.sleep(30000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumerGroup.shutdown();
    }

}

/**
 * 消费组,(一)多个消费者
 *
 * @param <K>
 * @param <V>
 */
class ConsumerGroup<K, V> {

    /**
     * 配置
     */
    private final Properties props;

    /**
     * 消费者数量
     */
    private final int num;

    /**
     * 消费者线程池
     */
    private ThreadPoolExecutor executor;

    /**
     * 任务线程池
     */
    private ThreadPoolExecutor handlerExecutor;

    /**
     * 消费者实例的封装
     */
    private List<ConsumerRunner<K, V>> consumerRunners;

    public ConsumerGroup(Properties props, int num) {
        this.props = props;
        this.num = num;
        // 线程池初始化,这里要跟根据具体业务定制
        executor = new ThreadPoolExecutor(num, num,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        handlerExecutor = new ThreadPoolExecutor(num, num,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        init();
    }

    /**
     * 初始化消费者
     */
    private void init() {
        String commit = props.getProperty("enable.auto.commit", "false");
        boolean autoCommit = Boolean.parseBoolean(commit);
        consumerRunners = new ArrayList<>(num);
        for (int i = 0; i < num; i++) {
            KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props);
            ConsumerRunner<K, V> consumerRunner = new ConsumerRunner<>(consumer, handlerExecutor, autoCommit);
            consumerRunners.add(consumerRunner);
        }
    }

    /**
     * 订阅主题
     *
     * @param topics 主题
     */
    public void subscribe(Collection<String> topics) {
        consumerRunners.forEach(consumerRunner -> consumerRunner.subscribe(topics));
    }

    /**
     * 启动
     */
    public void start() {
        consumerRunners.forEach(executor::execute);
    }

    /**
     * 关闭
     */
    public void shutdown() {
        // 消费者关闭
        consumerRunners.forEach(ConsumerRunner::shutdown);
        // 线程池关闭,优先关闭消费者线程池
        executor.shutdown();
        handlerExecutor.shutdown();
    }

}

/**
 * 消费者实例封装,poll数据到任务线程池
 *
 * @param <K>
 * @param <V>
 */
class ConsumerRunner<K, V> implements Runnable {

    /**
     * 是否关闭
     */
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /**
     * 消费者实例
     */
    private final KafkaConsumer<K, V> consumer;

    /**
     * 任务处理线程池
     */
    private final ThreadPoolExecutor handlerExecutor;

    /**
     * 自动提交
     */
    private final boolean autoCommit;

    public ConsumerRunner(KafkaConsumer<K, V> consumer, ThreadPoolExecutor handlerExecutor, boolean autoCommit) {
        this.consumer = consumer;
        this.handlerExecutor = handlerExecutor;
        this.autoCommit = autoCommit;
    }

    @Override
    public void run() {
        try {
            while (!closed.get()) {
                ConsumerRecords<K, V> records = consumer.poll(1000);
                for (ConsumerRecord<K, V> record : records) {
                    // 提交到任务线程池
                    handlerExecutor.execute(new ConsumerHandlerRunner<>(record));
                }
                if (!autoCommit) {
                    consumer.commitSync();
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) {
                throw e;
            }
        } finally {
            consumer.close();
        }
    }

    /**
     * 订阅topic
     *
     * @param topics 主题
     */
    public void subscribe(Collection<String> topics) {
        consumer.subscribe(topics);
    }

    /**
     * Shutdown hook which can be called from a separate thread
     */
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

}

/**
 * 处理任务,真正关注的业务逻辑实现
 *
 * @param <K>
 * @param <V>
 */
class ConsumerHandlerRunner<K, V> implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(ConsumerHandlerRunner.class);

    private final ConsumerRecord<K, V> record;

    public ConsumerHandlerRunner(ConsumerRecord<K, V> record) {
        this.record = record;
    }

    @Override
    public void run() {
        // 处理业务逻辑
        LOG.info("record: {}", record);
    }

}

上面的例子一批一批commit,但实际可能并未处理完,还在任务线程池中,这时候又继续poll数据,可能导致任务线程池队列阻塞,这时需要考虑让消费线程等待,程序中并未考虑这些细节。

生产中还是建议使用spring-kafka,屏蔽了commit的一些痛点,以及多线程消费问题。

spring-kafka源码分析

核心: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#processCommits

消费者poll时,默认500条:

poll

将500条record消费,然后每消费一条,记录到ack中,之后遍历,加到offsets中,最后commits。

commits

在debug过程中,看源码时间太长,导致session超时了,因此这500条commit offset失败,发生了rebalance。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1180)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1049)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:625)
	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.lang.Thread.run(Thread.java:745)

这个时候,就发生了rebalance。

下面是Stack Overflow上面大神的分析

Suggested values:

注意:heartbeat.interval.ms This was added in 0.10.1 and it will send heartbeat between polls.

因此,这也告诉了我们,要快速poll,否则容易超时,出现rebalance问题。如果程序真的500条在规定的session时长中消费不完,那么可以调整参数max.poll.records(0.9后才有效)、或者调节max.poll.interval.mssession.timeout.ms参数(具体看官网)。两次poll间隔要小于session.timeout.ms,即实际处理poll的数据时间要小于session.timeout.ms

例如,我的spring boot kafka配置如下:

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=47.104.131.255:9092

# =============== producer ===============

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# =============== consumer ===============
# 指定默认消费者group id
spring.kafka.consumer.group-id=test

# spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
# spring.kafka.consumer.auto-commit-interval=100

# 每次poll的条数
spring.kafka.consumer.max-poll-records=200

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

这句配置就指定了poll的记录条数:spring.kafka.consumer.max-poll-records=200

max-poll-records

在实际情况中,要根据处理业务的时间,来调节相应的参数。

spring boot集成kafka代码可以参考:spring-boot-kafka

监控

kafka-offset-monitor

monitor

这个可视化的管理工具可以看到topic的分区信息,消费到哪里了,还能看消费能力等统计信息。

重要的配置

kafka consumer 配置详解