spring kafka seek

I have a kafka consumer which after resume, it’s picking up from last committed offset and sequentially consuming from there. Even with several calls to seekToEnd().

Logs

//before pause
2021-10-09 15:44:18,569 [fx-kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024115]   
2021-10-09 15:44:28,573 [fx-kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024116]   

//paused

//called seekToEnd before resume

//after resume
//called seekToEnd several times before and after resume
2021-10-09 15:45:13,603 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024117]   
2021-10-09 15:45:53,610 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024118]  
2021-10-09 15:46:03,612 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024119]   
2021-10-09 15:46:13,613 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024120] 

Turns out this actually due to an issue with the implementation of the subclass extend AbstractConsumerSeekAware.

Turned out there is an issue with my current code. At consumer start up, I did a manual seek in the onPartitionsAssigned. This so far have been working well.

However, now when there is a need to pause/resume the consumer, turned out the overrode method to do the manual seek has since skipped maintaining the Consumer into the callbacks field.

When the seekToEnd was invoked, it’s really iterating through an empty map.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s