package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.vertx.core.Future;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.DownstreamAmqpMessageFactory;
import org.eclipse.hono.client.amqp.GenericSenderLink;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/client/command/amqp/ProtonBasedCommandResponseSender.class */
public class ProtonBasedCommandResponseSender extends AbstractServiceClient implements CommandResponseSender {
    private final boolean jmsVendorPropsEnabled;

    public ProtonBasedCommandResponseSender(HonoConnection honoConnection, SendMessageSampler.Factory factory, boolean z) {
        super(honoConnection, factory);
        this.jmsVendorPropsEnabled = z;
    }

    private Future<GenericSenderLink> createSender(String str, String str2) {
        return this.connection.executeOnContext(promise -> {
            GenericSenderLink.create(this.connection, "command_response", str, str2, this.samplerFactory.create("command_response"), str3 -> {
            }).onComplete2(promise);
        });
    }

    private Message createDownstreamMessage(CommandResponse commandResponse, TenantObject tenantObject, RegistrationAssertion registrationAssertion) {
        ResourceIdentifier from = ResourceIdentifier.from("command_response", commandResponse.getTenantId(), commandResponse.getReplyToId());
        Message newMessage = DownstreamAmqpMessageFactory.newMessage(from, commandResponse.getContentType(), commandResponse.getPayload(), tenantObject, tenantObject.getDefaults().getMap(), registrationAssertion.getDefaults(), commandResponse.getAdditionalProperties(), this.jmsVendorPropsEnabled);
        newMessage.setAddress(from.toString());
        newMessage.setCorrelationId(commandResponse.getCorrelationId());
        AmqpUtils.addStatus(newMessage, commandResponse.getStatus());
        AmqpUtils.addTenantId(newMessage, commandResponse.getTenantId());
        AmqpUtils.addDeviceId(newMessage, commandResponse.getDeviceId());
        return newMessage;
    }

    @Override // org.eclipse.hono.client.command.CommandResponseSender
    public Future<Void> sendCommandResponse(TenantObject tenantObject, RegistrationAssertion registrationAssertion, CommandResponse commandResponse, SpanContext spanContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(registrationAssertion);
        Objects.requireNonNull(commandResponse);
        Future<GenericSenderLink> createSender = createSender(commandResponse.getTenantId(), commandResponse.getReplyToId());
        return createSender.recover(th -> {
            return Future.failedFuture(StatusCodeMapper.toServerError(th));
        }).compose(genericSenderLink -> {
            Message createDownstreamMessage = createDownstreamMessage(commandResponse, tenantObject, registrationAssertion);
            Span newChildSpan = newChildSpan(spanContext, "forward Command response");
            if (commandResponse.getMessagingType() != getMessagingType()) {
                newChildSpan.log(String.format("using messaging type %s instead of type %s used for the original command", getMessagingType(), commandResponse.getMessagingType()));
            }
            return genericSenderLink.sendAndWaitForOutcome(createDownstreamMessage, newChildSpan);
        }).onSuccess2(protonDelivery -> {
            ((GenericSenderLink) createSender.result()).close();
        }).mapEmpty();
    }
}
