Переписал логику обработки новых ПР. Ускорил используя форк-джойн-пул

This commit is contained in:
Struchkov Mark 2022-12-16 21:49:40 +03:00
parent 20b520eda1
commit aa102acf8d
4 changed files with 115 additions and 35 deletions

View File

@ -9,6 +9,7 @@ import dev.struchkov.bot.gitlab.context.service.MergeRequestsService;
import dev.struchkov.bot.gitlab.context.service.ProjectService; import dev.struchkov.bot.gitlab.context.service.ProjectService;
import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty; import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty;
import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty; import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetMergeRequestFromGitlab;
import dev.struchkov.bot.gitlab.core.utils.StringUtils; import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.CommitJson; import dev.struchkov.bot.gitlab.sdk.domain.CommitJson;
import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson; import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson;
@ -18,12 +19,19 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.ConversionService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -36,7 +44,6 @@ import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@RequiredArgsConstructor @RequiredArgsConstructor
public class MergeRequestParser { public class MergeRequestParser {
public static final Integer COUNT = 100;
private static final Set<MergeRequestState> OLD_STATUSES = Set.of( private static final Set<MergeRequestState> OLD_STATUSES = Set.of(
MergeRequestState.MERGED, MergeRequestState.OPENED, MergeRequestState.CLOSED MergeRequestState.MERGED, MergeRequestState.OPENED, MergeRequestState.CLOSED
); );
@ -47,6 +54,18 @@ public class MergeRequestParser {
private final ConversionService conversionService; private final ConversionService conversionService;
private final PersonProperty personProperty; private final PersonProperty personProperty;
private ForkJoinPool forkJoinPool;
@PreDestroy
public void preDestroy() {
forkJoinPool.shutdown();
}
@PostConstruct
public void postConstruct() {
forkJoinPool = new ForkJoinPool(4);
}
public void parsingOldMergeRequest() { public void parsingOldMergeRequest() {
log.debug("Старт обработки старых MR"); log.debug("Старт обработки старых MR");
final Set<IdAndStatusPr> existIds = mergeRequestsService.getAllId(OLD_STATUSES); final Set<IdAndStatusPr> existIds = mergeRequestsService.getAllId(OLD_STATUSES);
@ -73,19 +92,9 @@ public class MergeRequestParser {
log.debug("Старт обработки новых MR"); log.debug("Старт обработки новых MR");
final Set<Long> projectIds = projectService.getAllIds(); final Set<Long> projectIds = projectService.getAllIds();
for (Long projectId : projectIds) { final List<MergeRequestJson> mergeRequestJsons = getMergeRequests(projectIds);
projectProcessing(projectId);
}
log.debug("Конец обработки новых MR");
}
private void projectProcessing(Long projectId) {
int page = 1;
List<MergeRequestJson> mergeRequestJsons = getMergeRequestJsons(projectId, page);
while (checkNotEmpty(mergeRequestJsons)) {
if (checkNotEmpty(mergeRequestJsons)) {
final Set<Long> jsonIds = mergeRequestJsons.stream() final Set<Long> jsonIds = mergeRequestJsons.stream()
.map(MergeRequestJson::getId) .map(MergeRequestJson::getId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
@ -107,9 +116,43 @@ public class MergeRequestParser {
log.trace("Пачка новых MR обработана и отправлена на сохранение. Количество: {} шт.", newMergeRequests.size()); log.trace("Пачка новых MR обработана и отправлена на сохранение. Количество: {} шт.", newMergeRequests.size());
mergeRequestsService.createAll(newMergeRequests); mergeRequestsService.createAll(newMergeRequests);
} }
mergeRequestJsons = getMergeRequestJsons(projectId, page++);
} }
log.debug("Конец обработки новых MR");
}
/**
* Позволяет получить MR для переданных идентификаторов проектов.
*
* @param projectIds идентификаторы проектов
* @return полученные у GitLab MergeRequests
*/
private List<MergeRequestJson> getMergeRequests(Set<Long> projectIds) {
final List<ForkJoinTask<List<MergeRequestJson>>> tasks = projectIds.stream()
.map(projectId -> new GetMergeRequestFromGitlab(projectId, gitlabProperty.getUrlPullRequestOpen(), personProperty.getToken()))
.map(forkJoinPool::submit)
.collect(Collectors.toList());
final List<MergeRequestJson> mergeRequestJsons = new ArrayList<>();
Iterator<ForkJoinTask<List<MergeRequestJson>>> iterator = tasks.iterator();
while (!tasks.isEmpty()) {
while (iterator.hasNext()) {
final ForkJoinTask<List<MergeRequestJson>> task = iterator.next();
if (task.isDone()) {
final List<MergeRequestJson> jsons;
try {
jsons = task.get();
mergeRequestJsons.addAll(jsons);
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
iterator.remove();
}
}
iterator = tasks.iterator();
}
return mergeRequestJsons;
} }
private static void personMapping(List<MergeRequest> newMergeRequests) { private static void personMapping(List<MergeRequest> newMergeRequests) {
@ -150,15 +193,6 @@ public class MergeRequestParser {
} }
} }
private List<MergeRequestJson> getMergeRequestJsons(Long projectId, int page) {
final List<MergeRequestJson> jsons = HttpParse.request(MessageFormat.format(gitlabProperty.getUrlPullRequestOpen(), projectId, page))
.header(StringUtils.H_PRIVATE_TOKEN, personProperty.getToken())
.header(ACCEPT)
.executeList(MergeRequestJson.class);
log.trace("Получено {} шт потенциально новых MR для проекта id:'{}' ", jsons.size(), projectId);
return jsons;
}
private Optional<MergeRequestJson> getMergeRequest(IdAndStatusPr existId) { private Optional<MergeRequestJson> getMergeRequest(IdAndStatusPr existId) {
final String mrUrl = MessageFormat.format(gitlabProperty.getUrlPullRequest(), existId.getProjectId(), existId.getTwoId()); final String mrUrl = MessageFormat.format(gitlabProperty.getUrlPullRequest(), existId.getProjectId(), existId.getTwoId());
return HttpParse.request(mrUrl) return HttpParse.request(mrUrl)

View File

@ -0,0 +1,49 @@
package dev.struchkov.bot.gitlab.core.service.parser.forktask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class GetMergeRequestFromGitlab extends RecursiveTask<List<MergeRequestJson>> {
private static final int PAGE_COUNT = 100;
private final long projectId;
private int pageNumber = 0;
private final String urlMrOpen;
private final String personToken;
@Override
protected List<MergeRequestJson> compute() {
final List<MergeRequestJson> mergeRequestJsons = getMergeRequestJsons(urlMrOpen, projectId, pageNumber, personToken);
if (mergeRequestJsons.size() == PAGE_COUNT) {
final GetMergeRequestFromGitlab newTask = new GetMergeRequestFromGitlab(projectId, pageNumber + 1, urlMrOpen, personToken);
newTask.fork();
final List<MergeRequestJson> result = newTask.join();
mergeRequestJsons.addAll(result);
}
return mergeRequestJsons;
}
private List<MergeRequestJson> getMergeRequestJsons(String url, Long projectId, int page, String personToken) {
final List<MergeRequestJson> jsons = HttpParse.request(MessageFormat.format(url, projectId, page, PAGE_COUNT))
.header(StringUtils.H_PRIVATE_TOKEN, personToken)
.header(ACCEPT)
.executeList(MergeRequestJson.class);
log.trace("Получено {} шт потенциально новых MR для проекта id:'{}' ", jsons.size(), projectId);
return jsons;
}
}

View File

@ -27,12 +27,12 @@ public class SchedulerService {
log.debug("Запуск процесса обновления данных"); log.debug("Запуск процесса обновления данных");
mergeRequestParser.parsingOldMergeRequest(); mergeRequestParser.parsingOldMergeRequest();
mergeRequestParser.parsingNewMergeRequest(); mergeRequestParser.parsingNewMergeRequest();
// pipelineParser.scanOldPipeline(); pipelineParser.scanOldPipeline();
// pipelineParser.scanNewPipeline(); pipelineParser.scanNewPipeline();
// discussionParser.scanOldDiscussions(); discussionParser.scanOldDiscussions();
// discussionParser.scanNewDiscussion(); discussionParser.scanNewDiscussion();
// cleanService.cleanOldPipelines(); cleanService.cleanOldPipelines();
// cleanService.cleanOldMergedRequests(); cleanService.cleanOldMergedRequests();
log.debug("Конец процесса обновления данных"); log.debug("Конец процесса обновления данных");
} }

View File

@ -37,7 +37,7 @@ gitlab-bot:
base-url: ${GITLAB_URL} base-url: ${GITLAB_URL}
replaceUrl: ${GITLAB_REPLACE_URL} replaceUrl: ${GITLAB_REPLACE_URL}
url-project: ${GITLAB_URL}/api/v4/projects?page={0, number, integer}&per_page=100 url-project: ${GITLAB_URL}/api/v4/projects?page={0, number, integer}&per_page=100
url-pull-request-open: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests?state=opened&page={1, number, integer}&per_page=100" url-pull-request-open: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests?state=opened&page={1, number, integer}&per_page={2, number, integer}"
url-pull-request-close: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests?state=closed&page={1, number, integer}&per_page=100" url-pull-request-close: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests?state=closed&page={1, number, integer}&per_page=100"
url-pull-request-comment: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/notes?&page={2,number,#}&per_page=100" url-pull-request-comment: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/notes?&page={2,number,#}&per_page=100"
url-pull-request: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}" url-pull-request: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}"
@ -51,7 +51,4 @@ gitlab-bot:
url-commit: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/commits?&page=1&per_page=1" url-commit: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/commits?&page=1&per_page=1"
url-new-note: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions/{2}/notes?body={3}" url-new-note: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions/{2}/notes?body={3}"
url-discussion: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions?&page={2,number,#}&per_page=100" url-discussion: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions?&page={2,number,#}&per_page=100"
url-one-discussion: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions/{2}" url-one-discussion: "${GITLAB_URL}/api/v4/projects/{0,number,#}/merge_requests/{1,number,#}/discussions/{2}"
logging:
level:
"dev.struchkov": trace