package org.eclipse.hono.cli.app;

import io.quarkus.runtime.ShutdownEvent;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.application.client.amqp.ProtonBasedApplicationClient;
import org.eclipse.hono.application.client.kafka.impl.KafkaApplicationClientImpl;
import org.eclipse.hono.cli.util.ConnectionOptions;
import org.eclipse.hono.cli.util.PropertiesVersionProvider;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.kafka.CommonKafkaClientConfigProperties;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.config.FileFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@Singleton
@CommandLine.Command(name = "app", description = {"A client for interacting with Hono's north bound API endpoints."}, mixinStandardHelpOptions = true, versionProvider = PropertiesVersionProvider.class, sortOptions = false, subcommands = {TelemetryAndEvent.class, CommandAndControl.class})
/* loaded from: input_file:org/eclipse/hono/cli/app/NorthBoundApis.class */
public class NorthBoundApis {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NorthBoundApis.class);
    private static final String SANDBOX_AMQP_USER = "consumer@HONO";
    private static final String SANDBOX_AMQP_PWD = "verysecret";
    private static final String SANDBOX_KAFKA_USER = "hono";
    private static final String SANDBOX_KAFKA_PWD = "hono-secret";

    @CommandLine.Mixin
    ConnectionOptions connectionOptions;

    @CommandLine.Option(names = {"--amqp"}, description = {"Connect to the AMQP 1.0 based API endpoints", "If not set, the Kafka based endpoints are used by default"}, order = 12)
    boolean useAmqp;

    @Inject
    Vertx vertx;

    @CommandLine.Spec
    CommandLine.Model.CommandSpec spec;
    ApplicationClient<? extends MessageContext> client;

    private void validateConnectionOptions() {
        if (this.connectionOptions.useSandbox) {
            return;
        }
        if (this.connectionOptions.hostname.isEmpty() || this.connectionOptions.portNumber.isEmpty()) {
            throw new CommandLine.ParameterException(this.spec.commandLine(), "Missing required option: both '--host=<hostname>' and '--port=<portNumber> need to be specified if not using '--sandbox'.\n");
        }
    }

    private String scramJaasConfig(String str, String str2) {
        return "%s required username=\"%s\" password=\"%s\";\n".formatted(ScramLoginModule.class.getName(), str, str2);
    }

    Future<KafkaApplicationClientImpl> createKafkaClient() {
        String formatted;
        HashMap hashMap = new HashMap();
        if (this.connectionOptions.useSandbox) {
            formatted = "%1$s:9092,%1$s:9094".formatted(ConnectionOptions.SANDBOX_HOST_NAME);
            hashMap.put("bootstrap.servers", formatted);
            hashMap.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
            hashMap.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
            Optional.ofNullable(this.connectionOptions.credentials).ifPresentOrElse(credentials -> {
                hashMap.put(SaslConfigs.SASL_JAAS_CONFIG, scramJaasConfig(credentials.username, credentials.password));
            }, () -> {
                hashMap.put(SaslConfigs.SASL_JAAS_CONFIG, scramJaasConfig(SANDBOX_KAFKA_USER, SANDBOX_KAFKA_PWD));
            });
        } else {
            validateConnectionOptions();
            formatted = "%s:%d".formatted(this.connectionOptions.hostname.get(), this.connectionOptions.portNumber.get());
            hashMap.put("bootstrap.servers", formatted);
            this.connectionOptions.trustStorePath.ifPresent(str -> {
                hashMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, str);
                this.connectionOptions.trustStorePassword.ifPresent(str -> {
                    hashMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, str);
                });
                Optional.ofNullable(FileFormat.detect(str)).ifPresent(fileFormat -> {
                    hashMap.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, fileFormat.name());
                });
                if (this.connectionOptions.disableHostnameVerification) {
                    hashMap.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
                }
            });
            if (this.connectionOptions.credentials != null) {
                if (this.connectionOptions.trustStorePath.isEmpty()) {
                    hashMap.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
                } else {
                    hashMap.put("security.protocol", SecurityProtocol.SASL_SSL.name);
                }
                hashMap.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
                hashMap.put(SaslConfigs.SASL_JAAS_CONFIG, scramJaasConfig(this.connectionOptions.credentials.username, this.connectionOptions.credentials.password));
            } else if (this.connectionOptions.trustStorePath.isPresent()) {
                hashMap.put("security.protocol", SecurityProtocol.SSL.name);
            }
        }
        CommonKafkaClientConfigProperties commonKafkaClientConfigProperties = new CommonKafkaClientConfigProperties();
        commonKafkaClientConfigProperties.setCommonClientConfig(hashMap);
        MessagingKafkaConsumerConfigProperties messagingKafkaConsumerConfigProperties = new MessagingKafkaConsumerConfigProperties();
        messagingKafkaConsumerConfigProperties.setCommonClientConfig(commonKafkaClientConfigProperties);
        MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties = new MessagingKafkaProducerConfigProperties();
        messagingKafkaProducerConfigProperties.setCommonClientConfig(commonKafkaClientConfigProperties);
        System.err.printf("Connecting to Kafka based messaging infrastructure [%s]%n", formatted);
        Promise promise = Promise.promise();
        KafkaApplicationClientImpl kafkaApplicationClientImpl = new KafkaApplicationClientImpl(this.vertx, messagingKafkaConsumerConfigProperties, CachingKafkaProducerFactory.sharedFactory(this.vertx), messagingKafkaProducerConfigProperties);
        kafkaApplicationClientImpl.addOnKafkaProducerReadyHandler(promise);
        return kafkaApplicationClientImpl.start().compose(r3 -> {
            return promise.future();
        }).map(r5 -> {
            this.client = kafkaApplicationClientImpl;
            return kafkaApplicationClientImpl;
        });
    }

    Future<ProtonBasedApplicationClient> createAmqpClient() {
        ClientConfigProperties clientConfigProperties = new ClientConfigProperties();
        clientConfigProperties.setReconnectAttempts(5);
        this.connectionOptions.trustStorePath.ifPresent(str -> {
            clientConfigProperties.setTrustStorePath(str);
            Optional<String> optional = this.connectionOptions.trustStorePassword;
            Objects.requireNonNull(clientConfigProperties);
            optional.ifPresent(clientConfigProperties::setTrustStorePassword);
        });
        if (this.connectionOptions.useSandbox) {
            clientConfigProperties.setHost(ConnectionOptions.SANDBOX_HOST_NAME);
            clientConfigProperties.setPort(((Integer) this.connectionOptions.trustStorePath.map(str2 -> {
                return 15671;
            }).orElse(15672)).intValue());
            clientConfigProperties.setUsername(SANDBOX_AMQP_USER);
            clientConfigProperties.setPassword(SANDBOX_AMQP_PWD);
        } else {
            validateConnectionOptions();
            Optional<String> optional = this.connectionOptions.hostname;
            Objects.requireNonNull(clientConfigProperties);
            optional.ifPresent(clientConfigProperties::setHost);
            Optional<Integer> optional2 = this.connectionOptions.portNumber;
            Objects.requireNonNull(clientConfigProperties);
            optional2.ifPresent((v1) -> {
                r1.setPort(v1);
            });
            Optional<String> optional3 = this.connectionOptions.trustStorePath;
            Objects.requireNonNull(clientConfigProperties);
            optional3.ifPresent(clientConfigProperties::setTrustStorePath);
            Optional<String> optional4 = this.connectionOptions.trustStorePassword;
            Objects.requireNonNull(clientConfigProperties);
            optional4.ifPresent(clientConfigProperties::setTrustStorePassword);
            clientConfigProperties.setHostnameVerificationRequired(!this.connectionOptions.disableHostnameVerification);
            Optional.ofNullable(this.connectionOptions.credentials).ifPresent(credentials -> {
                clientConfigProperties.setUsername(credentials.username);
                clientConfigProperties.setPassword(credentials.password);
            });
        }
        ProtonBasedApplicationClient protonBasedApplicationClient = new ProtonBasedApplicationClient(HonoConnection.newConnection(this.vertx, clientConfigProperties));
        System.err.printf("Connecting to AMQP 1.0 based messaging infrastructure [%s:%d]%n", clientConfigProperties.getHost(), Integer.valueOf(clientConfigProperties.getPort()));
        return protonBasedApplicationClient.connect().onSuccess2(honoConnection -> {
            this.client = protonBasedApplicationClient;
        }).map((Future<HonoConnection>) protonBasedApplicationClient);
    }

    public Future<ApplicationClient<? extends MessageContext>> getApplicationClient() {
        Promise promise = Promise.promise();
        if (this.client != null) {
            promise.complete(this.client);
        } else if (this.useAmqp) {
            Future<ProtonBasedApplicationClient> createAmqpClient = createAmqpClient();
            Objects.requireNonNull(promise);
            Future<ProtonBasedApplicationClient> onSuccess2 = createAmqpClient.onSuccess2((v1) -> {
                r1.complete(v1);
            });
            Objects.requireNonNull(promise);
            onSuccess2.onFailure(promise::fail);
        } else {
            Future<KafkaApplicationClientImpl> createKafkaClient = createKafkaClient();
            Objects.requireNonNull(promise);
            Future<KafkaApplicationClientImpl> onSuccess22 = createKafkaClient.onSuccess2((v1) -> {
                r1.complete(v1);
            });
            Objects.requireNonNull(promise);
            onSuccess22.onFailure(promise::fail);
        }
        return promise.future();
    }

    public void onStop(@Observes ShutdownEvent shutdownEvent) {
        if (this.client != null) {
            LOG.debug("disconnecting from Hono");
            this.client.stop().onComplete2(asyncResult -> {
                LOG.debug("stopped consumers");
            }).toCompletionStage().toCompletableFuture().join();
        }
    }
}
