This commit is contained in:
Struchkov Mark 2022-08-07 12:24:55 +03:00
parent 52ab41e68e
commit 84a83cd574
5 changed files with 25 additions and 13 deletions

View File

@ -21,7 +21,7 @@ public class KafkaMessageGenerator {
public void start(@Observes StartupEvent ev) { public void start(@Observes StartupEvent ev) {
for (int i = 0; i < 20; i++) { for (int i = 0; i < 50; i++) {
final KafkaMessage kafkaMessage = new KafkaMessage(); final KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setCount(i); kafkaMessage.setCount(i);
telegramChannel.sendAndAwait(kafkaMessage); telegramChannel.sendAndAwait(kafkaMessage);

View File

@ -0,0 +1,21 @@
package dev.struchkov.example.quarkus.kafka;
import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.hibernate.reactive.mutiny.Mutiny;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RequiredArgsConstructor
public class EntityRepositoryImpl {
private final Mutiny.SessionFactory factory;
public Uni<EntityForDb> save(EntityForDb entityForDb) {
return factory.withTransaction(
session -> session.merge(entityForDb)
);
}
}

View File

@ -10,14 +10,14 @@ import javax.enterprise.context.ApplicationScoped;
@RequiredArgsConstructor @RequiredArgsConstructor
public class KafkaHandler { public class KafkaHandler {
private final PanacheRepositoryImpl panacheRepository; private final EntityRepositoryImpl panacheRepository;
@Incoming("test") @Incoming("test")
public Uni<Void> handle(KafkaMessage message) { public Uni<Void> handle(KafkaMessage message) {
System.out.println("Получено сообщение " + message); System.out.println("Получено сообщение " + message);
final EntityForDb entityForDb = new EntityForDb(); final EntityForDb entityForDb = new EntityForDb();
entityForDb.setCount(message.getCount()); entityForDb.setCount(message.getCount());
return panacheRepository.persistAndFlush(entityForDb).replaceWithVoid(); return panacheRepository.save(entityForDb).replaceWithVoid();
} }
} }

View File

@ -1,10 +0,0 @@
package dev.struchkov.example.quarkus.kafka;
import io.quarkus.hibernate.reactive.panache.PanacheRepository;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PanacheRepositoryImpl implements PanacheRepository<EntityForDb> {
}

View File

@ -6,6 +6,7 @@ quarkus:
jdbc: false jdbc: false
reactive: reactive:
url: postgresql://localhost:5432/quarkus_test url: postgresql://localhost:5432/quarkus_test
max-size: 20
hibernate-orm: hibernate-orm:
database: database:
generation: update generation: update