feat: Start kafka as a service
This commit is contained in:
parent
f8bf68feb7
commit
24795af52b
2 changed files with 286 additions and 11 deletions
89
devenv.nix
89
devenv.nix
|
|
@ -7,12 +7,13 @@
|
||||||
}:
|
}:
|
||||||
|
|
||||||
{
|
{
|
||||||
imports = [ ];
|
imports = [ ./nix/kafka.nix ];
|
||||||
|
|
||||||
# https://devenv.sh/packages/
|
# https://devenv.sh/packages/
|
||||||
packages = [
|
packages = [
|
||||||
pkgs.git
|
pkgs.git
|
||||||
pkgs.jetbrains.idea-community-bin
|
pkgs.jetbrains.idea-community-bin
|
||||||
|
pkgs.yaml-language-server
|
||||||
];
|
];
|
||||||
|
|
||||||
languages.java.enable = true;
|
languages.java.enable = true;
|
||||||
|
|
@ -22,11 +23,83 @@
|
||||||
# https://devenv.sh/processes/
|
# https://devenv.sh/processes/
|
||||||
# processes.cargo-watch.exec = "cargo-watch";
|
# processes.cargo-watch.exec = "cargo-watch";
|
||||||
|
|
||||||
processes.flexinale-monolith.exec = ''
|
scripts =
|
||||||
java -Dspring.datasource.url=jdbc:postgresql://localhost:5432/monolith -jar flex-training-flexinale/flexinale-monolith/target/flexinale-monolith-2024.3.0-spring-boot-fat-jar.jar
|
let
|
||||||
'';
|
datasource = "-Dspring.datasource.url=jdbc:postgresql://${config.services.postgres.listen_addresses}:5432";
|
||||||
|
startFlexinale = name: db: ''
|
||||||
|
java ${datasource}/${db} -jar flex-training-flexinale/flexinale-${name}/target/flexinale-${name}-2024.3.0-spring-boot-fat-jar.jar
|
||||||
|
'';
|
||||||
|
in
|
||||||
|
{
|
||||||
|
"monolith-run".exec = startFlexinale "monolith" "monolith";
|
||||||
|
"modulith-1-run".exec = startFlexinale "modulith-1-onion" "modulith-1";
|
||||||
|
"modulith-2-run".exec = startFlexinale "modulith-2-components" "modulith-2";
|
||||||
|
"monolith-init".exec = ''
|
||||||
|
cd ${config.env.DEVENV_ROOT}/flex-training-flexinale
|
||||||
|
mvn test -Dtest=testdata.TestDataLoader -pl flexinale-monolith -am
|
||||||
|
'';
|
||||||
|
"modulith-1-init".exec = ''
|
||||||
|
cd ${config.env.DEVENV_ROOT}/flex-training-flexinale
|
||||||
|
mvn test -Dtest=testdata.TestDataLoader -pl flexinale-modulith-1-onion -am
|
||||||
|
'';
|
||||||
|
"modulith-2-init".exec = ''
|
||||||
|
cd ${config.env.DEVENV_ROOT}/flex-training-flexinale
|
||||||
|
mvn test -Dtest=testdata.AllTestDataLoader -pl flexinale-modulith-2-components -am
|
||||||
|
'';
|
||||||
|
"distributed-init".exec = ''
|
||||||
|
cd ${config.env.DEVENV_ROOT}/flex-training-flexinale
|
||||||
|
mvn test -Dtest=testdata.AllTestDataLoader -pl flexinale-distributed -am
|
||||||
|
'';
|
||||||
|
"distributed-portal".exec = ''
|
||||||
|
java ${datasource}/distributed-besucherportal \
|
||||||
|
-Dbootstrap.servers=localhost:29092 \
|
||||||
|
-Dspring.kafka.consumer.bootstrap-servers=localhost:29092 \
|
||||||
|
-Dspring.kafka.producer.bootstrap-servers=localhost:29092 \
|
||||||
|
-jar flex-training-flexinale/flexinale-distributed/flexinale-distributed-besucherportal/target/flexinale-distributed-besucherportal-2024.3.0-spring-boot-fat-jar.jar
|
||||||
|
'';
|
||||||
|
"distributed-backoffice".exec = ''
|
||||||
|
java ${datasource}/distributed-backoffice \
|
||||||
|
-Dbootstrap.servers=localhost:29092 \
|
||||||
|
-Dspring.kafka.consumer.bootstrap-servers=localhost:29092 \
|
||||||
|
-Dspring.kafka.producer.bootstrap-servers=localhost:29092 \
|
||||||
|
-jar flex-training-flexinale/flexinale-distributed/flexinale-distributed-backoffice/target/flexinale-distributed-backoffice-2024.3.0-spring-boot-fat-jar.jar
|
||||||
|
'';
|
||||||
|
"distributed-ticketing".exec = ''
|
||||||
|
java ${datasource}/distributed-ticketing \
|
||||||
|
-Dbootstrap.servers=localhost:29092 \
|
||||||
|
-Dspring.kafka.consumer.bootstrap-servers=localhost:29092 \
|
||||||
|
-Dspring.kafka.producer.bootstrap-servers=localhost:29092 \
|
||||||
|
-jar flex-training-flexinale/flexinale-distributed/flexinale-distributed-ticketing/target/flexinale-distributed-ticketing-2024.3.0-spring-boot-fat-jar.jar
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
|
||||||
|
services.apache-kafka = {
|
||||||
|
enable = true;
|
||||||
|
clusterId = "foobar";
|
||||||
|
settings = {
|
||||||
|
"broker.id" = 1;
|
||||||
|
"node.id" = 1;
|
||||||
|
"process.roles" = [
|
||||||
|
"broker"
|
||||||
|
"controller"
|
||||||
|
];
|
||||||
|
"listeners" = [
|
||||||
|
"PLAINTEXT://:9092"
|
||||||
|
"FLEXINALE://:29092"
|
||||||
|
"CONTROLLER://:9093"
|
||||||
|
];
|
||||||
|
"listener.security.protocol.map" = [
|
||||||
|
"PLAINTEXT:PLAINTEXT"
|
||||||
|
"FLEXINALE:PLAINTEXT"
|
||||||
|
"CONTROLLER:PLAINTEXT"
|
||||||
|
];
|
||||||
|
"controller.quorum.voters" = [
|
||||||
|
"1@localhost:9093"
|
||||||
|
];
|
||||||
|
"controller.listener.names" = [ "CONTROLLER" ];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
# https://devenv.sh/services/
|
|
||||||
services.postgres = {
|
services.postgres = {
|
||||||
enable = true;
|
enable = true;
|
||||||
listen_addresses = "127.0.0.1";
|
listen_addresses = "127.0.0.1";
|
||||||
|
|
@ -51,12 +124,6 @@
|
||||||
]);
|
]);
|
||||||
};
|
};
|
||||||
|
|
||||||
# https://devenv.sh/scripts/
|
|
||||||
scripts.monolith-load.exec = ''
|
|
||||||
cd ${config.env.DEVENV_ROOT}/flex-training-flexinale
|
|
||||||
mvn test -Dtest=testdata.TestDataLoader -pl flexinale-monolith -am
|
|
||||||
'';
|
|
||||||
|
|
||||||
# https://devenv.sh/tests/
|
# https://devenv.sh/tests/
|
||||||
enterTest = ''
|
enterTest = ''
|
||||||
echo "Running tests"
|
echo "Running tests"
|
||||||
|
|
|
||||||
208
nix/kafka.nix
Normal file
208
nix/kafka.nix
Normal file
|
|
@ -0,0 +1,208 @@
|
||||||
|
{
|
||||||
|
config,
|
||||||
|
lib,
|
||||||
|
pkgs,
|
||||||
|
...
|
||||||
|
}:
|
||||||
|
let
|
||||||
|
cfg = config.services.apache-kafka;
|
||||||
|
|
||||||
|
# The `javaProperties` generator takes care of various escaping rules and
|
||||||
|
# generation of the properties file, but we'll handle stringly conversion
|
||||||
|
# ourselves in mkPropertySettings and stringlySettings, since we know more
|
||||||
|
# about the specifically allowed format eg. for lists of this type, and we
|
||||||
|
# don't want to coerce-downsample values to str too early by having the
|
||||||
|
# coercedTypes from javaProperties directly in our NixOS option types.
|
||||||
|
#
|
||||||
|
# Make sure every `freeformType` and any specific option type in `settings` is
|
||||||
|
# supported here.
|
||||||
|
|
||||||
|
mkPropertyString =
|
||||||
|
let
|
||||||
|
render = {
|
||||||
|
bool = lib.boolToString;
|
||||||
|
int = toString;
|
||||||
|
list = lib.concatMapStringsSep "," mkPropertyString;
|
||||||
|
string = lib.id;
|
||||||
|
};
|
||||||
|
in
|
||||||
|
v: render.${builtins.typeOf v} v;
|
||||||
|
|
||||||
|
stringlySettings = lib.mapAttrs (_: mkPropertyString) (
|
||||||
|
lib.filterAttrs (_: v: v != null) cfg.settings
|
||||||
|
);
|
||||||
|
|
||||||
|
generator = (pkgs.formats.javaProperties { }).generate;
|
||||||
|
|
||||||
|
startScript = pkgs.writeShellScript "kafka-startup" ''
|
||||||
|
${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
|
||||||
|
${cfg.jre}/bin/java \
|
||||||
|
-cp "${cfg.package}/libs/*" \
|
||||||
|
-Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
|
||||||
|
${toString cfg.jvmOptions} \
|
||||||
|
kafka.Kafka \
|
||||||
|
${cfg.configFiles.serverProperties}
|
||||||
|
'';
|
||||||
|
|
||||||
|
kafConfig = pkgs.writeText "kaf-local-kafka.yaml" ''
|
||||||
|
clusters:
|
||||||
|
- name: local
|
||||||
|
brokers:
|
||||||
|
- localhost:29092
|
||||||
|
SASL: null
|
||||||
|
TLS: null
|
||||||
|
security-protocol: ""
|
||||||
|
version: "1.0.0"
|
||||||
|
'';
|
||||||
|
in
|
||||||
|
{
|
||||||
|
options.services.apache-kafka = {
|
||||||
|
enable = lib.mkEnableOption "Apache Kafka event streaming broker";
|
||||||
|
|
||||||
|
settings = lib.mkOption {
|
||||||
|
description = ''
|
||||||
|
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
|
||||||
|
{file}`server.properties`.
|
||||||
|
|
||||||
|
Note that .properties files contain mappings from string to string.
|
||||||
|
Keys with dots are NOT represented by nested attrs in these settings,
|
||||||
|
but instead as quoted strings (ie. `settings."broker.id"`, NOT
|
||||||
|
`settings.broker.id`).
|
||||||
|
'';
|
||||||
|
type = lib.types.submodule {
|
||||||
|
freeformType =
|
||||||
|
with lib.types;
|
||||||
|
let
|
||||||
|
primitive = oneOf [
|
||||||
|
bool
|
||||||
|
int
|
||||||
|
str
|
||||||
|
];
|
||||||
|
in
|
||||||
|
lazyAttrsOf (nullOr (either primitive (listOf primitive)));
|
||||||
|
|
||||||
|
options = {
|
||||||
|
"broker.id" = lib.mkOption {
|
||||||
|
description = "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
|
||||||
|
default = null;
|
||||||
|
type = with lib.types; nullOr int;
|
||||||
|
};
|
||||||
|
|
||||||
|
"log.dirs" = lib.mkOption {
|
||||||
|
description = "Log file directories.";
|
||||||
|
default = [ "${config.env.DEVENV_STATE}/apache-kafka" ];
|
||||||
|
type = with lib.types; listOf path;
|
||||||
|
};
|
||||||
|
|
||||||
|
"listeners" = lib.mkOption {
|
||||||
|
description = ''
|
||||||
|
Kafka Listener List.
|
||||||
|
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
|
||||||
|
'';
|
||||||
|
type = lib.types.listOf lib.types.str;
|
||||||
|
default = [ "PLAINTEXT://localhost:29092" ];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
clusterId = lib.mkOption {
|
||||||
|
description = ''
|
||||||
|
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
|
||||||
|
'';
|
||||||
|
type = with lib.types; nullOr str;
|
||||||
|
default = null;
|
||||||
|
};
|
||||||
|
|
||||||
|
configFiles.serverProperties = lib.mkOption {
|
||||||
|
description = ''
|
||||||
|
Kafka server.properties configuration file path.
|
||||||
|
Defaults to the rendered `settings`.
|
||||||
|
'';
|
||||||
|
type = lib.types.path;
|
||||||
|
};
|
||||||
|
|
||||||
|
configFiles.log4jProperties = lib.mkOption {
|
||||||
|
description = "Kafka log4j property configuration file path";
|
||||||
|
type = lib.types.path;
|
||||||
|
default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
|
||||||
|
defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
|
||||||
|
};
|
||||||
|
|
||||||
|
formatLogDirs = lib.mkOption {
|
||||||
|
description = ''
|
||||||
|
Whether to format log dirs in KRaft mode if all log dirs are
|
||||||
|
unformatted, ie. they contain no meta.properties.
|
||||||
|
'';
|
||||||
|
type = lib.types.bool;
|
||||||
|
default = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
formatLogDirsIgnoreFormatted = lib.mkOption {
|
||||||
|
description = ''
|
||||||
|
Whether to ignore already formatted log dirs when formatting log dirs,
|
||||||
|
instead of failing. Useful when replacing or adding disks.
|
||||||
|
'';
|
||||||
|
type = lib.types.bool;
|
||||||
|
default = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
log4jProperties = lib.mkOption {
|
||||||
|
description = "Kafka log4j property configuration.";
|
||||||
|
default = ''
|
||||||
|
log4j.rootLogger=INFO, stdout
|
||||||
|
|
||||||
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
|
||||||
|
'';
|
||||||
|
type = lib.types.lines;
|
||||||
|
};
|
||||||
|
|
||||||
|
jvmOptions = lib.mkOption {
|
||||||
|
description = "Extra command line options for the JVM running Kafka.";
|
||||||
|
default = [ ];
|
||||||
|
type = lib.types.listOf lib.types.str;
|
||||||
|
example = [
|
||||||
|
"-Djava.net.preferIPv4Stack=true"
|
||||||
|
"-Dcom.sun.management.jmxremote"
|
||||||
|
"-Dcom.sun.management.jmxremote.local.only=true"
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
package = lib.mkPackageOption pkgs "apacheKafka" { };
|
||||||
|
|
||||||
|
jre = lib.mkOption {
|
||||||
|
description = "The JRE with which to run Kafka";
|
||||||
|
default = cfg.package.passthru.jre;
|
||||||
|
defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre";
|
||||||
|
type = lib.types.package;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
config = lib.mkIf cfg.enable {
|
||||||
|
services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
|
||||||
|
|
||||||
|
scripts.kaf.exec = ''
|
||||||
|
exec ${pkgs.kaf}/bin/kaf --config ${kafConfig} "$@"
|
||||||
|
'';
|
||||||
|
|
||||||
|
processes.apache-kafka = {
|
||||||
|
exec = "${startScript}";
|
||||||
|
|
||||||
|
# process-compose = {
|
||||||
|
# readiness_probe = {
|
||||||
|
# exec.command = "${pkgs.curl}/bin/curl -f -k http://localhost:29092"; # ${cfg.listenAddress}:${toString cfg.port}";
|
||||||
|
# initial_delay_seconds = 2;
|
||||||
|
# period_seconds = 10;
|
||||||
|
# timeout_seconds = 2;
|
||||||
|
# success_threshold = 1;
|
||||||
|
# failure_threshold = 5;
|
||||||
|
# };
|
||||||
|
|
||||||
|
# # https://github.com/F1bonacc1/process-compose#-auto-restart-if-not-healthy
|
||||||
|
# availability.restart = "on_failure";
|
||||||
|
# };
|
||||||
|
};
|
||||||
|
};
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue