MapReduceIndexerTool
MapReduceIndexerTool is a MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files and writes the indexes into HDFS in a flexible, scalable, and fault-tolerant manner.
For more information on Morphlines, see:
- Extracting, Transforming, and Loading Data With Cloudera Morphlines for an introduction to Morphlines.
- Example Morphline Usage for morphline examples, discussion of those examples, and links to additional information.
The indexer creates an offline index on HDFS in the output directory specified by the --output-dir parameter. If the --go-live parameter is specified, Solr merges the resulting offline index into the live running service. Thus, the Solr service must have read access to the contents of the output directory to complete the --go-live step. In an environment with restrictive permissions, such as one with an HDFS umask of 077, the Solr user may not be able to read the contents of the newly created directory. To address this issue, the indexer automatically applies the HDFS ACLs to enable Solr to read the output directory contents. These ACLs are only applied if HDFS ACLs are enabled on the HDFS NameNode. For more information, see HDFS Extended ACLs.
The indexer only makes ACL updates to the output directory and its contents. If the output directory's parent directories do not include the run permission, the Solr service is not be able to access the output directory. Solr must have run permissions from standard permissions or ACLs on the parent directories of the output directory.
MapReduceIndexerTool Input Splits
Different from some other indexing tools, the MapReduceIndexerTool does not operate on HDFS blocks as input splits. This means that when indexing a smaller number of large files, fewer hosts may be involved. For example, indexing two files that are each one GB results in two hosts acting as mappers. If these files were stored on a system with a 128 MB block size, other mappers might divide the work on the two files among 16 mappers, corresponding to the 16 HDFS blocks that store the two files.
This intentional design choice aligns with MapReduceIndexerTool supporting indexing non-splittable file formats such as JSON, XML, jpg, or log4j.
In theory, this could result in inefficient use of resources when a single host indexes a large file while many other hosts sit idle. In reality, this indexing strategy typically results in satisfactory performance in production environments because in most cases the number of files is large enough that work is spread throughout the cluster.
While dividing tasks by input splits does not present problems in most cases, users may still want to divide indexing tasks along HDFS splits. In that case, use the CrunchIndexerTool, which can work with Hadoop input splits using the input-file-format option.
MapReduceIndexerTool Metadata
<!-- file metadata --> <field name="file_download_url" type="string" indexed="false" stored="true" /> <field name="file_upload_url" type="string" indexed="false" stored="true" /> <field name="file_scheme" type="string" indexed="true" stored="true" /> <field name="file_host" type="string" indexed="true" stored="true" /> <field name="file_port" type="int" indexed="true" stored="true" /> <field name="file_path" type="string" indexed="true" stored="true" /> <field name="file_name" type="string" indexed="true" stored="true" /> <field name="file_length" type="tlong" indexed="true" stored="true" /> <field name="file_last_modified" type="tlong" indexed="true" stored="true" /> <field name="file_owner" type="string" indexed="true" stored="true" /> <field name="file_group" type="string" indexed="true" stored="true" /> <field name="file_permissions_user" type="string" indexed="true" stored="true" /> <field name="file_permissions_group" type="string" indexed="true" stored="true" /> <field name="file_permissions_other" type="string" indexed="true" stored="true" /> <field name="file_permissions_stickybit" type="boolean" indexed="true" stored="true" />
Example output:
"file_upload_url":"foo/test-documents/sample-statuses-20120906-141433.avro", "file_download_url":"hdfs://host1.mycompany.com:8020/user/foo/ test-documents/sample-statuses-20120906-141433.avro", "file_scheme":"hdfs", "file_host":"host1.mycompany.com", "file_port":8020, "file_name":"sample-statuses-20120906-141433.avro", "file_path":"/user/foo/test-documents/sample-statuses-20120906-141433.avro", "file_last_modified":1357193447106, "file_length":1512, "file_owner":"foo", "file_group":"foo", "file_permissions_user":"rw-", "file_permissions_group":"r--", "file_permissions_other":"r--", "file_permissions_stickybit":false,
Invoking Command-Line Help
- To invoke the command-line help in a default parcels installation, use:
$ hadoop jar /opt/cloudera/parcels/CDH-*/jars/search-mr-*-job.jar \ org.apache.solr.hadoop.MapReduceIndexerTool --help
- To invoke the command-line help in a default packages installation, use:
$ hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar \ org.apache.solr.hadoop.MapReduceIndexerTool --help
usage: hadoop [GenericOptions]... jar search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool [--help] --output-dir HDFS_URI [--input-list URI] --morphline-file FILE [--morphline-id STRING] [--solr-home-dir DIR] [--update-conflict-resolver FQCN] [--mappers INTEGER] [--reducers INTEGER] [--max-segments INTEGER] [--fair-scheduler-pool STRING] [--dry-run] [--log4j FILE] [--verbose] [--show-non-solr-cloud] [--zk-host STRING] [--go-live] [--collection STRING] [--go-live-threads INTEGER] [HDFS_URI [HDFS_URI ...]] MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files and writes the indexes into HDFS, in a flexible, scalable and fault-tolerant manner. It also supports merging the output shards into a set of live customer facing Solr servers, typically a SolrCloud. The program proceeds in several consecutive MapReduce based phases, as follows: 1) Randomization phase: This (parallel) phase randomizes the list of input files in order to spread indexing load more evenly among the mappers of the subsequent phase. 2) Mapper phase: This (parallel) phase takes the input files, extracts the relevant content, transforms it and hands SolrInputDocuments to a set of reducers. The ETL functionality is flexible and customizable using chains of arbitrary morphline commands that pipe records from one transformation command to another. Commands to parse and transform a set of standard data formats such as Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional file or data formats can be added as morphline plugins. This is done by implementing a simple Java interface that consumes a record (e.g. a file in the form of an InputStream plus some headers plus contextual metadata) and generates as output zero or more records. Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, and any custom ETL logic can be registered and executed. Record fields, including MIME types, can also explicitly be passed by force from the CLI to the morphline, for example: hadoop ... -D morphlineField._attachment_mimetype=text/csv 3) Reducer phase: This (parallel) phase loads the mapper's SolrInputDocuments into one EmbeddedSolrServer per reducer. Each such reducer and Solr server can be seen as a (micro) shard. The Solr servers store their data in HDFS. 4) Mapper-only merge phase: This (parallel) phase merges the set of reducer shards into the number of solr shards expected by the user, using a mapper-only job. This phase is omitted if the number of shards is already equal to the number of shards expected by the user. 5) Go-live phase: This optional (parallel) phase merges the output shards of the previous phase into a set of live customer facing Solr servers, typically a SolrCloud. If this phase is omitted you can explicitly point each Solr server to one of the HDFS output shard directories. Fault Tolerance: Mapper and reducer task attempts are retried on failure per the standard MapReduce semantics. On program startup all data in the --output-dir is deleted if that output directory already exists. If the whole job fails you can retry simply by rerunning the program again using the same arguments. positional arguments: HDFS_URI HDFS URI of file or directory tree to index. (default: []) optional arguments: --help, -help, -h Show this help message and exit --input-list URI Local URI or HDFS URI of a UTF-8 encoded file containing a list of HDFS URIs to index, one URI per line in the file. If '-' is specified, URIs are read from the standard input. Multiple -- input-list arguments can be specified. --morphline-id STRING The identifier of the morphline that shall be executed within the morphline config file specified by --morphline-file. If the -- morphline-id option is ommitted the first (i.e. top-most) morphline within the config file is used. Example: morphline1 --solr-home-dir DIR Optional relative or absolute path to a local dir containing Solr conf/ dir and in particular conf/solrconfig.xml and optionally also lib/ dir. This directory will be uploaded to each MR task. Example: src/test/resources/solr/minimr --update-conflict-resolver FQCN Fully qualified class name of a Java class that implements the UpdateConflictResolver interface. This enables deduplication and ordering of a series of document updates for the same unique document key. For example, a MapReduce batch job might index multiple files in the same job where some of the files contain old and new versions of the very same document, using the same unique document key. Typically, implementations of this interface forbid collisions by throwing an exception, or ignore all but the most recent document version, or, in the general case, order colliding updates ascending from least recent to most recent (partial) update. The caller of this interface (i.e. the Hadoop Reducer) will then apply the updates to Solr in the order returned by the orderUpdates() method. The default RetainMostRecentUpdateConflictResolver implementation ignores all but the most recent document version, based on a configurable numeric Solr field, which defaults to the file_last_modified timestamp (default: org. apache.solr.hadoop.dedup. RetainMostRecentUpdateConflictResolver) --mappers INTEGER Tuning knob that indicates the maximum number of MR mapper tasks to use. -1 indicates use all map slots available on the cluster. (default: -1) --reducers INTEGER Tuning knob that indicates the number of reducers to index into. -1 indicates use all reduce slots available on the cluster. 0 indicates use one reducer per output shard, which disables the mtree merge MR algorithm. The mtree merge MR algorithm improves scalability by spreading load (in particular CPU load) among a number of parallel reducers that can be much larger than the number of solr shards expected by the user. It can be seen as an extension of concurrent lucene merges and tiered lucene merges to the clustered case. The subsequent mapper-only phase merges the output of said large number of reducers to the number of shards expected by the user, again by utilizing more available parallelism on the cluster. (default: -1) --max-segments INTEGER Tuning knob that indicates the maximum number of segments to be contained on output in the index of each reducer shard. After a reducer has built its output index it applies a merge policy to merge segments until there are <= maxSegments lucene segments left in this index. Merging segments involves reading and rewriting all data in all these segment files, potentially multiple times, which is very I/O intensive and time consuming. However, an index with fewer segments can later be merged faster, and it can later be queried faster once deployed to a live Solr serving shard. Set maxSegments to 1 to optimize the index for low query latency. In a nutshell, a small maxSegments value trades indexing latency for subsequently improved query latency. This can be a reasonable trade-off for batch indexing systems. (default: 1) --fair-scheduler-pool STRING Optional tuning knob that indicates the name of the fair scheduler pool to submit jobs to. The Fair Scheduler is a pluggable MapReduce scheduler that provides a way to share large clusters. Fair scheduling is a method of assigning resources to jobs such that all jobs get, on average, an equal share of resources over time. When there is a single job running, that job uses the entire cluster. When other jobs are submitted, tasks slots that free up are assigned to the new jobs, so that each job gets roughly the same amount of CPU time. Unlike the default Hadoop scheduler, which forms a queue of jobs, this lets short jobs finish in reasonable time while not starving long jobs. It is also an easy way to share a cluster between multiple of users. Fair sharing can also work with job priorities - the priorities are used as weights to determine the fraction of total compute time that each job gets. --dry-run Run in local mode and print documents to stdout instead of loading them into Solr. This executes the morphline in the client process (without submitting a job to MR) for quicker turnaround during early trial & debug sessions. (default: false) --log4j FILE Relative or absolute path to a log4j.properties config file on the local file system. This file will be uploaded to each MR task. Example: /path/to/log4j.properties --verbose, -v Turn on verbose output. (default: false) --show-non-solr-cloud Also show options for Non-SolrCloud mode as part of --help. (default: false) Required arguments: --output-dir HDFS_URI HDFS directory to write Solr indexes to. Inside there one output directory per shard will be generated. Example: hdfs://c2202.mycompany. com/user/$USER/test --morphline-file FILE Relative or absolute path to a local config file that contains one or more morphlines. The file must be UTF-8 encoded. Example: /path/to/morphline.conf Cluster arguments: Arguments that provide information about your Solr cluster. --zk-host STRING The address of a ZooKeeper ensemble being used by a SolrCloud cluster. This ZooKeeper ensemble will be examined to determine the number of output shards to create as well as the Solr URLs to merge the output shards into when using the -- go-live option. Requires that you also pass the --collection to merge the shards into. The --zk-host option implements the same partitioning semantics as the standard SolrCloud Near-Real-Time (NRT) API. This enables to mix batch updates from MapReduce ingestion with updates from standard Solr NRT ingestion on the same SolrCloud cluster, using identical unique document keys. Format is: a list of comma separated host:port pairs, each corresponding to a zk server. Example: '127.0.0.1:2181,127.0.0.1: 2182,127.0.0.1:2183' If the optional chroot suffix is used the example would look like: '127.0.0.1:2181/solr,127.0.0.1:2182/solr, 127.0.0.1:2183/solr' where the client would be rooted at '/solr' and all paths would be relative to this root - i.e. getting/setting/etc... '/foo/bar' would result in operations being run on '/solr/foo/bar' (from the server perspective). If --solr-home-dir is not specified, the Solr home directory for the collection will be downloaded from this ZooKeeper ensemble. Go live arguments: Arguments for merging the shards that are built into a live Solr cluster. Also see the Cluster arguments. --go-live Allows you to optionally merge the final index shards into a live Solr cluster after they are built. You can pass the ZooKeeper address with -- zk-host and the relevant cluster information will be auto detected. (default: false) --collection STRING The SolrCloud collection to merge shards into when using --go-live and --zk-host. Example: collection1 --go-live-threads INTEGER Tuning knob that indicates the maximum number of live merges to run in parallel at one time. (default: 1000) Generic options supported are --conf <configuration file> specify an application configuration file -D <property=value> use value for given property --fs <local|namenode:port> specify a namenode --jt <local|jobtracker:port> specify a job tracker --files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster --libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. --archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. The general command line syntax is bin/hadoop command [genericOptions] [commandOptions] Examples: # (Re)index an Avro based Twitter tweet file: sudo -u hdfs hadoop \ --config /etc/hadoop/conf.cloudera.mapreduce1 \ jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ -D 'mapred.child.java.opts=-Xmx500m' \ --log4j src/test/resources/log4j.properties \ --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \ --solr-home-dir src/test/resources/solr/minimr \ --output-dir hdfs://c2202.mycompany.com/user/$USER/test \ --shards 1 \ hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro # (Re)index all files that match all of the following conditions: # 1) File is contained in dir tree hdfs:///user/$USER/solrloadtest/twitter/tweets # 2) file name matches the glob pattern 'sample-statuses*.gz' # 3) file was last modified less than 100000 minutes ago # 4) file size is between 1 MB and 1 GB # Also include extra library jar file containing JSON tweet Java parser: hadoop jar target/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool \ -find hdfs:///user/$USER/solrloadtest/twitter/tweets \ -type f \ -name 'sample-statuses*.gz' \ -mmin -1000000 \ -size -100000000c \ -size +1000000c \ | sudo -u hdfs hadoop \ --config /etc/hadoop/conf.cloudera.mapreduce1 \ jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ --libjars /path/to/kite-morphlines-twitter-0.10.0.jar \ -D 'mapred.child.java.opts=-Xmx500m' \ --log4j src/test/resources/log4j.properties \ --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadJsonTestTweets.conf \ --solr-home-dir src/test/resources/solr/minimr \ --output-dir hdfs://c2202.mycompany.com/user/$USER/test \ --shards 100 \ --input-list - # Go live by merging resulting index shards into a live Solr cluster # (explicitly specify Solr URLs - for a SolrCloud cluster see next example): sudo -u hdfs hadoop \ --config /etc/hadoop/conf.cloudera.mapreduce1 \ jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ -D 'mapred.child.java.opts=-Xmx500m' \ --log4j src/test/resources/log4j.properties \ --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \ --solr-home-dir src/test/resources/solr/minimr \ --output-dir hdfs://c2202.mycompany.com/user/$USER/test \ --shard-url http://solr001.mycompany.com:8983/solr/collection1 \ --shard-url http://solr002.mycompany.com:8983/solr/collection1 \ --go-live \ hdfs:///user/foo/indir # Go live by merging resulting index shards into a live SolrCloud cluster # (discover shards and Solr URLs through ZooKeeper): sudo -u hdfs hadoop \ --config /etc/hadoop/conf.cloudera.mapreduce1 \ jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ -D 'mapred.child.java.opts=-Xmx500m' \ --log4j src/test/resources/log4j.properties \ --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \ --output-dir hdfs://c2202.mycompany.com/user/$USER/test \ --zk-host zk01.mycompany.com:2181/solr \ --collection collection1 \ --go-live \ hdfs:///user/foo/indir # MapReduce on Yarn - Pass custom JVM arguments (including a custom tmp directory) HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/'; \ sudo -u hdfs hadoop \ --config /etc/hadoop/conf.cloudera.mapreduce1 \ jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ -D 'mapreduce.map.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \ -D 'mapreduce.reduce.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \ --log4j src/test/resources/log4j.properties \ --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \ --solr-home-dir src/test/resources/solr/minimr \ --output-dir hdfs://c2202.mycompany.com/user/$USER/test \ --shards 1 \ hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro # MapReduce on MR1 - Pass custom JVM arguments (including a custom tmp directory) HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/'; \ sudo -u hdfs hadoop \ --config /etc/hadoop/conf.cloudera.mapreduce1 \ jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ -D 'mapred.child.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \ --log4j src/test/resources/log4j.properties \ --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \ --solr-home-dir src/test/resources/solr/minimr \ --output-dir hdfs://c2202.mycompany.com/user/$USER/test \ --shards 1 \ hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro