TECHNOLOGIES
FORUMS
JOBS
BOOKS
EVENTS
INTERVIEWS
Live
MORE
LEARN
Training
CAREER
MEMBERS
VIDEOS
NEWS
BLOGS
Sign Up
Login
No unread comment.
View All Comments
No unread message.
View All Messages
No unread notification.
View All Notifications
Answers
Post
An Article
A Blog
A News
A Video
An EBook
An Interview Question
Ask Question
Forums
Monthly Leaders
Forum guidelines
Pooja Singh
NA
9
931
Manually committing offsets in Kafka
May 20 2021 12:25 AM
Hi,
I have a list of offsets with their corresponding partition and I need to commit them manually.
To do so I am looping through the list and assigning partition to the consumer and then seeking to a particular offset.
then I am consuming the message and passing the ConsumerBulider to commit method.
Sometimes it executes smoothly but sometimes it throws "Local:Waiting for Coordinator" exception.
But in both the cases , when I try consuming messages afterwards I re-consume the same series of messages I already have committed or should I say I tried committing. Which means I never really could commit them :(
foreach
(var item
in
cmdparamslist)
{
Partition p =
new
Partition(Int16.Parse(item.PartitionID));
TopicPartition tp =
new
TopicPartition(configuration.GetSection(
"KafkaSettings"
).GetSection(
"Topic"
).Value, p);
Offset o =
new
Offset(
long
.Parse(item.Offset));
TopicPartitionOffset tpo =
new
TopicPartitionOffset(tp,o);
//Ltpo.Add(tpo);
try
{
KafkaConsumer.Assign(tpo);
await Task.Delay(TimeSpan.FromSeconds(1));
KafkaConsumer.Seek(tpo);
var cr = KafkaConsumer.Consume(cts.Token);
try
{
KafkaConsumer.Commit(cr);
}
catch
(TopicPartitionOffsetException e1)
{
Console.WriteLine(
"exception "
+e);
}
catch
(KafkaException e)
{
Console.WriteLine(
"exception "
+e);
}
}
catch
(KafkaException e)
{
Console.WriteLine(
"exception "
+e);
}
}
KafkaConsumer.Close();
}
catch
(Exception e)
{
Console.WriteLine(
"exception "
+e);
}
}
Consumer / Client configuration:
var conf =
new
ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = configuration.GetSection(
"KafkaSettings"
).GetSection(
"RemoteServers"
).Value,
AutoOffsetReset = AutoOffsetReset.Earliest,
SaslMechanism = SaslMechanism.Gssapi,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
EnableAutoCommit =
false
//EnableAutoOffsetStore = false
};
I am using Confluent.Kafka 1.6.2 version
Could someone please help me ?
Reply
Answers (
0
)
Why Can't read an image by URL path in the .Net Core 2.1 Application?
Need use LIKE %