/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.api.ApiGenerator;
import com.alibaba.nacos.istio.api.ApiGeneratorFactory;
import com.alibaba.nacos.istio.common.AbstractConnection;
import com.alibaba.nacos.istio.common.Event;
import com.alibaba.nacos.istio.common.NacosResourceManager;
import com.alibaba.nacos.istio.common.ResourceSnapshot;
import com.alibaba.nacos.istio.common.WatchedStatus;
import com.alibaba.nacos.istio.mcp.McpConnection;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.util.NonceGenerator;
import io.grpc.stub.StreamObserver;
import istio.mcp.v1alpha1.Mcp;
import istio.mcp.v1alpha1.ResourceSourceGrpc;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class NacosMcpService
extends ResourceSourceGrpc.ResourceSourceImplBase {
    private final Map<String, AbstractConnection<Mcp.Resources>> connections = new ConcurrentHashMap<String, AbstractConnection<Mcp.Resources>>(16);
    @Autowired
    ApiGeneratorFactory apiGeneratorFactory;
    @Autowired
    NacosResourceManager resourceManager;

    public boolean hasClientConnection() {
        return this.connections.size() != 0;
    }

    @Override
    public StreamObserver<Mcp.RequestResources> establishResourceStream(final StreamObserver<Mcp.Resources> responseObserver) {
        this.resourceManager.initResourceSnapshot();
        final McpConnection newConnection = new McpConnection(responseObserver);
        return new StreamObserver<Mcp.RequestResources>(){
            private boolean initRequest = true;

            public void onNext(Mcp.RequestResources requestResources) {
                if (this.initRequest) {
                    newConnection.setConnectionId(requestResources.getSinkNode().getId());
                    NacosMcpService.this.connections.put(newConnection.getConnectionId(), newConnection);
                    this.initRequest = false;
                }
                NacosMcpService.this.process(requestResources, newConnection);
            }

            public void onError(Throwable throwable) {
                Loggers.MAIN.error("mcp: {} stream error.", (Object)newConnection.getConnectionId(), (Object)throwable);
                this.clear();
            }

            public void onCompleted() {
                responseObserver.onCompleted();
                this.clear();
            }

            private void clear() {
                NacosMcpService.this.connections.remove(newConnection.getConnectionId());
            }
        };
    }

    private void process(Mcp.RequestResources requestResources, AbstractConnection<Mcp.Resources> connection) {
        if (!this.shouldPush(requestResources, connection)) {
            return;
        }
        Mcp.Resources response = this.buildMcpResourcesResponse(requestResources.getCollection(), this.resourceManager.getResourceSnapshot());
        connection.push(response, connection.getWatchedStatusByType(requestResources.getCollection()));
    }

    private boolean shouldPush(Mcp.RequestResources requestResources, AbstractConnection<Mcp.Resources> connection) {
        String type = requestResources.getCollection();
        String connectionId = connection.getConnectionId();
        if (requestResources.getErrorDetail().getCode() != 0) {
            Loggers.MAIN.error("mcp: ACK error, connection-id: {}, code: {}, message: {}", new Object[]{connectionId, requestResources.getErrorDetail().getCode(), requestResources.getErrorDetail().getMessage()});
            return false;
        }
        if (requestResources.getResponseNonce().isEmpty()) {
            Loggers.MAIN.info("mcp: init request, type {}, connection-id {}, is incremental {}", new Object[]{type, connectionId, requestResources.getIncremental()});
            WatchedStatus watchedStatus = new WatchedStatus();
            watchedStatus.setType(type);
            connection.addWatchedResource(type, watchedStatus);
            return true;
        }
        WatchedStatus watchedStatus = connection.getWatchedStatusByType(type);
        if (watchedStatus == null) {
            Loggers.MAIN.info("mcp: reconnect, type {}, connection-id {}, is incremental {}", new Object[]{type, connectionId, requestResources.getIncremental()});
            watchedStatus = new WatchedStatus();
            watchedStatus.setType(type);
            connection.addWatchedResource(type, watchedStatus);
            return true;
        }
        if (!watchedStatus.getLatestNonce().equals(requestResources.getResponseNonce())) {
            Loggers.MAIN.warn("mcp: request dis match, type {}, connection-id {}", (Object)type, (Object)connectionId);
            return false;
        }
        watchedStatus.setAckedNonce(requestResources.getResponseNonce());
        Loggers.MAIN.info("mcp: ack, type {}, connection-id {}, nonce {}", new Object[]{type, connectionId, requestResources.getResponseNonce()});
        return false;
    }

    public void handleEvent(ResourceSnapshot resourceSnapshot, Event event) {
        switch (event.getType()) {
            case Service: {
                if (this.connections.size() == 0) {
                    return;
                }
                Loggers.MAIN.info("xds: event {} trigger push.", (Object)event.getType());
                Mcp.Resources serviceEntryMcpResponse = this.buildMcpResourcesResponse("istio/networking/v1alpha3/serviceentries", resourceSnapshot);
                for (AbstractConnection<Mcp.Resources> connection : this.connections.values()) {
                    WatchedStatus watchedStatus = connection.getWatchedStatusByType("istio/networking/v1alpha3/serviceentries");
                    if (watchedStatus == null) continue;
                    connection.push(serviceEntryMcpResponse, watchedStatus);
                }
                break;
            }
            default: {
                Loggers.MAIN.warn("Invalid event {}, ignore it.", (Object)event.getType());
            }
        }
    }

    private Mcp.Resources buildMcpResourcesResponse(String type, ResourceSnapshot resourceSnapshot) {
        ApiGenerator<?> serviceEntryGenerator = this.apiGeneratorFactory.getApiGenerator(type);
        List<?> rawResources = serviceEntryGenerator.generate(resourceSnapshot);
        String nonce = NonceGenerator.generateNonce();
        return Mcp.Resources.newBuilder().setCollection(type).addAllResources(rawResources).setSystemVersionInfo(resourceSnapshot.getVersion()).setNonce(nonce).build();
    }
}

