Skip to main content

Retry Topics - non blocking retries

Doskonale, przechodzimy do jednego z najpotężniejszych i najbardziej użytecznych mechanizmów w Spring for Kafka: Non-Blocking Retries (ponowienia nieblokujące). To nowoczesne podejście do obsługi błędów, które rozwiązuje wiele problemów klasycznego, blokującego ponawiania.

Czym są i dlaczego są lepsze niż blokujące ponowienia?

Problem z klasycznymi (blokującymi) ponowieniami: Wyobraź sobie, że w metodzie @KafkaListener masz pętlę try-catch z Thread.sleep().

  1. Odbierasz wiadomość A.
  2. Przetwarzanie kończy się błędem (np. niedostępne API).
  3. Twój kod czeka 5 sekund (Thread.sleep(5000)).
  4. Próbuje ponownie. W tym czasie wątek konsumenta jest zablokowany. Nie może on przetwarzać żadnych innych wiadomości, nawet tych, które przyszły po wiadomości A na tej samej partycji i mogłyby być przetworzone pomyślnie. To prowadzi do zatrzymania całego przetwarzania na partycji i jest bardzo nieefektywne.

Rozwiązanie: Non-Blocking Retries Mechanizm ten działa zupełnie inaczej. Zamiast blokować wątek, robi coś znacznie sprytniejszego:

  1. Odbierasz wiadomość A.
  2. Przetwarzanie kończy się błędem.
  3. Zamiast czekać, Spring Kafka przechwytuje wyjątek i wysyła wiadomość A na specjalny, dedykowany topic ponowień (retry topic), dodając do niej informację, kiedy powinna być przetworzona następnym razem.
  4. Wątek konsumenta jest natychmiast wolny i może pobrać kolejną wiadomość (np. B) z oryginalnego topicu.
  5. Osobny listener nasłuchuje na topicu ponowień. Gdy nadejdzie czas (timestamp wiadomości), pobiera wiadomość A i próbuje ją przetworzyć ponownie.

To jest "nieblokujące", ponieważ główny wątek konsumenta nigdy nie czeka.


Implementacja Twojego scenariusza krok po kroku

Scenariusz:

  1. Obsługa wiadomości CloudUploadRequest.
  2. Ponowienie tylko w przypadku SlowCloudUploadException.
  3. Maksymalnie 3 próby.
  4. Opóźnienia: 5s, 20s, 1 minuta.
  5. Po nieudanych próbach wiadomość trafia do Dead Letter Topic (DLT).
  6. Listener DLT tworzy zdarzenie CancelOperationEvent, aby anulować sagę.

Krok 1: Zdefiniowanie wyjątków i zdarzeń

// Wyjątek, który chcemy ponawiać
public class SlowCloudUploadException extends RuntimeException {
public SlowCloudUploadException(String message) {
super(message);
}
}

// Wyjątek, który NIE powinien być ponawiany (od razu do DLT)
public class InvalidFileFormatException extends RuntimeException {
public InvalidFileFormatException(String message) {
super(message);
}
}

// Model wiadomości przychodzącej
public class CloudUploadRequest {
private String fileId;
private String content;
// getters, setters, constructor...
}

// Zdarzenie anulujące sagę
public class CancelOperationEvent {
private String fileId;
private String reason;
// getters, setters, constructor...
}

Krok 2: Konfiguracja w kodzie (@RetryableTopic)

To najprostszy i najbardziej deklaratywny sposób konfiguracji. Używamy adnotacji @RetryableTopic bezpośrednio na metodzie listenera.

