Resetting a MSK Kafka Consumer Offset

Posted on Jan 19, 2021

I am not going to pretend I am any expert on the following, but I was recently testing a consumer consuming a Kafka stream from the AWS MSK hosted Kafka service. I had consumed the whole stream and wanted to reset the offset back to 0 so I could run through the events again. The information proved tricky to find and scattered across the web, so I have just compiled it into one place.

Where applicable I have included links at each step to where I got that information from. The terminal commands are given for illustration and may require some tweaking for your setup and OS.

  1. Download Kafka

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz

  1. Extract it & navigate to that folder

https://kafka.apache.org/quickstart

1
2
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
  1. Setup users_jaas.conf

https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html

1
2
3
4
5
KafkaClient {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="your-username"
   password="your-password";
};
  1. Export the KAFKA_OPTS ENV var

https://docs.aws.amazon.com/msk/latest/deveoperguide/msk-password.html

1
export KAFKA_OPTS=-Djava.security.auth.login.config=`<path-to-jaas-file>`/users_jaas.conf
  1. Copy the certs to a tmp file

https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html https://stackoverflow.com/questions/11936685/how-to-obtain-the-location-of-cacerts-of-the-default-java-installation

The $(/usr/libexec/java_home) works on OSX but may not be similarly successful on other OS’s

1
cp $(/usr/libexec/java_home)/lib/security/cacerts /tmp/kafka.client.truststore.jks
  1. Setup client_sasl.properties with your protocol and mechanism.

https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html

1
2
3
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
  1. Stop any consumers in the consumer group that you are going to reset. If consumers are still connected, the next steps will fail.

  2. Describe the consumer group.

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b https://docs.cloudera.com/documentation/kafka/latest/topics/kafka_command_line.html

I have set a long timeout as I was getting timeout errors before the command could complete.

This is presuming you are running in the Kafka folder from step 2

Generically

1
bin/kafka-consumer-groups.sh --bootstrap-server <broker_url> --group <consumer-group-id> --describe --command-config client_sasl.properties --timeout 20000

For example

1
bin/kafka-consumer-groups.sh --bootstrap-server b-1.cluster.f3ko7p.c5.kafka.us-east-1.amazonaws.com:9096 --group consumer-group-101 --describe --command-config client_sasl.properties --timeout 20000

If this succeeds, you should see output similar to this. This allows you to check that the offset is what you think it is and that you have set up the auth correctly.

1
2
GROUP                   TOPIC                           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-group-101      super-cool-topic-with-data      6          14600           14600           0               -               -               -
  1. dry-run the reset to check it is doing what you expect

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

Generically

1
bin/kafka-consumer-groups.sh --bootstrap-server <broker_url> --group <consumer-group-id> --command-config client_sasl.properties --topic <topic-name> --reset-offsets --to-earliest --dry-run --timeout 20000

For example

1
bin/kafka-consumer-groups.sh --bootstrap-server b-1.cluster.f3ko7p.c5.kafka.us-east-1.amazonaws.com:9096 --group consumer-group-101 --command-config client_sasl.properties --topic super-cool-topic-with-data --reset-offsets --to-earliest --dry-run --timeout 20000

You should expect to see something like this returned

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
consumer-group-101             super-cool-topic-with-data     3          0
consumer-group-101             super-cool-topic-with-data     7          0
consumer-group-101             super-cool-topic-with-data     2          0
consumer-group-101             super-cool-topic-with-data     6          0
consumer-group-101             super-cool-topic-with-data     5          0
consumer-group-101             super-cool-topic-with-data     8          0
consumer-group-101             super-cool-topic-with-data     0          0
consumer-group-101             super-cool-topic-with-data     4          0
consumer-group-101             super-cool-topic-with-data     9          0
consumer-group-101             super-cool-topic-with-data     1          0
  1. Perform the reset

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

Notice the --execute in the command. This means we actually run it.

1
bin/kafka-consumer-groups.sh --bootstrap-server <broker_url> --group <consumer-group-id> --command-config client_sasl.properties --topic <topic-name> --reset-offsets --to-earliest --execute --timeout 20000

For example

1
bin/kafka-consumer-groups.sh --bootstrap-server b-1.cluster.f3ko7p.c5.kafka.us-east-1.amazonaws.com:9096 --group consumer-group-101 --command-config client_sasl.properties --topic super-cool-topic-with-data --reset-offsets --to-earliest --execute --timeout 20000

You should see the same output as before

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
consumer-group-101             super-cool-topic-with-data     3          0
consumer-group-101             super-cool-topic-with-data     7          0
consumer-group-101             super-cool-topic-with-data     2          0
consumer-group-101             super-cool-topic-with-data     6          0
consumer-group-101             super-cool-topic-with-data     5          0
consumer-group-101             super-cool-topic-with-data     8          0
consumer-group-101             super-cool-topic-with-data     0          0
consumer-group-101             super-cool-topic-with-data     4          0
consumer-group-101             super-cool-topic-with-data     9          0
consumer-group-101             super-cool-topic-with-data     1          0
  1. Restart the consumer if applicable