package org.eclipse.hono.cli.app;

import io.quarkus.runtime.Quarkus;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.cli.UsageMessageFormatter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.cli.util.CommandUtils;
import org.eclipse.hono.cli.util.PropertiesVersionProvider;
import org.eclipse.hono.util.Constants;
import picocli.CommandLine;

@Singleton
@CommandLine.Command(name = "consume", description = {"Consume telemetry and/or event messages from devices."}, mixinStandardHelpOptions = true, versionProvider = PropertiesVersionProvider.class, sortOptions = false)
/* loaded from: input_file:org/eclipse/hono/cli/app/TelemetryAndEvent.class */
public class TelemetryAndEvent implements Callable<Integer> {
    private static final String MESSAGE_TYPE_TELEMETRY = "telemetry";
    private static final String MESSAGE_TYPE_EVENT = "event";

    @CommandLine.ParentCommand
    NorthBoundApis appCommand;

    @CommandLine.Option(names = {"-t", "--tenant"}, description = {"The tenant to consume messages for (default: ${DEFAULT-VALUE})."}, defaultValue = Constants.DEFAULT_TENANT, order = 15, scope = CommandLine.ScopeType.INHERIT)
    String tenantId;

    @CommandLine.Option(names = {"--telemetry"}, description = {"Consume telemetry messages.", "If not specified, both telemetry and event messages will be consumed.", "Messages are printed to standard out one message per line using the following format:", "t 4711 text/plain This is the message payload {key1=value1,key2=value2,...}"}, order = 20)
    boolean consumeTelemetry;

    @CommandLine.Option(names = {"--event"}, description = {"Consume event messages.", "If not specified, both telemetry and event messages will be consumed.", "Messages are printed to standard out one message per line using the following format:", "e 4711 text/plain This is the message payload {key1=value1,key2=value2,...}"}, order = 21)
    boolean consumeEvent;

    @Inject
    Vertx vertx;
    private final Set<String> supportedMessageTypes = new HashSet();

    private Future<Void> createConsumers(ApplicationClient<? extends MessageContext> applicationClient) {
        Handler<Throwable> handler = th -> {
            System.err.println("peer has closed message consumer(s) unexpectedly, trying to reopen ...");
            this.vertx.setTimer(1000L, l -> {
                createConsumers(applicationClient);
            });
        };
        ArrayList arrayList = new ArrayList();
        if (this.supportedMessageTypes.contains("event")) {
            arrayList.add(applicationClient.createEventConsumer(this.tenantId, downstreamMessage -> {
                printMessage("event", downstreamMessage);
            }, handler));
        }
        if (this.supportedMessageTypes.contains("telemetry")) {
            arrayList.add(applicationClient.createTelemetryConsumer(this.tenantId, downstreamMessage2 -> {
                printMessage("telemetry", downstreamMessage2);
            }, handler));
        }
        return CompositeFuture.all(arrayList).mapEmpty();
    }

    private void printMessage(String str, DownstreamMessage<? extends MessageContext> downstreamMessage) {
        System.out.println("%s %s %s %s %s".formatted(Character.valueOf(str.charAt(0)), downstreamMessage.getDeviceId(), Optional.ofNullable(downstreamMessage.getContentType()).orElse(UsageMessageFormatter.DEFAULT_OPT_PREFIX), Optional.ofNullable(downstreamMessage.getPayload()).map((v0) -> {
            return v0.toString();
        }).orElse(UsageMessageFormatter.DEFAULT_OPT_PREFIX), downstreamMessage.getProperties().getPropertiesMap()));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() {
        if (this.consumeEvent) {
            this.supportedMessageTypes.add("event");
        }
        if (this.consumeTelemetry) {
            this.supportedMessageTypes.add("telemetry");
        }
        if (this.supportedMessageTypes.isEmpty()) {
            this.supportedMessageTypes.add("event");
            this.supportedMessageTypes.add("telemetry");
        }
        try {
            this.appCommand.getApplicationClient().compose(this::createConsumers).onSuccess2(r8 -> {
                System.err.println("Consuming messages for tenant [%s], ctrl-c to exit.\n".formatted(this.tenantId));
            }).toCompletionStage().toCompletableFuture().join();
            Quarkus.waitForExit();
            return 0;
        } catch (CompletionException e) {
            CommandUtils.printError(e.getCause());
            System.err.println("failed to create message consumer(s): %s".formatted(e.getMessage()));
            return 1;
        }
    }
}
