[转]Kafka发布消息时如何选择Partition

本文旨在了解Kafka发送消息到拥有多个Partition的Topic时如何选择Partition。或许许多人已经知道Kafka默认(当key为null)时采用Round-robin策略,也就是雨露均沾,风水轮流转,实现类是DefaultPartitioner。但我们实际应用中为保持相关消息的顺序性,就必须送到指定的Partition,方法可以有
1.指定Partition编号
2.指定key
3.自定义Partitioner-实现
还应考虑当前指定了Key或Partition编号发送消息后,后续消息key为null会选哪个Partition。最后思考一个问题,Consumer每次poll时是获得的消息列表只包含一个partition源还是可以多个partition源。
为完成本次实验,可以本地搭建一个kafka环境,参考简单搭建Apache Kafka分布式消息系统。待zookeeper和kafka正常启动后,我们用下面的明亮创建一个partition数据量为3的Topic:partition-test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic partition-test

验证一下该topic的信息
topic info
Topic已准备好,接下来我们用java代码进行测试

默认选择 Partition:Round-robin

这里贴出比较完整的源代码

public class MyKafkaProducer{
    public static void main(String[] args) throws Exception{
        KafkaProducer<String,String> producer = new KafkaProducer<>(getProperties());
        for(int i = 0; i < 10; i++ ){
            ProducerRecord<String,Integer> producerRecord = new ProducerRecord<>("partition-test",i);
            RecordMetadata metadata = producer.send(producerRecord).get();
            System.out.println("Sent to partition:" + metadata.partition() + ", offset:" + meatadata.offset()); 
        }
    }
    private static Properties getProperties(){
        Properties props = new Properties();
        props.put(BOOTSTRRAP_SERVER_CONFIG,"localhost:9092");
        props.put(CLIENT_ID_CONFIG,"PartitionTestProducer");
        props.put(KEY_SRIALI2ER_CLASS,StringSerializer.class=getName());
        props.put(VALUE_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());
        return props;
    }
}

这里创建ProducerRecord是用的new ProducerRecord<>("partition-test",i),它实际是通过

this(topic,null,null,null,value,null)

调用方法

public ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value,Iterable<Header> header)

所以partition,timestamp,key,headers都是null,当partition和key都为null时就要采用Round-robin了,看DefaultPartitioner的注释

The default partitioning strategy:
1.If a partition is specified in the record,use it(指定了partition,则用指定的partition)
2.If no partition is specified but a key is present choose a partition based on a hash of the key(如果没有指定partition,但指定了key,则通过key算出hash值定位到partition)
3.If no partition or key is present choose a partition in a round-robin fashion(partition和key都未指定则采用round-robin)

所以producerRecord没有partition和key的情况下,发送10条消息所用的partition的结果如下

Sent to partition: 0, offset: 0
Sent to partition: 2, offset: 0
Sent to partition: 1, offset: 1
Sent to partition: 0, offset: 1
Sent to partition: 2, offset: 1
Sent to partition: 1, offset: 2
Sent to partition: 0, offset: 2
Sent to partition: 2, offset: 2
Sent to partition: 1, offset: 3
Sent to partition: 0, offset: 3

注意到这个DefaultPartitioner中的Round-robin算法,在三个partition时并不是以自然顺序0,1,2的顺序,三个partition时顺序是0,2,1这样的顺序,而每次从哪个索引号开始也是随机的,所以有时会是2,1,0或者1,0,2反正每个partition机会均等。

指定Partition编号

上面代码mian方法稍加变化

public static void main(String[] args) throws Exception{
    KafkaProducer<String,Integer> producer = new KafkaProducer<>(getProperties());

    String topic = "partition-test";
    int partitionSize = producer.partitionsFor(topic).size();
    System.out.println("Partition size:" + partitionSize);

    int partition = new Random().nextInt(partitionSize);
    for(int i = 0; i < 10 ; i++){
        ProducerRecord<String,Integer> producerRecord = new ProducerRecord<>(topic,partition,null,i);
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("Sent to partition:" + metadata.partition() + ",offset: " + metadata.offset());
    }
}

