package org.eclipse.hono.cli.app;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.ArrayList;
import java.util.Objects;
import java.util.function.BiConsumer;
import javax.annotation.PostConstruct;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.application.client.amqp.AmqpApplicationClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Profile({"receiver"})
@Component
/* loaded from: input_file:BOOT-INF/classes/org/eclipse/hono/cli/app/Receiver.class */
public class Receiver extends AbstractApplicationClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";

    @Value("${message.type}")
    protected String messageType = TYPE_ALL;
    private BiConsumer<String, DownstreamMessage<? extends MessageContext>> messageHandler = this::handleMessage;
    private ApplicationClient<? extends MessageContext> client;

    @Autowired
    public final void setApplicationClient(ApplicationClient<? extends MessageContext> applicationClient) {
        this.client = (ApplicationClient) Objects.requireNonNull(applicationClient);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMessageHandler(BiConsumer<String, DownstreamMessage<? extends MessageContext>> biConsumer) {
        this.messageHandler = (BiConsumer) Objects.requireNonNull(biConsumer);
    }

    @PostConstruct
    Future<CompositeFuture> start() {
        Future succeededFuture;
        if (this.client instanceof AmqpApplicationClient) {
            AmqpApplicationClient amqpApplicationClient = (AmqpApplicationClient) this.client;
            succeededFuture = amqpApplicationClient.connect().onComplete2(asyncResult -> {
                amqpApplicationClient.addReconnectListener(honoConnection -> {
                    createConsumer();
                });
            }).mapEmpty();
        } else {
            succeededFuture = Future.succeededFuture();
        }
        return succeededFuture.compose(r3 -> {
            return createConsumer();
        }).onComplete2(this::handleCreateConsumerStatus);
    }

    private CompositeFuture createConsumer() {
        Handler<Throwable> handler = th -> {
            this.log.info("close handler of consumer is called", th);
            this.vertx.setTimer(this.connectionRetryInterval, l -> {
                this.log.info("attempting to re-create the consumer ...");
                createConsumer();
            });
        };
        ArrayList arrayList = new ArrayList();
        if (this.messageType.equals("event") || this.messageType.equals(TYPE_ALL)) {
            arrayList.add(this.client.createEventConsumer(this.tenantId, downstreamMessage -> {
                this.messageHandler.accept("event", downstreamMessage);
            }, handler));
        }
        if (this.messageType.equals("telemetry") || this.messageType.equals(TYPE_ALL)) {
            arrayList.add(this.client.createTelemetryConsumer(this.tenantId, downstreamMessage2 -> {
                this.messageHandler.accept("telemetry", downstreamMessage2);
            }, handler));
        }
        if (arrayList.isEmpty()) {
            arrayList.add(Future.failedFuture(String.format("Invalid message type [\"%s\"]. Valid types are \"telemetry\", \"event\" or \"all\"", this.messageType)));
        }
        return CompositeFuture.all(arrayList);
    }

    private void handleMessage(String str, DownstreamMessage<? extends MessageContext> downstreamMessage) {
        this.log.info("received {} message [device: {}, content-type: {}]: {}", str, downstreamMessage.getDeviceId(), downstreamMessage.getContentType(), downstreamMessage.getPayload());
        this.log.info("... with properties: {}", downstreamMessage.getProperties().getPropertiesMap());
    }

    private void handleCreateConsumerStatus(AsyncResult<CompositeFuture> asyncResult) {
        if (asyncResult.succeeded()) {
            this.log.info("Receiver [tenant: {}, mode: {}] created successfully, hit ctrl-c to exit", this.tenantId, this.messageType);
        } else {
            this.log.error("Error occurred during initialization of receiver: {}", asyncResult.cause().getMessage());
            this.vertx.close();
        }
    }
}
