Рефакторинг слушателя и дестребутора событий

Новый способ сохранения событий
This commit is contained in:
Mark Struchkov 2019-08-01 22:41:10 +03:00
parent 2f33c2ac2c
commit 7b7fb629fa
11 changed files with 250 additions and 31 deletions

10
pom.xml
View File

@ -47,6 +47,16 @@
<artifactId>social-core</artifactId>
<version>${social.core.ver}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jpa</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
</dependencies>

View File

@ -0,0 +1,15 @@
package org.sadtech.vkbot.core.config;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
* TODO: Добавить описание класса.
*
* @author upagge [28/07/2019]
*/
@EnableJpaRepositories(basePackages = "org.sadtech.vkbot.core.repository.jpa")
@EntityScan(basePackages = "org.sadtech.vkbot.core.domain.jpa")
public class VkBotJpaConfig {
}

View File

@ -1,39 +1,41 @@
package org.sadtech.vkbot.core.distribution;
import com.google.gson.JsonObject;
import org.apache.log4j.Logger;
import org.sadtech.social.core.service.RawEventService;
import lombok.extern.slf4j.Slf4j;
import org.sadtech.vkbot.core.service.RawEventService;
// todo [upagge] [12/07/2019]: Рефакторинг - отказаться от наследования
public class EventDistributor extends AbstractBasketSubscribe<JsonObject, JsonObject> implements Runnable {
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
private static final Logger log = Logger.getLogger(EventDistributor.class);
@Slf4j
public class EventDistributor implements Runnable {
private final RawEventService rawEventService;
private final Set<AbstractBasketSubscribe> basketSubscribes;
public EventDistributor(RawEventService rawEventService) {
public EventDistributor(RawEventService rawEventService, Set<AbstractBasketSubscribe> basketSubscribes) {
this.rawEventService = rawEventService;
this.basketSubscribes = basketSubscribes;
log.info("EventDistributor инициализирован");
}
@Override
public void run() {
while (true) {
if (rawEventService.getJsonObjects().peek() != null) {
JsonObject event = rawEventService.getJsonObjects().poll();
log.info("Добавлено новое событие");
super.update(event);
}
Optional.ofNullable(rawEventService.getJsonObjects())
.ifPresent(events -> events.forEach(this::goNextSubscribe));
}
}
@Override
protected boolean check(JsonObject object) {
return false;
}
@Override
public void processing(JsonObject object) {
private boolean goNextSubscribe(JsonObject object) {
AtomicBoolean flag = new AtomicBoolean(false);
basketSubscribes.stream()
.filter(basketSubscribe -> basketSubscribe.check(object))
.forEach(basketSubscribe -> {
basketSubscribe.update(object);
flag.set(true);
});
return flag.get();
}
}

View File

@ -0,0 +1,30 @@
package org.sadtech.vkbot.core.domain.jpa;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.sadtech.social.core.domain.BasicEntity;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;
/**
* TODO: Добавить описание класса.
*
* @author upagge [28/07/2019]
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@Table(name = "event")
@NoArgsConstructor
public class JsonObjectId extends BasicEntity {
@Column(name = "json", length = 500)
private String json;
public JsonObjectId(String json) {
this.json = json;
}
}

View File

@ -7,27 +7,18 @@ import com.vk.api.sdk.exceptions.ClientException;
import com.vk.api.sdk.exceptions.LongPollServerKeyExpiredException;
import com.vk.api.sdk.objects.callback.longpoll.responses.GetLongPollEventsResponse;
import com.vk.api.sdk.objects.groups.LongPollServer;
import org.apache.log4j.Logger;
import org.sadtech.social.core.repository.impl.EventRepositoryQueue;
import org.sadtech.social.core.service.RawEventService;
import org.sadtech.social.core.service.impl.RawEventServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.sadtech.vkbot.core.config.VkConnect;
import org.sadtech.vkbot.core.service.RawEventService;
@Slf4j
public class EventListenerVk implements Runnable {
private static final Logger log = Logger.getLogger(EventListenerVk.class);
private final VkApiClient vk;
private final GroupActor actor;
private static final Integer DEFAULT_WAIT_TIME = 25;
private final RawEventService rawEventService;
public EventListenerVk(VkConnect vkConnect) {
vk = vkConnect.getVkApiClient();
actor = vkConnect.getGroupActor();
rawEventService = new RawEventServiceImpl(new EventRepositoryQueue());
}
public EventListenerVk(VkConnect vkConnect, RawEventService rawEventService) {
vk = vkConnect.getVkApiClient();
actor = vkConnect.getGroupActor();

View File

@ -0,0 +1,20 @@
package org.sadtech.vkbot.core.repository;
import com.google.gson.JsonObject;
import java.util.Set;
/**
* Обработка событий социальной сети.
*
* @author upagge [08/07/2019]
*/
public interface RawEventRepository {
void add(JsonObject dataObject);
void cleanAll();
Set<JsonObject> getEventQueue();
}

View File

@ -0,0 +1,26 @@
//package org.sadtech.vkbot.core.repository.impl;
//
//import com.google.gson.JsonObject;
//import org.sadtech.vkbot.core.repository.RawEventRepository;
//
//import java.util.Queue;
//import java.util.concurrent.ConcurrentLinkedQueue;
//
//public class RawEventRepositoryQueue implements RawEventRepository {
//
// private final Queue<JsonObject> jsonObjects = new ConcurrentLinkedQueue<>();
//
// @Override
// public void add(JsonObject jsonObject) {
// jsonObjects.offer(jsonObject);
// }
//
// @Override
// public void cleanAll() {
// jsonObjects.clear();
// }
//
// public Set<JsonObject> getEventQueue() {
// return jsonObjects;
// }
//}

View File

@ -0,0 +1,56 @@
package org.sadtech.vkbot.core.repository.impl.jpa;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.sadtech.vkbot.core.domain.jpa.JsonObjectId;
import org.sadtech.vkbot.core.repository.RawEventRepository;
import org.sadtech.vkbot.core.repository.jpa.RawEventRepositoryJpa;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* TODO: Добавить описание класса.
*
* @author upagge [28/07/2019]
*/
public class RawEventRepositoryJpaImpl implements RawEventRepository {
private final RawEventRepositoryJpa rawEventRepositoryJpa;
private final Gson gson = new Gson();
private boolean addFlag = false;
public RawEventRepositoryJpaImpl(RawEventRepositoryJpa rawEventRepositoryJpa) {
this.rawEventRepositoryJpa = rawEventRepositoryJpa;
}
@Override
public void add(JsonObject dataObject) {
System.out.println(dataObject.toString());
rawEventRepositoryJpa.saveAndFlush(new JsonObjectId(dataObject.toString()));
addFlag = true;
}
@Override
public void cleanAll() {
rawEventRepositoryJpa.deleteAll();
}
@Override
public Set<JsonObject> getEventQueue() {
if (addFlag) {
List<JsonObjectId> allEvent = rawEventRepositoryJpa.findAll();
rawEventRepositoryJpa.deleteAll(allEvent);
addFlag = false;
return allEvent.stream().map(jsonObjectId -> convert(jsonObjectId.getJson())).collect(Collectors.toSet());
} else {
return null;
}
}
private JsonObject convert(String json) {
return gson.fromJson(json, JsonObject.class);
}
}

View File

@ -0,0 +1,15 @@
package org.sadtech.vkbot.core.repository.jpa;
import org.sadtech.vkbot.core.domain.jpa.JsonObjectId;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
/**
* TODO: Добавить описание интерфейса.
*
* @author upagge [28/07/2019]
*/
@Repository
public interface RawEventRepositoryJpa extends JpaRepository<JsonObjectId, Integer> {
}

View File

@ -0,0 +1,20 @@
package org.sadtech.vkbot.core.service;
import com.google.gson.JsonObject;
import java.util.Set;
/**
* Интерфейс для взаимодействия с событиями социальной сети.
*
* @author upagge [08/07/2019]
*/
public interface RawEventService {
void cleanAll();
void add(JsonObject jsonObject);
Set<JsonObject> getJsonObjects();
}

View File

@ -0,0 +1,34 @@
package org.sadtech.vkbot.core.service.impl;
import com.google.gson.JsonObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.sadtech.vkbot.core.repository.RawEventRepository;
import org.sadtech.vkbot.core.service.RawEventService;
import java.util.Set;
@Slf4j
@RequiredArgsConstructor
public class RawEventServiceImpl implements RawEventService {
private final RawEventRepository rawEventRepository;
@Override
public void cleanAll() {
rawEventRepository.cleanAll();
log.info("Репозиторий событий очищен");
}
@Override
public void add(JsonObject jsonObject) {
rawEventRepository.add(jsonObject);
log.info("Событие отправленно в репозиторий");
}
@Override
public Set<JsonObject> getJsonObjects() {
return rawEventRepository.getEventQueue();
}
}