package com.ververica.cdc.connectors.mysql.source.assigners;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.class */
public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitAssigner.class);
    private final List<TableId> alreadyProcessedTables;
    private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
    private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
    private final Map<TableId, TableChanges.TableChange> tableSchemas;
    private final Map<String, BinlogOffset> splitFinishedOffsets;
    private final MySqlSourceConfig sourceConfig;
    private final int currentParallelism;
    private final List<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private final MySqlPartition partition;
    private final Object lock;
    private volatile Throwable uncaughtSplitterException;
    private AssignerStatus assignerStatus;
    private MySqlChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;
    private ExecutorService executor;

    @Nullable
    private Long checkpointIdToFinish;

    public MySqlSnapshotSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, List<TableId> list, boolean z) {
        this(mySqlSourceConfig, i, new ArrayList(), new ArrayList(), new LinkedHashMap(), new HashMap(), new HashMap(), AssignerStatus.INITIAL_ASSIGNING, list, z, true, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
    }

    public MySqlSnapshotSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, SnapshotPendingSplitsState snapshotPendingSplitsState) {
        this(mySqlSourceConfig, i, snapshotPendingSplitsState.getAlreadyProcessedTables(), snapshotPendingSplitsState.getRemainingSplits(), snapshotPendingSplitsState.getAssignedSplits(), snapshotPendingSplitsState.getTableSchemas(), snapshotPendingSplitsState.getSplitFinishedOffsets(), snapshotPendingSplitsState.getSnapshotAssignerStatus(), snapshotPendingSplitsState.getRemainingTables(), snapshotPendingSplitsState.isTableIdCaseSensitive(), snapshotPendingSplitsState.isRemainingTablesCheckpointed(), snapshotPendingSplitsState.getChunkSplitterState());
    }

    private MySqlSnapshotSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, List<TableId> list, List<MySqlSchemalessSnapshotSplit> list2, Map<String, MySqlSchemalessSnapshotSplit> map, Map<TableId, TableChanges.TableChange> map2, Map<String, BinlogOffset> map3, AssignerStatus assignerStatus, List<TableId> list3, boolean z, boolean z2, ChunkSplitterState chunkSplitterState) {
        this.lock = new Object();
        this.sourceConfig = mySqlSourceConfig;
        this.currentParallelism = i;
        this.alreadyProcessedTables = list;
        this.remainingSplits = new CopyOnWriteArrayList(list2);
        this.assignedSplits = (Map) map.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (mySqlSchemalessSnapshotSplit, mySqlSchemalessSnapshotSplit2) -> {
            return mySqlSchemalessSnapshotSplit;
        }, LinkedHashMap::new));
        this.tableSchemas = map2;
        this.splitFinishedOffsets = map3;
        this.assignerStatus = assignerStatus;
        this.remainingTables = new CopyOnWriteArrayList(list3);
        this.isRemainingTablesCheckpointed = z2;
        this.isTableIdCaseSensitive = z;
        this.chunkSplitter = createChunkSplitter(mySqlSourceConfig, z, chunkSplitterState);
        this.partition = new MySqlPartition(mySqlSourceConfig.getMySqlConnectorConfig().getLogicalName());
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void open() {
        this.chunkSplitter.open();
        discoveryCaptureTables();
        captureNewlyAddedTables();
        startAsynchronouslySplit();
    }

    private void discoveryCaptureTables() {
        if (!needToDiscoveryTables()) {
            if (this.isRemainingTablesCheckpointed || AssignerStatus.isSnapshotAssigningFinished(this.assignerStatus)) {
                return;
            }
            try {
                JdbcConnection openJdbcConnection = DebeziumUtils.openJdbcConnection(this.sourceConfig);
                Throwable th = null;
                try {
                    List<TableId> discoverCapturedTables = DebeziumUtils.discoverCapturedTables(openJdbcConnection, this.sourceConfig);
                    discoverCapturedTables.removeAll(this.alreadyProcessedTables);
                    this.remainingTables.addAll(discoverCapturedTables);
                    this.isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(openJdbcConnection);
                    if (openJdbcConnection != null) {
                        if (0 != 0) {
                            try {
                                openJdbcConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openJdbcConnection.close();
                        }
                    }
                    return;
                } finally {
                }
            } catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        LOG.debug("The remainingTables is empty, start to discovery tables");
        try {
            JdbcConnection openJdbcConnection2 = DebeziumUtils.openJdbcConnection(this.sourceConfig);
            Throwable th3 = null;
            try {
                try {
                    this.remainingTables.addAll(DebeziumUtils.discoverCapturedTables(openJdbcConnection2, this.sourceConfig));
                    this.isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(openJdbcConnection2);
                    if (openJdbcConnection2 != null) {
                        if (0 != 0) {
                            try {
                                openJdbcConnection2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            openJdbcConnection2.close();
                        }
                    }
                    LOG.debug("Discovery tables success, time cost: {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } finally {
            }
        } catch (Exception e2) {
            throw new FlinkRuntimeException("Failed to discovery tables to capture", e2);
        }
    }

    private void captureNewlyAddedTables() {
        if (this.sourceConfig.isScanNewlyAddedTableEnabled()) {
            try {
                JdbcConnection openJdbcConnection = DebeziumUtils.openJdbcConnection(this.sourceConfig);
                Throwable th = null;
                try {
                    List<TableId> discoverCapturedTables = DebeziumUtils.discoverCapturedTables(openJdbcConnection, this.sourceConfig);
                    HashSet hashSet = new HashSet();
                    hashSet.addAll((List) this.remainingSplits.stream().map((v0) -> {
                        return v0.getTableId();
                    }).collect(Collectors.toList()));
                    hashSet.addAll(this.alreadyProcessedTables);
                    hashSet.addAll(this.remainingTables);
                    HashSet hashSet2 = new HashSet(hashSet);
                    hashSet2.removeAll(discoverCapturedTables);
                    discoverCapturedTables.removeAll(hashSet);
                    if (!hashSet2.isEmpty()) {
                        LinkedList linkedList = new LinkedList();
                        for (Map.Entry<String, MySqlSchemalessSnapshotSplit> entry : this.assignedSplits.entrySet()) {
                            if (hashSet2.contains(entry.getValue().getTableId())) {
                                linkedList.add(entry.getKey());
                            }
                        }
                        Set<String> keySet = this.assignedSplits.keySet();
                        keySet.getClass();
                        linkedList.forEach((v1) -> {
                            r1.remove(v1);
                        });
                        Set<String> keySet2 = this.splitFinishedOffsets.keySet();
                        keySet2.getClass();
                        linkedList.forEach((v1) -> {
                            r1.remove(v1);
                        });
                        this.tableSchemas.entrySet().removeIf(entry2 -> {
                            return hashSet2.contains(entry2.getKey());
                        });
                        this.remainingSplits.removeIf(mySqlSchemalessSnapshotSplit -> {
                            return hashSet2.contains(mySqlSchemalessSnapshotSplit.getTableId());
                        });
                        this.remainingTables.removeAll(hashSet2);
                        this.alreadyProcessedTables.removeIf(tableId -> {
                            return hashSet2.contains(tableId);
                        });
                    }
                    if (!discoverCapturedTables.isEmpty()) {
                        LOG.info("Found newly added tables, start capture newly added tables process");
                        this.remainingTables.addAll(discoverCapturedTables);
                        if (AssignerStatus.isAssigningFinished(this.assignerStatus)) {
                            LOG.info("Found newly added tables, start capture newly added tables process under binlog reading phase");
                            startAssignNewlyAddedTables();
                        }
                    }
                    if (openJdbcConnection != null) {
                        if (0 != 0) {
                            try {
                                openJdbcConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openJdbcConnection.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
            }
        }
    }

    private void startAsynchronouslySplit() {
        if (this.chunkSplitter.hasNextChunk() || !this.remainingTables.isEmpty()) {
            if (this.executor == null) {
                this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build());
            }
            this.executor.submit(this::splitChunksForRemainingTables);
        }
    }

    private void splitTable(TableId tableId) {
        LOG.info("Start splitting table {} into chunks...", tableId);
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        boolean z = false;
        do {
            synchronized (this.lock) {
                try {
                    List<MySqlSnapshotSplit> splitChunks = this.chunkSplitter.splitChunks(this.partition, tableId);
                    if (!z && !splitChunks.isEmpty()) {
                        z = true;
                        HashMap hashMap = new HashMap();
                        hashMap.putAll(splitChunks.iterator().next().getTableSchemas());
                        this.tableSchemas.putAll(hashMap);
                    }
                    List list = (List) splitChunks.stream().map((v0) -> {
                        return v0.toSchemalessSnapshotSplit();
                    }).collect(Collectors.toList());
                    i += splitChunks.size();
                    this.remainingSplits.addAll(list);
                    if (!this.chunkSplitter.hasNextChunk()) {
                        this.remainingTables.remove(tableId);
                    }
                    this.lock.notify();
                } catch (Exception e) {
                    throw new IllegalStateException("Error when splitting chunks for " + tableId, e);
                }
            }
        } while (this.chunkSplitter.hasNextChunk());
        LOG.info("Split table {} into {} chunks, time cost: {}ms.", new Object[]{tableId, Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public Optional<MySqlSplit> getNext() {
        waitTableDiscoveryReady();
        synchronized (this.lock) {
            checkSplitterErrors();
            if (!this.remainingSplits.isEmpty()) {
                MySqlSchemalessSnapshotSplit next = this.remainingSplits.iterator().next();
                this.remainingSplits.remove(next);
                this.assignedSplits.put(next.splitId(), next);
                addAlreadyProcessedTablesIfNotExists(next.getTableId());
                return Optional.of(next.toMySqlSnapshotSplit(this.tableSchemas.get(next.getTableId())));
            }
            if (this.remainingTables.isEmpty()) {
                closeExecutorService();
                return Optional.empty();
            }
            try {
                this.lock.wait();
                return getNext();
            } catch (InterruptedException e) {
                throw new FlinkRuntimeException("InterruptedException while waiting for asynchronously snapshot split");
            }
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public boolean waitingForFinishedSplits() {
        return !allSnapshotSplitsFinished();
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        if (waitingForFinishedSplits()) {
            LOG.error("The assigner is not ready to offer finished split information, this should not be called");
            throw new FlinkRuntimeException("The assigner is not ready to offer finished split information, this should not be called");
        }
        ArrayList<MySqlSchemalessSnapshotSplit> arrayList = new ArrayList(this.assignedSplits.values());
        ArrayList arrayList2 = new ArrayList();
        for (MySqlSchemalessSnapshotSplit mySqlSchemalessSnapshotSplit : arrayList) {
            arrayList2.add(new FinishedSnapshotSplitInfo(mySqlSchemalessSnapshotSplit.getTableId(), mySqlSchemalessSnapshotSplit.splitId(), mySqlSchemalessSnapshotSplit.getSplitStart(), mySqlSchemalessSnapshotSplit.getSplitEnd(), this.splitFinishedOffsets.get(mySqlSchemalessSnapshotSplit.splitId())));
        }
        return arrayList2;
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void onFinishedSplits(Map<String, BinlogOffset> map) {
        this.splitFinishedOffsets.putAll(map);
        if (allSnapshotSplitsFinished() && AssignerStatus.isAssigningSnapshotSplits(this.assignerStatus)) {
            if (this.currentParallelism != 1) {
                LOG.info("Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
            } else {
                this.assignerStatus = this.assignerStatus.onFinish();
                LOG.info("Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
            }
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void addSplits(Collection<MySqlSplit> collection) {
        for (MySqlSplit mySqlSplit : collection) {
            this.tableSchemas.putAll(mySqlSplit.asSnapshotSplit().getTableSchemas());
            this.remainingSplits.add(mySqlSplit.asSnapshotSplit().toSchemalessSnapshotSplit());
            this.assignedSplits.remove(mySqlSplit.splitId());
            this.splitFinishedOffsets.remove(mySqlSplit.splitId());
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public SnapshotPendingSplitsState snapshotState(long j) {
        SnapshotPendingSplitsState snapshotPendingSplitsState = new SnapshotPendingSplitsState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.tableSchemas, this.splitFinishedOffsets, this.assignerStatus, this.remainingTables, this.isTableIdCaseSensitive, true, this.chunkSplitter.snapshotState(j));
        if (this.checkpointIdToFinish == null && !AssignerStatus.isSnapshotAssigningFinished(this.assignerStatus) && allSnapshotSplitsFinished()) {
            this.checkpointIdToFinish = Long.valueOf(j);
        }
        return snapshotPendingSplitsState;
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void notifyCheckpointComplete(long j) {
        if (this.checkpointIdToFinish != null && AssignerStatus.isAssigningSnapshotSplits(this.assignerStatus) && allSnapshotSplitsFinished()) {
            if (j >= this.checkpointIdToFinish.longValue()) {
                this.assignerStatus = this.assignerStatus.onFinish();
            }
            LOG.info("Snapshot split assigner is turn into finished status.");
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public AssignerStatus getAssignerStatus() {
        return this.assignerStatus;
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void startAssignNewlyAddedTables() {
        Preconditions.checkState(AssignerStatus.isAssigningFinished(this.assignerStatus), "Invalid assigner status {}", new Object[]{this.assignerStatus});
        this.assignerStatus = this.assignerStatus.startAssignNewlyTables();
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void onBinlogSplitUpdated() {
        Preconditions.checkState(AssignerStatus.isNewlyAddedAssigningSnapshotFinished(this.assignerStatus), "Invalid assigner status {}", new Object[]{this.assignerStatus});
        this.assignerStatus = this.assignerStatus.onBinlogSplitUpdated();
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void close() {
        closeExecutorService();
        if (this.chunkSplitter != null) {
            try {
                this.chunkSplitter.close();
            } catch (Exception e) {
                LOG.warn("Fail to close the chunk splitter.");
            }
        }
    }

    private void closeExecutorService() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
        if (this.alreadyProcessedTables.contains(tableId)) {
            return;
        }
        this.alreadyProcessedTables.add(tableId);
    }

    private void waitTableDiscoveryReady() {
        while (needToDiscoveryTables()) {
            LOG.debug("Current assigner is discovering tables, wait tables ready...");
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
            }
        }
    }

    public boolean noMoreSnapshotSplits() {
        return !needToDiscoveryTables() && this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean needToDiscoveryTables() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty() && this.alreadyProcessedTables.isEmpty();
    }

    public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public Map<TableId, TableChanges.TableChange> getTableSchemas() {
        return this.tableSchemas;
    }

    public Map<String, BinlogOffset> getSplitFinishedOffsets() {
        return this.splitFinishedOffsets;
    }

    private boolean allSnapshotSplitsFinished() {
        return noMoreSnapshotSplits() && this.assignedSplits.size() == this.splitFinishedOffsets.size();
    }

    private void splitChunksForRemainingTables() {
        try {
            if (this.chunkSplitter.hasNextChunk()) {
                LOG.info("Start splitting remaining chunks for table {}", this.chunkSplitter.getCurrentSplittingTableId());
                splitTable(this.chunkSplitter.getCurrentSplittingTableId());
            }
            Iterator<TableId> it = this.remainingTables.iterator();
            while (it.hasNext()) {
                splitTable(it.next());
            }
        } catch (Throwable th) {
            synchronized (this.lock) {
                if (this.uncaughtSplitterException == null) {
                    this.uncaughtSplitterException = th;
                } else {
                    this.uncaughtSplitterException.addSuppressed(th);
                }
                this.lock.notify();
            }
        }
    }

    private void checkSplitterErrors() {
        if (this.uncaughtSplitterException != null) {
            throw new FlinkRuntimeException("Chunk splitting has encountered exception", this.uncaughtSplitterException);
        }
    }

    private static MySqlChunkSplitter createChunkSplitter(MySqlSourceConfig mySqlSourceConfig, boolean z, ChunkSplitterState chunkSplitterState) {
        MySqlSchema mySqlSchema = new MySqlSchema(mySqlSourceConfig, z);
        if (ChunkSplitterState.NO_SPLITTING_TABLE_STATE.equals(chunkSplitterState)) {
            return new MySqlChunkSplitter(mySqlSchema, mySqlSourceConfig);
        }
        TableId currentSplittingTableId = chunkSplitterState.getCurrentSplittingTableId();
        return new MySqlChunkSplitter(mySqlSchema, mySqlSourceConfig, (currentSplittingTableId == null || !mySqlSourceConfig.getTableFilters().dataCollectionFilter().isIncluded(currentSplittingTableId)) ? ChunkSplitterState.NO_SPLITTING_TABLE_STATE : chunkSplitterState);
    }
}
