flex/nix/kafka.nix

208 lines
6.4 KiB
Nix

{
config,
lib,
pkgs,
...
}:
let
cfg = config.services.kafka;
# The `javaProperties` generator takes care of various escaping rules and
# generation of the properties file, but we'll handle stringly conversion
# ourselves in mkPropertySettings and stringlySettings, since we know more
# about the specifically allowed format eg. for lists of this type, and we
# don't want to coerce-downsample values to str too early by having the
# coercedTypes from javaProperties directly in our NixOS option types.
#
# Make sure every `freeformType` and any specific option type in `settings` is
# supported here.
mkPropertyString =
let
render = {
bool = lib.boolToString;
int = toString;
list = lib.concatMapStringsSep "," mkPropertyString;
string = lib.id;
};
in
v: render.${builtins.typeOf v} v;
stringlySettings = lib.mapAttrs (_: mkPropertyString) (
lib.filterAttrs (_: v: v != null) cfg.settings
);
generator = (pkgs.formats.javaProperties { }).generate;
startScript = pkgs.writeShellScript "kafka-startup" ''
${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
${cfg.jre}/bin/java \
-cp "${cfg.package}/libs/*" \
-Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
${toString cfg.jvmOptions} \
kafka.Kafka \
${cfg.configFiles.serverProperties}
'';
kafConfig = pkgs.writeText "kaf-local-kafka.yaml" ''
clusters:
- name: local
brokers:
- localhost:29092
SASL: null
TLS: null
security-protocol: ""
version: "1.0.0"
'';
in
{
options.services.kafka = {
enable = lib.mkEnableOption "Apache Kafka event streaming broker";
settings = lib.mkOption {
description = ''
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
{file}`server.properties`.
Note that .properties files contain mappings from string to string.
Keys with dots are NOT represented by nested attrs in these settings,
but instead as quoted strings (ie. `settings."broker.id"`, NOT
`settings.broker.id`).
'';
type = lib.types.submodule {
freeformType =
with lib.types;
let
primitive = oneOf [
bool
int
str
];
in
lazyAttrsOf (nullOr (either primitive (listOf primitive)));
options = {
"broker.id" = lib.mkOption {
description = "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
default = null;
type = with lib.types; nullOr int;
};
"log.dirs" = lib.mkOption {
description = "Log file directories.";
default = [ "${config.env.DEVENV_STATE}/apache-kafka" ];
type = with lib.types; listOf path;
};
"listeners" = lib.mkOption {
description = ''
Kafka Listener List.
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
'';
type = lib.types.listOf lib.types.str;
default = [ "PLAINTEXT://localhost:9092" ];
};
};
};
};
clusterId = lib.mkOption {
description = ''
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
'';
type = with lib.types; nullOr str;
default = null;
};
configFiles.serverProperties = lib.mkOption {
description = ''
Kafka server.properties configuration file path.
Defaults to the rendered `settings`.
'';
type = lib.types.path;
};
configFiles.log4jProperties = lib.mkOption {
description = "Kafka log4j property configuration file path";
type = lib.types.path;
default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
};
formatLogDirs = lib.mkOption {
description = ''
Whether to format log dirs in KRaft mode if all log dirs are
unformatted, ie. they contain no meta.properties.
'';
type = lib.types.bool;
default = false;
};
formatLogDirsIgnoreFormatted = lib.mkOption {
description = ''
Whether to ignore already formatted log dirs when formatting log dirs,
instead of failing. Useful when replacing or adding disks.
'';
type = lib.types.bool;
default = false;
};
log4jProperties = lib.mkOption {
description = "Kafka log4j property configuration.";
default = ''
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
'';
type = lib.types.lines;
};
jvmOptions = lib.mkOption {
description = "Extra command line options for the JVM running Kafka.";
default = [ ];
type = lib.types.listOf lib.types.str;
example = [
"-Djava.net.preferIPv4Stack=true"
"-Dcom.sun.management.jmxremote"
"-Dcom.sun.management.jmxremote.local.only=true"
];
};
package = lib.mkPackageOption pkgs "apacheKafka" { };
jre = lib.mkOption {
description = "The JRE with which to run Kafka";
default = cfg.package.passthru.jre;
defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre";
type = lib.types.package;
};
};
config = lib.mkIf cfg.enable {
services.kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
scripts.kaf.exec = ''
exec ${pkgs.kaf}/bin/kaf --config ${kafConfig} "$@"
'';
processes.kafka = {
exec = "${startScript}";
# process-compose = {
# readiness_probe = {
# exec.command = "${pkgs.curl}/bin/curl -f -k http://localhost:29092"; # ${cfg.listenAddress}:${toString cfg.port}";
# initial_delay_seconds = 2;
# period_seconds = 10;
# timeout_seconds = 2;
# success_threshold = 1;
# failure_threshold = 5;
# };
# # https://github.com/F1bonacc1/process-compose#-auto-restart-if-not-healthy
# availability.restart = "on_failure";
# };
};
};
}