I am not going to pretend I am any expert on the following, but I was recently testing a consumer consuming a Kafka stream from the AWS MSK hosted Kafka service. I had consumed the whole stream and wanted to reset the offset back to 0 so I could run through the events again. The information proved tricky to find and scattered across the web, so I have just compiled it into one place.
Where applicable I have included links at each step to where I got that information from. The terminal commands are given for illustration and may require some tweaking for your setup and OS.
- Download Kafka
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz
- Extract it & navigate to that folder
https://kafka.apache.org/quickstart
1
2
|
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
|
- Setup
users_jaas.conf
https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html
1
2
3
4
5
|
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="your-username"
password="your-password";
};
|
- Export the KAFKA_OPTS ENV var
https://docs.aws.amazon.com/msk/latest/deveoperguide/msk-password.html
1
|
export KAFKA_OPTS=-Djava.security.auth.login.config=`<path-to-jaas-file>`/users_jaas.conf
|
- Copy the certs to a tmp file
https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html
https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html
https://stackoverflow.com/questions/11936685/how-to-obtain-the-location-of-cacerts-of-the-default-java-installation
The $(/usr/libexec/java_home)
works on OSX but may not be similarly successful on other OS’s
1
|
cp $(/usr/libexec/java_home)/lib/security/cacerts /tmp/kafka.client.truststore.jks
|
- Setup
client_sasl.properties
with your protocol and mechanism.
https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html
1
2
3
|
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
|
-
Stop any consumers in the consumer group that you are going to reset. If consumers are still connected, the next steps will fail.
-
Describe the consumer group.
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
https://docs.cloudera.com/documentation/kafka/latest/topics/kafka_command_line.html
I have set a long timeout as I was getting timeout errors before the command could complete.
This is presuming you are running in the Kafka folder from step 2
Generically
1
|
bin/kafka-consumer-groups.sh --bootstrap-server <broker_url> --group <consumer-group-id> --describe --command-config client_sasl.properties --timeout 20000
|
For example
1
|
bin/kafka-consumer-groups.sh --bootstrap-server b-1.cluster.f3ko7p.c5.kafka.us-east-1.amazonaws.com:9096 --group consumer-group-101 --describe --command-config client_sasl.properties --timeout 20000
|
If this succeeds, you should see output similar to this. This allows you to check that the offset is what you think it is and that you have set up the auth correctly.
1
2
|
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group-101 super-cool-topic-with-data 6 14600 14600 0 - - -
|
dry-run
the reset to check it is doing what you expect
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
Generically
1
|
bin/kafka-consumer-groups.sh --bootstrap-server <broker_url> --group <consumer-group-id> --command-config client_sasl.properties --topic <topic-name> --reset-offsets --to-earliest --dry-run --timeout 20000
|
For example
1
|
bin/kafka-consumer-groups.sh --bootstrap-server b-1.cluster.f3ko7p.c5.kafka.us-east-1.amazonaws.com:9096 --group consumer-group-101 --command-config client_sasl.properties --topic super-cool-topic-with-data --reset-offsets --to-earliest --dry-run --timeout 20000
|
You should expect to see something like this returned
1
2
3
4
5
6
7
8
9
10
11
|
GROUP TOPIC PARTITION NEW-OFFSET
consumer-group-101 super-cool-topic-with-data 3 0
consumer-group-101 super-cool-topic-with-data 7 0
consumer-group-101 super-cool-topic-with-data 2 0
consumer-group-101 super-cool-topic-with-data 6 0
consumer-group-101 super-cool-topic-with-data 5 0
consumer-group-101 super-cool-topic-with-data 8 0
consumer-group-101 super-cool-topic-with-data 0 0
consumer-group-101 super-cool-topic-with-data 4 0
consumer-group-101 super-cool-topic-with-data 9 0
consumer-group-101 super-cool-topic-with-data 1 0
|
- Perform the reset
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
Notice the --execute
in the command. This means we actually run it.
1
|
bin/kafka-consumer-groups.sh --bootstrap-server <broker_url> --group <consumer-group-id> --command-config client_sasl.properties --topic <topic-name> --reset-offsets --to-earliest --execute --timeout 20000
|
For example
1
|
bin/kafka-consumer-groups.sh --bootstrap-server b-1.cluster.f3ko7p.c5.kafka.us-east-1.amazonaws.com:9096 --group consumer-group-101 --command-config client_sasl.properties --topic super-cool-topic-with-data --reset-offsets --to-earliest --execute --timeout 20000
|
You should see the same output as before
1
2
3
4
5
6
7
8
9
10
11
|
GROUP TOPIC PARTITION NEW-OFFSET
consumer-group-101 super-cool-topic-with-data 3 0
consumer-group-101 super-cool-topic-with-data 7 0
consumer-group-101 super-cool-topic-with-data 2 0
consumer-group-101 super-cool-topic-with-data 6 0
consumer-group-101 super-cool-topic-with-data 5 0
consumer-group-101 super-cool-topic-with-data 8 0
consumer-group-101 super-cool-topic-with-data 0 0
consumer-group-101 super-cool-topic-with-data 4 0
consumer-group-101 super-cool-topic-with-data 9 0
consumer-group-101 super-cool-topic-with-data 1 0
|
- Restart the consumer if applicable