Skip to main content

Spring Data integration with PostgreSQL Queue

Oczywiście! Użycie PostgreSQL jako kolejki zadań to bardzo popularny i solidny wzorzec, zwłaszcza gdy nie chcemy wprowadzać do naszej architektury dodatkowego, dedykowanego systemu kolejkowego (jak RabbitMQ czy Kafka). Spring Data JPA świetnie się do tego nadaje.

Poniżej znajdziesz szczegółowe wyjaśnienie wraz z przykładami kodu.

Jak zaimplementować kolejkę w PostgreSQL przy użyciu Spring JPA?

Kluczem do wydajnej i bezpiecznej implementacji kolejki w PostgreSQL jest polecenie SELECT ... FOR UPDATE SKIP LOCKED.

  • FOR UPDATE: Blokuje odczytany wiersz na czas trwania bieżącej transakcji. Inne transakcje nie mogą go modyfikować ani usunąć, dopóki nasza transakcja się не zakończy (przez commit lub rollback).
  • SKIP LOCKED: To jest "magia" współbieżności. Jeśli SELECT natrafi na wiersz już zablokowany przez inną transakcję (innego "konsumenta" kolejki), nie będzie na niego czekać. Zamiast tego po prostu go pominie i spróbuje znaleźć następny wolny wiersz. Dzięki temu wielu konsumentów może pracować na tej samej tabeli bez wzajemnego blokowania się.

Oto implementacja krok po kroku:

Krok 1: Tabela w bazie i encja JPA

Najpierw potrzebujemy tabeli, która będzie przechowywać nasze zadania.

SQL (dla przykładu):

CREATE TYPE job_status AS ENUM ('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED');

CREATE TABLE queue_jobs (
id BIGSERIAL PRIMARY KEY,
payload TEXT NOT NULL,
status job_status NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
retries INT NOT NULL DEFAULT 0
);

-- Indeks na statusie i dacie utworzenia przyspieszy wyszukiwanie zadań
CREATE INDEX idx_queue_jobs_pending ON queue_jobs (status, created_at) WHERE status = 'PENDING';

Encja JPA (QueueJob.java):

import jakarta.persistence.*;
import java.time.Instant;

@Entity
@Table(name = "queue_jobs")
public class QueueJob {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(nullable = false, columnDefinition = "TEXT")
private String payload; // np. JSON z danymi do przetworzenia

@Enumerated(EnumType.STRING)
@Column(nullable = false, columnDefinition = "job_status")
private Status status;

@Column(name = "created_at", nullable = false, updatable = false)
private Instant createdAt = Instant.now();

@Column(name = "updated_at", nullable = false)
private Instant updatedAt = Instant.now();

@Column(nullable = false)
private int retries = 0;

public enum Status {
PENDING, PROCESSING, COMPLETED, FAILED
}

// Gettery, Settery, konstruktory...
}

Krok 2: Repozytorium JPA z natywnym zapytaniem

Ponieważ FOR UPDATE SKIP LOCKED jest specyficzne dla PostgreSQL i nie jest częścią standardu JPQL, musimy użyć zapytania natywnego.

QueueJobRepository.java:

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;

import java.util.Optional;

@Repository
public interface QueueJobRepository extends JpaRepository<QueueJob, Long> {

// To zapytanie jest sercem naszego mechanizmu kolejkowego.
// 1. Wybiera zadania o statusie PENDING.
// 2. Sortuje je, by brać najstarsze najpierw (FIFO).
// 3. LIMIT 1 - bierze tylko jedno zadanie.
// 4. FOR UPDATE SKIP LOCKED - blokuje ten jeden wiersz do końca transakcji i pomija te,
// które są już zablokowane przez innych konsumentów.
@Query(value = "SELECT * FROM queue_jobs WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED", nativeQuery = true)
Optional<QueueJob> findNextPendingJob();
}

Krok 3: Producent (dodawanie zadań do kolejki)