通过producer.partitionsFor(topic)获得partition列表,partition编号从0开始的,随机选择一下partition,然后构建producerRecord是用new ProducerRecord<>(topic, partition, null, i),实际调用

this(topic,partition,null,key,value,null)

所以指定了partition,但key是null,看下执行效果

Partition size: 3
Sent to partition: 1, offset: 4
Sent to partition: 1, offset: 5
Sent to partition: 1, offset: 6
Sent to partition: 1, offset: 7
Sent to partition: 1, offset: 8
Sent to partition: 1, offset: 9
Sent to partition: 1, offset: 10
Sent to partition: 1, offset: 11
Sent to partition: 1, offset: 12
Sent to partition: 1, offset: 13

执行多次可以看到每次从3个parition中随机选择了一个parition。
我们也可以测试一下只要partition不为null,即使指定了key,这个key也不会参与决策使用哪一个partition,比如下面代码

for(int i = 0; i < 10; i++){
    String randomKey = RandomStringUtils.randomAllphabetic(5);
    ProducerRecord<String,Integer> producerRecord = new ProducerRcord<>(topic,1,randomKey,i);
    RecordMetadata metadata = producer.send(producerRecord).get();
    System.out.println("Sent to partition:" + metadata.partition() + ",offset:" + metadata.offset() + ",key:" + randomKey);
}

只写死了partition为1,任凭key怎么个随机法都改变不了往1号partition发布消息的事实

Sent to partition: 1, offset: 24, key: yBtcg
Sent to partition: 1, offset: 25, key: MgNbO
Sent to partition: 1, offset: 26, key: BkzIH
Sent to partition: 1, offset: 27, key: TjvPr
Sent to partition: 1, offset: 28, key: JoUhs
Sent to partition: 1, offset: 29, key: bvuUp
Sent to partition: 1, offset: 30, key: LGWeQ
Sent to partition: 1, offset: 31, key: NhFnX
Sent to partition: 1, offset: 32, key: BgZzK
Sent to partition: 1, offset: 33, key: UyZhP

指定key

在没有指定partition(null值)时,如果有key,kafka将依据key哈希输出partition编号来,下面是测试代码

String[] keys = new String[]{"yBtcg", "yBtcg", "yBtcg", "MgNbO", "BkzIH", "TjvPr"}
for (int i = 0;i < 6; i++){
    ProducerRecord<String,Integer> producerRecord = new ProducerRecord<>(topic,null,keys[i],i);
    RecordMetadata metadata = producer.send(producerRecord).get();
    System.out.println("Sent to partition:" + metadata.partition()+", offset:" +metadata.offset() + "key:" + key[i]);
}

只要partition的数目不变,上面的代码执行千百遍所选择的partition都会一样的

Sent to partition: 1, offset: 37, key: yBtcg
Sent to partition: 1, offset: 38, key: yBtcg
Sent to partition: 1, offset: 39, key: yBtcg
Sent to partition: 2, offset: 14, key: MgNbO
Sent to partition: 0, offset: 25, key: BkzIH
Sent to partition: 0, offset: 26, key: TjvPr

一般来说我们不建议使用key来算出partition编号,因为极有可能消息不能平均分布到每一个partition,除非是一个UUID。比如用户输入的不确定字符串,或是一个数字序列 123450001, 123450002, 123450003,变化部分在后端,很容易使得个别partition很繁忙,而有些闲的蛋疼,降低了topic运输数据的效率。

采用paritition编号或key来指定partition后对Round-robin的影响

我们知道kafka默认采用round-robin来选择partition,即如果partition数据为3,没有指定partition或key的情况下,partition的选择策略是0, 2, 1, 0, 2, 1, 0, 2, 1.....。我们假设下面的情形
1.未指定partition和key,发送了一条消息到了partition 0
2.然后指定partition 1发送又一条消息
3.再次不指定partition和key,发送一条纤细(这条消息应该发送到步骤1中网后下一个partition 2 还是1呢?)
代码验证

