Добавил использование ForkJoinPool для обновления старых MR

This commit is contained in:
Struchkov Mark 2022-12-20 19:55:17 +03:00
parent 0342a80b10
commit 422de73854
Signed by: upagge
GPG Key ID: D3018BE7BA428CA6
3 changed files with 64 additions and 19 deletions

View File

@ -9,7 +9,8 @@ import dev.struchkov.bot.gitlab.context.service.MergeRequestsService;
import dev.struchkov.bot.gitlab.context.service.ProjectService; import dev.struchkov.bot.gitlab.context.service.ProjectService;
import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty; import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty;
import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty; import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetMergeRequestTask; import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetAllMergeRequestForProjectTask;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetSingleMergeRequestTask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils; import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.CommitJson; import dev.struchkov.bot.gitlab.sdk.domain.CommitJson;
import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson; import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson;
@ -32,6 +33,7 @@ import java.util.stream.Stream;
import static dev.struchkov.haiti.utils.Checker.checkNotEmpty; import static dev.struchkov.haiti.utils.Checker.checkNotEmpty;
import static dev.struchkov.haiti.utils.Checker.checkNotNull; import static dev.struchkov.haiti.utils.Checker.checkNotNull;
import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResult;
import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResults; import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResults;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT; import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@ -71,10 +73,7 @@ public class MergeRequestParser {
log.debug("Старт обработки старых MR"); log.debug("Старт обработки старых MR");
final Set<IdAndStatusPr> existIds = mergeRequestsService.getAllId(OLD_STATUSES); final Set<IdAndStatusPr> existIds = mergeRequestsService.getAllId(OLD_STATUSES);
final List<MergeRequest> mergeRequests = existIds.stream() final List<MergeRequest> newMergeRequests = getOldMergeRequests(existIds).stream()
.map(this::getMergeRequest)
.filter(Optional::isPresent)
.map(Optional::get)
.map(mergeRequestJson -> { .map(mergeRequestJson -> {
final MergeRequest newMergeRequest = conversionService.convert(mergeRequestJson, MergeRequest.class); final MergeRequest newMergeRequest = conversionService.convert(mergeRequestJson, MergeRequest.class);
parsingCommits(newMergeRequest); parsingCommits(newMergeRequest);
@ -82,13 +81,30 @@ public class MergeRequestParser {
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
if (checkNotEmpty(mergeRequests)) { if (checkNotEmpty(newMergeRequests)) {
personMapping(mergeRequests); personMapping(newMergeRequests);
mergeRequestsService.updateAll(mergeRequests); mergeRequestsService.updateAll(newMergeRequests);
} }
log.debug("Конец обработки старых MR"); log.debug("Конец обработки старых MR");
} }
private List<MergeRequestJson> getOldMergeRequests(Set<IdAndStatusPr> existIds) {
final List<ForkJoinTask<Optional<MergeRequestJson>>> tasks = existIds.stream()
.map(
existId -> new GetSingleMergeRequestTask(
gitlabProperty.getUrlMergeRequest(),
existId.getProjectId(),
existId.getTwoId(),
personProperty.getToken()
)
).map(forkJoinPool::submit)
.collect(Collectors.toList());
return pullTaskResult(tasks).stream()
.flatMap(Optional::stream)
.collect(Collectors.toList());
}
public void parsingNewMergeRequest() { public void parsingNewMergeRequest() {
log.debug("Старт обработки новых MR"); log.debug("Старт обработки новых MR");
final Set<Long> projectIds = projectService.getAllIds(); final Set<Long> projectIds = projectService.getAllIds();
@ -130,7 +146,7 @@ public class MergeRequestParser {
*/ */
private List<MergeRequestJson> getMergeRequests(Set<Long> projectIds) { private List<MergeRequestJson> getMergeRequests(Set<Long> projectIds) {
final List<ForkJoinTask<List<MergeRequestJson>>> tasks = projectIds.stream() final List<ForkJoinTask<List<MergeRequestJson>>> tasks = projectIds.stream()
.map(projectId -> new GetMergeRequestTask(projectId, gitlabProperty.getUrlMergeRequestOpen(), personProperty.getToken())) .map(projectId -> new GetAllMergeRequestForProjectTask(projectId, gitlabProperty.getUrlMergeRequestOpen(), personProperty.getToken()))
.map(forkJoinPool::submit) .map(forkJoinPool::submit)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -175,12 +191,4 @@ public class MergeRequestParser {
} }
} }
private Optional<MergeRequestJson> getMergeRequest(IdAndStatusPr existId) {
final String mrUrl = MessageFormat.format(gitlabProperty.getUrlMergeRequest(), existId.getProjectId(), existId.getTwoId());
return HttpParse.request(mrUrl)
.header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, personProperty.getToken())
.execute(MergeRequestJson.class);
}
} }

View File

@ -17,7 +17,7 @@ import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
@RequiredArgsConstructor @RequiredArgsConstructor
public class GetMergeRequestTask extends RecursiveTask<List<MergeRequestJson>> { public class GetAllMergeRequestForProjectTask extends RecursiveTask<List<MergeRequestJson>> {
private static final int PAGE_COUNT = 100; private static final int PAGE_COUNT = 100;
@ -32,7 +32,7 @@ public class GetMergeRequestTask extends RecursiveTask<List<MergeRequestJson>> {
Thread.sleep(100); Thread.sleep(100);
final List<MergeRequestJson> mergeRequestJsons = getMergeRequestJsons(); final List<MergeRequestJson> mergeRequestJsons = getMergeRequestJsons();
if (mergeRequestJsons.size() == PAGE_COUNT) { if (mergeRequestJsons.size() == PAGE_COUNT) {
final GetMergeRequestTask newTask = new GetMergeRequestTask(projectId, pageNumber + 1, urlMrOpen, gitlabToken); final GetAllMergeRequestForProjectTask newTask = new GetAllMergeRequestForProjectTask(projectId, pageNumber + 1, urlMrOpen, gitlabToken);
newTask.fork(); newTask.fork();
mergeRequestJsons.addAll(newTask.join()); mergeRequestJsons.addAll(newTask.join());
} }

View File

@ -0,0 +1,37 @@
package dev.struchkov.bot.gitlab.core.service.parser.forktask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.MergeRequestJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.util.Optional;
import java.util.concurrent.RecursiveTask;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j
@RequiredArgsConstructor
public class GetSingleMergeRequestTask extends RecursiveTask<Optional<MergeRequestJson>> {
private final String urlMr;
private final long projectId;
private final long mrTwoId;
private final String gitlabToken;
@Override
@SneakyThrows
protected Optional<MergeRequestJson> compute() {
Thread.sleep(100);
final String mrUrl = MessageFormat.format(urlMr, projectId, mrTwoId);
return HttpParse.request(mrUrl)
.header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, gitlabToken)
.execute(MergeRequestJson.class);
}
}