@Service
public class CloudUploadProcessor {

private final KafkaTemplate<String, Object> kafkaTemplate;

// Symulacja zawodnego serwisu
private int attemptCounter = 0;

public CloudUploadProcessor(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@RetryableTopic(
// Opóźnienia w milisekundach: 5s, 20s, 60s
backoff = @Backoff(delay = 5000, multiplier = 4.0, maxDelay = 60000),
// Liczba prób = 1 (pierwsza) + 2 (ponowienia) = 3
attempts = "3",
// Ponawiaj TYLKO dla tego wyjątku
include = { SlowCloudUploadException.class },
// Można też wykluczyć inne: exclude = { InvalidFileFormatException.class }
// Włącza mechanizm DLT
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(id = "upload-processor", topics = "cloud-uploads")
public void handleUpload(CloudUploadRequest request) {
System.out.println("Attempting to upload file: " + request.getFileId());

// Symulacja błędu walidacji (nie będzie ponawiany)
if (request.getContent() == null || request.getContent().isEmpty()) {
throw new InvalidFileFormatException("File content is empty for fileId: " + request.getFileId());
}

// Symulacja niestabilnego serwisu chmurowego
attemptCounter++;
if (attemptCounter <= 3) {
System.err.println("Cloud service is slow, throwing SlowCloudUploadException. Attempt: " + attemptCounter);
throw new SlowCloudUploadException("Upload failed for fileId: " + request.getFileId());
}

// Symulacja sukcesu po kilku próbach (ten kod nie zostanie osiągnięty w naszym przykładzie)
System.out.println("File " + request.getFileId() + " uploaded successfully!");
attemptCounter = 0; // Reset for next message
}

// Listener dla Dead Letter Topic
// Domyślna nazwa DLT to <nazwa-topicu>.dlt
@DltHandler
public void handleDlt(CloudUploadRequest failedRequest,
@Header(KafkaHeaders.RECEIVED_TOPIC) String originalTopic,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {

System.err.println("--- DLT Handler ---");
System.err.println("Received failed message from topic: " + originalTopic);
System.err.println("Failed to upload file: " + failedRequest.getFileId());
System.err.println("Reason: " + exceptionMessage);

// Tworzymy event anulujący sagę
CancelOperationEvent cancelEvent = new CancelOperationEvent(
failedRequest.getFileId(),
"Upload failed after multiple retries."
);

// Wysyłamy event na odpowiedni topic
System.out.println("Publishing CancelOperationEvent for saga compensation...");
kafkaTemplate.send("saga-cancellation-events", cancelEvent);
}
}

Analiza adnotacji @RetryableTopic

  • backoff = @Backoff(...): Definiuje strategię opóźnień.
  • delay = 5000: Pierwsze opóźnienie to 5 sekund.
  • multiplier = 4.0: Każde kolejne opóźnienie jest mnożone przez ten czynnik (5s * 4 = 20s, 20s * 4 = 80s, ale...).
  • maxDelay = 60000: ...ale opóźnienie nigdy nie przekroczy 60 sekund. Więc sekwencja to 5s, 20s, 60s. Idealnie pasuje do Twoich wymagań.
  • attempts = "3": Maksymalna liczba prób (pierwsza + ponowienia).
  • include = { SlowCloudUploadException.class }: Lista wyjątków, które mają uruchomić mechanizm ponowień. Jeśli zostanie rzucony jakikolwiek inny wyjątek (np. InvalidFileFormatException), wiadomość od razu trafi do DLT (lub spowoduje błąd, w zależności od konfiguracji).
  • dltStrategy = DltStrategy.FAIL_ON_ERROR: Ta strategia oznacza, że po wyczerpaniu prób wiadomość jest wysyłana do DLT. Inne opcje to np. ALWAYS_RETRY_ON_ERROR (wiadomość DLT jest też ponawiana).
  • @DltHandler: Magiczna adnotacja Springa. Oznacza metodę, która ma być wywołana dla wiadomości z DLT powiązanego z tym listenerem. Spring automatycznie kieruje tu wiadomości z odpowiedniego topicu DLT.

Co się dzieje pod spodem?

  1. Spring widzi adnotację @RetryableTopic i automatycznie tworzy dodatkowe topici:
  • cloud-uploads-retry-0 (lub podobna nazwa, z opóźnieniem 5s)
  • cloud-uploads-retry-1 (z opóźnieniem 20s)
  • cloud-uploads-retry-2 (z opóźnieniem 60s)
  • cloud-uploads-dlt (Dead Letter Topic)
  • Uwaga: od nowszych wersji Spring Kafka (3.0+), domyślnie tworzony jest jeden topic retry z partycjami per opóźnienie, co jest bardziej efektywne.
  1. Tworzy również odpowiednich konsumentów dla tych topiców.

Przepływ wiadomości:

  1. Wiadomość trafia na cloud-uploads.
  2. handleUpload rzuca SlowCloudUploadException.
  3. Spring wysyła wiadomość na topic cloud-uploads-retry-0.
  4. Po 5 sekundach konsument topicu retry pobiera ją i przekazuje z powrotem do handleUpload.
  5. Proces się powtarza z opóźnieniem 20s (trafia na cloud-uploads-retry-1).
  6. Proces się powtarza z opóźnieniem 60s (trafia na cloud-uploads-retry-2).
  7. handleUpload po raz trzeci rzuca wyjątek. Liczba prób została wyczerpana.
  8. Spring wysyła wiadomość na cloud-uploads-dlt.
  9. Listener oznaczony @DltHandler (handleDlt) zostaje wywołany.
  10. handleDlt publikuje CancelOperationEvent, co pozwala innym częściom systemu zareagować i wycofać zmiany w ramach sagi.

Ten mechanizm jest niezwykle potężny, bo pozwala w sposób deklaratywny i czysty zaimplementować skomplikowaną logikę obsługi błędów, która jest odporna, wydajna i nie blokuje przetwarzania.