package ai.platon.scent.rest.api.service.pulsar.v30x;

import ai.platon.pulsar.common.urls.UrlAware;
import ai.platon.pulsar.persist.WebPage;
import ai.platon.pulsar.rest.api.common.AbstractScrapeHyperlink;
import ai.platon.pulsar.rest.api.common.DegenerateXSQLScrapeHyperlink;
import ai.platon.pulsar.rest.api.common.ScrapeAPIUtils;
import ai.platon.pulsar.rest.api.common.ScrapeHyperlink;
import ai.platon.pulsar.rest.api.common.XSQLScrapeHyperlink;
import ai.platon.pulsar.rest.api.entities.ScrapeRequest;
import ai.platon.pulsar.rest.api.entities.ScrapeResponse;
import ai.platon.pulsar.rest.api.entities.ScrapeStatusRequest;
import ai.platon.pulsar.skeleton.session.PulsarSession;
import ai.platon.scent.rest.api.service.v1.ScrapeServiceV1;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.CoroutineDispatcher;
import kotlinx.coroutines.CoroutineName;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.SupervisorKt;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowKt;
import org.apache.commons.collections4.ListValuedMap;
import org.apache.commons.collections4.MultiMapUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* compiled from: ScrapeServiceV30x.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��h\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0017\u0018��2\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u0016\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u000b0\u00162\u0006\u0010\u0017\u001a\u00020\nH\u0016J\u0010\u0010\u0018\u001a\u00020\u000e2\u0006\u0010\u0019\u001a\u00020\u000eH\u0016J\u0010\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u001c\u001a\u00020\u001dH\u0012J\u0010\u0010\u001e\u001a\u00020\u000b2\u0006\u0010\u001c\u001a\u00020\u001dH\u0016J\u0010\u0010\u001f\u001a\u00020\u000b2\u0006\u0010\u001c\u001a\u00020 H\u0016J\u001c\u0010!\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u000b0#0\"2\u0006\u0010$\u001a\u00020\nH\u0016J\u0010\u0010%\u001a\u00020\n2\u0006\u0010\u001c\u001a\u00020\u001dH\u0016R\u0016\u0010\u0005\u001a\n \u0007*\u0004\u0018\u00010\u00060\u0006X\u0092\u0004¢\u0006\u0002\n��R\u001a\u0010\b\u001a\u000e\u0012\u0004\u0012\u00020\n\u0012\u0004\u0012\u00020\u000b0\tX\u0092\u0004¢\u0006\u0002\n��RN\u0010\f\u001aB\u0012\f\u0012\n \u0007*\u0004\u0018\u00010\u000e0\u000e\u0012\f\u0012\n \u0007*\u0004\u0018\u00010\n0\n \u0007* \u0012\f\u0012\n \u0007*\u0004\u0018\u00010\u000e0\u000e\u0012\f\u0012\n \u0007*\u0004\u0018\u00010\n0\n\u0018\u00010\r0\rX\u0092\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0092\u0004¢\u0006\u0002\n��R\u000e\u0010\u0011\u001a\u00020\u0012X\u0092\u0004¢\u0006\u0002\n��R\u0014\u0010\u0002\u001a\u00020\u0003X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0013\u0010\u0014¨\u0006&"}, d2 = {"Lai/platon/scent/rest/api/service/pulsar/v30x/ScrapeServiceV30x;", "", "session", "Lai/platon/pulsar/skeleton/session/PulsarSession;", "(Lai/platon/pulsar/skeleton/session/PulsarSession;)V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "responseCache", "Ljava/util/concurrent/ConcurrentSkipListMap;", "", "Lai/platon/pulsar/rest/api/entities/ScrapeResponse;", "responseStatusIndex", "Lorg/apache/commons/collections4/ListValuedMap;", "", "scrapingDispatcher", "Lkotlinx/coroutines/CoroutineDispatcher;", "scrapingScope", "Lkotlinx/coroutines/CoroutineScope;", "getSession", "()Lai/platon/pulsar/skeleton/session/PulsarSession;", "commandStatusFlow", "Lkotlinx/coroutines/flow/Flow;", "uuid", "count", "statusCode", "createScrapeHyperlink", "Lai/platon/pulsar/rest/api/common/ScrapeHyperlink;", "request", "Lai/platon/pulsar/rest/api/entities/ScrapeRequest;", "executeQuery", "getStatus", "Lai/platon/pulsar/rest/api/entities/ScrapeStatusRequest;", "streamEvents", "Lreactor/core/publisher/Flux;", "Lorg/springframework/http/codec/ServerSentEvent;", "id", "submitJob", "scent-rest"})
@Service
/* loaded from: input_file:ai/platon/scent/rest/api/service/pulsar/v30x/ScrapeServiceV30x.class */
public class ScrapeServiceV30x {

    @NotNull
    private final PulsarSession session;
    private final Logger logger;

    @NotNull
    private final ConcurrentSkipListMap<String, ScrapeResponse> responseCache;
    private final ListValuedMap<Integer, String> responseStatusIndex;

    @NotNull
    private final CoroutineDispatcher scrapingDispatcher;

    @NotNull
    private final CoroutineScope scrapingScope;

    public ScrapeServiceV30x(@NotNull PulsarSession pulsarSession) {
        Intrinsics.checkNotNullParameter(pulsarSession, "session");
        this.session = pulsarSession;
        this.logger = LoggerFactory.getLogger(ScrapeServiceV30x.class);
        this.responseCache = new ConcurrentSkipListMap<>();
        this.responseStatusIndex = MultiMapUtils.newListValuedHashMap();
        this.scrapingDispatcher = Dispatchers.getIO().limitedParallelism(10);
        this.scrapingScope = CoroutineScopeKt.CoroutineScope(this.scrapingDispatcher.plus(SupervisorKt.SupervisorJob$default((Job) null, 1, (Object) null)).plus(new CoroutineName("scraping")));
    }

    @NotNull
    public PulsarSession getSession() {
        return this.session;
    }

    @NotNull
    public ScrapeResponse executeQuery(@NotNull ScrapeRequest scrapeRequest) {
        Intrinsics.checkNotNullParameter(scrapeRequest, "request");
        try {
            UrlAware createScrapeHyperlink = createScrapeHyperlink(scrapeRequest);
            getSession().submit(createScrapeHyperlink);
            ScrapeResponse scrapeResponse = (ScrapeResponse) createScrapeHyperlink.get(120L, TimeUnit.SECONDS);
            Intrinsics.checkNotNull(scrapeResponse);
            return scrapeResponse;
        } catch (TimeoutException e) {
            this.logger.warn("Timeout executing query: >>>" + scrapeRequest.getSql() + "<<<", e);
            return new ScrapeResponse("", 408, 408, 0, false, (List) null, (String) null, 120, (DefaultConstructorMarker) null);
        } catch (Exception e2) {
            this.logger.error("Unexpected error executing query: >>>" + scrapeRequest.getSql() + "<<<", e2);
            return new ScrapeResponse("", ScrapeServiceV1.BATCH_SIZE, 1462, 0, false, (List) null, (String) null, 120, (DefaultConstructorMarker) null);
        }
    }

    @NotNull
    public String submitJob(@NotNull ScrapeRequest scrapeRequest) {
        Intrinsics.checkNotNullParameter(scrapeRequest, "request");
        UrlAware createScrapeHyperlink = createScrapeHyperlink(scrapeRequest);
        this.responseCache.put(createScrapeHyperlink.getUuid(), createScrapeHyperlink.getResponse());
        createScrapeHyperlink.getResponse().setId(createScrapeHyperlink.getUuid());
        getSession().submit(createScrapeHyperlink);
        return createScrapeHyperlink.getUuid();
    }

    @NotNull
    public ScrapeResponse getStatus(@NotNull final ScrapeStatusRequest scrapeStatusRequest) {
        Intrinsics.checkNotNullParameter(scrapeStatusRequest, "request");
        ConcurrentSkipListMap<String, ScrapeResponse> concurrentSkipListMap = this.responseCache;
        String id = scrapeStatusRequest.getId();
        Function1<String, ScrapeResponse> function1 = new Function1<String, ScrapeResponse>() { // from class: ai.platon.scent.rest.api.service.pulsar.v30x.ScrapeServiceV30x$getStatus$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @NotNull
            public final ScrapeResponse invoke(@NotNull String str) {
                Intrinsics.checkNotNullParameter(str, "it");
                return new ScrapeResponse(scrapeStatusRequest.getId(), 404, 404, 0, false, (List) null, (String) null, 120, (DefaultConstructorMarker) null);
            }
        };
        ScrapeResponse computeIfAbsent = concurrentSkipListMap.computeIfAbsent(id, (v1) -> {
            return getStatus$lambda$0(r2, v1);
        });
        Intrinsics.checkNotNullExpressionValue(computeIfAbsent, "computeIfAbsent(...)");
        return computeIfAbsent;
    }

    @NotNull
    public Flux<ServerSentEvent<ScrapeResponse>> streamEvents(@NotNull final String str) {
        Intrinsics.checkNotNullParameter(str, "id");
        Function1<FluxSink<ScrapeResponse>, Unit> function1 = new Function1<FluxSink<ScrapeResponse>, Unit>() { // from class: ai.platon.scent.rest.api.service.pulsar.v30x.ScrapeServiceV30x$streamEvents$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            public final void invoke(FluxSink<ScrapeResponse> fluxSink) {
                CoroutineScope coroutineScope;
                Flow flow = FlowKt.catch(FlowKt.onEach(ScrapeServiceV30x.this.commandStatusFlow(str), new ScrapeServiceV30x$streamEvents$1$job$1(fluxSink, null)), new ScrapeServiceV30x$streamEvents$1$job$2(ScrapeServiceV30x.this, fluxSink, null));
                coroutineScope = ScrapeServiceV30x.this.scrapingScope;
                Job launchIn = FlowKt.launchIn(flow, coroutineScope);
                fluxSink.onDispose(() -> {
                    invoke$lambda$0(r1);
                });
            }

            private static final void invoke$lambda$0(Job job) {
                Intrinsics.checkNotNullParameter(job, "$job");
                Job.DefaultImpls.cancel$default(job, (CancellationException) null, 1, (Object) null);
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((FluxSink<ScrapeResponse>) obj);
                return Unit.INSTANCE;
            }
        };
        Flux create = Flux.create((v1) -> {
            streamEvents$lambda$1(r0, v1);
        });
        ScrapeServiceV30x$streamEvents$2 scrapeServiceV30x$streamEvents$2 = new Function1<ScrapeResponse, ServerSentEvent<ScrapeResponse>>() { // from class: ai.platon.scent.rest.api.service.pulsar.v30x.ScrapeServiceV30x$streamEvents$2
            public final ServerSentEvent<ScrapeResponse> invoke(ScrapeResponse scrapeResponse) {
                ServerSentEvent.Builder builder = ServerSentEvent.builder(scrapeResponse);
                String id = scrapeResponse.getId();
                Intrinsics.checkNotNull(id);
                return builder.id(id).event(scrapeResponse.getEvent()).build();
            }
        };
        Flux<ServerSentEvent<ScrapeResponse>> map = create.map((v1) -> {
            return streamEvents$lambda$2(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(map, "map(...)");
        return map;
    }

    @NotNull
    public Flow<ScrapeResponse> commandStatusFlow(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "uuid");
        return FlowKt.flow(new ScrapeServiceV30x$commandStatusFlow$1(this, str, null));
    }

    public int count(int i) {
        if (i == 0) {
            return this.responseCache.size();
        }
        List list = this.responseStatusIndex.get(Integer.valueOf(i));
        if (list != null) {
            return list.size();
        }
        return 0;
    }

    private ScrapeHyperlink createScrapeHyperlink(ScrapeRequest scrapeRequest) throws IllegalStateException {
        String sql = scrapeRequest.getSql();
        final AbstractScrapeHyperlink xSQLScrapeHyperlink = ScrapeAPIUtils.INSTANCE.isScrapeUDF(sql) ? new XSQLScrapeHyperlink(scrapeRequest, ScrapeAPIUtils.INSTANCE.normalize(sql), getSession(), (String) null, 8, (DefaultConstructorMarker) null) : new DegenerateXSQLScrapeHyperlink(scrapeRequest, getSession(), (String) null, 4, (DefaultConstructorMarker) null);
        xSQLScrapeHyperlink.getEventHandlers().getCrawlEventHandlers().getOnLoaded().addLast(new Function2<UrlAware, WebPage, Object>() { // from class: ai.platon.scent.rest.api.service.pulsar.v30x.ScrapeServiceV30x$createScrapeHyperlink$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(2);
            }

            @Nullable
            public final Object invoke(@NotNull UrlAware urlAware, @Nullable WebPage webPage) {
                ConcurrentSkipListMap concurrentSkipListMap;
                ListValuedMap listValuedMap;
                Intrinsics.checkNotNullParameter(urlAware, "url");
                concurrentSkipListMap = ScrapeServiceV30x.this.responseCache;
                concurrentSkipListMap.put(xSQLScrapeHyperlink.getUuid(), xSQLScrapeHyperlink.getResponse());
                listValuedMap = ScrapeServiceV30x.this.responseStatusIndex;
                listValuedMap.get(Integer.valueOf(xSQLScrapeHyperlink.getResponse().getStatusCode())).add(xSQLScrapeHyperlink.getUuid());
                return null;
            }
        });
        return (ScrapeHyperlink) xSQLScrapeHyperlink;
    }

    private static final ScrapeResponse getStatus$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (ScrapeResponse) function1.invoke(obj);
    }

    private static final void streamEvents$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }

    private static final ServerSentEvent streamEvents$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (ServerSentEvent) function1.invoke(obj);
    }
}
