Ускорил обработку новых пайплайнов с помощью ForkJoinPool

This commit is contained in:
Struchkov Mark 2022-12-16 23:26:45 +03:00
parent aa102acf8d
commit 77d9e6aba4
13 changed files with 317 additions and 94 deletions

View File

@ -8,6 +8,7 @@ import lombok.NonNull;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.List;
import java.util.Set;
/**
@ -19,6 +20,8 @@ public interface PipelineService {
Pipeline create(@NonNull Pipeline pipeline);
List<Pipeline> createAll(@NonNull List<Pipeline> newPipelines);
Pipeline update(@NonNull Pipeline pipeline);
Page<Pipeline> getAllByStatuses(@NonNull Set<PipelineStatus> statuses, @NonNull Pageable pagination);

View File

@ -0,0 +1,16 @@
package dev.struchkov.bot.gitlab.core.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ForkJoinPool;
@Configuration
public class CoreConfig {
@Bean("parserPool")
public ForkJoinPool parserPool() {
return new ForkJoinPool(4);
}
}

View File

@ -39,6 +39,8 @@ public class PipelineJsonConverter implements Converter<PipelineJson, Pipeline>
pipeline.setWebUrl(source.getWebUrl());
pipeline.setStatus(convertStatus(source.getStatus()));
pipeline.setPerson(convertPerson.convert(source.getUser()));
pipeline.setProjectId(source.getProjectId());
return pipeline;
}

View File

@ -55,6 +55,14 @@ public class PipelineServiceImpl implements PipelineService {
return newPipeline;
}
@Override
@Transactional
public List<Pipeline> createAll(@NonNull List<Pipeline> newPipelines) {
return newPipelines.stream()
.map(this::create)
.collect(Collectors.toList());
}
private void notifyNewPipeline(Pipeline pipeline, String oldStatus) {
if (isNeedNotifyNewPipeline(pipeline)) {
notifyService.send(

View File

@ -9,27 +9,23 @@ import dev.struchkov.bot.gitlab.context.service.MergeRequestsService;
import dev.struchkov.bot.gitlab.context.service.ProjectService;
import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty;
import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetMergeRequestFromGitlab;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetMergeRequestTask;
import dev.struchkov.bot.gitlab.core.utils.PoolUtils;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.CommitJson;
import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.convert.ConversionService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
@ -41,7 +37,6 @@ import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j
@Service
@RequiredArgsConstructor
public class MergeRequestParser {
private static final Set<MergeRequestState> OLD_STATUSES = Set.of(
@ -54,16 +49,22 @@ public class MergeRequestParser {
private final ConversionService conversionService;
private final PersonProperty personProperty;
private ForkJoinPool forkJoinPool;
private final ForkJoinPool forkJoinPool;
@PreDestroy
public void preDestroy() {
forkJoinPool.shutdown();
}
@PostConstruct
public void postConstruct() {
forkJoinPool = new ForkJoinPool(4);
public MergeRequestParser(
GitlabProperty gitlabProperty,
MergeRequestsService mergeRequestsService,
ProjectService projectService,
ConversionService conversionService,
PersonProperty personProperty,
@Qualifier("parserPool") ForkJoinPool forkJoinPool
) {
this.gitlabProperty = gitlabProperty;
this.mergeRequestsService = mergeRequestsService;
this.projectService = projectService;
this.conversionService = conversionService;
this.personProperty = personProperty;
this.forkJoinPool = forkJoinPool;
}
public void parsingOldMergeRequest() {
@ -129,30 +130,11 @@ public class MergeRequestParser {
*/
private List<MergeRequestJson> getMergeRequests(Set<Long> projectIds) {
final List<ForkJoinTask<List<MergeRequestJson>>> tasks = projectIds.stream()
.map(projectId -> new GetMergeRequestFromGitlab(projectId, gitlabProperty.getUrlPullRequestOpen(), personProperty.getToken()))
.map(projectId -> new GetMergeRequestTask(projectId, gitlabProperty.getUrlPullRequestOpen(), personProperty.getToken()))
.map(forkJoinPool::submit)
.collect(Collectors.toList());
final List<MergeRequestJson> mergeRequestJsons = new ArrayList<>();
Iterator<ForkJoinTask<List<MergeRequestJson>>> iterator = tasks.iterator();
while (!tasks.isEmpty()) {
while (iterator.hasNext()) {
final ForkJoinTask<List<MergeRequestJson>> task = iterator.next();
if (task.isDone()) {
final List<MergeRequestJson> jsons;
try {
jsons = task.get();
mergeRequestJsons.addAll(jsons);
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
iterator.remove();
}
}
iterator = tasks.iterator();
}
return mergeRequestJsons;
return PoolUtils.pullTaskResults(tasks);
}
private static void personMapping(List<MergeRequest> newMergeRequests) {

View File

@ -7,11 +7,14 @@ import dev.struchkov.bot.gitlab.context.service.PipelineService;
import dev.struchkov.bot.gitlab.context.service.ProjectService;
import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty;
import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetPipelineShortTask;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetPipelineTask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.PipelineJson;
import dev.struchkov.bot.gitlab.sdk.domain.PipelineShortJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
@ -20,7 +23,10 @@ import org.springframework.stereotype.Service;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import static dev.struchkov.bot.gitlab.context.domain.PipelineStatus.CREATED;
@ -29,6 +35,8 @@ import static dev.struchkov.bot.gitlab.context.domain.PipelineStatus.PENDING;
import static dev.struchkov.bot.gitlab.context.domain.PipelineStatus.PREPARING;
import static dev.struchkov.bot.gitlab.context.domain.PipelineStatus.RUNNING;
import static dev.struchkov.bot.gitlab.context.domain.PipelineStatus.WAITING_FOR_RESOURCE;
import static dev.struchkov.bot.gitlab.core.utils.PoolUtils.pullTaskResult;
import static dev.struchkov.bot.gitlab.core.utils.PoolUtils.pullTaskResults;
import static dev.struchkov.haiti.context.exception.ConvertException.convertException;
import static dev.struchkov.haiti.utils.Checker.checkNotEmpty;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@ -40,82 +48,94 @@ import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PipelineParser {
public static final Integer COUNT = 100;
private static final Set<PipelineStatus> oldStatus = Set.of(
CREATED, WAITING_FOR_RESOURCE, PREPARING, PENDING, RUNNING, MANUAL
);
private final PipelineService pipelineService;
private final ProjectService projectService;
private final GitlabProperty gitlabProperty;
private final PersonProperty personProperty;
private final ConversionService conversionService;
private final ForkJoinPool forkJoinPool;
private LocalDateTime lastUpdate = LocalDateTime.now();
public PipelineParser(
PipelineService pipelineService,
ProjectService projectService,
GitlabProperty gitlabProperty,
PersonProperty personProperty,
ConversionService conversionService,
@Qualifier("parserPool") ForkJoinPool forkJoinPool
) {
this.pipelineService = pipelineService;
this.projectService = projectService;
this.gitlabProperty = gitlabProperty;
this.personProperty = personProperty;
this.conversionService = conversionService;
this.forkJoinPool = forkJoinPool;
}
public void scanNewPipeline() {
log.debug("Старт обработки новых папйплайнов");
int page = 0;
log.debug("Старт обработки новых пайплайнов");
final Set<Long> projectIds = projectService.getAllIds();
for (Long projectId : projectIds) {
processingProject(projectId);
}
final Map<Long, Long> pipelineProjectMap = getPipelineShortJsons(projectIds).stream()
.collect(Collectors.toMap(PipelineShortJson::getId, PipelineShortJson::getProjectId));
log.debug("Конец обработки новых папйплайнов");
}
private void processingProject(Long projectId) {
int page = 1;
LocalDateTime newLastUpdate = LocalDateTime.now();
List<PipelineJson> pipelineJsons = getPipelineJsons(projectId, page, lastUpdate);
while (checkNotEmpty(pipelineJsons)) {
final Set<Long> jsonIds = pipelineJsons.stream()
.map(PipelineJson::getId)
.collect(Collectors.toSet());
final ExistContainer<Pipeline, Long> existContainer = pipelineService.existsById(jsonIds);
if (!pipelineProjectMap.isEmpty()) {
final ExistContainer<Pipeline, Long> existContainer = pipelineService.existsById(pipelineProjectMap.keySet());
if (!existContainer.isAllFound()) {
final Set<Long> idsNotFound = existContainer.getIdNoFound();
for (Long newId : idsNotFound) {
final Pipeline newPipeline = HttpParse.request(
MessageFormat.format(gitlabProperty.getUrlPipeline(), projectId, newId)
)
.header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, personProperty.getToken())
.execute(PipelineJson.class)
.map(json -> {
final Pipeline pipeline = conversionService.convert(json, Pipeline.class);
pipeline.setProjectId(projectId);
return pipeline;
})
.orElseThrow(convertException("Ошибка обновления Pipelines"));
pipelineService.create(newPipeline);
}
final List<ForkJoinTask<PipelineJson>> tasks = idsNotFound.stream()
.map(pipelineId -> new GetPipelineTask(
gitlabProperty.getUrlPipeline(),
pipelineProjectMap.get(pipelineId),
pipelineId,
personProperty.getToken()
))
.map(forkJoinPool::submit)
.collect(Collectors.toList());
final List<PipelineJson> pipelineJsons = pullTaskResult(tasks);
if (checkNotEmpty(pipelineJsons)) {
final List<Pipeline> newPipelines = pipelineJsons.stream()
.map(json -> conversionService.convert(json, Pipeline.class))
.collect(Collectors.toList());
pipelineService.createAll(newPipelines);
}
}
pipelineJsons = getPipelineJsons(projectId, ++page, lastUpdate);
}
lastUpdate = newLastUpdate;
log.debug("Конец обработки новых пайплайнов");
}
private List<PipelineJson> getPipelineJsons(Long projectId, int page, LocalDateTime afterUpdate) {
return HttpParse.request(MessageFormat.format(gitlabProperty.getUrlPipelines(), projectId, page))
.header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, personProperty.getToken())
.getParameter("updated_after", afterUpdate.minusHours(12L).toString())
.executeList(PipelineJson.class);
private List<PipelineShortJson> getPipelineShortJsons(Set<Long> projectIds) {
LocalDateTime newLastUpdate = LocalDateTime.now();
final List<ForkJoinTask<List<PipelineShortJson>>> tasks = projectIds.stream()
.map(projectId -> new GetPipelineShortTask(
gitlabProperty.getUrlPipelines(),
projectId,
lastUpdate,
personProperty.getToken()
))
.map(forkJoinPool::submit)
.collect(Collectors.toList());
final List<PipelineShortJson> pipelineJsons = pullTaskResults(tasks);
lastUpdate = newLastUpdate;
return pipelineJsons;
}
public void scanOldPipeline() {
log.debug("Старт обработки старых папйплайнов");
int page = 0;

View File

@ -16,30 +16,29 @@ import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class GetMergeRequestFromGitlab extends RecursiveTask<List<MergeRequestJson>> {
public class GetMergeRequestTask extends RecursiveTask<List<MergeRequestJson>> {
private static final int PAGE_COUNT = 100;
private final long projectId;
private int pageNumber = 0;
private final String urlMrOpen;
private final String personToken;
private final String gitlabToken;
@Override
protected List<MergeRequestJson> compute() {
final List<MergeRequestJson> mergeRequestJsons = getMergeRequestJsons(urlMrOpen, projectId, pageNumber, personToken);
final List<MergeRequestJson> mergeRequestJsons = getMergeRequestJsons();
if (mergeRequestJsons.size() == PAGE_COUNT) {
final GetMergeRequestFromGitlab newTask = new GetMergeRequestFromGitlab(projectId, pageNumber + 1, urlMrOpen, personToken);
final GetMergeRequestTask newTask = new GetMergeRequestTask(projectId, pageNumber + 1, urlMrOpen, gitlabToken);
newTask.fork();
final List<MergeRequestJson> result = newTask.join();
mergeRequestJsons.addAll(result);
mergeRequestJsons.addAll(newTask.join());
}
return mergeRequestJsons;
}
private List<MergeRequestJson> getMergeRequestJsons(String url, Long projectId, int page, String personToken) {
final List<MergeRequestJson> jsons = HttpParse.request(MessageFormat.format(url, projectId, page, PAGE_COUNT))
.header(StringUtils.H_PRIVATE_TOKEN, personToken)
private List<MergeRequestJson> getMergeRequestJsons() {
final List<MergeRequestJson> jsons = HttpParse.request(MessageFormat.format(urlMrOpen, projectId, pageNumber, PAGE_COUNT))
.header(StringUtils.H_PRIVATE_TOKEN, gitlabToken)
.header(ACCEPT)
.executeList(MergeRequestJson.class);
log.trace("Получено {} шт потенциально новых MR для проекта id:'{}' ", jsons.size(), projectId);

View File

@ -0,0 +1,51 @@
package dev.struchkov.bot.gitlab.core.service.parser.forktask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.PipelineShortJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class GetPipelineShortTask extends RecursiveTask<List<PipelineShortJson>> {
private static final int PAGE_COUNT = 100;
private final String urlPipelines;
private final long projectId;
private int pageNumber = 0;
private final LocalDateTime lastUpdate;
private final String gitlabToken;
@Override
protected List<PipelineShortJson> compute() {
final List<PipelineShortJson> jsons = getPipelineJsons();
if (jsons.size() == PAGE_COUNT) {
final GetPipelineShortTask newTask = new GetPipelineShortTask(urlPipelines, projectId, pageNumber + 1, lastUpdate, gitlabToken);
newTask.fork();
jsons.addAll(newTask.join());
}
jsons.forEach(pipelineJson -> pipelineJson.setProjectId(projectId));
return jsons;
}
private List<PipelineShortJson> getPipelineJsons() {
final List<PipelineShortJson> jsons = HttpParse.request(MessageFormat.format(urlPipelines, projectId, pageNumber, PAGE_COUNT))
.header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, gitlabToken)
.getParameter("updated_after", lastUpdate.minusHours(12L).toString())
.executeList(PipelineShortJson.class);
log.trace("Получено {} шт потенциально новых пайплайнов для проекта id:'{}' ", jsons.size(), projectId);
return jsons;
}
}

View File

@ -0,0 +1,37 @@
package dev.struchkov.bot.gitlab.core.service.parser.forktask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.PipelineJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.util.concurrent.RecursiveTask;
import static dev.struchkov.haiti.context.exception.ConvertException.convertException;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j
@RequiredArgsConstructor
public class GetPipelineTask extends RecursiveTask<PipelineJson> {
private final String urlPipeline;
private final long projectId;
private final long pipelineId;
private final String gitlabToken;
@Override
protected PipelineJson compute() {
final PipelineJson pipelineJson = HttpParse.request(
MessageFormat.format(urlPipeline, projectId, pipelineId)
)
.header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, gitlabToken)
.execute(PipelineJson.class)
.orElseThrow(convertException("Ошибка обновления Pipelines"));
pipelineJson.setProjectId(projectId);
return pipelineJson;
}
}

View File

@ -0,0 +1,60 @@
package dev.struchkov.bot.gitlab.core.utils;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinTask;
@Slf4j
@UtilityClass
public class PoolUtils {
public static <T> List<T> pullTaskResults(List<ForkJoinTask<List<T>>> tasks) {
final List<T> results = new ArrayList<>();
Iterator<ForkJoinTask<List<T>>> iterator = tasks.iterator();
while (!tasks.isEmpty()) {
while (iterator.hasNext()) {
final ForkJoinTask<List<T>> task = iterator.next();
if (task.isDone()) {
final List<T> jsons;
try {
jsons = task.get();
results.addAll(jsons);
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
iterator.remove();
}
}
iterator = tasks.iterator();
}
return results;
}
public static <T> List<T> pullTaskResult(List<ForkJoinTask<T>> tasks) {
final List<T> results = new ArrayList<>();
Iterator<ForkJoinTask<T>> iterator = tasks.iterator();
while (!tasks.isEmpty()) {
while (iterator.hasNext()) {
final ForkJoinTask<T> task = iterator.next();
if (task.isDone()) {
try {
results.add(task.get());
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
iterator.remove();
}
}
iterator = tasks.iterator();
}
return results;
}
}

View File

@ -46,7 +46,7 @@ gitlab-bot:
users-url: ${GITLAB_URL}/api/v4/users
url-note: "{0}#note_{1,number,#}"
url-note-api: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/notes/{2,number,#}"
url-pipelines: "${GITLAB_URL}/api/v4/projects/{0,number,#}/pipelines?&page={1,number,#}&per_page=100"
url-pipelines: "${GITLAB_URL}/api/v4/projects/{0,number,#}/pipelines?&page={1,number,#}&per_page={2,number,#}"
url-pipeline: "${GITLAB_URL}/api/v4/projects/{0,number,#}/pipelines/{1,number,#}"
url-commit: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/commits?&page=1&per_page=1"
url-new-note: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions/{2}/notes?body={3}"

View File

@ -36,4 +36,8 @@ public class PipelineJson {
@JsonProperty("web_url")
private String webUrl;
// Поля ниже не отдаются гитлабом, а заполняются вручную
private Long projectId;
}

View File

@ -0,0 +1,41 @@
package dev.struchkov.bot.gitlab.sdk.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author upagge 17.01.2021
*/
@Data
public class PipelineShortJson {
private Long id;
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("created_at")
private LocalDateTime created;
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("updated_at")
private LocalDateTime updated;
private PipelineStatusJson status;
private String ref;
@JsonProperty("web_url")
private String webUrl;
// Поля ниже не отдаются гитлабом, а заполняются вручную
private Long projectId;
}