package org.eclipse.microprofile.reactive.messaging.tck.acknowledgement;

import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.tck.ArchiveExtender;
import org.eclipse.microprofile.reactive.messaging.tck.TckBase;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.shrinkwrap.api.Archive;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.Test;

/* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/acknowledgement/EmitterOfMessageAcknowledgementTest.class */
public class EmitterOfMessageAcknowledgementTest extends TckBase {

    @Inject
    private EmitterBean bean;

    @Inject
    private MessageConsumer processor;

    @ApplicationScoped
    /* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/acknowledgement/EmitterOfMessageAcknowledgementTest$MessageConsumer.class */
    public static class MessageConsumer {
        private boolean failureModeEnabled = false;

        public void enableFailureMode() {
            this.failureModeEnabled = true;
        }

        public void disableFailureMode() {
            this.failureModeEnabled = false;
        }

        @Incoming("data")
        public CompletionStage<Void> process(String str) {
            if (this.failureModeEnabled) {
                if (str.equalsIgnoreCase("b")) {
                    throw new IllegalArgumentException("b");
                }
                if (str.equalsIgnoreCase("c")) {
                    CompletableFuture completableFuture = new CompletableFuture();
                    completableFuture.completeExceptionally(new IllegalArgumentException("c"));
                    return completableFuture;
                }
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    @Deployment
    public static Archive<JavaArchive> deployment() {
        JavaArchive addAsManifestResource = ShrinkWrap.create(JavaArchive.class).addClasses(new Class[]{EmitterBean.class, MessageConsumer.class, ArchiveExtender.class}).addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml");
        ServiceLoader.load(ArchiveExtender.class).iterator().forEachRemaining(archiveExtender -> {
            archiveExtender.extend(addAsManifestResource);
        });
        return addAsManifestResource;
    }

    @Test
    public void testThatEmitterReceiveAcksAfterSuccessfulProcessingOfPayload() {
        this.processor.disableFailureMode();
        Emitter<String> emitter = this.bean.getEmitter();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        emitter.send(Message.of("a", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("b", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th2 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("c", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th3 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("d", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th4 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("e", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th5 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 5);
        });
        Assertions.assertThat(atomicInteger2).hasValue(0);
    }

    @Test
    public void testThatEmitterReceiveNacksAfterFailingProcessingOfPayload() {
        Emitter<String> emitter = this.bean.getEmitter();
        this.processor.enableFailureMode();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        emitter.send(Message.of("a", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("b", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th2 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("c", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th3 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("d", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th4 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        emitter.send(Message.of("e", () -> {
            atomicInteger.incrementAndGet();
            return completedFuture;
        }, th5 -> {
            atomicInteger2.incrementAndGet();
            return completedFuture;
        }));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 3);
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == 2);
        });
    }
}