RecordMetadata metadata = producer.send(new ProducerRecord<>(topic,100)).get();
System.out.println("Sent to partition:" + metadata.partition() + ",offset:" + metadata.offset());

metadata = producer.send(new ProducerRecord<>(topic,1,null,200)).get();
System.out.println("sent to partion:" + metadata.partition() + ", offset:"+metadata.offset());

metadata = producer.send(new ProducerRecord<>(topic,300)).get();
System.out.println("Sent to partiton:"+ metadata.partition() + ",offset:" + metadata.offset());

第一次执行

Sent to partition: 2, offset: 55
Sent to partition: 1, offset: 99
Sent to partition: 1, offset: 100

第二次执行

Sent to partition: 0, offset: 71
Sent to partition: 1, offset: 101
Sent to partition: 2, offset: 56

所以指定了partition或key时,不会改变DefaultPartitioner固有顺序,这未指定partition或key时,消息总是按照0,2,1的循环来。
因此,如果有需求同一批次的许多消息要放在同一个partition,随机选取的partition需要是恰好是round-robin的下一个partition,那么下次不指定partition或key的消息就会使用同一个partition。更有甚至两次随机选到了同一个parition。那两批次的消息全部会挤到同一个partition中去。
为一批消息指定partition的另外一种实现方式是,不采用随机选择partition,而是可以选发送批次中的第一条,然后再回调中使用RecordMetadata.partition()来发批次中的其他消息,这个partition便是round-robin返回的,如此下回不指定partition或key时不会用到相同的partition,最后的办法是如果一批次中有5条消息,选择了round-robin的下一个parition后,round-robin应该跳到该parition4次,这样更能让消息均匀分布到每一个parition中去。

如何实现自己的parititioner

自定义的partitioner需要实现org.apache.kafka.clients.producer.Partitioner接口(含三个实现方法)。下面是一个粗陋的自定义实现

public class SimplePartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer accountId = (Integer) key;
        return cluster.partitionCountForTopic(topic) % accountId;
    }

    //实现另两方法
}

在创建 Kafka Producer 时需设置 partitioner.class 属性

props.put(PARTITIONER_CLASS_CONFIG, "cc.unmi.SimplePartitioner");

然后创建 ProducerRecord 就用用 AccountId 作为 Key

producer.send(new ProducerRecord<>(topic, 12345, 200))

SimplePartitioner 中把 AccountId 对 Partition 数目进行求模获得 Partition 编号,这样就能保证相同 AccountId 的消息总是往同一个 Partition 中发送。
最后一个问题,Consumer poll 时是只从一个 Partition 中抓若干条记录,还是同时从多个 Partition 中抓一批消息,这个看看
1.org.apache.kafka.clients.consumer.KafkaConsumer
2.org.apache.kafka.clients.consumer.internals.Fetcher
两个类的源代码就知道 Kafka 的 Java 客户端轮询消息时会逐个轮询每一个 Topic, 并组装在一个 ConsumerRecords<K, V> 对象中。


2017-07-08:以上关于 Consumer 如何从 Partition 中拉消息的描述不太准确。这与 Topic 有多少个 Partition, 以及相同 Group ID 中有多少个 Consumer 有关, 在相同的 Topic 和 Group ID 范围内,一个 Partition 中会被一个 Consumer 消费,这就是说当 Consumer 比 Partition 少,某些 Consumer 要消费多个 Partition; 如果 Consumer 比 Partition 多,那么就出现多出来的 Consumer 无所事事。这会产生一个 Consumer 数目增减时的 Rebalance 的过程。

原文链接: https://yanbin.blog/how-kafka-select-partition/

  • 浏览:200
  • 评论:0

发表新的回复