chore: Initial import of FLEX training material

This commit is contained in:
Alexander Kobjolke 2024-11-07 21:02:53 +01:00
parent c01246d4f7
commit 414504d225
1020 changed files with 53940 additions and 0 deletions

View file

@ -0,0 +1,105 @@
package de.accso.flexinale.common.api.event;
import com.fasterxml.jackson.annotation.JsonSetter;
import de.accso.flexinale.common.shared_kernel.DeveloperMistakeException;
import de.accso.flexinale.common.shared_kernel.DoNotCheckInArchitectureTests;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.lang.reflect.Field;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
@SuppressWarnings("CanBeFinal")
public abstract class AbstractEvent implements Event {
public final Id id = Identifiable.Id.of();
public final Id correlationId;
@DoNotCheckInArchitectureTests
protected LocalDateTime timestamp = LocalDateTime.now();
@DoNotCheckInArchitectureTests
protected final EventContext eventContext;
// event is start of a new event chain
protected AbstractEvent() {
this.correlationId = Identifiable.Id.of();
this.eventContext = new EventContext( this.correlationId,
List.of( new EventContext.EventHistoryElement(id, this.getClass())) );
}
// event is part of an existing event chain but without a history
protected AbstractEvent(final Id correlationId) {
this.correlationId = correlationId;
this.eventContext = new EventContext( this.correlationId,
List.of( new EventContext.EventHistoryElement(id, this.getClass())) );
}
// event is part of an existing event chain
protected AbstractEvent(final Event predecessorEvent) {
this(predecessorEvent.eventContext());
}
// event is part of an existing event chain
protected AbstractEvent(final EventContext predecessorEventContext) {
this.correlationId = predecessorEventContext.correlationId;
this.eventContext = new EventContext( this.correlationId,
List.copyOf(predecessorEventContext.eventHistory.stream().toList()),
new EventContext.EventHistoryElement(id, this.getClass()) // append this event to history
);
}
@Override
public Id id() {
return id;
}
@Override
public LocalDateTime timestamp() {
return timestamp;
}
@Override
public Id correlationId() {
return correlationId;
}
@Override
public EventContext eventContext() {
return eventContext;
}
@JsonSetter("version") // needed as otherwise the static field version (in one of the subclasses) is not (de)serialized
public void setVersion(Version newVersion) {
String fieldName = "version";
try {
Field field = this.getClass().getDeclaredField(fieldName); // static field
field.setAccessible(true);
field.set(this, newVersion);
}
catch (NoSuchFieldException | IllegalAccessException ex) {
throw new DeveloperMistakeException("version field cannot be set via reflection in an Event class", ex);
}
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
@Override
public boolean equals(final Object o) {
if (this == o) {return true;}
if (o == null || getClass() != o.getClass()) {return false;}
final AbstractEvent that = (AbstractEvent) o;
return Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
}

View file

@ -0,0 +1,14 @@
package de.accso.flexinale.common.api.event;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import de.accso.flexinale.common.shared_kernel.Versionable;
import java.io.Serializable;
import java.time.LocalDateTime;
@SuppressWarnings("unused")
public interface Event extends Identifiable, Versionable, Serializable {
LocalDateTime timestamp();
Id correlationId();
EventContext eventContext();
}

View file

@ -0,0 +1,65 @@
package de.accso.flexinale.common.api.event;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.ArrayList;
import java.util.List;
@SuppressWarnings("unused")
public class EventContext {
public record EventHistoryElement(Identifiable.Id id, Class<? extends Event> eventType) {}
public Identifiable.Id correlationId;
public List<EventHistoryElement> eventHistory;
private EventContext() {} // used by Jackson deserialization
public EventContext(final Identifiable.Id correlationId, final List<EventHistoryElement> eventHistory) {
this.correlationId = correlationId;
this.eventHistory = List.copyOf(eventHistory);
}
public EventContext(final Identifiable.Id correlationId,
final List<EventHistoryElement> eventHistory, final EventHistoryElement additionalHistoryElement) {
this.correlationId = correlationId;
// pretty ugly to copy plus add to immutable list
List<EventHistoryElement> tempList = new ArrayList<>(eventHistory);
tempList.add(additionalHistoryElement);
this.eventHistory = List.copyOf(tempList);
}
public boolean equalsByCorrelationId(EventContext that) {
return this.correlationId.equals(that.correlationId);
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
@Override
public boolean equals(final Object o) {
if (this == o) {return true;}
if (o == null || getClass() != o.getClass()) {return false;}
EventContext that = (EventContext) o;
return new EqualsBuilder()
.append(eventHistory, that.eventHistory)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(this.correlationId)
.toHashCode();
}
}

View file

@ -0,0 +1,20 @@
package de.accso.flexinale.common.api.eventbus;
import de.accso.flexinale.common.api.event.Event;
public interface EventBus<E extends Event> {
// Event type specific subscription
void subscribe(Class<E> eventType, EventSubscriber<E> subscriber, EventSubscriptionAtStart eventSubscriptionAtStart);
void unsubscribe(Class<E> eventType, EventSubscriber<E> subscriber);
// subscription for _all_ Events (i.e. based on Event.class)
void subscribe(EventSubscriber<Event> genericSubscriber, EventSubscriptionAtStart eventSubscriptionAtStart);
void unsubscribe(EventSubscriber<Event> genericSubscriber);
void unsubscribeAll();
void publish(Class<E> eventType, E event);
EventContextHolder getEventContextHolder();
}

View file

@ -0,0 +1,7 @@
package de.accso.flexinale.common.api.eventbus;
import de.accso.flexinale.common.api.event.Event;
public interface EventBusFactory {
<E extends Event> EventBus<E> createOrGetEventBusFor(Class<E> type);
}

View file

@ -0,0 +1,10 @@
package de.accso.flexinale.common.api.eventbus;
import de.accso.flexinale.common.api.event.EventContext;
public interface EventContextHolder {
EventContext get();
void set(EventContext eventContext);
void remove();
boolean isEmpty();
}

View file

@ -0,0 +1,7 @@
package de.accso.flexinale.common.api.eventbus;
import de.accso.flexinale.common.api.event.Event;
public interface EventNotification {
void notify(Event event);
}

View file

@ -0,0 +1,56 @@
package de.accso.flexinale.common.api.eventbus;
import de.accso.flexinale.common.api.event.Event;
@SuppressWarnings("unused")
public interface EventPublisher<E1 extends Event> {
String getName();
void post(Class<E1> eventType, E1 event);
interface EventPublisher2<E1 extends Event, E2 extends Event>
extends EventPublisher<E1> {
void post2(Class<E2> eventType, E2 event);
}
interface EventPublisher3<E1 extends Event, E2 extends Event, E3 extends Event>
extends EventPublisher2<E1, E2> {
void post3(Class<E3> eventType, E3 event);
}
interface EventPublisher4<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event>
extends EventPublisher3<E1, E2, E3> {
void post4(Class<E4> eventType, E4 event);
}
interface EventPublisher5<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event>
extends EventPublisher4<E1, E2, E3, E4> {
void post5(Class<E5> eventType, E5 event);
}
interface EventPublisher6<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event>
extends EventPublisher5<E1, E2, E3, E4, E5> {
void post6(Class<E6> eventType, E6 event);
}
interface EventPublisher7<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event, E7 extends Event>
extends EventPublisher6<E1, E2, E3, E4, E5, E6> {
void post7(Class<E7> eventType, E7 event);
}
interface EventPublisher8<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event, E7 extends Event, E8 extends Event>
extends EventPublisher7<E1, E2, E3, E4, E5, E6, E7> {
void post8(Class<E8> eventType, E8 event);
}
interface EventPublisher9<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event, E7 extends Event, E8 extends Event, E9 extends Event>
extends EventPublisher8<E1, E2, E3, E4, E5, E6, E7, E8> {
void post9(Class<E9> eventType, E9 event);
}
}

View file

@ -0,0 +1,67 @@
package de.accso.flexinale.common.api.eventbus;
import de.accso.flexinale.common.api.event.Event;
@SuppressWarnings("unused") // methods are called via reflection
public interface EventSubscriber<E1 extends Event> {
String getName();
String getGroupName();
void receive(E1 event);
interface EventSubscriber2<E1 extends Event, E2 extends Event>
extends EventSubscriber<E1> {
void receive2(E2 event);
}
interface EventSubscriber3<E1 extends Event, E2 extends Event, E3 extends Event>
extends EventSubscriber2<E1, E2> {
void receive3(E3 event);
}
interface EventSubscriber4<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event>
extends EventSubscriber3<E1, E2, E3> {
void receive4(E4 event);
}
interface EventSubscriber5<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event>
extends EventSubscriber4<E1, E2, E3, E4> {
void receive5(E5 event);
}
interface EventSubscriber6<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event>
extends EventSubscriber5<E1, E2, E3, E4, E5> {
void receive6(E6 event);
}
interface EventSubscriber7<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event, E7 extends Event>
extends EventSubscriber6<E1, E2, E3, E4, E5, E6> {
void receive7(E7 event);
}
interface EventSubscriber8<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event, E7 extends Event, E8 extends Event>
extends EventSubscriber7<E1, E2, E3, E4, E5, E6, E7> {
void receive8(E8 event);
}
interface EventSubscriber9<E1 extends Event, E2 extends Event, E3 extends Event, E4 extends Event,
E5 extends Event, E6 extends Event, E7 extends Event, E8 extends Event, E9 extends Event>
extends EventSubscriber8<E1, E2, E3, E4, E5, E6, E7, E8> {
void receive9(E9 event);
}
String RECEIVE = "receive";
String RECEIVE2 = "receive2";
String RECEIVE3 = "receive3";
String RECEIVE4 = "receive4";
String RECEIVE5 = "receive5";
String RECEIVE6 = "receive6";
String RECEIVE7 = "receive7";
String RECEIVE8 = "receive8";
String RECEIVE9 = "receive9";
}

View file

@ -0,0 +1,19 @@
package de.accso.flexinale.common.api.eventbus;
// Behaviour on how a EventSubscriber (i.e. Consumer) reads new messages, after having subscribed to the EventBus
// - How much of the old messages are read or re-read (if any)?
// - Where do we start, at which offset?
public enum EventSubscriptionAtStart {
// (re)read all messages from the EventBus from the very beginning (i.e. from offset 0)
// In Kafka this is - for a new ConsumerGroup - "earliest".
START_READING_FROM_BEGINNING,
// read all messages from now on (so don't (re)read anything published before now)
// In Kafka this is - for a new ConsumerGroup - "latest".
START_READING_FROM_NOW,
// read all messages from the EventBus, starting where we left off last time (so last offset we had + 1)
// (if we had not connected before at all: start from beginning, i.e. from offset 0)
// In Kafka this is - for a known ConsumerGroup - the default.
START_READING_FROM_LAST_TIME
}

View file

@ -0,0 +1,10 @@
package de.accso.flexinale.common.application;
public interface Config {
int getQuoteOnline();
int getMinZeitZwischenVorfuehrungenInMinuten();
String getApplicationTitle();
String getBuildVersion();
String getBuildDate();
}

View file

@ -0,0 +1,9 @@
package de.accso.flexinale.common.application;
public enum PersistMode {
// updates on existing entities allowed (beware of inconsistencies with derived entities!)
UPDATE,
// only new data can be added, no updates on existing entities allowed
ADD_ONLY
}

View file

@ -0,0 +1,12 @@
package de.accso.flexinale.common.application;
import de.accso.flexinale.common.shared_kernel.Identifiable;
public record PersistedEntityAndResult<E extends Identifiable>
(E entity, PersistedResult result) {
public enum PersistedResult {
UPDATED,
ADDED
}
}

View file

@ -0,0 +1,129 @@
package de.accso.flexinale.common.application.caching;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public final class InMemoryCache<ID, OBJECT> {
// as an alternative: extract interface, use Cache2K as a backing implementation
// see https://cache2k.org/docs/latest/user-guide.html#getting-started
private final ConcurrentHashMap<ID, OBJECT> backingMap = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
public OBJECT get(final ID id) {
lock.lock();
try {
return backingMap.get(id);
}
finally {
lock.unlock();
}
}
public OBJECT put(final ID id, final OBJECT o) {
lock.lock();
try {
return backingMap.put(id, o);
}
finally {
lock.unlock();
}
}
public OBJECT remove(final ID id) {
lock.lock();
try {
return backingMap.remove(id);
}
finally {
lock.unlock();
}
}
public Set<ID> keys() {
lock.lock();
try {
return backingMap.keySet().stream().collect(Collectors.toUnmodifiableSet());
}
finally {
lock.unlock();
}
}
public Set<OBJECT> values() {
lock.lock();
try {
return backingMap.values().stream().collect(Collectors.toUnmodifiableSet());
}
finally {
lock.unlock();
}
}
public Set<Map.Entry<ID, OBJECT>> entrySet() {
lock.lock();
try {
return backingMap.entrySet();
}
finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return backingMap.size();
}
finally {
lock.unlock();
}
}
public boolean isEmpty() {
lock.lock();
try {
return backingMap.isEmpty();
}
finally {
lock.unlock();
}
}
public boolean contains(final OBJECT o) {
lock.lock();
try {
return backingMap.contains(o);
}
finally {
lock.unlock();
}
}
public boolean containsKey(final ID key) {
return backingMap.containsKey(key);
}
@Override
public boolean equals(final Object o) {
if (this == o) {return true;}
if (o == null || getClass() != o.getClass()) {return false;}
final InMemoryCache<?, ?> that = (InMemoryCache<?, ?>) o;
return backingMap.equals(that.backingMap);
}
@Override
public int hashCode() {
return backingMap.hashCode();
}
@Override
public String toString() {
return backingMap.toString();
}
}

View file

@ -0,0 +1,131 @@
package de.accso.flexinale.common.application.services;
import de.accso.flexinale.common.application.PersistMode;
import de.accso.flexinale.common.application.PersistedEntityAndResult;
import de.accso.flexinale.common.domain.model.AbstractDao;
import de.accso.flexinale.common.shared_kernel.DoNotCheckInArchitectureTests;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import de.accso.flexinale.common.shared_kernel.Mergeable;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import static de.accso.flexinale.common.application.PersistedEntityAndResult.PersistedResult.ADDED;
import static de.accso.flexinale.common.application.PersistedEntityAndResult.PersistedResult.UPDATED;
@SuppressWarnings("unused")
@DoNotCheckInArchitectureTests
public abstract class AbstractExcelDataUploadService<E extends Identifiable & Mergeable<E>> implements ExcelDataUploadService<E> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExcelDataUploadService.class);
@Override
public void beforeLoad(Object... o) {}
@Override
public void afterLoad(Object... o) {}
@Override
public final Collection<E> loadDataFromExcelSheet(final String resourceName) throws IOException {
try (InputStream stream = AbstractExcelDataUploadService.class.getResourceAsStream(resourceName)) {
if (stream == null) {
throw new FileNotFoundException("resource '%s' not found!".formatted(resourceName));
}
return loadDataFromExcelSheet(stream);
}
}
@Override
public final Collection<E> loadDataFromExcelSheet(final InputStream stream) throws IOException {
Collection<E> dataSet = new HashSet<>();
// Excel sheet (the sheet, not the file!) needs to be named exactly like the entity's class name
// (example: "Kino")
String sheetName = getNameOfExcelDataType();
try (Workbook workbook = new XSSFWorkbook(stream)) {
// iterate through each row of first/desired sheet from the workbook
for (Row excelRow : workbook.getSheet(sheetName)) {
// ignore comments, starting with # , load all other rows
Cell firstCell = excelRow.getCell(0);
if (firstCell != null && !firstCell.getStringCellValue().startsWith("#")) {
E data = createDataFromExcelRow(excelRow);
dataSet.add(data);
}
}
}
return dataSet;
}
@Override
public final int loadDataFromExcelSheetAndPersist(
final String resourceName, final PersistMode mode) throws IOException {
try (InputStream stream = AbstractExcelDataUploadService.class.getResourceAsStream(resourceName)) {
if (stream == null) {
throw new IOException("resource '%s' not found!".formatted(resourceName));
}
return loadDataFromExcelSheetAndPersist(stream, mode);
}
}
@Override
public final int loadDataFromExcelSheetAndPersist(
final InputStream stream, final PersistMode mode) throws IOException {
return loadDataFromExcelSheetAndPersistAndReturn(stream, mode).size();
}
@Override
public Collection<PersistedEntityAndResult<E>> loadDataFromExcelSheetAndPersistAndReturn(
final InputStream stream, final PersistMode mode) throws IOException {
Collection<E> dataCollectionFromExcelSheet = loadDataFromExcelSheet(stream);
return loadDataAndPersistAndReturn(dataCollectionFromExcelSheet, mode);
}
@Override
public Collection<PersistedEntityAndResult<E>> loadDataAndPersistAndReturn(
final Collection<E> dataCollection, final PersistMode mode) {
Collection<PersistedEntityAndResult<E>> dataAddedAndUpdated = new HashSet<>();
AbstractDao<E> dao = getDao();
String sheetName = getNameOfExcelDataType();
for (E dataFromExcelSheet : dataCollection) {
Identifiable.Id id = dataFromExcelSheet.id();
Optional<E> optionalEntityFromDatabase = dao.findById(id);
if (optionalEntityFromDatabase.isPresent()) {
E entityFromDatabase = optionalEntityFromDatabase.get();
switch(mode) {
case ADD_ONLY ->
LOGGER.warn("%s %s already exists. Not persisted as mode 'ADD_ONLY' is active".formatted(sheetName, id));
case UPDATE -> {
LOGGER.info("%s %s already exists. Merged as mode 'UPDATE' is active".formatted(sheetName, id));
E mergedEntity = entityFromDatabase.merge(dataFromExcelSheet);
dao.save(mergedEntity);
dataAddedAndUpdated.add( new PersistedEntityAndResult<>(dataFromExcelSheet, UPDATED) );
}
}
}
else {
LOGGER.info("%s %s does not yet exist. Added as new entity".formatted(sheetName, id));
dao.save(dataFromExcelSheet);
dataAddedAndUpdated.add( new PersistedEntityAndResult<>(dataFromExcelSheet, ADDED) );
}
}
return dataAddedAndUpdated;
}
}

