ACKS (acknowledgemenets) and Transactions
Doskonałe pytanie! To jest właśnie przejście na wyższy poziom zaawansowania w Kafce. Połączenie acks z transakcjami jest kluczowe dla zrozumienia, jak osiągnąć najwyższe gwarancje dostarczania, czyli "exactly-once" (dokładnie raz).
Wyjaśnijmy to krok po kroku.
1. Różnica w celach: Trwałość vs Atomowość
Najpierw musimy zrozumieć, że acks i transakcje rozwiązują dwa różne, choć powiązane, problemy:
-
acksdotyczy TRWAŁOŚCI (Durability) pojedynczej operacji zapisu. -
Pytanie, na które odpowiada: "Czy moja pojedyncza wiadomość została bezpiecznie zapisana na brokerze (i jego replikach), aby przetrwać awarię?"
-
acks=alldaje najlepszą gwarancję trwałości. -
Transakcje dotyczą ATOMOWOŚCI (Atomicity) grupy operacji.
-
Pytanie, na które odpowiada: "Czy moja grupa operacji (np. odczytanie wiadomości, przetworzenie jej i wysłanie kilku nowych wiadomości) zakończyła się w całości sukcesem, czy w całości została wycofana?"
-
Dzięki transakcjom unikamy sytuacji, w której część operacji się udała, a część nie, pozostawiając system w niespójnym stanie.
Analogia do bazy danych:
acks=alljest jakfsync()w systemie plików – upewnia się, że dane fizycznie trafiły na dysk.- Transakcja jest jak
BEGIN TRANSACTION ... COMMIT/ROLLBACK– grupuje wiele zapytańINSERT,UPDATEw jedną logiczną, niepodzielną całość.
2. Jak acks ma się do transakcji?
acks=all jest absolutnym i nienegocjowalnym WYMAGANIEM WSTĘPNYM dla transakcji w Kafce.
Inaczej mówiąc: Nie można mieć transakcji bez acks=all.
Dlaczego?
Wyobraź sobie, co by się stało, gdyby transakcje działały z acks=1:
- Aplikacja rozpoczyna transakcję.
- Wysyła wiadomość A na topic T1. Używa
acks=1. - Lider dla T1 zapisuje wiadomość A, odsyła
acki natychmiast ulega awarii, zanim zdążył ją zreplikować. - Aplikacja wysyła wiadomość B na topic T2. Ta operacja się udaje.
- Aplikacja wykonuje
commitTransaction(). Z jej perspektywy wszystko jest w porządku.
Rezultat: Transakcja została "zatwierdzona", ale wiadomość A zniknęła z systemu! Złamaliśmy podstawową zasadę atomowości – transakcja nie zakończyła się w całości sukcesem.
Dlatego, aby mechanizm transakcyjny był w ogóle wiarygodny, musi opierać się na najsilniejszej gwarancji trwałości dla każdej pojedynczej operacji wewnątrz transakcji. Producent skonfigurowany do pracy w trybie transakcyjnym jest automatycznie zmuszany do używania acks=all.
3. Idempotentność – drugi filar transakcji
Oprócz acks=all, transakcje w Kafce wymagają, aby producent był idempotentny (enable.idempotence=true).
- Idempotentność gwarantuje, że ponowne wysłanie tej samej wiadomości (np. z powodu błędu sieciowego) nie spowoduje jej zduplikowania na brokerze.
- Dlaczego jest to potrzebne w transakcjach? Mechanizm transakcyjny musi być w stanie bezpiecznie ponawiać operacje w razie problemów. Gdyby producent nie był idempotentny, ponowienie operacji
commitTransaction()mogłoby spowodować zapisanie tych samych wiadomości po raz drugi.
Dobra wiadomość jest taka, że włączenie transakcji automatycznie włącza idempotentność.
4. Jak to działa w Spring for Kafka?
Spring pięknie abstahuje te skomplikowane mechanizmy. Twoja rola sprowadza się do kilku kluczowych kroków:
Krok 1: Konfiguracja Producenta
Musisz nadać swojemu producentowi unikalny transactional.id. To jest sygnał dla Springa i Kafki, że ten producent będzie używany w transakcjach.
Konfiguracja w application.yml:
spring:
kafka:
producer:
# To jest kluczowe! Musi być unikalne dla każdej instancji producenta.
transactional-id: "my-transactional-producer-" # Często dodaje się tu jakiś sufix
# Nie musisz już ustawiać acks ani enable.idempotence.
# Ustawienie transactional-id automatycznie wymusza:
# acks: all
# enable.idempotence: true
# retries: (bardzo duża liczba)
# ... i inne bezpieczne ustawienia
consumer:
# Aby konsument czytał tylko zatwierdzone (committed) wiadomości
properties:
isolation.level: read_committed
transactional.id: To jest najważniejsza flaga. Umożliwia brokerowi zarządzanie stanem transakcji i odzyskiwanie spójności po awarii aplikacji (tzw. "fencing zombies").isolation.level: read_committed: To ustawienie dla konsumenta. Gwarantuje, że będzie on widział tylko te wiadomości, które pochodzą z pomyślnie zakończonych (zatwierdzonych) transakcji. Pominie wiadomości z transakcji, które zostały wycofane (aborted).
Krok 2: Użycie @Transactional w kodzie
Spring udostępnia adnotację @Transactional("kafkaTransactionManager"), która zarządza cyklem życia transakcji (rozpocznij, zatwierdź, wycofaj).
Najczęstszy wzorzec: "Consume-Process-Produce"
@Service
public class OrderProcessor {
private final KafkaTemplate<String, ProcessedOrder> kafkaTemplate;
public OrderProcessor(KafkaTemplate<String, ProcessedOrder> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(topics = "raw-orders", groupId = "order-processor-group")
@Transactional("kafkaTransactionManager") // Magia dzieje się tutaj!
public void processOrder(ConsumerRecord<String, RawOrder> record) {
try {
RawOrder rawOrder = record.value();
System.out.println("Processing order: " + rawOrder.getId());
// 1. Logika biznesowa
if (rawOrder.getAmount() <= 0) {
// Ta sytuacja spowoduje rzucenie wyjątku i wycofanie transakcji
throw new IllegalArgumentException("Order amount must be positive.");
}
ProcessedOrder processedOrder = new ProcessedOrder(rawOrder.getId(), "PROCESSED");
// 2. Wysyłanie wiadomości wynikowej
kafkaTemplate.send("processed-orders", processedOrder);
// Jeśli metoda zakończy się bez wyjątku, Spring automatycznie zatwierdzi transakcję (commit).
// Oznacza to, że offset dla wiadomości 'raw-order' zostanie zatwierdzony
// ORAZ wiadomość na 'processed-orders' stanie się widoczna dla konsumentów.
// Te dwie operacje są ATOMOWE.
} catch (Exception e) {
System.err.println("Failed to process order. Rolling back transaction.");
// Gdy z metody zostanie rzucony wyjątek, Spring automatycznie wycofa transakcję (abort).
// Oznacza to, że offset NIE zostanie zatwierdzony (wiadomość 'raw-order' zostanie przetworzona ponownie)
// ORAZ wiadomość na 'processed-orders' NIGDY nie stanie się widoczna.
throw e; // Rzuć dalej, aby Spring mógł obsłużyć rollback
}
}
}
Podsumowanie
| Cecha | acks=all (bez transakcji) | Transakcje w Kafce |
|---|---|---|
| Główny cel | Trwałość pojedynczej wiadomości. | Atomowość grupy operacji. |
| Gwarancje | "At-least-once". Przy włączeniu enable.idempotence gwarantuje "exactly-once" dla pojedynczej operacji zapisu na jednej partycji. | "Exactly-once" dla całego przepływu "consume-process-produce". |
| Relacja | acks=all jest warunkiem koniecznym do działania transakcji. | Transakcje opierają się na acks=all i rozszerzają jego gwarancje na wiele operacji. |
| Konfiguracja kluczowa | acks: all, enable.idempotence: true | transactional-id: <id>, isolation.level: read_committed |
| Zastosowanie | Bezpieczne wysyłanie pojedynczych, niezależnych od siebie wiadomości. | Przetwarzanie strumieniowe, gdzie odczyt i zapis muszą być spójne (np. wzorzec consume-process-produce). |
Mówiąc najprościej: acks to fundament, a transakcje to solidny budynek wzniesiony na tym fundamencie. Nie możesz budować bez fundamentów.