Near Real Time (NRT) Indexing Tweets Using Flume
The following section describes how to use Flume for near real time (NRT) indexing using tweets from the Twitter public stream as an example. Near real time indexing is generally used when new data needs to be returned in query results in time frames measured in seconds. Before continuing, make sure that you have completed the procedures in Preparing to Index Sample Tweets with Cloudera Search.
Install Flume
If you have not already done so, install Flume. For Cloudera Manager installations, Flume is included in CDH, and no additional action is required for installation. Add the Flume service to the cluster following the instructions in Adding a Service.
For instructions on installing Flume in an unmanaged environment, see Setting Up Apache Flume Using the Command Line.
Sentry Configuration for NRT Indexing Using Flume
If your cluster has security enabled and is using Apache Sentry for authorization, make sure that the Flume system user (flume by default) has permission to update the collection (cloudera_tutorial_tweets in this example):
- Switch to the Sentry admin user (solr in this example) using kinit:
kinit solr@EXAMPLE.COM
- Create a Sentry role:
solrctl sentry --create-role cloudera_tutorial_flume
- Map the flume group to this role:
solrctl sentry --add-role-group cloudera_tutorial_flume flume
- Grant Update privileges to the cloudera_tutorial_flume role for the cloudera_tutorial_tweets collections:
solrctl sentry --grant-privilege cloudera_tutorial_flume 'collection=cloudera_tutorial_tweets->action=update'
For more information on the Sentry privilege model for Cloudera Search, see Authorization Privilege Model for Solr.
Copy Configuration Template Files
Copy the configuration files as follows:
- Cloudera Manager: For Cloudera Manager environments, the Flume agent is configured in a later section. Skip to Configuring the Flume Solr Sink.
- Unmanaged CDH:
sudo cp -r $HOME/cloudera_tutorial_tweets_config /etc/flume-ng/conf/cloudera_tutorial_tweets sudo cp /usr/share/doc/search*/examples/solr-nrt/twitter-flume.conf /etc/flume-ng/conf/flume.conf sudo cp /usr/share/doc/search*/examples/solr-nrt/test-morphlines/tutorialReadAvroContainer.conf /etc/flume-ng/conf/morphline.conf
Configuring the Flume Solr Sink
- For Cloudera Manager deployments, use Cloudera Manager to edit the configuration files similar to the process described in Configuring the Flume Agents.
- For unmanaged CDH installations, use command-line tools (such as vi) to edit files.
- Modify the Flume configuration for a single agent to specify the Flume source details and configure the flow. You must set the relative or absolute path to the morphline configuration
file.
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Set Agent Name to twitter_stream and modify Configuration File exactly as follows. You will replace the YOUR_TWITTER_* values in a later step:
twitter_stream.sources = twitterSrc twitter_stream.channels = memoryChannel twitter_stream.sinks = solrSink twitter_stream.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource twitter_stream.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY twitter_stream.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET twitter_stream.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN twitter_stream.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET twitter_stream.sources.twitterSrc.maxBatchDurationMillis = 200 twitter_stream.sources.twitterSrc.channels = memoryChannel twitter_stream.channels.memoryChannel.type = memory twitter_stream.channels.memoryChannel.capacity = 10000 twitter_stream.channels.memoryChannel.transactionCapacity = 1000 twitter_stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink twitter_stream.sinks.solrSink.channel = memoryChannel twitter_stream.sinks.solrSink.morphlineFile = morphlines.conf
Click Save Changes.
. When prompted to make configuration changes on the service configuration page, click - Unmanaged CDH: If you copied the configuration templates as described in Copy Configuration Template Files, no further action is required in this step.
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Set Agent Name to twitter_stream and modify Configuration File exactly as follows. You will replace the YOUR_TWITTER_* values in a later step:
- Edit the Morphline configuration to specify Solr environment details.
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Make sure that you selected the same agent that you selected in step 1. Edit the SOLR_LOCATOR directive in the Morphlines File as follows. Edit the SOLR_LOCATOR entry only. Leave the rest of the
configuration unedited.
SOLR_LOCATOR : { # Name of solr collection collection : cloudera_tutorial_tweets # ZooKeeper ensemble zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr" }
Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.
Click Save Changes.
. When prompted to make configuration
changes on the service configuration page, click - Unmanaged CDH: Edit the SOLR_LOCATOR section in /etc/flume-ng/conf/morphline.conf as
follows:
SOLR_LOCATOR : { # Name of solr collection collection : cloudera_tutorial_tweets # ZooKeeper ensemble zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr" }
Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Make sure that you selected the same agent that you selected in step 1. Edit the SOLR_LOCATOR directive in the Morphlines File as follows. Edit the SOLR_LOCATOR entry only. Leave the rest of the
configuration unedited.
- (Unmanaged CDH only) Copy flume-env.sh.template to flume-env.sh:
sudo cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-ng/conf/flume-env.sh
- Update the Java heap size.
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Make sure that you selected the same agent that you selected in step 1. Select , and then set Java Heap Size of Agent in Bytes to 500 and select MiB in the unit drop-down menu. Click Save Changes. . When prompted to make configuration changes on the service configuration page, click
- Unmanaged CDH: Edit /etc/flume-ng/conf/flume-env.sh, inserting or replacing JAVA_OPTS as
follows:
JAVA_OPTS="-Xmx500m"
- (Optional) Modify Flume logging settings to facilitate monitoring and debugging:
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Make sure that you selected the same agent that you selected in step 1. Select , and then modify Agent Logging Advanced Configuration
Snippet (Safety Valve) to include:
log4j.logger.org.apache.flume.sink.solr=DEBUG log4j.logger.org.kitesdk.morphline=TRACE
. When prompted to make configuration
changes on the service configuration page, click - Unmanaged CDH: Run the following commands:
sudo bash -c 'echo "log4j.logger.org.apache.flume.sink.solr=DEBUG" >> /etc/flume-ng/conf/log4j.properties' sudo bash -c 'echo "log4j.logger.org.kitesdk.morphline=TRACE" >> /etc/flume-ng/conf/log4j.properties'
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Make sure that you selected the same agent that you selected in step 1. Select , and then modify Agent Logging Advanced Configuration
Snippet (Safety Valve) to include:
- (Optional) In an unmanaged environment you can use SEARCH_HOME to configure where
Flume finds Cloudera Search dependencies for the Flume Solr Sink. To set SEARCH_HOME use a command similar to the following:
export SEARCH_HOME=/usr/lib/search
Alternatively, you can add the same setting to /etc/flume-ng/conf/flume-env.sh.
In a Cloudera Manager managed environment, Cloudera Manager automatically updates the SOLR_HOME location with any required dependencies.
Configuring Flume Solr Sink to Access the Twitter Public Stream
Use the Twitter developer site to generate credentials to access the Twitter public stream:
- Sign in to https://apps.twitter.com with a Twitter account.
- On the Application Management page, click Create New App.
- Fill in the form to represent the Search installation. This can represent multiple clusters, and does not require the callback URL. Because this is not a publicly distributed application, the values you enter for the required name, description, and website fields are not important.
- Read and accept the Developer Agreement, then click Create your Twitter application at the bottom of the page.
- Click on the Keys and Access Tokens tab, then click Create my access token button at the bottom.
The Flume TwitterSource uses the Twitter 1.1 API, which requires authentication of both the consumer (application) and the user (you). Consider this information confidential, just like your regular Twitter credentials. Edit the Flume configuration and replace the following properties with the credentials you generated:
agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
To edit the Flume configuration:
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Make sure that you selected the same agent that you configured earlier. Modify the Configuration File parameter. . When prompted to make configuration changes on the service configuration page, click
- Unmanaged CDH: Edit /etc/flume-ng/conf/flume.conf.
For authentication to succeed, you must make sure the system clock is set correctly on the Flume agent host that connects to Twitter. You can install NTP and keep the host synchronized by running the ntpd service, or manually synchronize by using the command sudo ntpdate pool.ntp.org. To confirm that the time is set correctly, make sure that the output of the command date --utc matches the time shown at http://www.timeanddate.com/worldclock/timezone/utc. You can also set the time manually using the date command.
Starting the Flume Agent
- If you are using Kerberos, kinit as the user that has privileges to update the collection:
kinit jdoe@EXAMPLE.COM
Replace EXAMPLE.COM with your Kerberos realm name.
- Delete all existing documents in the cloudera_tutorial_tweets collection. If your cluster does not have security enabled, run the following command as
the solr user by adding sudo -u solr before the command:
solrctl collection --deletedocs cloudera_tutorial_tweets
- Start or restart the Flume agent configured in Configuring the Flume
Solr Sink:
- Cloudera Manager: . Make sure that you selected the same agent that you configured earlier.
- Unmanaged CDH:
sudo /etc/init.d/flume-ng-agent restart
- Monitor progress in the Flume log file and watch for errors:
tail -f /var/log/flume-ng/flume*.log
- Security Enabled: https://search01.example.com:8985/solr/cloudera_tutorial_tweets/select?q=*%3A*&sort=created_at+desc&wt=json&indent=true
- Security Disabled: http://search01.example.com:8983/solr/cloudera_tutorial_tweets/select?q=*%3A*&sort=created_at+desc&wt=json&indent=true
To print diagnostic information, such as the content of records as they pass through the morphline commands, enable TRACE log level diagnostics by adding the following to your log4j.properties file:
log4j.logger.org.kitesdk.morphline=TRACE
In Cloudera Manager, you can use the safety valve to add the setting.
Go to
. After setting this value, restart the service.Indexing a File Containing Tweets with Flume HTTPSource
HTTPSource lets you ingest data into Solr by POSTing a file over HTTP. HTTPSource sends data using a channel to a sink, in this case a SolrSink. For more information, see Flume Solr BlobHandler Configuration Options.
- Stop the Flume agent that you configured in Configuring the Flume Solr Sink:
- Cloudera Manager: . Make sure that you selected the same agent that you configured earlier.
- Unmanaged CDH:
sudo /etc/init.d/flume-ng-agent stop
- If you are using Kerberos, kinit as the user that has privileges to update the collection:
kinit jdoe@EXAMPLE.COM
Replace EXAMPLE.COM with your Kerberos realm name.
- Delete all existing documents in the cloudera_tutorial_tweets collection. If your cluster does not have security enabled, run the following commands as
the solr user by adding sudo -u solr before the command:
solrctl collection --deletedocs cloudera_tutorial_tweets
- Modify the Flume configuration:
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Replace the Configuration File configuration with the following:
twitter_stream.sources = httpSrc twitter_stream.channels = memoryChannel twitter_stream.sinks = solrSink twitter_stream.sources.httpSrc.type = org.apache.flume.source.http.HTTPSource twitter_stream.sources.httpSrc.port = 5140 twitter_stream.sources.httpSrc.handler = org.apache.flume.sink.solr.morphline.BlobHandler twitter_stream.sources.httpSrc.handler.maxBlobLength = 2000000000 twitter_stream.sources.httpSrc.channels = memoryChannel twitter_stream.channels.memoryChannel.type = memory twitter_stream.channels.memoryChannel.capacity = 10000 twitter_stream.channels.memoryChannel.transactionCapacity = 1000 twitter_stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink twitter_stream.sinks.solrSink.channel = memoryChannel twitter_stream.sinks.solrSink.morphlineFile = morphlines.conf
Click Save Changes.
. When prompted to make configuration
changes on the service configuration page, click - Unmanaged CDH: If you copied the configuration templates as described in Copy Configuration Template Files, edit /etc/flume-ng/conf/flume.conf and comment out all sources except
the HTTP source as follows:
#agent.sources = twitterSrc agent.sources = httpSrc #agent.sources = spoolSrc #agent.sources = avroSrc
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Replace the Configuration File configuration with the following:
- Start the Flume agent:
- Cloudera Manager: . Make sure that you selected the same agent that you configured earlier.
- Unmanaged CDH:
sudo /etc/init.d/flume-ng-agent start
- Send a file containing tweets to the HTTP source. Run the following commands on a cluster host, replacing flume01.example.com with the hostname of the
Flume agent you configured as the HTTP source:
- Cloudera Manager:
cd /opt/cloudera/parcels/CDH/share/doc/search-*/examples/test-documents curl --data-binary @sample-statuses-20120906-141433-medium.avro 'http://flume01.example.com:5140?resourceName=sample-statuses-20120906-141433-medium.avro' --header 'Content-Type:application/octet-stream' --verbose
- Unmanaged CDH:
cd /usr/share/doc/search-*/examples/test-documents curl --data-binary @sample-statuses-20120906-141433-medium.avro 'http://flume01.example.com:5140?resourceName=sample-statuses-20120906-141433-medium.avro' --header 'Content-Type:application/octet-stream' --verbose
- Cloudera Manager:
- Check the log for status or errors:
tail /var/log/flume-ng/flume*.log
- Run a Solr query. For example, for a Solr server running on search01.example.com, go to one of the following URLs in a browser, depending on whether you
have enabled TLS on your cluster:
- Security Enabled: https://search01.example.com:8985/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
- Security Disabled: http://search01.example.com:8983/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
Indexing a File Containing Tweets with Flume SpoolDirectorySource
SpoolDirectorySource specifies a directory on a local disk that Flume monitors. Flume automatically transfers data from files in this directory to Solr. SpoolDirectorySource sends data over a channel to a sink, in this case a SolrSink.
- Stop the Flume agent configured in Configuring the Flume Solr Sink:
- Cloudera Manager: . Make sure that you selected the same agent that you configured earlier.
- Unmanaged CDH:
sudo /etc/init.d/flume-ng-agent stop
- If you are using Kerberos, kinit as the user that has privileges to update the collection:
kinit jdoe@EXAMPLE.COM
Replace EXAMPLE.COM with your Kerberos realm name.
- Delete all existing documents in the cloudera_tutorial_tweets collection. If your cluster does not have security enabled, run the following commands as
the solr user by adding sudo -u solr before the command:
solrctl collection --deletedocs cloudera_tutorial_tweets
- Modify the Flume configuration:
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Replace the Configuration File configuration with the following:
twitter_stream.sources = spoolSrc twitter_stream.channels = memoryChannel twitter_stream.sinks = solrSink twitter_stream.sources.spoolSrc.type = spooldir twitter_stream.sources.spoolSrc.spoolDir = /tmp/myspooldir twitter_stream.sources.spoolSrc.ignorePattern = \. twitter_stream.sources.spoolSrc.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder twitter_stream.sources.spoolSrc.deserializer.maxBlobLength = 2000000000 twitter_stream.sources.spoolSrc.batchSize = 1 twitter_stream.sources.spoolSrc.fileHeader = true twitter_stream.sources.spoolSrc.fileHeaderKey = resourceName twitter_stream.sources.spoolSrc.channels = memoryChannel twitter_stream.channels.memoryChannel.type = memory twitter_stream.channels.memoryChannel.capacity = 10000 twitter_stream.channels.memoryChannel.transactionCapacity = 1000 twitter_stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink twitter_stream.sinks.solrSink.channel = memoryChannel twitter_stream.sinks.solrSink.morphlineFile = morphlines.conf
Click Save Changes.
. When prompted to make configuration
changes on the service configuration page, click - Unmanaged CDH: If you copied the configuration templates as described in Copy Configuration Template Files, edit /etc/flume-ng/conf/flume.conf and comment out all sources except
the spool source as follows:
#agent.sources = twitterSrc #agent.sources = httpSrc agent.sources = spoolSrc #agent.sources = avroSrc
- Cloudera Manager: In the Cloudera Manager Admin Console, go to Cancel. Replace the Configuration File configuration with the following:
- Delete the spool directory if it exists, and then create a new spool directory. Run the following commands on the host running the Flume agent that you configured:
rm -rf /tmp/myspooldir sudo -u flume mkdir /tmp/myspooldir
- Start the Flume agent:
- Cloudera Manager: . Make sure that you selected the same agent that you configured earlier.
- Unmanaged CDH:
sudo /etc/init.d/flume-ng-agent start
- Copy a file containing tweets to the /tmp/myspooldir directory. To ensure that no partial files are ingested, copy and then atomically move files. Run
the following commands on the host running the Flume agent that you configured:
- Cloudera Manager:
sudo -u flume cp /opt/cloudera/parcels/CDH/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
- Unmanaged CDH:
sudo -u flume cp /usr/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
- Cloudera Manager:
- Check the log for status or errors:
tail /var/log/flume-ng/flume*.log
- Check the completion status:
find /tmp/myspooldir
- Run a Solr query. For example, for a Solr server running on search01.example.com, go to one of the following URLs in a browser, depending on whether you
have enabled security on your cluster:
- Security Enabled: https://search01.example.com:8985/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
- Security Disabled: http://search01.example.com:8983/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true