package org.springframework.data.elasticsearch.client.reactive;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/spring-data-elasticsearch-4.0.5.RELEASE.jar:org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.class */
public class MultiNodeHostProvider implements HostProvider {
    private final WebClientProvider clientProvider;
    private final Supplier<HttpHeaders> headersSupplier;
    private final Map<InetSocketAddress, ElasticsearchHost> hosts = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiNodeHostProvider(WebClientProvider webClientProvider, Supplier<HttpHeaders> supplier, InetSocketAddress... inetSocketAddressArr) {
        this.clientProvider = webClientProvider;
        this.headersSupplier = supplier;
        for (InetSocketAddress inetSocketAddress : inetSocketAddressArr) {
            this.hosts.put(inetSocketAddress, new ElasticsearchHost(inetSocketAddress, ElasticsearchHost.State.UNKNOWN));
        }
    }

    @Override // org.springframework.data.elasticsearch.client.reactive.HostProvider
    public Mono<HostProvider.ClusterInformation> clusterInfo() {
        return nodes(null).map(this::updateNodeState).buffer(this.hosts.size()).then(Mono.just(new HostProvider.ClusterInformation(new LinkedHashSet(this.hosts.values()))));
    }

    @Override // org.springframework.data.elasticsearch.client.reactive.HostProvider
    public WebClient createWebClient(InetSocketAddress inetSocketAddress) {
        return this.clientProvider.get(inetSocketAddress);
    }

    @Override // org.springframework.data.elasticsearch.client.reactive.HostProvider
    public Mono<InetSocketAddress> lookupActiveHost(HostProvider.Verification verification) {
        if (HostProvider.Verification.LAZY.equals(verification)) {
            for (ElasticsearchHost elasticsearchHost : hosts()) {
                if (elasticsearchHost.isOnline()) {
                    return Mono.just(elasticsearchHost.getEndpoint());
                }
            }
        }
        return findActiveHostInKnownActives().switchIfEmpty(findActiveHostInUnresolved()).switchIfEmpty(findActiveHostInDead()).switchIfEmpty(Mono.error((Supplier<? extends Throwable>) () -> {
            return new NoReachableHostException(new LinkedHashSet(getCachedHostState()));
        }));
    }

    Collection<ElasticsearchHost> getCachedHostState() {
        return this.hosts.values();
    }

    private Mono<InetSocketAddress> findActiveHostInKnownActives() {
        return findActiveForSate(ElasticsearchHost.State.ONLINE);
    }

    private Mono<InetSocketAddress> findActiveHostInUnresolved() {
        return findActiveForSate(ElasticsearchHost.State.UNKNOWN);
    }

    private Mono<InetSocketAddress> findActiveHostInDead() {
        return findActiveForSate(ElasticsearchHost.State.OFFLINE);
    }

    private Mono<InetSocketAddress> findActiveForSate(ElasticsearchHost.State state) {
        return nodes(state).map(this::updateNodeState).filter((v0) -> {
            return v0.isOnline();
        }).map((v0) -> {
            return v0.getEndpoint();
        }).next();
    }

    private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, ClientResponse> tuple2) {
        ElasticsearchHost elasticsearchHost = new ElasticsearchHost(tuple2.getT1(), tuple2.getT2().statusCode().isError() ? ElasticsearchHost.State.OFFLINE : ElasticsearchHost.State.ONLINE);
        this.hosts.put(tuple2.getT1(), elasticsearchHost);
        return elasticsearchHost;
    }

    private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable ElasticsearchHost.State state) {
        return Flux.fromIterable(hosts()).filter(elasticsearchHost -> {
            return state == null || elasticsearchHost.getState().equals(state);
        }).map((v0) -> {
            return v0.getEndpoint();
        }).flatMap(inetSocketAddress -> {
            return Mono.just(inetSocketAddress).zipWith(createWebClient(inetSocketAddress).head().uri("/", new Object[0]).headers(httpHeaders -> {
                httpHeaders.addAll(this.headersSupplier.get());
            }).exchange().doOnError(th -> {
                this.hosts.put(inetSocketAddress, new ElasticsearchHost(inetSocketAddress, ElasticsearchHost.State.OFFLINE));
                this.clientProvider.getErrorListener().accept(th);
            }));
        }).onErrorContinue((th, obj) -> {
            this.clientProvider.getErrorListener().accept(th);
        });
    }

    private List<ElasticsearchHost> hosts() {
        ArrayList arrayList = new ArrayList(this.hosts.values());
        Collections.shuffle(arrayList);
        return arrayList;
    }
}
