Сделал обработку старых пайплайнов с использованием ForkJoinPool

This commit is contained in:
Struchkov Mark 2022-12-20 20:21:40 +03:00
parent 18672cf341
commit 0980b05925
Signed by: upagge
GPG Key ID: D3018BE7BA428CA6
4 changed files with 44 additions and 37 deletions

View File

@ -24,6 +24,8 @@ public interface PipelineService {
Pipeline update(@NonNull Pipeline pipeline); Pipeline update(@NonNull Pipeline pipeline);
List<Pipeline> updateAll(@NonNull List<Pipeline> pipelines);
List<Pipeline> getAllByStatuses(@NonNull Set<PipelineStatus> statuses); List<Pipeline> getAllByStatuses(@NonNull Set<PipelineStatus> statuses);
Page<Pipeline> getAll(@NonNull PipelineFilter filter, @NonNull Pageable pagination); Page<Pipeline> getAll(@NonNull PipelineFilter filter, @NonNull Pageable pagination);
@ -31,5 +33,4 @@ public interface PipelineService {
ExistContainer<Pipeline, Long> existsById(@NonNull Set<Long> pipelineIds); ExistContainer<Pipeline, Long> existsById(@NonNull Set<Long> pipelineIds);
void deleteAllById(Set<Long> pipelineIds); void deleteAllById(Set<Long> pipelineIds);
} }

View File

@ -92,6 +92,13 @@ public class PipelineServiceImpl implements PipelineService {
return oldPipeline; return oldPipeline;
} }
@Override
public List<Pipeline> updateAll(@NonNull List<Pipeline> pipelines) {
return pipelines.stream()
.map(this::update)
.collect(Collectors.toList());
}
private boolean isNeedNotifyNewPipeline(@NonNull Pipeline pipeline) { private boolean isNeedNotifyNewPipeline(@NonNull Pipeline pipeline) {
final Person personPipelineCreator = pipeline.getPerson(); final Person personPipelineCreator = pipeline.getPerson();
return notificationStatus.contains(pipeline.getStatus()) // Пайплайн имеет статус необходимый для уведомления return notificationStatus.contains(pipeline.getStatus()) // Пайплайн имеет статус необходимый для уведомления

View File

@ -9,16 +9,13 @@ import dev.struchkov.bot.gitlab.core.config.properties.GitlabProperty;
import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty; import dev.struchkov.bot.gitlab.core.config.properties.PersonProperty;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetPipelineShortTask; import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetPipelineShortTask;
import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetPipelineTask; import dev.struchkov.bot.gitlab.core.service.parser.forktask.GetPipelineTask;
import dev.struchkov.bot.gitlab.core.utils.StringUtils;
import dev.struchkov.bot.gitlab.sdk.domain.PipelineJson; import dev.struchkov.bot.gitlab.sdk.domain.PipelineJson;
import dev.struchkov.bot.gitlab.sdk.domain.PipelineShortJson; import dev.struchkov.bot.gitlab.sdk.domain.PipelineShortJson;
import dev.struchkov.haiti.utils.network.HttpParse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.ConversionService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.MessageFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -38,7 +35,6 @@ import static dev.struchkov.haiti.utils.Checker.checkFalse;
import static dev.struchkov.haiti.utils.Checker.checkNotEmpty; import static dev.struchkov.haiti.utils.Checker.checkNotEmpty;
import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResult; import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResult;
import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResults; import static dev.struchkov.haiti.utils.concurrent.ForkJoinUtils.pullTaskResults;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
/** /**
* Парсер пайплайнов. * Парсер пайплайнов.
@ -91,12 +87,9 @@ public class PipelineParser {
if (checkFalse(existContainer.isAllFound())) { if (checkFalse(existContainer.isAllFound())) {
final Set<Long> idsNotFound = existContainer.getIdNoFound(); final Set<Long> idsNotFound = existContainer.getIdNoFound();
final List<PipelineJson> pipelineJsons = getNewPipelineJson(pipelineProjectMap, idsNotFound); final List<Pipeline> newPipelines = getNewPipelines(pipelineProjectMap, idsNotFound);
if (checkNotEmpty(pipelineJsons)) { if (checkNotEmpty(newPipelines)) {
final List<Pipeline> newPipelines = pipelineJsons.stream()
.map(json -> conversionService.convert(json, Pipeline.class))
.collect(Collectors.toList());
pipelineService.createAll(newPipelines); pipelineService.createAll(newPipelines);
} }
} }
@ -106,8 +99,8 @@ public class PipelineParser {
log.debug("Конец обработки новых пайплайнов"); log.debug("Конец обработки новых пайплайнов");
} }
private List<PipelineJson> getNewPipelineJson(Map<Long, Long> pipelineProjectMap, Set<Long> idsNotFound) { private List<Pipeline> getNewPipelines(Map<Long, Long> pipelineProjectMap, Set<Long> idsNotFound) {
final List<ForkJoinTask<PipelineJson>> tasks = idsNotFound.stream() final List<ForkJoinTask<Optional<PipelineJson>>> tasks = idsNotFound.stream()
.map(pipelineId -> new GetPipelineTask( .map(pipelineId -> new GetPipelineTask(
gitlabProperty.getUrlPipeline(), gitlabProperty.getUrlPipeline(),
pipelineProjectMap.get(pipelineId), pipelineProjectMap.get(pipelineId),
@ -117,8 +110,10 @@ public class PipelineParser {
.map(forkJoinPool::submit) .map(forkJoinPool::submit)
.collect(Collectors.toList()); .collect(Collectors.toList());
final List<PipelineJson> pipelineJsons = pullTaskResult(tasks); return pullTaskResult(tasks).stream()
return pipelineJsons; .flatMap(Optional::stream)
.map(json -> conversionService.convert(json, Pipeline.class))
.collect(Collectors.toList());
} }
private List<PipelineShortJson> getPipelineShortJsons(Set<Long> projectIds) { private List<PipelineShortJson> getPipelineShortJsons(Set<Long> projectIds) {
@ -144,20 +139,25 @@ public class PipelineParser {
log.debug("Старт обработки старых пайплайнов"); log.debug("Старт обработки старых пайплайнов");
final List<Pipeline> pipelines = pipelineService.getAllByStatuses(oldStatus); final List<Pipeline> pipelines = pipelineService.getAllByStatuses(oldStatus);
for (Pipeline pipeline : pipelines) { final List<ForkJoinTask<Optional<PipelineJson>>> tasks = pipelines.stream()
final Optional<Pipeline> optNewPipeline = HttpParse.request( .map(
MessageFormat.format(gitlabProperty.getUrlPipeline(), pipeline.getProjectId(), pipeline.getId()) pipeline -> new GetPipelineTask(
) gitlabProperty.getUrlPipeline(),
.header(ACCEPT) pipeline.getProjectId(),
.header(StringUtils.H_PRIVATE_TOKEN, personProperty.getToken()) pipeline.getId(),
.execute(PipelineJson.class) personProperty.getToken()
.map(json -> conversionService.convert(json, Pipeline.class)); )
if (optNewPipeline.isPresent()) { )
final Pipeline newPipeline = optNewPipeline.get(); .map(forkJoinPool::submit)
pipelineService.update(newPipeline); .collect(Collectors.toList());
} else {
log.error("Ошибка обновления пайплайна. ProjectId:{}, PipelineId:{}", pipeline.getProjectId(), pipeline.getId()); final List<Pipeline> newPipelines = pullTaskResult(tasks).stream()
} .flatMap(Optional::stream)
.map(json -> conversionService.convert(json, Pipeline.class))
.collect(Collectors.toList());
if (checkNotEmpty(newPipelines)) {
pipelineService.updateAll(pipelines);
} }
log.debug("Конец обработки старых пайплайнов"); log.debug("Конец обработки старых пайплайнов");

View File

@ -8,14 +8,14 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Optional;
import java.util.concurrent.RecursiveTask; import java.util.concurrent.RecursiveTask;
import static dev.struchkov.haiti.context.exception.ConvertException.convertException;
import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT; import static dev.struchkov.haiti.utils.network.HttpParse.ACCEPT;
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class GetPipelineTask extends RecursiveTask<PipelineJson> { public class GetPipelineTask extends RecursiveTask<Optional<PipelineJson>> {
private final String urlPipeline; private final String urlPipeline;
private final long projectId; private final long projectId;
@ -24,17 +24,16 @@ public class GetPipelineTask extends RecursiveTask<PipelineJson> {
@Override @Override
@SneakyThrows @SneakyThrows
protected PipelineJson compute() { protected Optional<PipelineJson> compute() {
Thread.sleep(100); Thread.sleep(100);
final PipelineJson pipelineJson = HttpParse.request( return HttpParse.request(MessageFormat.format(urlPipeline, projectId, pipelineId))
MessageFormat.format(urlPipeline, projectId, pipelineId)
)
.header(ACCEPT) .header(ACCEPT)
.header(StringUtils.H_PRIVATE_TOKEN, gitlabToken) .header(StringUtils.H_PRIVATE_TOKEN, gitlabToken)
.execute(PipelineJson.class) .execute(PipelineJson.class)
.orElseThrow(convertException("Ошибка обновления Pipelines")); .map(json -> {
pipelineJson.setProjectId(projectId); json.setProjectId(projectId);
return pipelineJson; return json;
});
} }
} }