Skip to main content

Commit Offset

Świetnie, przechodzimy do drugiej, równie ważnej strony medalu – konsumentów i potwierdzania przetworzenia wiadomości (commitowania offsetów). To kluczowy element gwarantujący, że wiadomości nie zostaną utracone ani przetworzone wielokrotnie w sposób niekontrolowany.

Podstawy: Co to jest "commit offset"?

W Kafce każda grupa konsumentów (group.id) śledzi, które wiadomości z danej partycji już przetworzyła. Robi to, zapisując offset – czyli numer porządkowy ostatniej przetworzonej wiadomości + 1. Ten proces nazywa się "commitowaniem offsetu".

  • Dlaczego to jest ważne? Gdy konsument ulegnie awarii i uruchomi się ponownie (lub gdy inny konsument z tej samej grupy przejmie jego partycję), zacznie czytać wiadomości od ostatniego zatwierdzonego offsetu. Dzięki temu nie przetwarza ponownie wiadomości, które już zostały pomyślnie obsłużone.

Spring for Kafka daje ogromną elastyczność w zarządzaniu tym procesem. Główne opcje sprowadzają się do trybu automatycznego lub manualnego commitowania, z wieloma wariantami po drodze.


Tryby commitowania w Spring for Kafka (właściwość ack-mode)

Konfigurujemy to głównie za pomocą właściwości spring.kafka.listener.ack-mode. Przeanalizujmy najważniejsze opcje, od najprostszej do najbardziej zaawansowanej.

1. ack-mode: BATCH (Domyślny) - Automatyczny commit

  • Jak działa: Spring Kafka automatycznie commituje offsety za Ciebie. Domyślnie enable.auto.commit w kliencie Kafki jest ustawione na false, a to Spring zarządza cyklem commitowania. Po przetworzeniu całej paczki (batcha) wiadomości pobranych z brokera, Spring commituje offset ostatniej wiadomości z tej paczki.
  • Logika:
  1. Konsument pobiera paczkę wiadomości (np. 500, zgodnie z max.poll.records).
  2. Spring przekazuje je jedna po drugiej do Twojej metody @KafkaListener.
  3. Jeśli wszystkie wiadomości w paczce zostaną przetworzone bez rzucenia wyjątku, Spring wywołuje commit.
  4. Jeśli w trakcie przetwarzania (np. przy 10. wiadomości z 500) metoda rzuci wyjątek, cała paczka nie jest commitowana. Po restarcie konsument zacznie od pierwszej wiadomości z tej nieudanej paczki.
  • Zalety:
  • Bardzo proste w użyciu – nie musisz pisać żadnego kodu do commitowania.
  • Wydajne, bo commity nie następują po każdej wiadomości.
  • Wady:
  • Ryzyko ponownego przetwarzania. Jeśli przetworzyłeś 9 wiadomości, a 10. spowodowała błąd, po restarcie przetworzysz te pierwsze 9 wiadomości ponownie. Twoja logika biznesowa musi być na to gotowa (idempotentna).
  • Kiedy używać? W większości standardowych przypadków, gdy ponowne przetworzenie wiadomości nie jest problemem (np. zapisujesz dane do bazy z kluczem UPSERT/MERGE).

Konfiguracja (application.yml):

spring:
kafka:
listener:
ack-mode: BATCH # To jest domyślne, nie trzeba tego wpisywać
consumer:
# Ważne: Spring sam zarządza commitami, więc to musi być false
enable-auto-commit: false
# Liczba wiadomości w paczce
max-poll-records: 50

2. ack-mode: RECORD - Automatyczny commit po każdej wiadomości

  • Jak działa: Spring commituje offset po pomyślnym przetworzeniu każdej pojedynczej wiadomości.
  • Logika:
  1. Konsument pobiera paczkę wiadomości.
  2. Spring bierze pierwszą wiadomość i przekazuje ją do @KafkaListener.
  3. Jeśli metoda zakończy się sukcesem, Spring natychmiast commituje offset tej jednej wiadomości.
  4. Przechodzi do następnej wiadomości.
  • Zalety:
  • Minimalizuje ponowne przetwarzanie. Jeśli 10. wiadomość zawiedzie, tylko ona zostanie przetworzona ponownie po restarcie, a nie cała paczka.
  • Wady:
  • Niższa wydajność. Częste commity to dodatkowe wywołania sieciowe do Kafki.
  • Kiedy używać? Gdy operacje są kosztowne i chcesz za wszelką cenę uniknąć ponownego ich wykonywania, a niewielki spadek wydajności jest akceptowalny.

3. ack-mode: MANUAL i ack-mode: MANUAL_IMMEDIATE - Pełna kontrola

Tutaj Ty przejmujesz stery. Spring nie robi nic automatycznie – to Ty musisz powiedzieć, kiedy offset ma zostać zatwierdzony.

  • Jak działa: Twoja metoda listenera musi przyjąć dodatkowy argument typu Acknowledgment. Wywołanie metody acknowledgment.acknowledge() jest sygnałem dla Springa, że może commitować offset.
  • Różnica między MANUAL a MANUAL_IMMEDIATE:
  • MANUAL: acknowledgment.acknowledge() tylko zaznacza wiadomość jako gotową do commitu. Rzeczywisty commit nastąpi, gdy wszystkie wiadomości z paczki zostaną przetworzone (podobnie jak w trybie BATCH, ale na Twoje żądanie).
  • MANUAL_IMMEDIATE: acknowledgment.acknowledge() powoduje natychmiastowy, synchroniczny commit offsetu. Jest to odpowiednik trybu RECORD, ale pod Twoją kontrolą.

