chore: Initial import of FLEX training material
This commit is contained in:
parent
c01246d4f7
commit
12235acc42
1020 changed files with 53940 additions and 0 deletions
|
|
@ -0,0 +1,164 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<relativePath>../pom.xml</relativePath> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
|
||||
<artifactId>flexinale-distributed-test-distributed</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<name>Flexinale Distributed Test - testing done between distributed services</name>
|
||||
<description>Flexinale - FLEX case-study "film festival", distributed services - testing done between distributed services</description>
|
||||
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.poi</groupId>
|
||||
<artifactId>poi</artifactId>
|
||||
<version>${apache-poi.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.poi</groupId>
|
||||
<artifactId>poi-ooxml</artifactId>
|
||||
<version>${apache-poi.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-besucherportal</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-common</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-common</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-security</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-security_api_contract</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-backoffice</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-backoffice_api_contract</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-ticketing</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.accso</groupId>
|
||||
<artifactId>flexinale-distributed-ticketing_api_contract</artifactId>
|
||||
<version>2024.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>${maven-jar-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>com.github.spotbugs</groupId>
|
||||
<artifactId>spotbugs-maven-plugin</artifactId>
|
||||
<version>${spotbugs-maven-plugin.version}</version>
|
||||
<configuration>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>com.h3xstream.findsecbugs</groupId>
|
||||
<artifactId>findsecbugs-plugin</artifactId>
|
||||
<version>${findsecbugs-maven-plugin.version}</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<!-- overwrite dependency on spotbugs if you want to specify the version of spotbugs -->
|
||||
<dependency>
|
||||
<groupId>com.github.spotbugs</groupId>
|
||||
<artifactId>spotbugs</artifactId>
|
||||
<version>${spotbugs.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
package de.accso.flexinale;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class FlexinaleDistributedApplicationTestDistributed {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(FlexinaleDistributedApplicationTestDistributed.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,410 @@
|
|||
package de.accso.flexinale.backoffice.api_in.rest;
|
||||
|
||||
import de.accso.flexinale.FlexinaleDistributedApplicationTestDistributed;
|
||||
import de.accso.flexinale.backoffice.api_contract.event.FilmCreatedEvent;
|
||||
import de.accso.flexinale.backoffice.api_contract.event.KinoCreatedEvent;
|
||||
import de.accso.flexinale.backoffice.api_contract.event.VorfuehrungCreatedEvent;
|
||||
import de.accso.flexinale.backoffice.domain.model.KinoSaal;
|
||||
import de.accso.flexinale.besucherportal.api_in.event.FilmSubscriber;
|
||||
import de.accso.flexinale.besucherportal.api_in.event.KinoSubscriber;
|
||||
import de.accso.flexinale.besucherportal.api_in.event.KontingentSubscriber;
|
||||
import de.accso.flexinale.besucherportal.api_in.event.VorfuehrungSubscriber;
|
||||
import de.accso.flexinale.common.api.eventbus.EventBus;
|
||||
import de.accso.flexinale.common.api.eventbus.EventBusFactory;
|
||||
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
|
||||
import de.accso.flexinale.common.api.event.Event;
|
||||
import de.accso.flexinale.common.infrastructure.eventbus.KafkaAvailabilityChecker;
|
||||
import de.accso.flexinale.common.shared_kernel.DoNotCheckInArchitectureTests;
|
||||
import de.accso.flexinale.common.shared_kernel.Identifiable.Id;
|
||||
import de.accso.flexinale.ticketing.api_contract.event.OnlineKontingentChangedEvent;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart.START_READING_FROM_BEGINNING;
|
||||
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaConfiguration.adminConfig;
|
||||
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaTopicHelper.deleteKafkaTopicsWithPrefixAndSuffix;
|
||||
import static de.accso.flexinale.common.shared_kernel.Identifiable.Id.uuidString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.with;
|
||||
|
||||
@SpringBootTest(classes = {FlexinaleDistributedApplicationTestDistributed.class})
|
||||
@ActiveProfiles("test-distributed")
|
||||
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
|
||||
@DoNotCheckInArchitectureTests
|
||||
@Disabled("needs Kafka and hence only to be called manually and explicitely")
|
||||
public class End2EndWithDistributedKafkaEventBusTest {
|
||||
|
||||
private static final Class<End2EndWithDistributedKafkaEventBusTest> CLAZZ =
|
||||
End2EndWithDistributedKafkaEventBusTest.class;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CLAZZ);
|
||||
|
||||
private static final String xlsDataFile =
|
||||
"/de/accso/flexinale/backoffice/api_in/rest/NewDataForRestCallIntegrationTest.xlsx";
|
||||
private static InputStream xlsDataFileIS() {
|
||||
return Objects.requireNonNull(CLAZZ.getResourceAsStream(xlsDataFile));
|
||||
}
|
||||
|
||||
@Autowired
|
||||
FilmRestController filmRestControllerInBackoffice;
|
||||
@Autowired
|
||||
KinoKinoSaalRestController kinoKinoSaalRestControllerInBackoffice;
|
||||
@Autowired
|
||||
VorfuehrungRestController vorfuehrungRestControllerInBackoffice;
|
||||
|
||||
@Autowired
|
||||
FilmSubscriber filmSubscriberInBesucherportal;
|
||||
@Autowired
|
||||
KinoSubscriber kinoSubscriberInBesucherportal;
|
||||
@Autowired
|
||||
VorfuehrungSubscriber vorfuehrungSubscriberInBesucherportal;
|
||||
@Autowired
|
||||
KontingentSubscriber kontingentSubscriberInBesucherportal;
|
||||
|
||||
@Autowired
|
||||
KafkaAvailabilityChecker kafkaAvailabilityChecker;
|
||||
|
||||
@Autowired
|
||||
EventBusFactory eventBusfactory;
|
||||
|
||||
private static final Duration awaitilityTimeOutInSeconds = Duration.ofSeconds(60);
|
||||
private static final Duration awaitilityPollIntervalInSeconds = Duration.ofSeconds(1);
|
||||
|
||||
// sniffing all the *created events
|
||||
private static final List<FilmCreatedEvent> listOfFilmCreatedEventsReceived = new ArrayList<>();
|
||||
private static final List<KinoCreatedEvent> listOfKinoCreatedEventsReceived = new ArrayList<>();
|
||||
private static final List<VorfuehrungCreatedEvent> listOfVorfuehrungCreatedEventsReceived = new ArrayList<>();
|
||||
private static final List<OnlineKontingentChangedEvent> listOfOnlineKontingentChangedEventsReceived = new ArrayList<>();
|
||||
|
||||
|
||||
@Test
|
||||
void testEnd2EndSendingAndRetrievingEventsOnTheDistributedEventBus() throws IOException, ExecutionException, TimeoutException {
|
||||
assertThatKafkaIsRunning();
|
||||
|
||||
// make sure we start clean, i.e. we are deleting the topics (incl. its content)
|
||||
// (alternative: set subscribers to "START_READING_FROM_NOW" (but due to timing issues, consumers hence
|
||||
// could miss some early events sent)
|
||||
deleteKafkaTopicsWithPrefixAndSuffix("de.accso.flexinale", "");
|
||||
|
||||
// at least check if there are no DLT topics containing errors
|
||||
assertThatNoFlexinaleDLTTopicsExist();
|
||||
|
||||
/*
|
||||
// commented out, as all subscribers of this test have the EventSubscriptionAtStart set to START_READING_FROM_BEGINNING
|
||||
// -> so no more need to wait for consumers at startup
|
||||
// Need to wait a while until all Kafka consumers are connected
|
||||
// (as otherwise they might connect too late and miss messages)
|
||||
Duration waitForConsumersToGetStarted = Duration.ofSeconds(10);
|
||||
|
||||
LOGGER.info("Need to wait a while as first all Kafka consumers need to start and connect ...");
|
||||
await()
|
||||
.pollDelay(waitForConsumersToGetStarted.minusMillis(100))
|
||||
.timeout(waitForConsumersToGetStarted)
|
||||
.until(() -> true);
|
||||
LOGGER.info("Need to wait a while as first all Kafka consumers need to start and connect ... done");
|
||||
*/
|
||||
|
||||
// Loading order is important because of foreign key relations
|
||||
testUploadNewDataViaRestAndCheckPersistenceAndPublishedEvents4Filme();
|
||||
testUploadNewDataViaRestAndCheckPersistenceAndPublishedEvents4Kinos();
|
||||
testUploadNewDataViaRestAndCheckPersistenceAndPublishedEvents4VorfuehrungenAndKontingente();
|
||||
|
||||
// Does not work: A check at the end (to see any produced errors) does not seem to work, such DLT topics are created
|
||||
// but not found here (but in the next consecutive run at the beginning of this test method).
|
||||
assertThatNoFlexinaleDLTTopicsExist();
|
||||
}
|
||||
|
||||
void testUploadNewDataViaRestAndCheckPersistenceAndPublishedEvents4Filme() throws IOException {
|
||||
EventSubscriber<FilmCreatedEvent> filmCreatedSniffer = createAndSubscribeFilmCreatedSniffer();
|
||||
|
||||
// read file and sent events
|
||||
try (BufferedInputStream stream = new BufferedInputStream(xlsDataFileIS())) {
|
||||
|
||||
// act
|
||||
ResponseEntity<RestResponseMessage> result = null;
|
||||
try {
|
||||
// data is loaded and published
|
||||
result = filmRestControllerInBackoffice.loadAndPersistAndPublishNewFilme(stream, xlsDataFile);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOGGER.error("loadAndPersistAndPublishNewKinosKinoSaele failed...");
|
||||
}
|
||||
|
||||
// assert
|
||||
assertThat(result).isNotNull();
|
||||
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
}
|
||||
|
||||
// async asserts - check if all events were retrieved
|
||||
SequencedCollection<Id> expectedReceivedFilmIds = List.of(Id.of("F00"), Id.of("F01"), Id.of("F02"), Id.of("F03"));
|
||||
Set<Id> alreadyCheckedFilmIds = new HashSet<>();
|
||||
AtomicInteger repeatCounter = new AtomicInteger();
|
||||
|
||||
with().pollDelay(awaitilityPollIntervalInSeconds)
|
||||
.pollInterval(awaitilityPollIntervalInSeconds)
|
||||
.await()
|
||||
.atMost(awaitilityTimeOutInSeconds)
|
||||
.untilAsserted( () ->
|
||||
{
|
||||
LOGGER.info("Waiting for FilmCreatedEvents to be retrieved by subscribers ..." +
|
||||
".".repeat(repeatCounter.getAndIncrement()));
|
||||
|
||||
for (Id filmId : expectedReceivedFilmIds) {
|
||||
if (!alreadyCheckedFilmIds.contains(filmId)) {
|
||||
assertThat(filmSubscriberInBesucherportal.getCache().film(filmId)).isNotNull();
|
||||
|
||||
// when the assertion was ok, we remember the id as 'already checked'
|
||||
// (as otherwise the loop will do _all_ assertions , as "untilAsserted()" fails
|
||||
// when only one assert fails - so the whole loop would be done every time
|
||||
alreadyCheckedFilmIds.add(filmId);
|
||||
}
|
||||
}
|
||||
assertThat(filmSubscriberInBesucherportal.getCache().filme()).hasSize(expectedReceivedFilmIds.size());
|
||||
|
||||
assertThat(listOfFilmCreatedEventsReceived.size()).isEqualTo(expectedReceivedFilmIds.size());
|
||||
});
|
||||
}
|
||||
|
||||
void testUploadNewDataViaRestAndCheckPersistenceAndPublishedEvents4Kinos() throws IOException {
|
||||
EventSubscriber<KinoCreatedEvent> kinoCreatedSniffer = createAndSubscribeKinoCreatedSniffer();
|
||||
|
||||
// read file for KinoSaele
|
||||
Collection<KinoSaal> kinoSaele;
|
||||
try (BufferedInputStream stream = new BufferedInputStream(xlsDataFileIS())) {
|
||||
kinoSaele = kinoKinoSaalRestControllerInBackoffice.loadKinoSaele(stream);
|
||||
}
|
||||
|
||||
// read file and sent events
|
||||
try (BufferedInputStream stream = new BufferedInputStream(xlsDataFileIS())) {
|
||||
|
||||
// act
|
||||
ResponseEntity<RestResponseMessage> result = null;
|
||||
try {
|
||||
// data is loaded and published
|
||||
result = kinoKinoSaalRestControllerInBackoffice.loadAndPersistAndPublishNewKinosKinoSaele(stream, xlsDataFile, kinoSaele);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOGGER.error("testUploadViaRestAndCheckPersistenceAndPublishedEventsNewKinos failed...");
|
||||
}
|
||||
|
||||
// assert
|
||||
assertThat(result).isNotNull();
|
||||
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
}
|
||||
|
||||
// async asserts - check if all events were retrieved
|
||||
SequencedCollection<Id> expectedReceivedKinoIds = List.of(Id.of("K00"), Id.of("K01"), Id.of("K02"), Id.of("K03"));
|
||||
Set<Id> alreadyCheckedKinoIds = new HashSet<>();
|
||||
|
||||
AtomicInteger repeatCounter = new AtomicInteger();
|
||||
|
||||
with().pollDelay(awaitilityPollIntervalInSeconds)
|
||||
.pollInterval(awaitilityPollIntervalInSeconds)
|
||||
.await()
|
||||
.atMost(awaitilityTimeOutInSeconds)
|
||||
.untilAsserted( () ->
|
||||
{
|
||||
LOGGER.info("Waiting for KinoCreatedEvents to be retrieved by subscribers ..." +
|
||||
".".repeat(repeatCounter.getAndIncrement()));
|
||||
|
||||
for (Id kinoId : expectedReceivedKinoIds) {
|
||||
if (!alreadyCheckedKinoIds.contains(kinoId)) {
|
||||
assertThat(kinoSubscriberInBesucherportal.getCache().kino(kinoId)).isNotNull();
|
||||
assertThat(kinoSubscriberInBesucherportal.getCache().kino(kinoId).kinoSaele()).isNotNull();
|
||||
|
||||
assertThat(kinoSubscriberInBesucherportal.getCache().kino(kinoId).kinoSaele()).isNotEmpty();
|
||||
kinoSubscriberInBesucherportal.getCache().kino(kinoId).kinoSaele().forEach(kinoSaalTO -> {
|
||||
assertThat(kinoSaalTO.kino()).isNotNull();
|
||||
assertThat(kinoSaalTO.kino().id()).isEqualTo(kinoId);
|
||||
});
|
||||
|
||||
assertThat(kinoSubscriberInBesucherportal.getCache().kinos()).hasSize(expectedReceivedKinoIds.size());
|
||||
|
||||
// when the assertion was ok, we remember the id as 'already checked'
|
||||
// (as otherwise the loop will do _all_ assertions , as "untilAsserted()" fails
|
||||
// when only one assert fails - so the whole loop would be done every time
|
||||
alreadyCheckedKinoIds.add(kinoId);
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(listOfKinoCreatedEventsReceived.size()).isEqualTo(expectedReceivedKinoIds.size());
|
||||
});
|
||||
}
|
||||
|
||||
void testUploadNewDataViaRestAndCheckPersistenceAndPublishedEvents4VorfuehrungenAndKontingente() throws IOException {
|
||||
EventSubscriber<VorfuehrungCreatedEvent> vorfuehrungCreatedSniffer = createAndSubscribeVorfuehrungCreatedSniffer();
|
||||
EventSubscriber<OnlineKontingentChangedEvent> onlineKontingentChangedSniffer = createAndSubscribeOnlineKontingentChangedSniffer();
|
||||
|
||||
// read file and sent events
|
||||
try (BufferedInputStream stream = new BufferedInputStream(xlsDataFileIS())) {
|
||||
|
||||
// act
|
||||
ResponseEntity<RestResponseMessage> result = null;
|
||||
try {
|
||||
// data is loaded and published
|
||||
result = vorfuehrungRestControllerInBackoffice.loadAndPersistAndPublishNewVorfuehrungen(stream, xlsDataFile);
|
||||
} catch (Exception ex) {
|
||||
LOGGER.error("testUploadViaRestAndCheckPersistenceAndPublishedEventsNewVorfuehrungenAndKontingente failed...");
|
||||
}
|
||||
|
||||
// assert
|
||||
assertThat(result).isNotNull();
|
||||
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||||
}
|
||||
|
||||
// async asserts - check if all events were retrieved:
|
||||
|
||||
// VorfuehrungCreatedEvents are consumed both, in-parallel, in Besucherportal _and_ in Ticketing.
|
||||
Map<Id, Integer> expectedReceivedKontingenteOfVorfuehrung =
|
||||
Map.of( Id.of("V00"), 1, Id.of("V01"), 1, Id.of("V02"), 2,
|
||||
Id.of("V03"), 3, Id.of("V04"), 4, Id.of("V05"), 5,
|
||||
Id.of("V06"), 6, Id.of("V07"), 7, Id.of("V08"), 8); // each Vorfuehrung's has its KinoSaal's Kapazitaet, divided by 3
|
||||
Set<Id> expectedReceivedVorfuehrungIds = expectedReceivedKontingenteOfVorfuehrung.keySet();
|
||||
|
||||
Set<Id> alreadyCheckedVorfuehrungIds = new HashSet<>();
|
||||
|
||||
AtomicInteger repeatCounter = new AtomicInteger();
|
||||
|
||||
with().pollDelay(awaitilityPollIntervalInSeconds)
|
||||
.pollInterval(awaitilityPollIntervalInSeconds)
|
||||
.await()
|
||||
.atMost(awaitilityTimeOutInSeconds)
|
||||
.untilAsserted( () ->
|
||||
{
|
||||
LOGGER.info("Waiting for all VorfuehrungCreatedEvents to be retrieved by subscribers ..." +
|
||||
".".repeat(repeatCounter.getAndIncrement()));
|
||||
|
||||
for (Id vorfuehrungId : expectedReceivedVorfuehrungIds) {
|
||||
if (!alreadyCheckedVorfuehrungIds.contains(vorfuehrungId)) {
|
||||
// Did the Besucherportal consume the VorfuehrungCreatedEvent and has the Vorfuehrung in its cache?
|
||||
assertThat(vorfuehrungSubscriberInBesucherportal.getCache().vorfuehrung(vorfuehrungId)).isNotNull();
|
||||
|
||||
// Ticketing also consumes VorfuehrungCreatedEvent, does produce OnlineKontingentChangedEvent -
|
||||
// which are consumed in Besucherportal. Only then the Vorfuehrung has a Kontingent > 0.
|
||||
assertThat(vorfuehrungSubscriberInBesucherportal.getCache().getRestkontingentOnline(vorfuehrungId))
|
||||
.isEqualTo(expectedReceivedKontingenteOfVorfuehrung.get(vorfuehrungId));
|
||||
|
||||
// assert that Kontingente are created due to the uploaded Vorfuehrungen
|
||||
assertThat(kontingentSubscriberInBesucherportal.getCache().getRestkontingentOnline(vorfuehrungId))
|
||||
.isEqualTo(expectedReceivedKontingenteOfVorfuehrung.get(vorfuehrungId));
|
||||
|
||||
// when the assertion was ok, we remember the id as 'already checked'
|
||||
// (as otherwise the loop will do _all_ assertions , as "untilAsserted()" fails
|
||||
// when only one assert fails - so the whole loop would be done every time
|
||||
alreadyCheckedVorfuehrungIds.add(vorfuehrungId);
|
||||
}
|
||||
}
|
||||
assertThat(kontingentSubscriberInBesucherportal.getCache().vorfuehrungen()).hasSize(expectedReceivedVorfuehrungIds.size());
|
||||
|
||||
assertThat(listOfVorfuehrungCreatedEventsReceived.size()).isEqualTo(expectedReceivedVorfuehrungIds.size());
|
||||
assertThat(listOfOnlineKontingentChangedEventsReceived.size()).isEqualTo(expectedReceivedVorfuehrungIds.size());
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------
|
||||
|
||||
abstract class TestEventSubscriberSniffer<E extends Event> implements EventSubscriber<E> {
|
||||
final Class<E> eventType;
|
||||
|
||||
TestEventSubscriberSniffer(final Class<E> eventType) {
|
||||
this.eventType = eventType;
|
||||
EventBus<E> eventBus = eventBusfactory.createOrGetEventBusFor(eventType);
|
||||
eventBus.subscribe(eventType, this, START_READING_FROM_BEGINNING); // or START_READING_FROM_NOW?
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return End2EndWithDistributedKafkaEventBusTest.class.getSimpleName() + "-TestSniffer4" + eventType.getSimpleName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroupName() {
|
||||
return uuidString();
|
||||
}
|
||||
}
|
||||
|
||||
private EventSubscriber<FilmCreatedEvent> createAndSubscribeFilmCreatedSniffer() {
|
||||
return new TestEventSubscriberSniffer<>(FilmCreatedEvent.class) {
|
||||
@Override
|
||||
public void receive(FilmCreatedEvent event) {
|
||||
listOfFilmCreatedEventsReceived.add(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private EventSubscriber<KinoCreatedEvent> createAndSubscribeKinoCreatedSniffer() {
|
||||
return new TestEventSubscriberSniffer<>(KinoCreatedEvent.class) {
|
||||
@Override
|
||||
public void receive(KinoCreatedEvent event) {
|
||||
listOfKinoCreatedEventsReceived.add(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private EventSubscriber<VorfuehrungCreatedEvent> createAndSubscribeVorfuehrungCreatedSniffer() {
|
||||
return new TestEventSubscriberSniffer<>(VorfuehrungCreatedEvent.class) {
|
||||
@Override
|
||||
public void receive(VorfuehrungCreatedEvent event) {
|
||||
listOfVorfuehrungCreatedEventsReceived.add(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private EventSubscriber<OnlineKontingentChangedEvent> createAndSubscribeOnlineKontingentChangedSniffer() {
|
||||
return new TestEventSubscriberSniffer<>(OnlineKontingentChangedEvent.class) {
|
||||
@Override
|
||||
public void receive(OnlineKontingentChangedEvent event) {
|
||||
listOfOnlineKontingentChangedEventsReceived.add(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------
|
||||
|
||||
private void assertThatKafkaIsRunning() throws ExecutionException {
|
||||
assertThat(kafkaAvailabilityChecker.isKafkaAvailable()).isTrue();
|
||||
}
|
||||
|
||||
private void assertThatNoFlexinaleTopicsExist() throws ExecutionException {
|
||||
assertThat( getKafkaTopicNamesWithPrefixAndSuffix("de.accso.flexinale", "") ).isEmpty();
|
||||
}
|
||||
|
||||
private void assertThatNoFlexinaleDLTTopicsExist() throws ExecutionException {
|
||||
assertThat( getKafkaTopicNamesWithPrefixAndSuffix("de.accso.flexinale", "DLT") ).isEmpty();
|
||||
}
|
||||
|
||||
private List<String> getKafkaTopicNamesWithPrefixAndSuffix(String prefix, String suffix) throws ExecutionException {
|
||||
List<String> topicNamesEndingWithDLT;
|
||||
|
||||
try (var adminClient = AdminClient.create( adminConfig() )) {
|
||||
topicNamesEndingWithDLT = adminClient.listTopics().names().get()
|
||||
.stream()
|
||||
.filter(name -> name.startsWith(prefix) && name.endsWith(suffix))
|
||||
.toList();
|
||||
return topicNamesEndingWithDLT;
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
application.title=FLEXinale as Distributed Services, Test
|
||||
application.version=test
|
||||
spring.banner.location=classpath:/flexinale-banner.txt
|
||||
|
||||
#########################################################################
|
||||
# For endpoint /version
|
||||
#########################################################################
|
||||
build.version=@pom.version@
|
||||
build.date=@maven.build.timestamp@
|
||||
|
||||
#########################################################################
|
||||
# Persistence
|
||||
#########################################################################
|
||||
spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/distributed-test
|
||||
spring.datasource.name=flexinale
|
||||
spring.datasource.username=flexinale
|
||||
spring.datasource.password=flexinale
|
||||
spring.datasource.driver-class-name=org.postgresql.Driver
|
||||
spring.jpa.database=postgresql
|
||||
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect
|
||||
spring.jpa.generate-ddl=true
|
||||
# spring.jpa.show-sql=true
|
||||
spring.jpa.hibernate.ddl-auto=create
|
||||
server.port=9093
|
||||
|
||||
#########################################################################
|
||||
# Web
|
||||
#########################################################################
|
||||
spring.thymeleaf.prefix=classpath:/templates/
|
||||
spring.thymeleaf.suffix=.html
|
||||
server.error.path=/error
|
||||
|
||||
#########################################################################
|
||||
# flexinale properties
|
||||
#########################################################################
|
||||
# Quote for online kontingent in percent
|
||||
de.accso.flexinale.kontingent.quote.online=33
|
||||
# time in minutes
|
||||
de.accso.flexinale.vorfuehrung.min-zeit-zwischen-vorfuehrungen-in-minuten=30
|
||||
Binary file not shown.
|
|
@ -0,0 +1,20 @@
|
|||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{YYYYMMdd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="org.springframework.orm.jpa" level="INFO"/>
|
||||
<logger name="org.springframework.boot.autoconfigure.domain.EntityScan" level="INFO"/>
|
||||
|
||||
<logger name="org.apache.kafka" level="WARN"/>
|
||||
<logger name="org.apache.kafka.clients.admin.AdminClient" level="INFO"/>
|
||||
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="INFO"/>
|
||||
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="INFO"/>
|
||||
|
||||
<logger name="de.accso" level="INFO"/>
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
||||
Loading…
Add table
Add a link
Reference in a new issue