To prosty serwis, który zapisuje nowe zadania w bazie.

JobProducerService.java:

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class JobProducerService {

private final QueueJobRepository repository;

public JobProducerService(QueueJobRepository repository) {
this.repository = repository;
}

// Prosta metoda do dodawania zadań
@Transactional
public void enqueueJob(String payload) {
QueueJob newJob = new QueueJob();
newJob.setPayload(payload);
newJob.setStatus(QueueJob.Status.PENDING);
repository.save(newJob);
}
}

Krok 4: Konsument (przetwarzanie zadań z kolejki)

To najważniejszy element. Będzie okresowo sprawdzał, czy są nowe zadania do przetworzenia. Użyjemy do tego adnotacji @Scheduled.

JobConsumerService.java:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.Optional;

@Service
public class JobConsumerService {

private static final Logger log = LoggerFactory.getLogger(JobConsumerService.class);
private final QueueJobRepository repository;
// Wstrzyknij serwis, który faktycznie wykonuje logikę biznesową
private final BusinessLogicService businessLogicService;

public JobConsumerService(QueueJobRepository repository, BusinessLogicService businessLogicService) {
this.repository = repository;
this.businessLogicService = businessLogicService;
}

// Uruchamia się co 5 sekund.
@Scheduled(fixedDelay = 5000)
@Transactional // Ta adnotacja jest KRYTYCZNA!
public void processNextJob() {
Optional<QueueJob> jobOptional = repository.findNextPendingJob();

if (jobOptional.isEmpty()) {
// Brak zadań do przetworzenia, kończymy.
return;
}

QueueJob job = jobOptional.get();
log.info("Processing job ID: {}", job.getId());

// Oznaczamy zadanie jako przetwarzane.
// Dzięki transakcji, ta zmiana będzie widoczna dopiero po commicie.
// Blokada `FOR UPDATE` nadal obowiązuje.
job.setStatus(QueueJob.Status.PROCESSING);
job.setUpdatedAt(Instant.now());
repository.save(job); // Zapisujemy, aby zmiana statusu była częścią transakcji

try {
// Tutaj wykonujemy faktyczną logikę biznesową
businessLogicService.process(job.getPayload());

// Sukces: oznaczamy zadanie jako zakończone
job.setStatus(QueueJob.Status.COMPLETED);
log.info("Job ID: {} completed successfully.", job.getId());

} catch (Exception e) {
// Błąd: oznaczamy jako FAILED
log.error("Job ID: {} failed. Error: {}", job.getId(), e.getMessage());
job.setStatus(QueueJob.Status.FAILED);
job.setRetries(job.getRetries() + 1);
} finally {
// Zapisujemy ostateczny stan (COMPLETED lub FAILED)
job.setUpdatedAt(Instant.now());
repository.save(job);
}
// Koniec metody -> transakcja jest zatwierdzana (commit).
// Blokada na wierszu jest zwalniana.
}
}

Pamiętaj, aby włączyć obsługę @Scheduled w głównej klasie aplikacji przez dodanie adnotacji @EnableScheduling.


Czy kolejka usuwa elementy z bazy po przetworzeniu?

To zależy wyłącznie od Twojej implementacji. Masz trzy główne podejścia:

  1. Aktualizacja statusu (zalecane): Tak jak w przykładzie powyżej. Zmieniasz status na COMPLETED lub FAILED.
  • Zalety: Zachowujesz pełną historię wykonania zadań, co jest bezcenne do audytu, debugowania i analizy. Możesz łatwo zaimplementować mechanizm ponawiania (retries).
  • Wady: Tabela z zadaniami rośnie w nieskończoność. Wymaga to strategii archiwizacji lub czyszczenia starych zadań (np. usuwania zadań starszych niż 30 dni).
  1. Usuwanie elementu: Po pomyślnym przetworzeniu zadania, zamiast aktualizować status, po prostu je usuwasz: