package ai.platon.scent.parse.html;

import ai.platon.pulsar.common.AppContext;
import ai.platon.pulsar.common.DateTimes;
import ai.platon.pulsar.common.ExceptionsKt;
import ai.platon.pulsar.common.IllegalApplicationStateException;
import ai.platon.pulsar.common.LogsKt;
import ai.platon.pulsar.common.config.ImmutableConfig;
import ai.platon.pulsar.common.persist.ext.WebPageExKt;
import ai.platon.pulsar.crawl.common.GlobalCacheFactory;
import ai.platon.pulsar.crawl.parse.ParseFilter;
import ai.platon.pulsar.crawl.parse.ParseResult;
import ai.platon.pulsar.dom.FeaturedDocument;
import ai.platon.pulsar.persist.WebPage;
import ai.platon.pulsar.persist.model.PageModel;
import ai.platon.scent.ScentSession;
import ai.platon.scent.common.ScentStatusTracker;
import ai.platon.scent.parse.html.AbstractSQLExtractorParseFilter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.jvm.internal.StringCompanionObject;
import kotlin.text.StringsKt;
import kotlinx.coroutines.BuildersKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* compiled from: AbstractSinkAwareSQLExtractorParseFilter.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"�� \u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n\u0002\b\u0004\n\u0002\u0010\b\n\u0002\b\u0004\n\u0002\u0010\t\n��\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\b&\u0018�� L2\u00020\u0001:\u0001LB%\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t¢\u0006\u0002\u0010\nJ\b\u00102\u001a\u000203H\u0002J\b\u00104\u001a\u000203H\u0016J\b\u00105\u001a\u000203H\u0002J\u0010\u00106\u001a\u0002032\u0006\u00107\u001a\u000208H&J\u001a\u00109\u001a\u0004\u0018\u00010:2\u0006\u0010;\u001a\u00020<2\u0006\u0010=\u001a\u00020>H\u0014J\u0010\u0010?\u001a\u0002032\u0006\u0010;\u001a\u00020<H\u0002J$\u0010@\u001a\u0004\u0018\u00010:2\u0006\u0010A\u001a\u00020B2\u0006\u0010C\u001a\u00020D2\b\u0010E\u001a\u0004\u0018\u00010:H\u0016J \u0010F\u001a\u0002032\u0006\u0010A\u001a\u00020B2\u0006\u0010C\u001a\u00020D2\u0006\u0010G\u001a\u00020HH\u0016J\b\u0010I\u001a\u00020!H\u0016J\b\u0010J\u001a\u00020'H\u0016J\b\u0010K\u001a\u000203H\u0016R\u0014\u0010\u000b\u001a\u00020\fX\u0084\u0004¢\u0006\b\n��\u001a\u0004\b\r\u0010\u000eR\u001c\u0010\u000f\u001a\n \u0011*\u0004\u0018\u00010\u00100\u0010X\u0084\u0004¢\u0006\b\n��\u001a\u0004\b\u0012\u0010\u0013R\"\u0010\u0014\u001a\n \u0011*\u0004\u0018\u00010\u00150\u0015X\u0084\u000e¢\u0006\u000e\n��\u001a\u0004\b\u0016\u0010\u0017\"\u0004\b\u0018\u0010\u0019R\"\u0010\u001a\u001a\n \u0011*\u0004\u0018\u00010\u001b0\u001bX\u0084\u000e¢\u0006\u000e\n��\u001a\u0004\b\u001c\u0010\u001d\"\u0004\b\u001e\u0010\u001fR\u0014\u0010 \u001a\u00020!8VX\u0096\u0004¢\u0006\u0006\u001a\u0004\b\"\u0010#R\u000e\u0010$\u001a\u00020%X\u0082\u0004¢\u0006\u0002\n��R\u0012\u0010&\u001a\u00020'X¦\u0004¢\u0006\u0006\u001a\u0004\b(\u0010)R\u000e\u0010*\u001a\u00020'X\u0082D¢\u0006\u0002\n��R\u0014\u0010+\u001a\u00020,X\u0094D¢\u0006\b\n��\u001a\u0004\b-\u0010.R\u000e\u0010/\u001a\u00020%X\u0082\u0004¢\u0006\u0002\n��R\u000e\u00100\u001a\u000201X\u0082\u000e¢\u0006\u0002\n��¨\u0006M"}, d2 = {"Lai/platon/scent/parse/html/AbstractSinkAwareSQLExtractorParseFilter;", "Lai/platon/scent/parse/html/AbstractSQLExtractorParseFilter;", "session", "Lai/platon/scent/ScentSession;", "scentStatusTracker", "Lai/platon/scent/common/ScentStatusTracker;", "globalCacheFactory", "Lai/platon/pulsar/crawl/common/GlobalCacheFactory;", "conf", "Lai/platon/pulsar/common/config/ImmutableConfig;", "(Lai/platon/scent/ScentSession;Lai/platon/scent/common/ScentStatusTracker;Lai/platon/pulsar/crawl/common/GlobalCacheFactory;Lai/platon/pulsar/common/config/ImmutableConfig;)V", "closed", "Ljava/util/concurrent/atomic/AtomicBoolean;", "getClosed", "()Ljava/util/concurrent/atomic/AtomicBoolean;", "commitExector", "Ljava/util/concurrent/ExecutorService;", "kotlin.jvm.PlatformType", "getCommitExector", "()Ljava/util/concurrent/ExecutorService;", "commitLastTime", "Ljava/time/Instant;", "getCommitLastTime", "()Ljava/time/Instant;", "setCommitLastTime", "(Ljava/time/Instant;)V", "commitMinInterval", "Ljava/time/Duration;", "getCommitMinInterval", "()Ljava/time/Duration;", "setCommitMinInterval", "(Ljava/time/Duration;)V", "hasSink", "", "getHasSink", "()Z", "logger", "Lorg/slf4j/Logger;", "sinkCollection", "", "getSinkCollection", "()Ljava/lang/String;", "sms", "syncBatchSize", "", "getSyncBatchSize", "()I", "taskLogger", "totalMillis", "", "checkState", "", "close", "commitWithTimeout", "doCommit", "pendingResult", "Lai/platon/scent/parse/html/SinkAwarePendingResult;", "extractWithConnection", "Ljava/sql/ResultSet;", "task", "Lai/platon/scent/parse/html/AbstractSQLExtractorParseFilter$Task;", "conn", "Ljava/sql/Connection;", "logExtractComplete", "onAfterExtract", "page", "Lai/platon/pulsar/persist/WebPage;", "document", "Lai/platon/pulsar/dom/FeaturedDocument;", "rs", "onAfterFilter", "parseResult", "Lai/platon/pulsar/crawl/parse/ParseResult;", "shouldCommit", "toString", "tryCommit", "Companion", "scent-parse"})
@SourceDebugExtension({"SMAP\nAbstractSinkAwareSQLExtractorParseFilter.kt\nKotlin\n*S Kotlin\n*F\n+ 1 AbstractSinkAwareSQLExtractorParseFilter.kt\nai/platon/scent/parse/html/AbstractSinkAwareSQLExtractorParseFilter\n+ 2 fake.kt\nkotlin/jvm/internal/FakeKt\n*L\n1#1,168:1\n1#2:169\n*E\n"})
/* loaded from: input_file:ai/platon/scent/parse/html/AbstractSinkAwareSQLExtractorParseFilter.class */
public abstract class AbstractSinkAwareSQLExtractorParseFilter extends AbstractSQLExtractorParseFilter {

    @NotNull
    private final Logger logger;

    @NotNull
    private final Logger taskLogger;
    private final int syncBatchSize;
    private long totalMillis;

    @NotNull
    private final String sms;

    @NotNull
    private final AtomicBoolean closed;
    private final ExecutorService commitExector;
    private Duration commitMinInterval;
    private Instant commitLastTime;

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private static final SinkAwareResultManager pendingResultManager = new SinkAwareResultManager();

    /* compiled from: AbstractSinkAwareSQLExtractorParseFilter.kt */
    @Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0011\u0010\u0003\u001a\u00020\u0004¢\u0006\b\n��\u001a\u0004\b\u0005\u0010\u0006¨\u0006\u0007"}, d2 = {"Lai/platon/scent/parse/html/AbstractSinkAwareSQLExtractorParseFilter$Companion;", "", "()V", "pendingResultManager", "Lai/platon/scent/parse/html/SinkAwareResultManager;", "getPendingResultManager", "()Lai/platon/scent/parse/html/SinkAwareResultManager;", "scent-parse"})
    /* loaded from: input_file:ai/platon/scent/parse/html/AbstractSinkAwareSQLExtractorParseFilter$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        @NotNull
        public final SinkAwareResultManager getPendingResultManager() {
            return AbstractSinkAwareSQLExtractorParseFilter.pendingResultManager;
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AbstractSinkAwareSQLExtractorParseFilter(@NotNull ScentSession scentSession, @NotNull ScentStatusTracker scentStatusTracker, @NotNull GlobalCacheFactory globalCacheFactory, @NotNull ImmutableConfig immutableConfig) {
        super(scentSession, scentStatusTracker, globalCacheFactory, immutableConfig);
        Intrinsics.checkNotNullParameter(scentSession, "session");
        Intrinsics.checkNotNullParameter(scentStatusTracker, "scentStatusTracker");
        Intrinsics.checkNotNullParameter(globalCacheFactory, "globalCacheFactory");
        Intrinsics.checkNotNullParameter(immutableConfig, "conf");
        this.logger = LogsKt.getLogger(Reflection.getOrCreateKotlinClass(AbstractSinkAwareSQLExtractorParseFilter.class));
        this.taskLogger = LogsKt.getLogger(Reflection.getOrCreateKotlinClass(AbstractSinkAwareSQLExtractorParseFilter.class), ".Task");
        this.syncBatchSize = 60;
        this.sms = "._.";
        this.closed = new AtomicBoolean();
        this.commitExector = Executors.newSingleThreadExecutor();
        this.commitMinInterval = Duration.ofSeconds(30L);
        this.commitLastTime = Instant.now();
    }

    @NotNull
    public abstract String getSinkCollection();

    public boolean getHasSink() {
        return !StringsKt.isBlank(getSinkCollection());
    }

    protected int getSyncBatchSize() {
        return this.syncBatchSize;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @NotNull
    public final AtomicBoolean getClosed() {
        return this.closed;
    }

    protected final ExecutorService getCommitExector() {
        return this.commitExector;
    }

    protected final Duration getCommitMinInterval() {
        return this.commitMinInterval;
    }

    protected final void setCommitMinInterval(Duration duration) {
        this.commitMinInterval = duration;
    }

    protected final Instant getCommitLastTime() {
        return this.commitLastTime;
    }

    protected final void setCommitLastTime(Instant instant) {
        this.commitLastTime = instant;
    }

    @Override // ai.platon.scent.parse.html.AbstractSQLExtractorParseFilter
    public void onAfterFilter(@NotNull WebPage webPage, @NotNull FeaturedDocument featuredDocument, @NotNull ParseResult parseResult) {
        Object obj;
        Intrinsics.checkNotNullParameter(webPage, "page");
        Intrinsics.checkNotNullParameter(featuredDocument, "document");
        Intrinsics.checkNotNullParameter(parseResult, "parseResult");
        super.onAfterFilter(webPage, featuredDocument, parseResult);
        ExecutorService executorService = this.commitExector;
        try {
            Result.Companion companion = Result.Companion;
            obj = Result.constructor-impl(executorService.submit(() -> {
                onAfterFilter$lambda$1$lambda$0(r1);
            }));
        } catch (Throwable th) {
            Result.Companion companion2 = Result.Companion;
            obj = Result.constructor-impl(ResultKt.createFailure(th));
        }
        Throwable th2 = Result.exceptionOrNull-impl(obj);
        if (th2 != null) {
            LogsKt.warnInterruptible(this, th2, ExceptionsKt.stringify$default(th2, (String) null, (String) null, 3, (Object) null), new Object[0]);
        }
    }

    @Override // ai.platon.scent.parse.html.AbstractSQLExtractorParseFilter
    @Nullable
    public ResultSet onAfterExtract(@NotNull WebPage webPage, @NotNull FeaturedDocument featuredDocument, @Nullable ResultSet resultSet) {
        Intrinsics.checkNotNullParameter(webPage, "page");
        Intrinsics.checkNotNullParameter(featuredDocument, "document");
        ResultSet onAfterExtract = super.onAfterExtract(webPage, featuredDocument, resultSet);
        if (getHasSink() && onAfterExtract != null) {
            SinkAwareResultManager sinkAwareResultManager = pendingResultManager;
            String sinkCollection = getSinkCollection();
            String name = getName();
            Instant deadline = WebPageExKt.getOptions(webPage).getDeadline();
            Intrinsics.checkNotNullExpressionValue(deadline, "<get-deadline>(...)");
            sinkAwareResultManager.add(sinkCollection, name, onAfterExtract, deadline);
        }
        return onAfterExtract;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // ai.platon.scent.parse.html.AbstractSQLExtractorParseFilter
    @Nullable
    public ResultSet extractWithConnection(@NotNull AbstractSQLExtractorParseFilter.Task task, @NotNull Connection connection) {
        Intrinsics.checkNotNullParameter(task, "task");
        Intrinsics.checkNotNullParameter(connection, "conn");
        ResultSet extractWithConnection = super.extractWithConnection(task, connection);
        this.totalMillis += task.getMillis();
        logExtractComplete(task);
        return extractWithConnection;
    }

    public boolean shouldCommit() {
        if (!getHasSink()) {
            return false;
        }
        DateTimes dateTimes = DateTimes.INSTANCE;
        Instant instant = this.commitLastTime;
        Intrinsics.checkNotNull(instant);
        return DateTimes.elapsedTime$default(dateTimes, instant, (Instant) null, 2, (Object) null).compareTo(this.commitMinInterval) >= 0;
    }

    public void tryCommit() {
        if (shouldCommit()) {
            this.commitLastTime = Instant.now();
            try {
                commitWithTimeout();
            } catch (Exception e) {
                this.logger.warn("Failed to commit to jdbc sink", e);
                throw e;
            }
        }
    }

    public abstract void doCommit(@NotNull SinkAwarePendingResult sinkAwarePendingResult);

    private final void commitWithTimeout() {
        BuildersKt.runBlocking$default((CoroutineContext) null, new AbstractSinkAwareSQLExtractorParseFilter$commitWithTimeout$1(Duration.ofSeconds(45L), this, null), 1, (Object) null);
    }

    @Override // ai.platon.scent.parse.html.AbstractSQLExtractorParseFilter
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            tryCommit();
            closeConnections();
        }
    }

    @NotNull
    public String toString() {
        int id = getId();
        ParseFilter parent = getParent();
        return "id: " + id + ", pid: " + (parent != null ? parent.getId() : 0) + ", pattern: " + getUrlFilter() + ", sql: " + getSqlName() + ", children: " + getChildren().size();
    }

    private final void logExtractComplete(AbstractSQLExtractorParseFilter.Task task) {
        String str;
        boolean isInfoEnabled = this.logger.isInfoEnabled();
        if (getParent() != null && task.getNumFitRecords() == 0) {
            isInfoEnabled = false;
        }
        if (isInfoEnabled) {
            SinkAwarePendingResult sinkAwarePendingResult = pendingResultManager.get(getSinkCollection(), getName());
            PageModel pageModel = task.getPage().getPageModel();
            if (pageModel != null) {
                StringCompanionObject stringCompanionObject = StringCompanionObject.INSTANCE;
                Object[] objArr = {Integer.valueOf(pageModel.getNumNonBlankFields()), Integer.valueOf(pageModel.getNumNonNullFields()), Integer.valueOf(pageModel.getNumFields())};
                str = String.format("| %d/%d/%d model fields ", Arrays.copyOf(objArr, objArr.length));
                Intrinsics.checkNotNullExpressionValue(str, "format(...)");
            } else {
                str = "";
            }
            String str2 = str;
            String str3 = "%3d. Parsed in %,dms/%4.2fs %4.2fms/p | %d/%d/%d fields in %d/%d records %s| pending %d/%d results | %s | %s -> %s" + " | " + task.getUrl();
            StringCompanionObject stringCompanionObject2 = StringCompanionObject.INSTANCE;
            Object[] objArr2 = {Integer.valueOf(task.getPage().getId()), Long.valueOf(task.getMillis()), Double.valueOf(this.totalMillis / 1000.0d), Double.valueOf((1.0d * this.totalMillis) / getMeterRelevantTasks().getCount()), Integer.valueOf(task.getNumNonBlankFields()), Integer.valueOf(task.getNumNonNullFields()), Integer.valueOf(task.getNumFields()), Integer.valueOf(task.getNumFitRecords()), Integer.valueOf(task.getNumRecords()), str2, Integer.valueOf(sinkAwarePendingResult.getSize()), Long.valueOf(getMeterResults().getCount()), WebPageExKt.getLabel(task.getPage()), getName(), getSinkCollection()};
            String format = String.format(str3, Arrays.copyOf(objArr2, objArr2.length));
            Intrinsics.checkNotNullExpressionValue(format, "format(...)");
            this.taskLogger.info(format);
        }
    }

    private final void checkState() {
        if (isActive()) {
            return;
        }
        AppContext.INSTANCE.terminate();
        throw new IllegalApplicationStateException(this + " is closed");
    }

    private static final void onAfterFilter$lambda$1$lambda$0(AbstractSinkAwareSQLExtractorParseFilter abstractSinkAwareSQLExtractorParseFilter) {
        Intrinsics.checkNotNullParameter(abstractSinkAwareSQLExtractorParseFilter, "this$0");
        abstractSinkAwareSQLExtractorParseFilter.tryCommit();
    }
}