View file

@ -0,0 +1,32 @@
package de.accso.flexinale.common.application.services;
import de.accso.flexinale.common.application.PersistMode;
import de.accso.flexinale.common.application.PersistedEntityAndResult;
import de.accso.flexinale.common.domain.model.AbstractDao;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import org.apache.poi.ss.usermodel.Row;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
@SuppressWarnings("unused")
public interface ExcelDataUploadService<E extends Identifiable> {
Collection<E> loadDataFromExcelSheet(final String resourceName) throws IOException;
Collection<E> loadDataFromExcelSheet(final InputStream stream) throws IOException;
int loadDataFromExcelSheetAndPersist(final String resourceName, PersistMode mode) throws IOException;
int loadDataFromExcelSheetAndPersist(final InputStream stream, PersistMode mode) throws IOException;
Collection<PersistedEntityAndResult<E>>
loadDataFromExcelSheetAndPersistAndReturn(final InputStream stream, PersistMode mode) throws IOException;
Collection<PersistedEntityAndResult<E>>
loadDataAndPersistAndReturn(Collection<E> data, PersistMode mode);
AbstractDao<E> getDao();
String getNameOfExcelDataType();
void beforeLoad(Object... o);
void afterLoad(Object... o);
E createDataFromExcelRow(final Row row);
}

View file

@ -0,0 +1,19 @@
package de.accso.flexinale.common.domain.model;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import java.util.List;
import java.util.Optional;
@SuppressWarnings("unused")
public interface AbstractDao<E extends Identifiable> {
List<E> findAll();
Optional<E> findById(final Identifiable.Id id);
E save(final E entity);
void delete(final E data);
void deleteById(final Identifiable.Id id);
void deleteAll();
}

View file