Przykład kodu (MANUAL_IMMEDIATE):

@KafkaListener(topics = "important-events", ackMode = "MANUAL_IMMEDIATE")
public void handleImportantEvent(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
System.out.println("Processing record: " + record.value());

// Skomplikowana logika biznesowa, np. zapis do bazy i wysłanie emaila
saveToDatabase(record.value());
sendEmail(record.value());

// Jeśli wszystko się udało, potwierdź
acknowledgment.acknowledge();
System.out.println("Acknowledged!");

} catch (Exception e) {
// Coś poszło nie tak, NIE potwierdzamy.
// Wiadomość zostanie dostarczona ponownie.
System.err.println("Failed to process, not acknowledging.");
// Tutaj można np. wysłać wiadomość do Dead Letter Topic (DLT)
}
}
  • Zalety:
  • Maksymalna elastyczność. Możesz commitować offset dopiero po pomyślnym zapisie do bazy danych, wywołaniu zewnętrznego API i wykonaniu wszystkich kroków logiki biznesowej.
  • Wady:
  • Więcej kodu do napisania. Ty jesteś odpowiedzialny za logikę commitowania.
  • Łatwiej o pomyłkę (np. zapomnienie o wywołaniu acknowledge(), co spowoduje nieskończone ponowne przetwarzanie tej samej wiadomości).
  • Kiedy używać?
  • Gdy przetwarzanie wiadomości obejmuje wiele kroków (np. zapis do bazy + wywołanie API) i chcesz mieć pewność, że wszystkie się udały, zanim zatwierdzisz offset.
  • Gdy chcesz zaimplementować niestandardową logikę obsługi błędów (np. ręczne wysyłanie do DLT przed commitowaniem).

4. ack-mode: TIME i ack-mode: COUNT (Rzadziej używane)

  • TIME: Commituje co określony interwał czasowy (konfigurowany przez spring.kafka.listener.ack-time).
  • COUNT: Commituje co określoną liczbę wiadomości (konfigurowany przez spring.kafka.listener.ack-count).

Są to warianty trybu BATCH, dające nieco większą kontrolę nad tym, jak często wykonywane są commity.

Nasłuchiwanie na paczki wiadomości (Batch Listener)

Dla zwiększenia wydajności, zamiast przetwarzać wiadomości pojedynczo, możesz skonfigurować swój listener do pracy na całych paczkach.

Konfiguracja:

spring:
kafka:
listener:
type: batch # Kluczowa flaga!

Kod listenera:

@KafkaListener(topics = "bulk-data", containerFactory = "kafkaListenerContainerFactory")
public void handleBatch(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
System.out.println("Processing a batch of " + records.size() + " records.");

// Przetwarzaj całą listę - np. używając JDBC batch update
saveAllToDatabase(records.stream().map(ConsumerRecord::value).collect(Collectors.toList()));

// Jeśli używasz MANUAL ack-mode, potwierdź całą paczkę
acknowledgment.acknowledge();
}
  • Jak działa? Spring przekazuje do Twojej metody całą listę (List<ConsumerRecord>) wiadomości pobranych w jednym poll().
  • Zalety:
  • Ogromny wzrost wydajności dla operacji, które można zoptymalizować pod kątem wsadowym (np. wstawianie wielu rekordów do bazy w jednym zapytaniu).
  • Wady:
  • Obsługa błędów jest bardziej skomplikowana. Jeśli jedna wiadomość w paczce jest wadliwa, musisz zdecydować, co zrobić z resztą. Domyślnie, jeśli rzucisz wyjątek, cała paczka zostanie przetworzona ponownie.

Podsumowanie i rekomendacje

ack-modeJak działaZaletyWadyKiedy używać?
BATCH (domyślny)Commit po pomyślnym przetworzeniu całej paczki.Prosty, wydajny.Ryzyko ponownego przetwarzania całej paczki.Domyślny wybór, jeśli logika jest idempotentna.
RECORDCommit po każdej pojedynczej wiadomości.Minimalizuje ponowne przetwarzanie.Mniejsza wydajność (więcej commitów).Gdy operacje są kosztowne i chcesz uniknąć powtórzeń.
MANUAL_IMMEDIATETy wywołujesz acknowledge() co powoduje natychmiastowy commit.Pełna kontrola, idealne do złożonej logiki.Więcej kodu, łatwiej o błąd.Gdy przetwarzanie ma wiele kroków (baza+API).
Listener wsadowyOtrzymujesz List<ConsumerRecord>.Maksymalna wydajność dla operacji wsadowych.Trudniejsza obsługa błędów.Import danych, operacje ETL, gdzie liczy się przepustowość.

Złota rada: Zacznij od domyślnego BATCH. Jeśli zauważysz, że ponowne przetwarzanie wiadomości powoduje problemy w Twojej logice biznesowej, przejdź na RECORD. Jeśli potrzebujesz absolutnej kontroli nad cyklem życia wiadomości (np. z powodu integracji z systemami nietransakcyjnymi), użyj MANUAL_IMMEDIATE. Listenerów wsadowych używaj świadomie, gdy naprawdę potrzebujesz maksymalnej przepustowości.