Configuring Flume Security with Kafka
Cloudera Manager does not provide configuration options for Flume to work with Kafka sources and channels over TLS. When Kafka is configured with TLS, additional manual configuration is required to communicate with Flume.
This topic describes how to configure Flume to communicate with Kafka TLS.
Set Up Cloudera Manager to Generate flume.keytab
If you have already set up Kerberos because of a kerberized HDFS or Solr dependency on Flume, then this step is already done. Verify your Kerberos keytab files.
- Begins with the agent's name.
- Contains the $KERBEROS_PRINCIPAL string.
- Is syntactically correct.
tier1.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL
The property name generateKeyTabFor is an arbitrary name that is not used by Flume. There is no dependency on HDFS or any other service.
Verify the flume.keytab
After you configure and restart the agent, the key tab file is generated at the following directory location:
/var/run/cloudera-scm-agent/process/<latest_id>-flume-AGENT/flume.keytab
The file must not be empty on any host that runs a kerberized Flume agent.
Principal managaement is handled by Cloudera Manager for Flume, just as with other services. For example, principals are listed on the
page in Cloudera Manager.Create jaas.conf
Create a flafka_jaas.conf file on each host that runs a Flume agent. The configuration information is used to communicate with Kafka and also provide normal Flume Kerberos support. The flafka_jaas.conf file contains two entries for the Flume principal: Client and KafkaClient. Note that the principal property is host specific. Unix user flume must have read permission for this file.
/opt/cloudera/security/flafka_jaas.conf: Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="flume.keytab" principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="flume.keytab" principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM"; };
Set Up jaas for Flume in Cloudera Manager
In Flume service configuration, the Java Configuration Options for FlumeAgent requires the following:
-Djava.security.auth.login.config=/opt/cloudera/security/flafka_jaas.conf
Do not use Flume Service Environment Advanced Configuration Snippet (Safety Valve) to set this property using FLUME_AGENT_JAVA_OPTS, as it will override existing Java command line options.
Update Flume Service Configuration Kerberos Authentication and TLS Encryption
Add the relevant properties to the Flume source or Flume channel, depending on the broker.protocol type (in this case, SASL_SSL).
The default secure port of Kafka brokers is 9093 instead of 9092. Update the kafka.bootstrap.servers as well.
Kafka Source
Update the Flume Kafka source entries to include the following security configuration.
tier1.sources.source1.kafka.consumer.security.protocol = SASL_SSL tier1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka tier1.sources.source1.kafka.consumer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks tier1.sources.source1.kafka.consumer.ssl.truststore.password=cloudera tier1.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL
Kafka Channel
Update the Flume Kafka channel entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster. Both producer and consumer configurations are required.
tier1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL tier1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka tier1.channels.channel1.kafka.consumer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks tier1.channels.channel1.kafka.consumer.ssl.truststore.password=cloudera tier1.channels.channel1.kafka.producer.security.protocol = SASL_SSL tier1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka tier1.channels.channel1.kafka.producer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks tier1.channels.channel1.kafka.producer.ssl.truststore.password=cloudera tier1.channels.channel1.generateKeytabFor = $KERBEROS_PRINCIPAL
Kafka Sink
Update the Flume Kafka sink entries to include the following security configuration.
tier1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks tier1.sinks.sink1.kafka.producer.ssl.truststore.password=cloudera tier1.sinks.sink1.generateKeytabFor = $KERBEROS_PRINCIPAL
Update Flume Service Configuration: Kerberos Authentication with No Encryption
Add the relevant properties to the Flume source or Flume channel, depending on the broker.protocol type (in this case, SASL_PLAINTEXT).
The default secure port of Kafka brokers is 9093 rather than 9092: update kafka.bootstrap.servers as well.
Kafka Source
Update the Flume Kafka source entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster.
tier1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT tier1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka tier1.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL
Kafka Channel
Update the Flume Kafka channel entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster. Both producer and consumer configurations are required.
tier1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT tier1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka tier1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT tier1.channels.channel1.kafka.producer.ssl.truststore.password=cloudera tier1.channels.channel1.generateKeytabFor = $KERBEROS_PRINCIPAL
Kafka Sink
Update the Flume Kafka sink entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster.
tier1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka tier1.sinks.sink1.generateKeytabFor = $KERBEROS_PRINCIPAL
Example
The code sample below is a complete working example Flume configuration with two tiers. Tier1 reads an input log and puts the new Events to the sectest topic using a Kafka Sink (the tailed file has to exist before agent starts). Tier2 listens to the sectest topic by a Kafka Source and logs every event.
######################################################## # Tier1 ######################################################## tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 1000 tier1.channels.channel1.transactionCapacity = 100 tier1.sinks.sink1.channel = channel1 tier1.sources.source1.channels = channel1 tier1.sources.source1.type = exec tier1.sources.source1.command = tail -F /tmp/input/input.log tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.kafka.topic = sectest tier1.sinks.sink1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093 ### # Security related setup ### tier1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/etc/cdep-ssl-conf/CA_STANDARD/truststore.jks tier1.sinks.sink1.kafka.producer.ssl.truststore.password=cloudera tier1.sinks.sink1.generateKeytabFor = $KERBEROS_PRINCIPAL ######################################################## # Tier2 ######################################################## tier2.sources = source1 tier2.channels = channel1 tier2.sinks = sink1 tier2.channels.channel1.type = memory tier2.channels.channel1.capacity = 1000 tier2.channels.channel1.transactionCapacity = 100 tier2.sinks.sink1.channel = channel1 tier2.sources.source1.channels = channel1 tier2.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier2.sources.source1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093 tier2.sources.source1.kafka.topics = sectest tier2.sources.source1.kafka.offsets.storage = kafka tier2.sources.source1.kafka.consumer.group.id = flume tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest tier2.sinks.sink1.type = logger ### # Security related setup ### tier2.sources.source1.kafka.consumer.security.protocol = SASL_SSL tier2.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka tier2.sources.source1.kafka.consumer.ssl.truststore.location=/etc/cdep-ssl-conf/CA_STANDARD/truststore.jks tier2.sources.source1.kafka.consumer.ssl.truststore.password=cloudera tier2.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL