Friday, October 9, 2015

Approach for creating high performance Kafka Consumer using Spring

I am sharing this post to share my experience in writing fast kafka consumer, using Spring Kafka

https://spring.io/blog/2015/06/19/spring-integration-kafka-1-2-is-available-with-0-8-2-support-and-performance-enhancements

As you know Cassandra writes are extremely fast but most of the time it is your program which is slow. Similarly you can consume messages from Kafka faster than you can write in to Cassandra, provided partitioning is done properly.

My consumer worked fine when there was only one topic, but performance dropped significantly when more topics were added.  I used the following approach to increase the throughput and reduce the latency. One can find more details about the issue over here.

http://stackoverflow.com/questions/28412482/slow-consumer-throuput-when-using-2-consumer-configuration

1. Create separate channel for each type of topic
2. Create dedicated inbound-channel-adapter for each channel
3. Create dedicated consumer group for each type of inbound-channel-adapter


I was able to consume 18000 messages/sec on a single 4 CPU, 4 GB VM and write that into Cassandra in the same thread. All the topic had 6 partitions and equal number of stream were created to consume it.

Kafka Setup:  3 brokers each running on different machine
Cassandra Setup: 3 node  each running on similar hardware. The keyspace had a replication factor of 2.

You can add more partition and and run more consumer in parallel on different machine to consume more. You will be limited by your number of nodes in Cassandra.


     <int:channel id="inputFromKafkaXXX">
    </int:channel>
    <int:channel id="inputFromKafkaYYY">
    </int:channel>
    <int:channel id="inputFromKafkaZZZ">
    </int:channel>


    <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapterXXX" kafka-consumer-context-ref="consumerContextXXX"
            auto-startup="true" channel="inputFromKafkaXXX">
        <int:poller fixed-delay="100" time-unit="MILLISECONDS"
                    max-messages-per-poll="200" />
    </int-kafka:inbound-channel-adapter>
    <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapterYYY" kafka-consumer-context-ref="consumerContextYYY"
            auto-startup="true" channel="inputFromKafkaYYY">
        <int:poller fixed-delay="100" time-unit="MILLISECONDS"
                    max-messages-per-poll="200" />
    </int-kafka:inbound-channel-adapter>
    <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapterZZZ" kafka-consumer-context-ref="consumerContextZZZ"
            auto-startup="true" channel="inputFromKafkaZZZ">
        <int:poller fixed-delay="100" time-unit="MILLISECONDS"
                    max-messages-per-poll="200" />
    </int-kafka:inbound-channel-adapter>

    <bean id="consumerProperties"
          class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
    <int-kafka:consumer-context id="consumerContextXXX"
                                consumer-timeout="10" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                    group-id="cass_group_xxx" max-messages="200">
                <int-kafka:topic-filter pattern="VerType-xx.*" streams="6" exclude="false"/>
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
    <int-kafka:consumer-context id="consumerContextYYY"
                                consumer-timeout="10" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                    group-id="cass_group_yyy" max-messages="200">
                <int-kafka:topic-filter pattern="VerType-yyy.*" streams="6" exclude="false"/>
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>

    <int-kafka:consumer-context id="consumerContextZZZ"
                                consumer-timeout="10" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                    group-id="cass_group_zzz" max-messages="200">
                <int-kafka:topic-filter pattern="VerType-zzz.*" streams="6" exclude="false"/>
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>

    <int-kafka:zookeeper-connect id="zookeeperConnect"
                                 zk-connect="zookepeer.ip"  zk-connection-timeout="10000"
                                 zk-session-timeout="10000" zk-sync-time="20000" />

 
</beans>

No comments:

Post a Comment