@ -0,0 +1,61 @@
package de.accso.flexinale.common.infrastructure;
import de.accso.flexinale.common.api.eventbus.EventBusFactory;
import de.accso.flexinale.common.infrastructure.eventbus.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Configuration
public class FlexinaleCommonSpringFactory {
// Eventbus via Kafka
// default: used for scenarios using real distributed remote communication via Kafka
// so when starting all three apps Besucherportal, Backoffice and Ticketing and also for "test-distributed"
@Bean
@Profile({"!test-integrated & !testdata & !configtest & !smoketest & !local"})
public EventBusFactory createKafkaAsyncEventBusFactory(final KafkaAvailabilityChecker kafkaAvailabilityChecker) {
return new KafkaAsyncEventBusFactory(kafkaAvailabilityChecker);
}
@Bean
@Profile({"!test-integrated & !testdata & !configtest & !smoketest & !local"})
public KafkaAvailabilityChecker createKafkaAvailabilityChecker() {
return new KafkaAvailabilityChecker();
}
// --------------------------------------------------------------------------------------------------------------
// Eventbus as in-memory with Spy
// used in tests where publishers and subscribers need to communicate via an in-memory event bus
// and where we need to look into the event chain via the spy
@Bean
@Profile({"test-integrated"})
public EventBusFactory createInMemorySyncEventBusSpyFactory() {
return new InMemorySyncEventBusSpyFactory();
}
// --------------------------------------------------------------------------------------------------------------
// Eventbus as in-memory
// used in scenarios where publishers and subscribers need to communicated via an in-memory event bus
@Bean
@Profile({"local"})
public EventBusFactory createInMemorySyncEventBusFactory() {
return new InMemorySyncEventBusFactory();
}
// --------------------------------------------------------------------------------------------------------------
// Eventbus doing nothing
// used in scenarios to clean and load test data where we do not want to send any event
@Bean
@Profile({"testdata | configtest | smoketest"})
public EventBusFactory createNopeEventBusSpyFactory() {
return new NopeEventBusSpyFactory();
}
}

View file

@ -0,0 +1,70 @@
package de.accso.flexinale.common.infrastructure;
import de.accso.flexinale.common.application.Config;
import de.accso.flexinale.common.shared_kernel.FlexinaleIllegalArgumentException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlexinaleSpringConfig implements Config {
@Value("${application.title}")
public String applicationTitle;
@Value("${build.version}")
public String buildVersion;
@Value("${build.date}")
public String buildDate;
@Override
public String getApplicationTitle() {
return applicationTitle;
}
@Override
public String getBuildVersion() {
return buildVersion;
}
@Override
public String getBuildDate() {
return buildDate;
}
// -----------------------------------------------------------------------------------------------------------
private final int quoteOnline;
@Override
public int getQuoteOnline() {
return quoteOnline;
}
// -----------------------------------------------------------------------------------------------------------
private final int minZeitZwischenVorfuehrungenInMinuten;
@Override
public int getMinZeitZwischenVorfuehrungenInMinuten() {
return minZeitZwischenVorfuehrungenInMinuten;
}
// -----------------------------------------------------------------------------------------------------------
public FlexinaleSpringConfig(@Value("${de.accso.flexinale.kontingent.quote.online:33}") final int quoteOnline,
@Value("${de.accso.flexinale.vorfuehrung.min-zeit-zwischen-vorfuehrungen-in-minuten:30}") final int minZeitZwischenVorfuehrungenInMinuten) {
if (quoteOnline < 0 || quoteOnline > 100) {
String message = "Quote online should be a percentage, i.e. 0 <= quote online <= 100, but was " + quoteOnline;
throw new FlexinaleIllegalArgumentException(message);
}
this.quoteOnline = quoteOnline;
if (minZeitZwischenVorfuehrungenInMinuten < 0) {
String message = "minZeitZwischenVorfuehrungenInMinuten should be positiv, but was "
+ minZeitZwischenVorfuehrungenInMinuten;
throw new FlexinaleIllegalArgumentException(message);
}
this.minZeitZwischenVorfuehrungenInMinuten = minZeitZwischenVorfuehrungenInMinuten;
}
}

View file

@ -0,0 +1,51 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.module.SimpleModule;
import de.accso.flexinale.common.api.event.Event;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public final class EventSerializationHelper {
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new ObjectMapper();
jsonMapper.findAndRegisterModules();
jsonMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
// register module for date/time (de)serialization, in ISO format ISO8601, e.g. 2022-05-06T11:12:13Z
SimpleModule localDateTimeModule = new SimpleModule();
localDateTimeModule.addSerializer(LocalDateTime.class, new JsonSerializer<>() {
@Override
public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeString(value.format(dateTimeFormatter));
}
});
localDateTimeModule.addDeserializer(LocalDateTime.class, new JsonDeserializer<>() {
@Override
public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return LocalDateTime.parse(p.getValueAsString(), dateTimeFormatter);
}
});
jsonMapper.registerModule(localDateTimeModule);
}
public static String serializeEvent2JsonString(final Event event) throws JsonProcessingException {
return jsonMapper.writeValueAsString(event);
}
public static <E extends Event> E deserializeJsonString2Event(
final String jsonString, final Class<E> eventType) throws JsonProcessingException {
return jsonMapper.readValue(jsonString, eventType);
}
}

View file

@ -0,0 +1,88 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.event.Event;
import de.accso.flexinale.common.shared_kernel.DeveloperMistakeException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
final class EventSubscriberReceiveDispatcher<E extends Event> {
void callSubscriberReceiveMethod(final Class<E> eventType, final E event, final EventSubscriber<E> subscriber) {
boolean called = false;
if (subscriber instanceof EventSubscriber.EventSubscriber9<?, ?, ?, ?, ?, ?, ?, ?, ?>) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE9);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber8<?, ?, ?, ?, ?, ?, ?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE8);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber7<?, ?, ?, ?, ?, ?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE7);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber6<?, ?, ?, ?, ?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE6);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber5<?, ?, ?, ?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE5);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber4<?, ?, ?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE4);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber3<?, ?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE3);
}
if (!called && (subscriber instanceof EventSubscriber.EventSubscriber2<?, ?>)) {
called = tryToGetAndInvokeReceiveMethod(eventType, event, subscriber, EventSubscriber.RECEIVE2);
}
if (!called) {
subscriber.receive(event);
}
}
private record ReflectionMethodCacheKey<E extends Event>(
EventSubscriber<E> subscriber, String methodName, Class<E> eventType) {}
private final Map<ReflectionMethodCacheKey<E>, Method> reflectionMethodCache = new HashMap<>();
private boolean tryToGetAndInvokeReceiveMethod(
final Class<E> eventType, final E event, final EventSubscriber<E> subscriber, final String methodName)
{
if (subscriber == null) {
throw new DeveloperMistakeException("could not call a method for a null subscriber for event type %s, event=%s"
.formatted(eventType, event.toString()));
}
ReflectionMethodCacheKey<E> key = new ReflectionMethodCacheKey<>(subscriber, methodName, eventType);
Method receiveMethod = reflectionMethodCache.get(key);
try {
// we could extend this that also receive methods are found with have an event type as parameter as a
// superclass of 'eventtype', cf test case
// InMemorySyncEventBusAndEventBusSpyTest.testGenericEventSubscriberWithExtraSubscriptionCannotBeUsedDueToDispatchingError()
if (receiveMethod == null) {
receiveMethod = subscriber.getClass().getMethod(methodName, eventType);
receiveMethod.setAccessible(true);
reflectionMethodCache.put(key, receiveMethod);
}
receiveMethod.invoke(subscriber, event);
return true;
}
catch (NoSuchMethodException nsmex) {
return false;
}
// any exception which is thrown because of "wrong" reflection due to a developer mistake (there is no test for that, could not create a test setup for this)
catch (IllegalAccessException | IllegalArgumentException | NullPointerException | ExceptionInInitializerError ex) {
throw new DeveloperMistakeException(("DeveloperMistakeException when calling the receive method %s for the subscriber %s " +
"for event type %s, event=%s").formatted(methodName, subscriber, eventType, event.toString()), ex);
}
// Any other exception which is thrown at runtime - which is ok and hence not a developer mistake
// Such exceptions are always wrapped in a InvocationTargetException
catch (InvocationTargetException ex) {
throw new RuntimeException("Exception when calling the receive method '%s' for the subscriber %s for event type %s, event=%s"
.formatted(methodName, subscriber, eventType, event.toString()), ex);
}
}
}

View file

@ -0,0 +1,144 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart;
import de.accso.flexinale.common.api.event.Event;
import de.accso.flexinale.common.shared_kernel.DeveloperMistakeException;
import de.accso.flexinale.common.shared_kernel.Selection;
import de.accso.flexinale.common.shared_kernel.SelectionMultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
@SuppressWarnings("unused")
public class InMemorySyncEventBus<E extends Event> implements EventBus<E> {
private static final Logger LOGGER = LoggerFactory.getLogger(InMemorySyncEventBus.class);
protected record SubscriberGroup(String consumerGroupName) {
static SubscriberGroup of(EventSubscriber<?> subscriber) {
return new SubscriberGroup(subscriber.getGroupName());
}
}
protected final Class<E> eventType;
protected static final EventContextHolder eventContextHolder = new InMemorySyncEventContextHolder();
protected final EventSubscriberReceiveDispatcher<E> subscriberReceiveDispatcher = new EventSubscriberReceiveDispatcher<>();
protected final EventSubscriberReceiveDispatcher<Event> genericSubscriberReceiveDispatcher = new EventSubscriberReceiveDispatcher<>();
protected final SelectionMultiMap<SubscriberGroup, EventSubscriber<E>> subscribersPerGroupUsingSelection =
new SelectionMultiMap<>(Selection.SelectionAlgorithm.ROUND_ROBIN);
protected final SelectionMultiMap<SubscriberGroup, EventSubscriber<Event>> genericSubscribersPerGroupUsingSelection =
new SelectionMultiMap<>(Selection.SelectionAlgorithm.ROUND_ROBIN);
InMemorySyncEventBus(final Class<E> eventType) {
this.eventType = eventType;
}
@Override
public EventContextHolder getEventContextHolder() {
return eventContextHolder;
}
@Override
public void subscribe(final Class<E> eventType, final EventSubscriber<E> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.debug("InMemorySyncEventBus<%s>: subscriber %s registered".formatted(eventType.getSimpleName(), subscriber));
subscribersPerGroupUsingSelection.put(SubscriberGroup.of(subscriber), subscriber);
handle_SubscriptionAtStart(eventSubscriptionAtStart);
}
@Override
public void subscribe(final EventSubscriber<Event> genericSubscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.debug("InMemorySyncEventBus<%s>: generic subscriber %s registered".formatted(eventType.getSimpleName(), genericSubscriber));
genericSubscribersPerGroupUsingSelection.put(SubscriberGroup.of(genericSubscriber), genericSubscriber);
handle_SubscriptionAtStart(eventSubscriptionAtStart);
}
private void handle_SubscriptionAtStart(final EventSubscriptionAtStart eventSubscriptionAtStart) {
switch (eventSubscriptionAtStart) {
case START_READING_FROM_BEGINNING, START_READING_FROM_LAST_TIME -> {
throw new DeveloperMistakeException("not implemented. Probably want to use InMemorySyncEventBusSpy");
}
case START_READING_FROM_NOW -> {
// nope, nothing to do
}
}
}
@Override
public void unsubscribe(final Class<E> eventType, final EventSubscriber<E> subscriber) {
LOGGER.debug("InMemorySyncEventBus<%s>: subscriber %s unregistered".formatted(eventType.getSimpleName(), subscriber));
subscribersPerGroupUsingSelection.removeMapping(SubscriberGroup.of(subscriber), subscriber);
}
@Override
public void unsubscribe(EventSubscriber<Event> genericSubscriber) {
LOGGER.debug("InMemorySyncEventBus<%s>: subscriber %s unregistered".formatted(eventType.getSimpleName(), genericSubscriber));
genericSubscribersPerGroupUsingSelection.removeMapping(SubscriberGroup.of(genericSubscriber), genericSubscriber);
}
@Override
public void unsubscribeAll() {
LOGGER.debug("InMemorySyncEventBus<%s>: all subscribers and generic subscribers unregistered".formatted(eventType.getSimpleName()));
subscribersPerGroupUsingSelection.clear();
genericSubscribersPerGroupUsingSelection.clear();
}
@Override
public void publish(final Class<E> eventType, final E event) {
LOGGER.info("InMemorySyncEventBus<%s>: publish new event %s".formatted(eventType, event));
// publish event to subscribers (depending on the selection algorithm, e.g. RoundRobin)
try {
eventContextHolder.set(event.eventContext());
// for each group of the subscribers, select one (or more) subscribers, using the Selection algorithm
subscribersPerGroupUsingSelection.keySet()
.forEach(subscriberGroup -> {
Selection<EventSubscriber<E>> allSubscribersInGroup =
(Selection<EventSubscriber<E>>) subscribersPerGroupUsingSelection.get(subscriberGroup);
Set<EventSubscriber<E>> subscribersToSendTo = allSubscribersInGroup.next();
subscribersToSendTo.forEach(subscriberToSendTo ->
internal_sendToSubscriber(eventType, event, subscriberToSendTo));
});
// for each group of the generic subscribers, select one (or more) subscribers, using the Selection algorithm
genericSubscribersPerGroupUsingSelection.keySet()
.forEach(subscriberGroup -> {
Selection<EventSubscriber<Event>> allGenericSubscribersInGroup =
(Selection<EventSubscriber<Event>>) genericSubscribersPerGroupUsingSelection.get(subscriberGroup);
Set<EventSubscriber<Event>> genericSubscribersToSendTo = allGenericSubscribersInGroup.next();
genericSubscribersToSendTo.forEach(genericSubscriberToSendTo ->
internal_sendToGenericSubscriber(event, genericSubscriberToSendTo));
});
}
finally {
eventContextHolder.remove();
}
}
// -----------------------------------------------------------------------------------------------------------------
// send event to one subscriber
protected void internal_sendToSubscriber(Class<E> eventType, E event, EventSubscriber<E> subscriber) {
subscriberReceiveDispatcher.callSubscriberReceiveMethod(eventType, event, subscriber);
}
// send event to one generic subscriber
protected void internal_sendToGenericSubscriber(E event, EventSubscriber<Event> subscriber) {
genericSubscriberReceiveDispatcher.callSubscriberReceiveMethod(Event.class, event, subscriber);
}
}

View file

@ -0,0 +1,23 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.eventbus.EventBusFactory;
import de.accso.flexinale.common.api.event.Event;
import java.util.HashMap;
import java.util.Map;
public class InMemorySyncEventBusFactory implements EventBusFactory {
protected final Map<Class<? extends Event>, EventBus<? extends Event>> eventTypeToEventBusMap = new HashMap<>();
@SuppressWarnings("unchecked")
@Override
public <E extends Event> EventBus<E> createOrGetEventBusFor(final Class<E> eventType) {
EventBus<? extends Event> eventBus = eventTypeToEventBusMap.get(eventType);
if (eventBus == null) {
eventBus = new InMemorySyncEventBus<>(eventType);
eventTypeToEventBusMap.put(eventType, eventBus);
}
return (EventBus<E>) eventBus;
}
}

View file

@ -0,0 +1,130 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart;
import de.accso.flexinale.common.api.event.Event;
import de.accso.flexinale.common.shared_kernel.ThreadSafeCounterMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
public class InMemorySyncEventBusSpy<E extends Event> extends InMemorySyncEventBus<E> {
private static final Logger LOGGER = LoggerFactory.getLogger(InMemorySyncEventBusSpy.class);
// localEventQueue is used as an in-memory storage and in tests as a spy
private final ConcurrentLinkedQueue<E> localEventQueue = new ConcurrentLinkedQueue<>(); // TODO is currently never cleared - might want to use Cache implementation with automatic time-to-live?
private final ThreadSafeCounterMap<SubscriberGroup> consumerGroupOffsetMap = new ThreadSafeCounterMap<>();
InMemorySyncEventBusSpy(final Class<E> eventType) {
super(eventType);
}
public List<E> allEvents() {
return localEventQueue.stream().toList();
}
public void clear() {
localEventQueue.clear();
}
public int size() {
return localEventQueue.size();
}
// ----------------------------------------------------------------------------------------------------------------
@Override
public void subscribe(final Class<E> eventType, final EventSubscriber<E> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.debug("InMemorySyncEventBusSpy<%s>: subscriber %s registered".formatted(eventType.getSimpleName(), subscriber));
subscribersPerGroupUsingSelection.put(SubscriberGroup.of(subscriber), subscriber);
handle_SubscriptionAtStart(eventType, subscriber, eventSubscriptionAtStart);
}
private void handle_SubscriptionAtStart(final Class<E> eventType,
final EventSubscriber<E> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
switch (eventSubscriptionAtStart) {
case START_READING_FROM_BEGINNING -> {
// re-publish all events from beginning
localEventQueue.forEach(event -> this.internal_sendToSubscriber(eventType, event, subscriber));
}
case START_READING_FROM_NOW -> {
// nope, nothing to do
}
case START_READING_FROM_LAST_TIME -> {
long offset = consumerGroupOffsetMap.get(SubscriberGroup.of(subscriber));
// includes the case, that a consumer group was not known before -
// as then offset from the map is 0
// so nothing skipped but same behaviour as START_READING_FROM_BEGINNING
localEventQueue.stream().
skip(offset).
forEach(event -> this.internal_sendToSubscriber(eventType, event, subscriber));
}
}
}
@Override
public void subscribe(EventSubscriber<Event> genericSubscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.debug("InMemorySyncEventBusSpy<%s>: generic subscriber %s registered".formatted(eventType.getSimpleName(), genericSubscriber));
genericSubscribersPerGroupUsingSelection.put(SubscriberGroup.of(genericSubscriber), genericSubscriber);
handle_SubscriptionAtStart(genericSubscriber, eventSubscriptionAtStart);
}
private void handle_SubscriptionAtStart(final EventSubscriber<Event> genericSubscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
switch (eventSubscriptionAtStart) {
case START_READING_FROM_BEGINNING -> {
// re-publish all events from beginning
localEventQueue.forEach(event -> this.internal_sendToGenericSubscriber(event, genericSubscriber));
}
case START_READING_FROM_NOW -> {
// nope, nothing to do
}
case START_READING_FROM_LAST_TIME -> {
long offset = consumerGroupOffsetMap.get(SubscriberGroup.of(genericSubscriber));
// includes the case, that a consumer group was not known before, ie. offset is 0,
// so nothing skipped but same behaviour as START_READING_FROM_BEGINNING
localEventQueue.stream().
skip(offset).
forEach(event -> this.internal_sendToGenericSubscriber(event, genericSubscriber));
}
}
}
@Override
public void publish(final Class<E> eventType, final E event) {
LOGGER.info("InMemorySyncEventBusSpy<%s>: publish new event %s".formatted(eventType, event));
super.publish(eventType, event);
// persist event in local (in-memory) storage (i.e. spy)
localEventQueue.add(event);
}
// -----------------------------------------------------------------------------------------------------------------
// send event to one subscriber
@Override
protected void internal_sendToSubscriber(final Class<E> eventType, final E event, final EventSubscriber<E> subscriber) {
subscriberReceiveDispatcher.callSubscriberReceiveMethod(eventType, event, subscriber);
consumerGroupOffsetMap.increment(SubscriberGroup.of(subscriber));
}
// send event to one generic subscriber
@Override
protected void internal_sendToGenericSubscriber(final E event, final EventSubscriber<Event> genericSubscriber) {
genericSubscriberReceiveDispatcher.callSubscriberReceiveMethod(Event.class, event, genericSubscriber);
consumerGroupOffsetMap.increment(SubscriberGroup.of(genericSubscriber));
}
}

View file

@ -0,0 +1,17 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.event.Event;
public class InMemorySyncEventBusSpyFactory extends InMemorySyncEventBusFactory {
@SuppressWarnings("unchecked")
@Override
public <E extends Event> EventBus<E> createOrGetEventBusFor(final Class<E> eventType) {
EventBus<? extends Event> eventBus = eventTypeToEventBusMap.get(eventType);
if (eventBus == null) {
eventBus = new InMemorySyncEventBusSpy<>(eventType);
eventTypeToEventBusMap.put(eventType, eventBus);
}
return (EventBus<E>) eventBus;
}
}

View file

@ -0,0 +1,52 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.event.EventContext;
import de.accso.flexinale.common.shared_kernel.DeveloperMistakeException;
import java.util.concurrent.atomic.AtomicInteger;
public class InMemorySyncEventContextHolder implements EventContextHolder {
// this implementation holds only one context:
// - as long as a context with the same correlation id is used, it is kept, i.e. counter is increased (so that a remove is not deleting it yet)
// - as soon as a context with a different correlation id is used, it is reset, i.e. counter is set to 1
// -> So it is not possible to use a separate event chain and go back - for that, we could change that to a stack
// where context instances are pushed as new and popped to be cleared
private EventContext CONTEXT = null;
private AtomicInteger counter = new AtomicInteger(0);
@Override
public EventContext get() {
return CONTEXT;
}
@Override
public void set(final EventContext eventContext) {
if (CONTEXT != null && CONTEXT.equalsByCorrelationId(eventContext)) {
counter.incrementAndGet();
}
else {
counter = new AtomicInteger(1);
CONTEXT = eventContext;
}
}
@Override
public void remove() {
if (counter.get() == 0) {
throw new DeveloperMistakeException("remove on InMemorySyncEventContextHolder once too much, should not happen");
}
else {
counter.decrementAndGet();
if (counter.get() == 0) {
CONTEXT = null;
}
}
}
@Override
public boolean isEmpty() {
return CONTEXT == null;
}
}

View file

@ -0,0 +1,140 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart;
import de.accso.flexinale.common.api.event.Event;
import de.accso.flexinale.common.shared_kernel.DeveloperMistakeException;
import de.accso.flexinale.common.shared_kernel.FlexinaleIllegalStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("unused")
public final class KafkaAsyncEventBus<E extends Event> implements EventBus<E> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAsyncEventBus.class);
private static final EventContextHolder eventContextHolder = new ThreadLocalEventContextHolder();
private KafkaProducerAdapter<E> kafkaProducerAdapter;
private KafkaConsumerAdapter<E> kafkaConsumerAdapter;
private final KafkaAvailabilityChecker kafkaAvailabilityChecker;
private final Class<E> eventType; // TODO might want to use this in order to simplify the interface and get rid of the method's first parameter
KafkaAsyncEventBus(final Class<E> eventType, final KafkaAvailabilityChecker kafkaAvailabilityChecker) {
this.eventType = eventType;
this.kafkaAvailabilityChecker = kafkaAvailabilityChecker;
}
@Override
public void subscribe(final Class<E> eventType, final EventSubscriber<E> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.info("KafkaAsyncEventBus<%s>: subscriber %s registered".formatted(eventType, subscriber));
if (kafkaConsumerAdapter == null) {
kafkaConsumerAdapter = createKafkaConsumerAdapter(eventType, subscriber, eventSubscriptionAtStart);
startKafkaConsumerAdapterInSeparateThread(kafkaConsumerAdapter);
}
kafkaConsumerAdapter.addSubscriber(subscriber);
}
@Override
public void subscribe(EventSubscriber<Event> genericSubscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.info("KafkaAsyncEventBus<%s>: generic subscriber %s registered".formatted(eventType, genericSubscriber));
if (kafkaConsumerAdapter == null) {
kafkaConsumerAdapter = createKafkaConsumerAdapter(eventType, genericSubscriber, eventSubscriptionAtStart);
startKafkaConsumerAdapterInSeparateThread(kafkaConsumerAdapter);
}
kafkaConsumerAdapter.addGenericSubscriber(genericSubscriber);
}
@Override
public void unsubscribe(final Class<E> eventType, final EventSubscriber<E> subscriber) {
LOGGER.info("KafkaAsyncEventBus<%s>: subscriber %s unregistered".formatted(eventType, subscriber));
if (kafkaConsumerAdapter == null) {
throw new DeveloperMistakeException("unsubscribe a subscriber from non-existing kafkaConsumerAdapter");
}
kafkaConsumerAdapter.removeSubscriber(subscriber);
}
@Override
public void unsubscribe(EventSubscriber<Event> genericSubscriber) {
LOGGER.info("KafkaAsyncEventBus<%s>: generic subscriber %s unregistered".formatted(eventType, genericSubscriber));
if (kafkaConsumerAdapter == null) {
throw new DeveloperMistakeException("unsubscribe a generic subscriber from non-existing kafkaConsumerAdapter");
}
kafkaConsumerAdapter.removeGenericSubscriber(genericSubscriber);
}
@Override
public void unsubscribeAll() {
LOGGER.info("KafkaAsyncEventBus<%s>: all subscribers unregistered".formatted(eventType));
if (kafkaConsumerAdapter == null) {
throw new DeveloperMistakeException("unsubscribe all subscribers from non-existing kafkaConsumerAdapter");
}
kafkaConsumerAdapter.removeAllSubscribers();
kafkaConsumerAdapter.removeAllGenericSubscribers();
kafkaConsumerAdapter.shutdown(); // shutdown adapter with last subscriber
kafkaConsumerAdapter = null;
// might want to add a common shutdown method?
}
@Override
public void publish(final Class<E> eventType, final E event) {
LOGGER.info("KafkaAsyncEventBus<%s>: publish new event %s".formatted(eventType, event));
if (kafkaProducerAdapter == null) {
kafkaProducerAdapter = createKafkaProducerAdapter(eventType);
}
kafkaProducerAdapter.publish(eventType, event);
}
@Override
public EventContextHolder getEventContextHolder() {
return eventContextHolder;
}
// --------------------------------------------------------------------------------------------------------------
private KafkaConsumerAdapter<E> createKafkaConsumerAdapter(final Class<E> eventType, final EventSubscriber<?> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
// check this once for every new KafkaConsumerAdapter
checkKafkaAvailability();
return new KafkaConsumerAdapter<>(eventType, subscriber, eventContextHolder, eventSubscriptionAtStart);
}
private void startKafkaConsumerAdapterInSeparateThread(final KafkaConsumerAdapter<E> kafkaConsumerAdapter) {
String threadName = "thread4%s<%s>".formatted(kafkaConsumerAdapter.getClass().getName(), eventType);
new Thread(this.kafkaConsumerAdapter, threadName).start();
}
// --------------------------------------------------------------------------------------------------------------
private KafkaProducerAdapter<E> createKafkaProducerAdapter(final Class<E> eventType) {
// check this once for every new KafkaProducerAdapter
checkKafkaAvailability();
return new KafkaProducerAdapter<>(eventType);
}
// --------------------------------------------------------------------------------------------------------------
private void checkKafkaAvailability() {
try {
if (! kafkaAvailabilityChecker.isKafkaAvailable()) {
throw new FlexinaleIllegalStateException("Kafka is not available.");
}
}
catch (Exception ex) {
throw new FlexinaleIllegalStateException("Kafka is not available.", ex);
}
}
}

View file

@ -0,0 +1,30 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.eventbus.EventBusFactory;
import de.accso.flexinale.common.api.event.Event;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("unused")
public final class KafkaAsyncEventBusFactory implements EventBusFactory {
private final Map < Class<? extends Event>,
EventBus<? extends Event> > eventTypeToEventBusMap = new HashMap<>();
private final KafkaAvailabilityChecker kafkaAvailabilityChecker;
public KafkaAsyncEventBusFactory(final KafkaAvailabilityChecker kafkaAvailabilityChecker) {
this.kafkaAvailabilityChecker = kafkaAvailabilityChecker;
}
@Override
public <E extends Event> EventBus<E> createOrGetEventBusFor(final Class<E> eventType) {
EventBus<? extends Event> eventBus = eventTypeToEventBusMap.get(eventType);
if (eventBus == null) {
eventBus = new KafkaAsyncEventBus<>(eventType, kafkaAvailabilityChecker);
eventTypeToEventBusMap.put(eventType, eventBus);
}
return (EventBus<E>) eventBus;
}
}

View file

@ -0,0 +1,54 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.shared_kernel.FlexinaleIllegalStateException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaConfiguration.adminConfig;
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaConfiguration.kafkaTimeoutInMs;
public final class KafkaAvailabilityChecker {
// Kafka availability is checked intentionally in Spring init phase
// so the whole application (or test) won't start if no Kafka is available
public KafkaAvailabilityChecker() {
try {
if (!isKafkaAvailable()) {
// intentionally not written to Log as this is checked in init phase where not even a logger
// might be available yet.
System.err.println("ERROR - Kafka is not available! Start Kafka before starting this application or test!");
throw new FlexinaleIllegalStateException("Kafka is not available.");
}
}
catch (Exception ex) {
System.err.println("ERROR - Kafka is not available! Start Kafka before starting this application or test!");
throw new FlexinaleIllegalStateException("Kafka is not available.", ex);
}
}
@SuppressWarnings({"SameReturnValue", "BooleanMethodIsAlwaysInverted"})
public boolean isKafkaAvailable() throws ExecutionException {
Map<String, Object> kafkaAdminConfig = adminConfig();
try (var adminClient = AdminClient.create( kafkaAdminConfig )) {
adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(kafkaTimeoutInMs)).nodes().get();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return true;
}
/*
public static void main(String[] args) throws ExecutionException {
boolean kafkaAvailable = new KafkaAvailabilityChecker().isKafkaAvailable();
if (kafkaAvailable) {
System.out.println("All fine, Kafka is available");
}
}
*/
}

View file

@ -0,0 +1,92 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart;
import de.accso.flexinale.common.shared_kernel.FlexinaleIllegalStateException;
import java.util.HashMap;
import java.util.Map;
import static de.accso.flexinale.common.shared_kernel.Identifiable.Id.uuidString;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
//TODO externalize to kafka properties file (but than we have it fixed for all topics - in the Java code we could use different configs)
//Note that in the application.properties there is something configured already (like the (De)Serializer classes)
public final class KafkaConfiguration {
public static final int kafkaTimeoutInMs = 30_000;
public static final String KEY_BOOTSTRAP_SERVERS = BOOTSTRAP_SERVERS_CONFIG;
public static final String defaultBootstrapServers = "localhost:29092";
public static final String KEY_SERIALIZER = KEY_SERIALIZER_CLASS_CONFIG;
public static final String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
public static final String VALUE_SERIALIZER = VALUE_SERIALIZER_CLASS_CONFIG;
public static final String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
public static final String KEY_DESERIALIZER = KEY_DESERIALIZER_CLASS_CONFIG;
public static final String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public static final String VALUE_DESERIALIZER = VALUE_DESERIALIZER_CLASS_CONFIG;
public static final String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public static final String CONSUMER_GROUP_ID_CONFIG = GROUP_ID_CONFIG;
public static Map<String, Object> producerConfig() {
return Map.of(
KEY_BOOTSTRAP_SERVERS, getBootstrapServers(),
KEY_SERIALIZER, getClassFor(keySerializer),
VALUE_SERIALIZER, getClassFor(valueSerializer)
);
}
@SuppressWarnings("SameParameterValue")
public static Map<String, Object> consumerConfig(final Class<?> eventType, final EventSubscriber<?> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
Map<String, Object> config = new HashMap<>();
config.put(KEY_BOOTSTRAP_SERVERS, getBootstrapServers());
config.put(KEY_DESERIALIZER, getClassFor(keyDeserializer));
config.put(VALUE_DESERIALIZER, getClassFor(valueDeserializer));
switch (eventSubscriptionAtStart) {
case START_READING_FROM_BEGINNING -> {
// make new consumer group and read from earliest offset (works only for new consumer groups)
config.put(CONSUMER_GROUP_ID_CONFIG, subscriber.getGroupName() + "<" + eventType + ">" + uuidString());
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
}
case START_READING_FROM_NOW -> {
// make new consumer group and read from latest offset (works only for new consumer groups)
config.put(CONSUMER_GROUP_ID_CONFIG, subscriber.getGroupName() + "<" + eventType + ">" + uuidString());
config.put(AUTO_OFFSET_RESET_CONFIG, "latest");
}
case START_READING_FROM_LAST_TIME -> {
config.put(CONSUMER_GROUP_ID_CONFIG, subscriber.getGroupName() + "<" + eventType + ">");
}
}
return config;
}
public static String getBootstrapServers() {
String systemPropertyBootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_CONFIG);
return (systemPropertyBootstrapServers == null) ? defaultBootstrapServers : systemPropertyBootstrapServers;
}
public static Map<String, Object> adminConfig() {
return Map.of(
KEY_BOOTSTRAP_SERVERS, getBootstrapServers()
);
}
private static Class<?> getClassFor(final String classAsString) {
try {
return Class.forName(classAsString);
}
catch (ClassNotFoundException ex) {
throw new FlexinaleIllegalStateException(ex);
}
}
}

View file

@ -0,0 +1,182 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart;
import de.accso.flexinale.common.api.event.Event;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.CommonErrorHandler;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaConfiguration.CONSUMER_GROUP_ID_CONFIG;
final class KafkaConsumerAdapter<E extends Event> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerAdapter.class);
private final KafkaConsumer<String, String> internal_KafkaConsumer;
private final static Lock lock4KafkaConsumer = new ReentrantLock(); // the internal_KafkaConsumer is not thread-safe
private final String kafkaTopicName;
private final AtomicBoolean consumerIsShutdownAndNoLongerConsumingFromKafka = new AtomicBoolean(false);
private final CommonErrorHandler errorHandler = new KafkaConsumerErrorHandler();
private final Class<E> eventType;
private final EventContextHolder eventContextHolder;
private final ConcurrentLinkedQueue<EventSubscriber<E>> subscribers = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<EventSubscriber<Event>> genericSubscribers = new ConcurrentLinkedQueue<>();
private final EventSubscriberReceiveDispatcher<E> eventSubscriberReceiveDispatcher = new EventSubscriberReceiveDispatcher<>();
private final EventSubscriberReceiveDispatcher<Event> eventGenericSubscriberReceiveDispatcher = new EventSubscriberReceiveDispatcher<>();
// --------------------------------------------------------------------------------------------------------------
KafkaConsumerAdapter(final Class<E> eventType, final EventSubscriber<?> subscriber, EventContextHolder eventContextHolder,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
this.eventType = eventType;
this.eventContextHolder = eventContextHolder; //TODO This is probably not working correctly.
// As each KafkaConsumerAdapter is running in its own thread, they cannot share the same ThreadLocal.
// The eventContextHolder must not be given from the bus but instantiated in each KafkaConsumerAdapter separately.
this.kafkaTopicName = KafkaTopicHelper.createTopicNameFromEventClazz(eventType);
final Map<String, Object> kafkaConfig = KafkaConfiguration.consumerConfig(eventType, subscriber, eventSubscriptionAtStart);
final String kafkaConsumerGroupId = (String) kafkaConfig.get(CONSUMER_GROUP_ID_CONFIG);
LOGGER.info("create new subscription from %s (%s), for Kafka topic %s, using Kafka consumer group id %s".
formatted(subscriber.getName(),
subscriber.getGroupName(),
kafkaTopicName,
kafkaConsumerGroupId));
internal_KafkaConsumer = new KafkaConsumer<>(kafkaConfig);
addShutDownHook();
}
@Override
public void run() {
// keep running forever or until shutdown() is called from another thread.
try {
internal_KafkaConsumer.subscribe(List.of(kafkaTopicName));
Duration kafkaTimeoutInMs = Duration.ofMillis( KafkaConfiguration.kafkaTimeoutInMs );
// does not necessarily read messages "from beginning", i.e. already existing records which had
// been published before the consumer was connected to the topic!
while (!consumerIsShutdownAndNoLongerConsumingFromKafka.get()) {
ConsumerRecords<String, String> kafkaRecords = internal_KafkaConsumer.poll(kafkaTimeoutInMs);
processAllKafkaRecords(kafkaTopicName, kafkaRecords);
}
}
catch (WakeupException ex) {
if (!consumerIsShutdownAndNoLongerConsumingFromKafka.get()) {
throw ex; // ignore exception if closing
}
}
}
private void processAllKafkaRecords(final String kafkaTopicName, final ConsumerRecords<String, String> records) {
LOGGER.debug("consuming from %s now %d records ...".formatted(kafkaTopicName, records.count()));
records.forEach(record -> {
try {
LOGGER.debug("consuming from %s: %s".formatted(kafkaTopicName, record));
E event = EventSerializationHelper.deserializeJsonString2Event(record.value(), eventType);
processEventAndCallReceiveOnAllSubscribers(eventType, event);
}
catch (Exception ex) {
this.errorHandler.handleOne(ex, record,
/* not used */ null,
/* not used */ null);
}
});
LOGGER.debug("consuming from %s now %d records ... done".formatted(kafkaTopicName, records.count()));
}
private void processEventAndCallReceiveOnAllSubscribers(final Class<E> eventType, final E event) {
eventContextHolder.set(event.eventContext());
try {
subscribers.forEach(subscriber ->
eventSubscriberReceiveDispatcher.callSubscriberReceiveMethod(eventType, event, subscriber));
genericSubscribers.forEach(subscriber ->
eventGenericSubscriberReceiveDispatcher.callSubscriberReceiveMethod(Event.class, event, subscriber));
}
finally {
eventContextHolder.remove();
}
}
// --------------------------------------------------------------------------------------------------------------
void addSubscriber(final EventSubscriber<E> subscriber) {
subscribers.add(subscriber);
}
void addGenericSubscriber(final EventSubscriber<Event> genericSubscriber) {
genericSubscribers.add(genericSubscriber);
}
void removeSubscriber(final EventSubscriber<E> subscriber) {
subscribers.remove(subscriber);
}
void removeGenericSubscriber(final EventSubscriber<Event> genericSubscriber) {
genericSubscribers.remove(genericSubscriber);
}
void removeAllSubscribers() {
subscribers.forEach(subscribers::remove);
}
void removeAllGenericSubscribers() {
genericSubscribers.forEach(genericSubscribers::remove);
}
// --------------------------------------------------------------------------------------------------------------
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
// note that currently there is no way of reactiving the KafkaConsumerAdapter once it has been shutdown
void shutdown() {
try {
/**
* the lock does not seem to help, one still sees (with Java 21) these kind of errors:
*
* 20240327 14:42:20.284 [Thread-5] ERROR o.a.k.clients.consumer.KafkaConsumer - caught exception during shutdown
* java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-5, id: 43) otherThread(id: 44)
* at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
* at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2343)
* at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2321)
* at de.accso.flexinale.common.infrastructure.eventbus.KafkaConsumerAdapter.shutdown(KafkaConsumerAdapter.java:147)
* at java.base/java.lang.Thread.run(Thread.java:1583)
*/
boolean isLockAcquired = lock4KafkaConsumer.tryLock(1, TimeUnit.SECONDS);
if (isLockAcquired) {
try {
consumerIsShutdownAndNoLongerConsumingFromKafka.set(true);
internal_KafkaConsumer.close();
} finally {
lock4KafkaConsumer.unlock();
}
}
}
catch (Exception ex) {
LOGGER.error("caught exception during shutdown", ex);
}
}
}

View file

@ -0,0 +1,76 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.MessageListenerContainer;
// Error handling for Kafka consumers using retry pattern and finally writing error messages to DLT
/*
A typical Spring Kafka error handler would look like below, but this only works for consumers
which are annotated with @KafkaListener
...
private int numberOfMaxRetriesForRetryableErrors = 3;
@Bean
DefaultErrorHandler kafkaConsumerErrorHandler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler((record, ex) -> {
LOGGER.error("error while processing Kafka record " + record + ex);
// finally send to dead letter topics
new DeadLetterPublishingRecoverer(internal_KafkaErrorDLTTemplate).accept(record, ex);
},
// Default error handling is to retry
new ExponentialBackOffWithMaxRetries(numberOfMaxRetriesForRetryableErrors));
// ... but do not retry on FlexinaleKafkaNotRetryableException and its subclasses
errorHandler.addNotRetryableExceptions(FlexinaleKafkaNotRetryableException.class);
return errorHandler;
}
...
*/
public final class KafkaConsumerErrorHandler implements CommonErrorHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerErrorHandler.class);
// Kafka template for sending error message to DLT
static final KafkaTemplate<String, String> internal_KafkaErrorDLTTemplate = new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(
KafkaConfiguration.producerConfig()
));
@Override
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {
try {
LOGGER.error("error while processing Kafka record %s, now sending to DLT, error: %s".formatted(record, thrownException));
new DeadLetterPublishingRecoverer(internal_KafkaErrorDLTTemplate).accept(record, thrownException);
return true;
}
catch (Exception ex) {
return false;
}
}
}
abstract class FlexinaleKafkaNotRetryableException extends RuntimeException {
public FlexinaleKafkaNotRetryableException(final String message, final Throwable cause) {
super(message, cause);
}
}
@SuppressWarnings("unused")
class FlexinaleKafkaMessageDeserializationException extends FlexinaleKafkaNotRetryableException {
public FlexinaleKafkaMessageDeserializationException(final String message, final Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,60 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.event.Event;
import de.accso.flexinale.common.shared_kernel.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
final class KafkaProducerAdapter<E extends Event> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerAdapter.class);
private final KafkaTemplate<String, String> internal_KafkaTemplate;
private final String kafkaTopicName;
private final Class<E> eventType;
public KafkaProducerAdapter(final Class<E> eventType) {
this.eventType = eventType;
this.kafkaTopicName = KafkaTopicHelper.createTopicNameFromEventClazz(eventType);
LOGGER.info("create new %s for Kafka topic %s".formatted(getName(), kafkaTopicName));
this.internal_KafkaTemplate = new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(
KafkaConfiguration.producerConfig()
));
}
public void publish(final Class<E> eventType, final E event) {
try {
Identifiable.Id kafkaKey = event.id();
String kafkaMessage = EventSerializationHelper.serializeEvent2JsonString(event);
internal_KafkaTemplate
.send(kafkaTopicName, kafkaKey.id(), kafkaMessage)
.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.debug(String.format("message was sent to topic %s " +
"for event type %s, event id %s",
kafkaTopicName,
eventType.getCanonicalName(),
event.id()));
} else {
LOGGER.error(String.format("Kafka message could not be sent to topic %s " +
"for event type %s, event id %s - error: %s",
kafkaTopicName, eventType.getCanonicalName(),
event.id(),
ex.getMessage()));
}
});
}
catch (Exception ex) {
LOGGER.error("Kafka message will not sent out for event " + event, ex);
}
}
String getName() {
return "KafkaProducerAdapter" + "<" + eventType + ">";
}
}

View file

@ -0,0 +1,57 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.event.Event;
import de.accso.flexinale.common.shared_kernel.EventClassHelper;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaConfiguration.adminConfig;
import static de.accso.flexinale.common.infrastructure.eventbus.KafkaConfiguration.kafkaTimeoutInMs;
@SuppressWarnings("unused")
public final class KafkaTopicHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTopicHelper.class);
static <E extends Event> String createTopicNameFromEventClazz(final Class<E> eventType) {
return EventClassHelper.getEventClazzName(eventType)
+ ".v"
+ EventClassHelper.getEventClazzVersion(eventType).version();
}
static <E extends Event> String createTopicNameFromEventClazzAndInstance(final Class<E> eventType, final Event event) {
return eventType.getCanonicalName()
+ ".v"
+ event.version().version();
}
public static void deleteKafkaTopicsWithPrefixAndSuffix(final String prefix, final String suffix)
throws ExecutionException, TimeoutException
{
Map<String, Object> kafkaAdminConfig = adminConfig();
try (var adminClient = AdminClient.create( kafkaAdminConfig )) {
List<String> topicNames = adminClient.listTopics().names().get()
.stream()
.filter(name -> name.startsWith(prefix) && name.endsWith(suffix))
.toList();
adminClient.deleteTopics(topicNames).all().get(kafkaTimeoutInMs, TimeUnit.SECONDS);
if (topicNames.isEmpty()) {
LOGGER.info("did not delete any Kafka topics");
}
else {
LOGGER.info("tried to delete these Kafka topics: " + topicNames);
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

View file

@ -0,0 +1,32 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.event.EventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class NopeContextHolder implements EventContextHolder {
private static final Logger LOGGER = LoggerFactory.getLogger(NopeContextHolder.class);
@Override
public EventContext get() {
LOGGER.debug("NopeContextHolder.get() does nothing, always returns null");
return null;
}
@Override
public void set(EventContext eventContext) {
LOGGER.debug("NopeContextHolder.set() does nothing");
}
@Override
public void remove() {
LOGGER.debug("NopeContextHolder.remove() does nothing");
}
@Override
public boolean isEmpty() {
LOGGER.debug("NopeContextHolder.isEmpty() does nothing, always returns true");
return true;
}
}

View file

@ -0,0 +1,64 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.eventbus.EventSubscriber;
import de.accso.flexinale.common.api.eventbus.EventSubscriptionAtStart;
import de.accso.flexinale.common.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("unused")
public final class NopeEventBusSpy<E extends Event> implements EventBus<E> {
private static final Logger LOGGER = LoggerFactory.getLogger(NopeEventBusSpy.class);
private static final EventContextHolder eventContextHolder = new NopeContextHolder();
private final Class<E> eventType; // might want to use this simplify the interface and get rid of the method's first parameter
NopeEventBusSpy(final Class<E> eventType) {
this.eventType = eventType;
}
@Override
public void subscribe(final Class<E> eventType, final EventSubscriber<E> subscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.debug("NopeEventBusSpy<%s>: subscriber %s registered but does nothing"
.formatted(eventType.getSimpleName(), subscriber));
}
@Override
public void subscribe(final EventSubscriber<Event> genericSubscriber,
final EventSubscriptionAtStart eventSubscriptionAtStart) {
LOGGER.debug("NopeEventBusSpy<%s>: subscriber %s registered but does nothing"
.formatted(eventType.getSimpleName(), genericSubscriber));
}
@Override
public void unsubscribe(final Class<E> eventType, final EventSubscriber<E> subscriber) {
LOGGER.debug("NopeEventBusSpy<%s>: subscriber %s unregistered but does nothing"
.formatted(eventType.getSimpleName(), subscriber));
}
@Override
public void unsubscribe(EventSubscriber<Event> genericSubscriber) {
LOGGER.debug("NopeEventBusSpy<%s>: subscriber %s unregistered but does nothing"
.formatted(eventType.getSimpleName(), genericSubscriber));
}
@Override
public void unsubscribeAll() {
LOGGER.debug("NopeEventBusSpy<%s>: all subscribers unregistered but does nothing"
.formatted(eventType.getSimpleName()));
}
@Override
public void publish(final Class<E> eventType, final E event) {
LOGGER.info("NopeEventBusSpy<%s>: publish new event %s but does nothing".formatted(eventType, event));
}
@Override
public EventContextHolder getEventContextHolder() {
return eventContextHolder;
}
}

View file

@ -0,0 +1,17 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventBus;
import de.accso.flexinale.common.api.event.Event;
public final class NopeEventBusSpyFactory extends InMemorySyncEventBusFactory {
@SuppressWarnings("unchecked")
@Override
public <E extends Event> EventBus<E> createOrGetEventBusFor(final Class<E> eventType) {
EventBus<? extends Event> eventBus = eventTypeToEventBusMap.get(eventType);
if (eventBus == null) {
eventBus = new NopeEventBusSpy<>(eventType);
eventTypeToEventBusMap.put(eventType, eventBus);
}
return (EventBus<E>) eventBus;
}
}

View file

@ -0,0 +1,28 @@
package de.accso.flexinale.common.infrastructure.eventbus;
import de.accso.flexinale.common.api.eventbus.EventContextHolder;
import de.accso.flexinale.common.api.event.EventContext;
public final class ThreadLocalEventContextHolder implements EventContextHolder {
private static final ThreadLocal<EventContext> CONTEXT = new ThreadLocal<>();
@Override
public EventContext get() {
return CONTEXT.get();
}
@Override
public void set(EventContext eventContext) {
CONTEXT.set(eventContext);
}
@Override
public void remove() {
CONTEXT.remove();
}
@Override
public boolean isEmpty() {
return CONTEXT.get() == null;
}
}

View file

@ -0,0 +1,19 @@
package de.accso.flexinale.common.shared_kernel;
import java.util.Arrays;
public final class ClazzHelper {
public static boolean clazzImplementsInterface(final Class<?> implementationClazz, final Class<?> interfaceClazz) {
if (implementationClazz.equals(Object.class)) return false;
Class<?>[] directInterfaces = implementationClazz.getInterfaces(); // _only_ the direct interfaces, not from super class!
boolean implementsEvent = Arrays.stream(directInterfaces).toList().contains(interfaceClazz);
if (implementsEvent) {
return true;
}
else {
// recursive call
return clazzImplementsInterface(implementationClazz.getSuperclass(), interfaceClazz);
}
}
}

View file

@ -0,0 +1,14 @@
package de.accso.flexinale.common.shared_kernel;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public final class DateTimeHelper {
public static LocalDateTime fromEpochSeconds(final long epochSeconds) {
return LocalDateTime.ofEpochSecond(epochSeconds, 0, ZoneOffset.UTC);
}
public static long toEpochSeconds(final LocalDateTime dateTime) {
return dateTime.toEpochSecond(ZoneOffset.UTC);
}
}

View file

@ -0,0 +1,24 @@
package de.accso.flexinale.common.shared_kernel;
public class DeveloperMistakeException extends RuntimeException {
public DeveloperMistakeException(final String message) {
super(message);
}
public DeveloperMistakeException() {
super();
}
public DeveloperMistakeException(final String message, final Throwable cause) {
super(message, cause);
}
public DeveloperMistakeException(final Throwable cause) {
super(cause);
}
protected DeveloperMistakeException(final String message, final Throwable cause,
final boolean enableSuppression, final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View file

@ -0,0 +1,8 @@
package de.accso.flexinale.common.shared_kernel;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(value = RetentionPolicy.RUNTIME)
public @interface DoNotCheckInArchitectureTests {
}

View file

@ -0,0 +1,6 @@
package de.accso.flexinale.common.shared_kernel;
@SuppressWarnings("unused")
public interface EqualsByContent {
boolean equalsByContent(final Object o);
}

View file

@ -0,0 +1,58 @@
package de.accso.flexinale.common.shared_kernel;
import de.accso.flexinale.common.api.event.Event;
import java.lang.reflect.Constructor;
import static de.accso.flexinale.common.shared_kernel.ClazzHelper.clazzImplementsInterface;
public final class EventClassHelper {
@SuppressWarnings("unchecked")
public static <E extends Event> Class<E> getClassFor(final String clazzName) {
try {
return (Class<E>) Class.forName(clazzName);
}
catch (ClassNotFoundException ex) {
throw new FlexinaleIllegalStateException(ex);
}
}
public static <E extends Event> String getEventClazzName(final String clazzName) {
Class<? extends Event> clazz = getClassFor(clazzName);
if (!clazzImplementsInterface(clazz, Event.class)) {
throw new FlexinaleIllegalArgumentException("wrong clazz %s used to retrieve Event's clazz name and version"
.formatted(clazzName));
}
return clazz.getCanonicalName();
}
public static <E extends Event> String getEventClazzName(final Class<E> clazz) {
return clazz.getCanonicalName();
}
public static <E extends Event> Versionable.Version getEventClazzVersion(String clazzName) {
Class<? extends Event> clazz = getClassFor(clazzName);
if (!clazzImplementsInterface(clazz, Event.class)) {
throw new FlexinaleIllegalArgumentException("need to use Event class %s to retrieve class name"
.formatted(clazzName));
}
return getEventClazzVersion(clazz);
}
public static <E extends Event> Versionable.Version getEventClazzVersion(final Class<E> clazz) {
try {
Constructor<E> constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
Event e = constructor.newInstance();
return e.version();
}
catch (Exception ex) {
throw new FlexinaleIllegalArgumentException("error when instantiating Event class %s or retrieving its version"
.formatted(getEventClazzName(clazz)), ex);
}
}
}

View file

@ -0,0 +1,24 @@
package de.accso.flexinale.common.shared_kernel;
public class FlexinaleIllegalArgumentException extends DeveloperMistakeException {
public FlexinaleIllegalArgumentException() {
super();
}
public FlexinaleIllegalArgumentException(final String message) {
super(message);
}
public FlexinaleIllegalArgumentException(final String message, final Throwable cause) {
super(message, cause);
}
public FlexinaleIllegalArgumentException(final Throwable cause) {
super(cause);
}
protected FlexinaleIllegalArgumentException(final String message, final Throwable cause,
final boolean enableSuppression, final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View file

@ -0,0 +1,20 @@
package de.accso.flexinale.common.shared_kernel;
@SuppressWarnings("unused")
public class FlexinaleIllegalStateException extends IllegalStateException {
public FlexinaleIllegalStateException() {
super();
}
public FlexinaleIllegalStateException(final String message) {
super(message);
}
public FlexinaleIllegalStateException(final String message, final Throwable cause) {
super(message, cause);
}
public FlexinaleIllegalStateException(final Throwable cause) {
super(cause);
}
}

View file

@ -0,0 +1,31 @@
package de.accso.flexinale.common.shared_kernel;
import java.io.Serializable;
import java.util.UUID;
public interface Identifiable {
record Id(String id) implements Serializable, RawWrapper<String> {
public static Id of() {
return uuid();
}
public static Id of(String id) {
return new Id(id);
}
public static Id uuid() {
return Id.of(UUID.randomUUID().toString());
}
public static String uuidString() {
return UUID.randomUUID().toString();
}
@Override
public String raw() {
return id;
}
}
Id id();
}

View file

@ -0,0 +1,21 @@
package de.accso.flexinale.common.shared_kernel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public final class MapWithCounter<K> {
private final ConcurrentHashMap<K, AtomicLong> backingMap = new ConcurrentHashMap<>();
public long increment(K key) {
return backingMap.computeIfAbsent(key, k -> new AtomicLong()).incrementAndGet();
}
public long get(K key) {
AtomicLong counter = backingMap.get(key);
return counter != null ? counter.get() : 0;
}
public void set(K key, Long value) {
backingMap.put(key, new AtomicLong(value));
}
}

View file

@ -0,0 +1,5 @@
package de.accso.flexinale.common.shared_kernel;
public interface Mergeable<E extends Identifiable> {
E merge(E newData);
}

View file

@ -0,0 +1,25 @@
package de.accso.flexinale.common.shared_kernel;
import java.io.Serializable;
public interface RawWrapper<T> extends Serializable {
T raw();
default String print() { // unfortunately one cannot override toString() here, results in compiler error
return raw().toString();
}
@SuppressWarnings("unchecked")
static <T> T getRawOrNull(Object o) {
if (o == null)
return null;
else
try {
RawWrapper<T> castedObject = (RawWrapper<T>)o;
return castedObject.raw();
}
catch (ClassCastException ccex) {
throw new DeveloperMistakeException(ccex);
}
}
}

View file

@ -0,0 +1,71 @@
package de.accso.flexinale.common.shared_kernel;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
public interface Selection<T> extends Set<T> {
Set<T> next(); // select one or more options from the list based on a selection algorithm
enum SelectionAlgorithm {
RANDOM, CONSTANT_FIRST, CONSTANT_LAST, ROUND_ROBIN, ALL
}
final class RandomSelector<T> extends SynchronizedSet<T> implements Selection<T> {
@Override
public Set<T> next() {
if (this.size() == 0) return Set.of();
int randomIndex = ThreadLocalRandom.current().nextInt(this.size());
return Set.of(this.stream()
.skip(randomIndex)
.findFirst()
.orElseThrow(DeveloperMistakeException::new));
}
}
final class ConstantFirstSelector<T> extends SynchronizedSet<T> implements Selection<T> {
@Override
public Set<T> next() {
if (this.size() == 0) return Set.of();
return Set.of(this.stream()
.findFirst()
.orElseThrow(DeveloperMistakeException::new));
}
}
final class ConstantLastSelector<T> extends SynchronizedSet<T> implements Selection<T> {
@Override
public Set<T> next() {
if (this.size() == 0) return Set.of();
return Set.of(this.stream()
.skip(this.size()-1)
.findFirst()
.orElseThrow(DeveloperMistakeException::new));
}
}
final class RoundRobinSelector<T> extends SynchronizedSet<T> implements Selection<T> {
private int roundRobinIndex;
@Override
public Set<T> next() {
if (this.size() == 0) return Set.of();
T result = this.stream().toList().get(roundRobinIndex++);
if (roundRobinIndex == this.size()) roundRobinIndex = 0;
return Set.of(result);
}
}
final class AllSelector<T> extends SynchronizedSet<T> implements Selection<T> {
@Override
public Set<T> next() {
if (this.size() == 0) return Set.of();
return this;
}
}
}

View file

@ -0,0 +1,30 @@
package de.accso.flexinale.common.shared_kernel;
import org.apache.commons.collections4.multimap.AbstractSetValuedMap;
import java.util.*;
public final class SelectionMultiMap<K, V> extends AbstractSetValuedMap<K, V> {
private final Selection.SelectionAlgorithm selectionAlgorithm;
public SelectionMultiMap(Selection.SelectionAlgorithm selectionAlgorithm) {
super(new HashMap<K, Selection<V>>());
this.selectionAlgorithm = selectionAlgorithm;
}
@Override
protected Set<V> createCollection() {
return switch (selectionAlgorithm) {
case RANDOM -> new Selection.RandomSelector<>();
case CONSTANT_FIRST -> new Selection.ConstantFirstSelector<>();
case CONSTANT_LAST -> new Selection.ConstantLastSelector<>();
case ROUND_ROBIN -> new Selection.RoundRobinSelector<>();
case ALL -> new Selection.AllSelector<>();
};
}
@Override
public Set<V> get(final K key) {
return getMap().get(key);
}
}

View file

@ -0,0 +1,127 @@
package de.accso.flexinale.common.shared_kernel;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
// needed as one cannot derive from Collections.synchronizedSet()'s result type (as package-protected, sigh)
class SynchronizedSet<T> implements Set<T> {
private final Set<T> backingSet = new HashSet<T>();
private final Object lock = new Object();
@Override
public boolean add(T object) {
synchronized (lock) {
return backingSet.add(object);
}
}
@Override
public boolean addAll(Collection<? extends T> collection) {
synchronized (lock) {
return backingSet.addAll(collection);
}
}
@Override
public void clear() {
synchronized (lock) {
backingSet.clear();
}
}
@Override
public boolean contains(Object object) {
synchronized (lock) {
return backingSet.contains(object);
}
}
@Override
public boolean containsAll(Collection<?> collection) {
synchronized (lock) {
return backingSet.containsAll(collection);
}
}
@Override
public boolean isEmpty() {
synchronized (lock) {
return backingSet.isEmpty();
}
}
// needs to get synchronized outside!
@Override
public Iterator<T> iterator() {
return backingSet.iterator();
}
@Override
public T[] toArray() {
synchronized (lock) {
return (T[]) backingSet.toArray();
}
}
@Override
public T[] toArray(Object[] object) {
synchronized (lock) {
return (T[]) backingSet.toArray(object);
}
}
@Override
public boolean remove(Object object) {
synchronized (lock) {
return backingSet.remove(object);
}
}
@Override
public boolean removeAll(Collection<?> collection) {
synchronized (lock) {
return backingSet.removeAll(collection);
}
}
@Override
public boolean retainAll(Collection<?> collection) {
synchronized (lock) {
return backingSet.retainAll(collection);
}
}
@Override
public int size() {
synchronized (lock) {
return backingSet.size();
}
}
@Override
public boolean equals(Object object) {
synchronized (lock) {
if (object == this) {
return true;
}
return backingSet.equals(object);
}
}
@Override
public int hashCode() {
synchronized (lock) {
return backingSet.hashCode();
}
}
@Override
public String toString() {
synchronized (lock) {
return backingSet.toString();
}
}
}

View file

@ -0,0 +1,21 @@
package de.accso.flexinale.common.shared_kernel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public final class ThreadSafeCounterMap<K> {
private final ConcurrentHashMap<K, AtomicLong> backingMap = new ConcurrentHashMap<>();
public long increment(K key) {
return backingMap.computeIfAbsent(key, k -> new AtomicLong()).incrementAndGet();
}
public long get(K key) {
AtomicLong counter = backingMap.get(key);
return counter != null ? counter.get() : 0;
}
public void set(K key, Long value) {
backingMap.put(key, new AtomicLong(value));
}
}

View file

@ -0,0 +1,14 @@
package de.accso.flexinale.common.shared_kernel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class TimeFormatter {
public static String calculateString(final LocalDateTime from, final Integer durationInMinutes) {
DateTimeFormatter hourMinuteFormatter = DateTimeFormatter.ofPattern("HH:mm");
return hourMinuteFormatter.format(from)
+ " - "
+ hourMinuteFormatter.format(from.plusMinutes(durationInMinutes));
}
}

View file

@ -0,0 +1,32 @@
package de.accso.flexinale.common.shared_kernel;
import java.io.Serializable;
public interface Versionable {
record Version(Integer version) implements Serializable, RawWrapper<Integer> {
public static Version of(Integer version) {
return new Version(version);
}
public Version inc() {
return new Version(this.version()+1);
}
@Override
public Integer raw() {
return version;
}
}
Version version();
static Version unknownVersion() {
Integer UNKNOWN = -1;
return Version.of(UNKNOWN);
}
static Version initialVersion() {
Integer INITIAL = 0;
return Version.of(INITIAL);
}
}

View file

@ -0,0 +1,20 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{YYYYMMdd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.springframework.orm.jpa" level="INFO"/>
<logger name="org.springframework.boot.autoconfigure.domain.EntityScan" level="INFO"/>
<logger name="org.apache.kafka" level="WARN"/>
<logger name="org.apache.kafka.clients.admin.AdminClient" level="INFO"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="INFO"/>
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="INFO"/>
<logger name="de.accso" level="INFO"/>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>