/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.microprofile.reactive.messaging.tck.acknowledgement;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.tck.ArchiveExtender;
import org.eclipse.microprofile.reactive.messaging.tck.TckBase;
import org.eclipse.microprofile.reactive.messaging.tck.acknowledgement.EmitterBean;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.shrinkwrap.api.Archive;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.Asset;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.Test;

public class EmitterOfPayloadAcknowledgementTest
extends TckBase {
    @Inject
    private EmitterBean bean;
    @Inject
    private MessageConsumer processor;

    @Deployment
    public static Archive<JavaArchive> deployment() {
        JavaArchive archive = (JavaArchive)((JavaArchive)((JavaArchive)ShrinkWrap.create(JavaArchive.class)).addClasses(new Class[]{EmitterBean.class, MessageConsumer.class, ArchiveExtender.class})).addAsManifestResource((Asset)EmptyAsset.INSTANCE, "beans.xml");
        ServiceLoader.load(ArchiveExtender.class).iterator().forEachRemaining(ext -> ext.extend(archive));
        return archive;
    }

    @Test
    public void testThatEmitterReceiveAcksAfterSuccessfulProcessingOfPayload() {
        this.processor.disableFailureMode();
        Emitter<String> emitter = this.bean.getEmitter();
        CompletableFuture<Void> all = CompletableFuture.allOf(emitter.send((Object)"a").toCompletableFuture(), emitter.send((Object)"b").toCompletableFuture(), emitter.send((Object)"c").toCompletableFuture(), emitter.send((Object)"d").toCompletableFuture(), emitter.send((Object)"e").toCompletableFuture());
        Awaitility.await().until(all::isDone);
        Assertions.assertThat((boolean)all.isCompletedExceptionally()).isFalse();
        Assertions.assertThat((boolean)all.isCancelled()).isFalse();
    }

    @Test
    public void testThatEmitterReceiveNacksAfterFailingProcessingOfPayload() {
        Emitter<String> emitter = this.bean.getEmitter();
        this.processor.enableFailureMode();
        emitter.send((Object)"a").toCompletableFuture().join();
        Assertions.assertThatThrownBy(() -> {
            Void cfr_ignored_0 = (Void)emitter.send((Object)"b").toCompletableFuture().join();
        }).hasRootCauseInstanceOf(IllegalArgumentException.class);
        Assertions.assertThatThrownBy(() -> {
            Void cfr_ignored_0 = (Void)emitter.send((Object)"c").toCompletableFuture().join();
        }).hasRootCauseInstanceOf(IllegalArgumentException.class);
        emitter.send((Object)"d").toCompletableFuture().join();
    }

    @ApplicationScoped
    public static class MessageConsumer {
        private boolean failureModeEnabled = false;

        public void enableFailureMode() {
            this.failureModeEnabled = true;
        }

        public void disableFailureMode() {
            this.failureModeEnabled = false;
        }

        @Incoming(value="data")
        public CompletionStage<Void> process(String s) {
            if (this.failureModeEnabled) {
                if (s.equalsIgnoreCase("b")) {
                    throw new IllegalArgumentException("b");
                }
                if (s.equalsIgnoreCase("c")) {
                    CompletableFuture<Void> cf = new CompletableFuture<Void>();
                    cf.completeExceptionally(new IllegalArgumentException("c"));
                    return cf;
                }
            }
            return CompletableFuture.completedFuture(null);
        }
    }
}

