Commit Offset
Świetnie, przechodzimy do drugiej, równie ważnej strony medalu – konsumentów i potwierdzania przetworzenia wiadomości (commitowania offsetów). To kluczowy element gwarantujący, że wiadomości nie zostaną utracone ani przetworzone wielokrotnie w sposób niekontrolowany.
Podstawy: Co to jest "commit offset"?
W Kafce każda grupa konsumentów (group.id) śledzi, które wiadomości z danej partycji już przetworzyła. Robi to, zapisując offset – czyli numer porządkowy ostatniej przetworzonej wiadomości + 1. Ten proces nazywa się "commitowaniem offsetu".
- Dlaczego to jest ważne? Gdy konsument ulegnie awarii i uruchomi się ponownie (lub gdy inny konsument z tej samej grupy przejmie jego partycję), zacznie czytać wiadomości od ostatniego zatwierdzonego offsetu. Dzięki temu nie przetwarza ponownie wiadomości, które już zostały pomyślnie obsłużone.
Spring for Kafka daje ogromną elastyczność w zarządzaniu tym procesem. Główne opcje sprowadzają się do trybu automatycznego lub manualnego commitowania, z wieloma wariantami po drodze.
Tryby commitowania w Spring for Kafka (właściwość ack-mode)
Konfigurujemy to głównie za pomocą właściwości spring.kafka.listener.ack-mode. Przeanalizujmy najważniejsze opcje, od najprostszej do najbardziej zaawansowanej.
1. ack-mode: BATCH (Domyślny) - Automatyczny commit
- Jak działa: Spring Kafka automatycznie commituje offsety za Ciebie. Domyślnie
enable.auto.commitw kliencie Kafki jest ustawione nafalse, a to Spring zarządza cyklem commitowania. Po przetworzeniu całej paczki (batcha) wiadomości pobranych z brokera, Spring commituje offset ostatniej wiadomości z tej paczki. - Logika:
- Konsument pobiera paczkę wiadomości (np. 500, zgodnie z
max.poll.records). - Spring przekazuje je jedna po drugiej do Twojej metody
@KafkaListener. - Jeśli wszystkie wiadomości w paczce zostaną przetworzone bez rzucenia wyjątku, Spring wywołuje commit.
- Jeśli w trakcie przetwarzania (np. przy 10. wiadomości z 500) metoda rzuci wyjątek, cała paczka nie jest commitowana. Po restarcie konsument zacznie od pierwszej wiadomości z tej nieudanej paczki.
- Zalety:
- Bardzo proste w użyciu – nie musisz pisać żadnego kodu do commitowania.
- Wydajne, bo commity nie następują po każdej wiadomości.
- Wady:
- Ryzyko ponownego przetwarzania. Jeśli przetworzyłeś 9 wiadomości, a 10. spowodowała błąd, po restarcie przetworzysz te pierwsze 9 wiadomości ponownie. Twoja logika biznesowa musi być na to gotowa (idempotentna).
- Kiedy używać? W większości standardowych przypadków, gdy ponowne przetworzenie wiadomości nie jest problemem (np. zapisujesz dane do bazy z kluczem
UPSERT/MERGE).
Konfiguracja (application.yml):
spring:
kafka:
listener:
ack-mode: BATCH # To jest domyślne, nie trzeba tego wpisywać
consumer:
# Ważne: Spring sam zarządza commitami, więc to musi być false
enable-auto-commit: false
# Liczba wiadomości w paczce
max-poll-records: 50
2. ack-mode: RECORD - Automatyczny commit po każdej wiadomości
- Jak działa: Spring commituje offset po pomyślnym przetworzeniu każdej pojedynczej wiadomości.
- Logika:
- Konsument pobiera paczkę wiadomości.
- Spring bierze pierwszą wiadomość i przekazuje ją do
@KafkaListener. - Jeśli metoda zakończy się sukcesem, Spring natychmiast commituje offset tej jednej wiadomości.
- Przechodzi do następnej wiadomości.
- Zalety:
- Minimalizuje ponowne przetwarzanie. Jeśli 10. wiadomość zawiedzie, tylko ona zostanie przetworzona ponownie po restarcie, a nie cała paczka.
- Wady:
- Niższa wydajność. Częste commity to dodatkowe wywołania sieciowe do Kafki.
- Kiedy używać? Gdy operacje są kosztowne i chcesz za wszelką cenę uniknąć ponownego ich wykonywania, a niewielki spadek wydajności jest akceptowalny.
3. ack-mode: MANUAL i ack-mode: MANUAL_IMMEDIATE - Pełna kontrola
Tutaj Ty przejmujesz stery. Spring nie robi nic automatycznie – to Ty musisz powiedzieć, kiedy offset ma zostać zatwierdzony.
- Jak działa: Twoja metoda listenera musi przyjąć dodatkowy argument typu
Acknowledgment. Wywołanie metodyacknowledgment.acknowledge()jest sygnałem dla Springa, że może commitować offset. - Różnica między
MANUALaMANUAL_IMMEDIATE: MANUAL:acknowledgment.acknowledge()tylko zaznacza wiadomość jako gotową do commitu. Rzeczywisty commit nastąpi, gdy wszystkie wiadomości z paczki zostaną przetworzone (podobnie jak w trybieBATCH, ale na Twoje żądanie).MANUAL_IMMEDIATE:acknowledgment.acknowledge()powoduje natychmiastowy, synchroniczny commit offsetu. Jest to odpowiednik trybuRECORD, ale pod Twoją kontrolą.
Przykład kodu (MANUAL_IMMEDIATE):
@KafkaListener(topics = "important-events", ackMode = "MANUAL_IMMEDIATE")
public void handleImportantEvent(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
System.out.println("Processing record: " + record.value());
// Skomplikowana logika biznesowa, np. zapis do bazy i wysłanie emaila
saveToDatabase(record.value());
sendEmail(record.value());
// Jeśli wszystko się udało, potwierdź
acknowledgment.acknowledge();
System.out.println("Acknowledged!");
} catch (Exception e) {
// Coś poszło nie tak, NIE potwierdzamy.
// Wiadomość zostanie dostarczona ponownie.
System.err.println("Failed to process, not acknowledging.");
// Tutaj można np. wysłać wiadomość do Dead Letter Topic (DLT)
}
}
- Zalety:
- Maksymalna elastyczność. Możesz commitować offset dopiero po pomyślnym zapisie do bazy danych, wywołaniu zewnętrznego API i wykonaniu wszystkich kroków logiki biznesowej.
- Wady:
- Więcej kodu do napisania. Ty jesteś odpowiedzialny za logikę commitowania.
- Łatwiej o pomyłkę (np. zapomnienie o wywołaniu
acknowledge(), co spowoduje nieskończone ponowne przetwarzanie tej samej wiadomości). - Kiedy używać?
- Gdy przetwarzanie wiadomości obejmuje wiele kroków (np. zapis do bazy + wywołanie API) i chcesz mieć pewność, że wszystkie się udały, zanim zatwierdzisz offset.
- Gdy chcesz zaimplementować niestandardową logikę obsługi błędów (np. ręczne wysyłanie do DLT przed commitowaniem).
4. ack-mode: TIME i ack-mode: COUNT (Rzadziej używane)
TIME: Commituje co określony interwał czasowy (konfigurowany przezspring.kafka.listener.ack-time).COUNT: Commituje co określoną liczbę wiadomości (konfigurowany przezspring.kafka.listener.ack-count).
Są to warianty trybu BATCH, dające nieco większą kontrolę nad tym, jak często wykonywane są commity.
Nasłuchiwanie na paczki wiadomości (Batch Listener)
Dla zwiększenia wydajności, zamiast przetwarzać wiadomości pojedynczo, możesz skonfigurować swój listener do pracy na całych paczkach.
Konfiguracja:
spring:
kafka:
listener:
type: batch # Kluczowa flaga!
Kod listenera:
@KafkaListener(topics = "bulk-data", containerFactory = "kafkaListenerContainerFactory")
public void handleBatch(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
System.out.println("Processing a batch of " + records.size() + " records.");
// Przetwarzaj całą listę - np. używając JDBC batch update
saveAllToDatabase(records.stream().map(ConsumerRecord::value).collect(Collectors.toList()));
// Jeśli używasz MANUAL ack-mode, potwierdź całą paczkę
acknowledgment.acknowledge();
}
- Jak działa? Spring przekazuje do Twojej metody całą listę (
List<ConsumerRecord>) wiadomości pobranych w jednympoll(). - Zalety:
- Ogromny wzrost wydajności dla operacji, które można zoptymalizować pod kątem wsadowym (np. wstawianie wielu rekordów do bazy w jednym zapytaniu).
- Wady:
- Obsługa błędów jest bardziej skomplikowana. Jeśli jedna wiadomość w paczce jest wadliwa, musisz zdecydować, co zrobić z resztą. Domyślnie, jeśli rzucisz wyjątek, cała paczka zostanie przetworzona ponownie.
Podsumowanie i rekomendacje
ack-mode | Jak działa | Zalety | Wady | Kiedy używać? |
|---|---|---|---|---|
BATCH (domyślny) | Commit po pomyślnym przetworzeniu całej paczki. | Prosty, wydajny. | Ryzyko ponownego przetwarzania całej paczki. | Domyślny wybór, jeśli logika jest idempotentna. |
RECORD | Commit po każdej pojedynczej wiadomości. | Minimalizuje ponowne przetwarzanie. | Mniejsza wydajność (więcej commitów). | Gdy operacje są kosztowne i chcesz uniknąć powtórzeń. |
MANUAL_IMMEDIATE | Ty wywołujesz acknowledge() co powoduje natychmiastowy commit. | Pełna kontrola, idealne do złożonej logiki. | Więcej kodu, łatwiej o błąd. | Gdy przetwarzanie ma wiele kroków (baza+API). |
Listener wsadowy | Otrzymujesz List<ConsumerRecord>. | Maksymalna wydajność dla operacji wsadowych. | Trudniejsza obsługa błędów. | Import danych, operacje ETL, gdzie liczy się przepustowość. |
Złota rada: Zacznij od domyślnego BATCH. Jeśli zauważysz, że ponowne przetwarzanie wiadomości powoduje problemy w Twojej logice biznesowej, przejdź na RECORD. Jeśli potrzebujesz absolutnej kontroli nad cyklem życia wiadomości (np. z powodu integracji z systemami nietransakcyjnymi), użyj MANUAL_IMMEDIATE. Listenerów wsadowych używaj świadomie, gdy naprawdę potrzebujesz maksymalnej przepustowości.