Skip to main content

ACKS (acknowledgemenets) and Transactions

Doskonałe pytanie! To jest właśnie przejście na wyższy poziom zaawansowania w Kafce. Połączenie acks z transakcjami jest kluczowe dla zrozumienia, jak osiągnąć najwyższe gwarancje dostarczania, czyli "exactly-once" (dokładnie raz).

Wyjaśnijmy to krok po kroku.

1. Różnica w celach: Trwałość vs Atomowość

Najpierw musimy zrozumieć, że acks i transakcje rozwiązują dwa różne, choć powiązane, problemy:

  • acks dotyczy TRWAŁOŚCI (Durability) pojedynczej operacji zapisu.

  • Pytanie, na które odpowiada: "Czy moja pojedyncza wiadomość została bezpiecznie zapisana na brokerze (i jego replikach), aby przetrwać awarię?"

  • acks=all daje najlepszą gwarancję trwałości.

  • Transakcje dotyczą ATOMOWOŚCI (Atomicity) grupy operacji.

  • Pytanie, na które odpowiada: "Czy moja grupa operacji (np. odczytanie wiadomości, przetworzenie jej i wysłanie kilku nowych wiadomości) zakończyła się w całości sukcesem, czy w całości została wycofana?"

  • Dzięki transakcjom unikamy sytuacji, w której część operacji się udała, a część nie, pozostawiając system w niespójnym stanie.

Analogia do bazy danych:

  • acks=all jest jak fsync() w systemie plików – upewnia się, że dane fizycznie trafiły na dysk.
  • Transakcja jest jak BEGIN TRANSACTION ... COMMIT/ROLLBACK – grupuje wiele zapytań INSERT, UPDATE w jedną logiczną, niepodzielną całość.

2. Jak acks ma się do transakcji?

acks=all jest absolutnym i nienegocjowalnym WYMAGANIEM WSTĘPNYM dla transakcji w Kafce.

Inaczej mówiąc: Nie można mieć transakcji bez acks=all.

Dlaczego?

Wyobraź sobie, co by się stało, gdyby transakcje działały z acks=1:

  1. Aplikacja rozpoczyna transakcję.
  2. Wysyła wiadomość A na topic T1. Używa acks=1.
  3. Lider dla T1 zapisuje wiadomość A, odsyła ack i natychmiast ulega awarii, zanim zdążył ją zreplikować.
  4. Aplikacja wysyła wiadomość B na topic T2. Ta operacja się udaje.
  5. Aplikacja wykonuje commitTransaction(). Z jej perspektywy wszystko jest w porządku.

Rezultat: Transakcja została "zatwierdzona", ale wiadomość A zniknęła z systemu! Złamaliśmy podstawową zasadę atomowości – transakcja nie zakończyła się w całości sukcesem.

Dlatego, aby mechanizm transakcyjny był w ogóle wiarygodny, musi opierać się na najsilniejszej gwarancji trwałości dla każdej pojedynczej operacji wewnątrz transakcji. Producent skonfigurowany do pracy w trybie transakcyjnym jest automatycznie zmuszany do używania acks=all.

3. Idempotentność – drugi filar transakcji

Oprócz acks=all, transakcje w Kafce wymagają, aby producent był idempotentny (enable.idempotence=true).

  • Idempotentność gwarantuje, że ponowne wysłanie tej samej wiadomości (np. z powodu błędu sieciowego) nie spowoduje jej zduplikowania na brokerze.
  • Dlaczego jest to potrzebne w transakcjach? Mechanizm transakcyjny musi być w stanie bezpiecznie ponawiać operacje w razie problemów. Gdyby producent nie był idempotentny, ponowienie operacji commitTransaction() mogłoby spowodować zapisanie tych samych wiadomości po raz drugi.

Dobra wiadomość jest taka, że włączenie transakcji automatycznie włącza idempotentność.

4. Jak to działa w Spring for Kafka?

Spring pięknie abstahuje te skomplikowane mechanizmy. Twoja rola sprowadza się do kilku kluczowych kroków:

Krok 1: Konfiguracja Producenta

Musisz nadać swojemu producentowi unikalny transactional.id. To jest sygnał dla Springa i Kafki, że ten producent będzie używany w transakcjach.

Konfiguracja w application.yml:

spring:
kafka:
producer:
# To jest kluczowe! Musi być unikalne dla każdej instancji producenta.
transactional-id: "my-transactional-producer-" # Często dodaje się tu jakiś sufix
# Nie musisz już ustawiać acks ani enable.idempotence.
# Ustawienie transactional-id automatycznie wymusza:
# acks: all
# enable.idempotence: true
# retries: (bardzo duża liczba)
# ... i inne bezpieczne ustawienia

consumer:
# Aby konsument czytał tylko zatwierdzone (committed) wiadomości
properties:
isolation.level: read_committed
  • transactional.id: To jest najważniejsza flaga. Umożliwia brokerowi zarządzanie stanem transakcji i odzyskiwanie spójności po awarii aplikacji (tzw. "fencing zombies").
  • isolation.level: read_committed: To ustawienie dla konsumenta. Gwarantuje, że będzie on widział tylko te wiadomości, które pochodzą z pomyślnie zakończonych (zatwierdzonych) transakcji. Pominie wiadomości z transakcji, które zostały wycofane (aborted).

Krok 2: Użycie @Transactional w kodzie

Spring udostępnia adnotację @Transactional("kafkaTransactionManager"), która zarządza cyklem życia transakcji (rozpocznij, zatwierdź, wycofaj).

Najczęstszy wzorzec: "Consume-Process-Produce"

@Service
public class OrderProcessor {

private final KafkaTemplate<String, ProcessedOrder> kafkaTemplate;

public OrderProcessor(KafkaTemplate<String, ProcessedOrder> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@KafkaListener(topics = "raw-orders", groupId = "order-processor-group")
@Transactional("kafkaTransactionManager") // Magia dzieje się tutaj!
public void processOrder(ConsumerRecord<String, RawOrder> record) {
try {
RawOrder rawOrder = record.value();
System.out.println("Processing order: " + rawOrder.getId());

// 1. Logika biznesowa
if (rawOrder.getAmount() <= 0) {
// Ta sytuacja spowoduje rzucenie wyjątku i wycofanie transakcji
throw new IllegalArgumentException("Order amount must be positive.");
}

ProcessedOrder processedOrder = new ProcessedOrder(rawOrder.getId(), "PROCESSED");

// 2. Wysyłanie wiadomości wynikowej
kafkaTemplate.send("processed-orders", processedOrder);

// Jeśli metoda zakończy się bez wyjątku, Spring automatycznie zatwierdzi transakcję (commit).
// Oznacza to, że offset dla wiadomości 'raw-order' zostanie zatwierdzony
// ORAZ wiadomość na 'processed-orders' stanie się widoczna dla konsumentów.
// Te dwie operacje są ATOMOWE.

} catch (Exception e) {
System.err.println("Failed to process order. Rolling back transaction.");
// Gdy z metody zostanie rzucony wyjątek, Spring automatycznie wycofa transakcję (abort).
// Oznacza to, że offset NIE zostanie zatwierdzony (wiadomość 'raw-order' zostanie przetworzona ponownie)
// ORAZ wiadomość na 'processed-orders' NIGDY nie stanie się widoczna.
throw e; // Rzuć dalej, aby Spring mógł obsłużyć rollback
}
}
}

Podsumowanie

Cechaacks=all (bez transakcji)Transakcje w Kafce
Główny celTrwałość pojedynczej wiadomości.Atomowość grupy operacji.
Gwarancje"At-least-once". Przy włączeniu enable.idempotence gwarantuje "exactly-once" dla pojedynczej operacji zapisu na jednej partycji."Exactly-once" dla całego przepływu "consume-process-produce".
Relacjaacks=all jest warunkiem koniecznym do działania transakcji.Transakcje opierają się na acks=all i rozszerzają jego gwarancje na wiele operacji.
Konfiguracja kluczowaacks: all, enable.idempotence: truetransactional-id: <id>, isolation.level: read_committed
ZastosowanieBezpieczne wysyłanie pojedynczych, niezależnych od siebie wiadomości.Przetwarzanie strumieniowe, gdzie odczyt i zapis muszą być spójne (np. wzorzec consume-process-produce).

Mówiąc najprościej: acks to fundament, a transakcje to solidny budynek wzniesiony na tym fundamencie. Nie możesz budować bez fundamentów.