MapReduceIndexerTool

MapReduceIndexerTool is a MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files and writes the indexes into HDFS in a flexible, scalable, and fault-tolerant manner.

For more information on Morphlines, see:

MapReduceIndexerTool also supports merging the output shards into a set of live customer-facing Solr servers, typically a SolrCloud.

The indexer creates an offline index on HDFS in the output directory specified by the --output-dir parameter. If the --go-live parameter is specified, Solr merges the resulting offline index into the live running service. Thus, the Solr service must have read access to the contents of the output directory to complete the --go-live step. In an environment with restrictive permissions, such as one with an HDFS umask of 077, the Solr user may not be able to read the contents of the newly created directory. To address this issue, the indexer automatically applies the HDFS ACLs to enable Solr to read the output directory contents. These ACLs are only applied if HDFS ACLs are enabled on the HDFS NameNode. For more information, see HDFS Extended ACLs.

The indexer only makes ACL updates to the output directory and its contents. If the output directory's parent directories do not include the run permission, the Solr service is not be able to access the output directory. Solr must have run permissions from standard permissions or ACLs on the parent directories of the output directory.

MapReduceIndexerTool Input Splits

Different from some other indexing tools, the MapReduceIndexerTool does not operate on HDFS blocks as input splits. This means that when indexing a smaller number of large files, fewer hosts may be involved. For example, indexing two files that are each one GB results in two hosts acting as mappers. If these files were stored on a system with a 128 MB block size, other mappers might divide the work on the two files among 16 mappers, corresponding to the 16 HDFS blocks that store the two files.

This intentional design choice aligns with MapReduceIndexerTool supporting indexing non-splittable file formats such as JSON, XML, jpg, or log4j.

In theory, this could result in inefficient use of resources when a single host indexes a large file while many other hosts sit idle. In reality, this indexing strategy typically results in satisfactory performance in production environments because in most cases the number of files is large enough that work is spread throughout the cluster.

While dividing tasks by input splits does not present problems in most cases, users may still want to divide indexing tasks along HDFS splits. In that case, use the CrunchIndexerTool, which can work with Hadoop input splits using the input-file-format option.

MapReduceIndexerTool Metadata

The MapReduceIndexerTool generates metadata fields for each input file when indexing. These fields can be used in morphline commands. These fields can also be stored in Solr, by adding definitions like the following to your Solr schema.xml file. After the MapReduce indexing process completes, the fields are searchable through Solr.
<!-- file metadata -->
<field name="file_download_url" type="string" indexed="false" stored="true" />
<field name="file_upload_url" type="string" indexed="false" stored="true" />
<field name="file_scheme" type="string" indexed="true" stored="true" />
<field name="file_host" type="string" indexed="true" stored="true" />
<field name="file_port" type="int" indexed="true" stored="true" />
<field name="file_path" type="string" indexed="true" stored="true" />
<field name="file_name" type="string" indexed="true" stored="true" />
<field name="file_length" type="tlong" indexed="true" stored="true" />
<field name="file_last_modified" type="tlong" indexed="true" stored="true" />
<field name="file_owner" type="string" indexed="true" stored="true" />
<field name="file_group" type="string" indexed="true" stored="true" />
<field name="file_permissions_user" type="string" indexed="true" stored="true" />
<field name="file_permissions_group" type="string" indexed="true" stored="true" />
<field name="file_permissions_other" type="string" indexed="true" stored="true" />
<field name="file_permissions_stickybit" type="boolean" indexed="true" stored="true" />

Example output:

"file_upload_url":"foo/test-documents/sample-statuses-20120906-141433.avro",
"file_download_url":"hdfs://host1.mycompany.com:8020/user/foo/ test-documents/sample-statuses-20120906-141433.avro",
"file_scheme":"hdfs",
"file_host":"host1.mycompany.com",
"file_port":8020,
"file_name":"sample-statuses-20120906-141433.avro",
"file_path":"/user/foo/test-documents/sample-statuses-20120906-141433.avro",
"file_last_modified":1357193447106,
"file_length":1512,
"file_owner":"foo",
"file_group":"foo",
"file_permissions_user":"rw-",
"file_permissions_group":"r--",
"file_permissions_other":"r--",
"file_permissions_stickybit":false,

Invoking Command-Line Help

  • To invoke the command-line help in a default parcels installation, use:
    $ hadoop jar /opt/cloudera/parcels/CDH-*/jars/search-mr-*-job.jar \
    org.apache.solr.hadoop.MapReduceIndexerTool --help
  • To invoke the command-line help in a default packages installation, use:
    $ hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar \
    org.apache.solr.hadoop.MapReduceIndexerTool --help
usage: hadoop [GenericOptions]... jar search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool
       [--help] --output-dir HDFS_URI [--input-list URI]
       --morphline-file FILE [--morphline-id STRING] [--solr-home-dir DIR]
       [--update-conflict-resolver FQCN] [--mappers INTEGER]
       [--reducers INTEGER] [--max-segments INTEGER]
       [--fair-scheduler-pool STRING] [--dry-run] [--log4j FILE]
       [--verbose] [--show-non-solr-cloud] [--zk-host STRING] [--go-live]
       [--collection STRING] [--go-live-threads INTEGER]
       [HDFS_URI [HDFS_URI ...]]

MapReduce batch job driver that  takes  a  morphline  and creates a set of
Solr index shards from a set  of  input  files and writes the indexes into
HDFS, in a flexible, scalable and  fault-tolerant manner. It also supports
merging the  output  shards  into  a  set  of  live  customer  facing Solr
servers,  typically  a  SolrCloud.   The   program   proceeds  in  several
consecutive MapReduce based phases, as follows:

1) Randomization phase:  This  (parallel)  phase  randomizes  the  list of
input files in  order  to  spread  indexing  load  more  evenly  among the
mappers of the subsequent phase.

2) Mapper phase: This  (parallel)  phase  takes  the input files, extracts
the relevant content, transforms it and  hands SolrInputDocuments to a set
of reducers. The  ETL  functionality  is  flexible  and customizable using
chains  of  arbitrary  morphline  commands  that  pipe  records  from  one
transformation command to another. Commands  to  parse and transform a set
of standard data formats such as  Avro,  CSV,  Text, HTML, XML, PDF, Word,
Excel, etc. are provided out  of  the  box, and additional custom commands
and parsers for additional file or data  formats can be added as morphline
plugins. This  is  done  by  implementing  a  simple  Java  interface that
consumes a record (e.g. a file  in  the  form  of an InputStream plus some
headers plus contextual metadata)  and  generates  as  output zero or more
records. Any kind of data  format  can  be  indexed and any Solr documents
for any kind of Solr schema  can  be  generated,  and any custom ETL logic
can be registered and executed.
Record fields, including MIME  types,  can  also  explicitly  be passed by
force  from  the  CLI  to  the  morphline,  for  example:  hadoop  ...  -D
morphlineField._attachment_mimetype=text/csv

3)   Reducer   phase:   This   (parallel)   phase   loads   the   mapper's
SolrInputDocuments into  one  EmbeddedSolrServer  per  reducer.  Each such
reducer and Solr server can be seen  as  a (micro) shard. The Solr servers
store their data in HDFS.

4) Mapper-only merge  phase:  This  (parallel)  phase  merges  the  set of
reducer shards into the number of solr  shards expected by the user, using
a mapper-only job. This  phase  is  omitted  if  the  number  of shards is
already equal to the number of shards expected by the user.

5) Go-live phase: This optional (parallel)  phase merges the output shards
of the previous phase into  a  set  of  live customer facing Solr servers,
typically a SolrCloud. If this phase  is  omitted you can explicitly point
each Solr server to one of the HDFS output shard directories.

Fault Tolerance: Mapper and reducer  task  attempts are retried on failure
per the standard MapReduce semantics. On  program  startup all data in the
--output-dir is deleted if that  output  directory  already exists. If the
whole job fails you can retry simply  by rerunning the program again using
the same arguments.

positional arguments:
  HDFS_URI               HDFS URI of  file  or  directory  tree  to index.
                         (default: [])

optional arguments:
  --help, -help, -h      Show this help message and exit
  --input-list URI       Local URI or HDFS  URI  of  a  UTF-8 encoded file
                         containing a list of HDFS  URIs to index, one URI
                         per line in the file.  If  '-' is specified, URIs
                         are read from  the  standard  input.  Multiple --
                         input-list arguments can be specified.
  --morphline-id STRING  The identifier of  the  morphline  that  shall be
                         executed  within   the   morphline   config  file
                         specified  by   --morphline-file.   If   the   --
                         morphline-id option is  ommitted  the first (i.e.
                         top-most) morphline  within  the  config  file is
                         used. Example: morphline1
  --solr-home-dir DIR    Optional relative or  absolute  path  to  a local
                         dir containing Solr conf/  dir  and in particular
                         conf/solrconfig.xml  and  optionally   also  lib/
                         dir. This directory will  be  uploaded to each MR
                         task. Example: src/test/resources/solr/minimr
  --update-conflict-resolver FQCN
                         Fully qualified class name  of  a Java class that
                         implements the  UpdateConflictResolver interface.
                         This enables  deduplication  and  ordering  of  a
                         series of document  updates  for  the same unique
                         document key. For example,  a MapReduce batch job
                         might index multiple files in  the same job where
                         some of the files  contain  old  and new versions
                         of the very same document,  using the same unique
                         document key.
                         Typically,  implementations  of   this  interface
                         forbid collisions by  throwing  an  exception, or
                         ignore all but the  most recent document version,
                         or, in the general  case, order colliding updates
                         ascending  from  least  recent   to  most  recent
                         (partial) update. The  caller  of  this interface
                         (i.e. the Hadoop  Reducer)  will  then  apply the
                         updates to Solr  in  the  order  returned  by the
                         orderUpdates() method.
                         The                                       default
                         RetainMostRecentUpdateConflictResolver
                         implementation ignores all  but  the  most recent
                         document  version,   based   on   a  configurable
                         numeric  Solr  field,   which   defaults  to  the
                         file_last_modified   timestamp   (default:   org.
                         apache.solr.hadoop.dedup.
                         RetainMostRecentUpdateConflictResolver)
  --mappers INTEGER      Tuning knob that indicates  the maximum number of
                         MR mapper tasks to use.  -1 indicates use all map
                         slots available on the cluster. (default: -1)
  --reducers INTEGER     Tuning  knob  that   indicates   the   number  of
                         reducers to  index  into.  -1  indicates  use all
                         reduce  slots  available   on   the   cluster.  0
                         indicates  use  one  reducer  per  output  shard,
                         which disables the mtree  merge MR algorithm. The
                         mtree merge MR algorithm  improves scalability by
                         spreading load (in particular  CPU  load) among a
                         number of  parallel  reducers  that  can  be much
                         larger than the  number  of  solr shards expected
                         by the user. It can  be  seen  as an extension of
                         concurrent  lucene  merges   and   tiered  lucene
                         merges to  the  clustered  case.  The  subsequent
                         mapper-only  phase  merges  the  output  of  said
                         large number of reducers to  the number of shards
                         expected by the  user,  again  by  utilizing more
                         available parallelism on  the  cluster. (default:
                         -1)
  --max-segments INTEGER
                         Tuning knob that indicates  the maximum number of
                         segments to be contained  on  output in the index
                         of each reducer shard. After  a reducer has built
                         its output index  it  applies  a  merge policy to
                         merge segments  until  there  are  <= maxSegments
                         lucene  segments  left  in  this  index.  Merging
                         segments involves reading and  rewriting all data
                         in all these segment  files, potentially multiple
                         times, which  is  very  I/O  intensive  and  time
                         consuming. However, an index  with fewer segments
                         can later be merged faster,  and  it can later be
                         queried faster  once  deployed  to  a  live  Solr
                         serving shard. Set maxSegments  to  1 to optimize
                         the index for low  query  latency. In a nutshell,
                         a  small   maxSegments   value   trades  indexing
                         latency for subsequently  improved query latency.
                         This can  be  a  reasonable  trade-off  for batch
                         indexing systems. (default: 1)
  --fair-scheduler-pool STRING
                         Optional tuning knob that  indicates  the name of
                         the fair scheduler pool  to  submit  jobs to. The
                         Fair   Scheduler   is   a   pluggable   MapReduce
                         scheduler that  provides  a  way  to  share large
                         clusters.  Fair  scheduling   is   a   method  of
                         assigning resources to  jobs  such  that all jobs
                         get, on  average,  an  equal  share  of resources
                         over time. When there  is  a  single job running,
                         that job  uses  the  entire  cluster.  When other
                         jobs are submitted, tasks slots  that free up are
                         assigned to the new jobs,  so  that each job gets
                         roughly the same amount  of  CPU time. Unlike the
                         default Hadoop scheduler, which  forms a queue of
                         jobs, this lets short  jobs  finish in reasonable
                         time while not starving long  jobs. It is also an
                         easy way to share  a  cluster between multiple of
                         users.  Fair  sharing  can  also  work  with  job
                         priorities - the priorities  are  used as weights
                         to determine the fraction  of  total compute time
                         that each job gets.
  --dry-run              Run in local mode  and  print documents to stdout
                         instead of loading them  into Solr. This executes
                         the morphline  in  the  client  process  (without
                         submitting a job  to  MR)  for quicker turnaround
                         during early trial  &  debug  sessions. (default:
                         false)
  --log4j FILE           Relative or absolute  path  to a log4j.properties
                         config file on the  local  file system. This file
                         will  be  uploaded  to  each  MR  task.  Example:
                         /path/to/log4j.properties
  --verbose, -v          Turn on verbose output. (default: false)
  --show-non-solr-cloud  Also show options for  Non-SolrCloud mode as part
                         of --help. (default: false)

Required arguments:
  --output-dir HDFS_URI  HDFS directory to write  Solr  indexes to. Inside
                         there one  output  directory  per  shard  will be
                         generated.    Example:    hdfs://c2202.mycompany.
                         com/user/$USER/test
  --morphline-file FILE  Relative or absolute path to  a local config file
                         that contains one  or  more  morphlines. The file
                         must     be      UTF-8      encoded.     Example:
                         /path/to/morphline.conf

Cluster arguments:
  Arguments that provide information about your Solr cluster.

  --zk-host STRING       The address of  a  ZooKeeper  ensemble being used
                         by a SolrCloud  cluster.  This ZooKeeper ensemble
                         will be  examined  to  determine  the  number  of
                         output shards to create as  well as the Solr URLs
                         to merge the output shards into when using the --
                         go-live option. Requires that  you  also pass the
                         --collection to merge the shards into.

                         The  --zk-host   option   implements   the   same
                         partitioning semantics as  the standard SolrCloud
                         Near-Real-Time (NRT)  API.  This  enables  to mix
                         batch  updates  from   MapReduce  ingestion  with
                         updates from standard Solr  NRT  ingestion on the
                         same SolrCloud  cluster,  using  identical unique
                         document keys.

                         Format is: a  list  of  comma separated host:port
                         pairs,  each  corresponding   to   a  zk  server.
                         Example:               '127.0.0.1:2181,127.0.0.1:
                         2182,127.0.0.1:2183'  If   the   optional  chroot
                         suffix is  used  the  example  would  look  like:
                         '127.0.0.1:2181/solr,127.0.0.1:2182/solr,
                         127.0.0.1:2183/solr' where  the  client  would be
                         rooted  at  '/solr'  and   all   paths  would  be
                         relative     to     this      root     -     i.e.
                         getting/setting/etc...  '/foo/bar'  would  result
                         in operations being run  on '/solr/foo/bar' (from
                         the server perspective).

                         If --solr-home-dir  is  not  specified,  the Solr
                         home  directory  for   the   collection  will  be
                         downloaded from this ZooKeeper ensemble.

Go live arguments:
  Arguments for merging  the  shards  that  are  built  into  a  live Solr
  cluster. Also see the Cluster arguments.

  --go-live              Allows you to  optionally  merge  the final index
                         shards into a live  Solr  cluster  after they are
                         built. You can pass the ZooKeeper address with --
                         zk-host  and  the  relevant  cluster  information
                         will be auto detected.  (default: false)
  --collection STRING    The SolrCloud  collection  to  merge  shards into
                         when  using  --go-live  and  --zk-host.  Example:
                         collection1
  --go-live-threads INTEGER
                         Tuning knob that indicates  the maximum number of
                         live merges  to  run  in  parallel  at  one time.
                         (default: 1000)

Generic options supported are
  --conf <configuration file>
                         specify an application configuration file
  -D <property=value>    use value for given property
  --fs <local|namenode:port>
                         specify a namenode
  --jt <local|jobtracker:port>
                         specify a job tracker
  --files <comma separated list of files>
                         specify comma separated  files  to  be  copied to
                         the map reduce cluster
  --libjars <comma separated list of jars>
                         specify comma separated jar  files  to include in
                         the classpath.
  --archives <comma separated list of archives>
                         specify   comma   separated    archives   to   be
                         unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

Examples:

# (Re)index an Avro based Twitter tweet file:
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 1 \
  hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro

# (Re)index all files that match all of the following conditions:
# 1) File is contained in dir tree hdfs:///user/$USER/solrloadtest/twitter/tweets
# 2) file name matches the glob pattern 'sample-statuses*.gz'
# 3) file was last modified less than 100000 minutes ago
# 4) file size is between 1 MB and 1 GB
# Also include extra library jar file containing JSON tweet Java parser:
hadoop jar target/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool \
  -find hdfs:///user/$USER/solrloadtest/twitter/tweets \
  -type f \
  -name 'sample-statuses*.gz' \
  -mmin -1000000 \
  -size -100000000c \
  -size +1000000c \
| sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  --libjars /path/to/kite-morphlines-twitter-0.10.0.jar \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadJsonTestTweets.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 100 \
  --input-list -

# Go live by merging resulting index shards into a live Solr cluster
# (explicitly specify Solr URLs - for a SolrCloud cluster see next example):
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shard-url http://solr001.mycompany.com:8983/solr/collection1 \
  --shard-url http://solr002.mycompany.com:8983/solr/collection1 \
  --go-live \
  hdfs:///user/foo/indir

# Go live by merging resulting index shards into a live SolrCloud cluster
# (discover shards and Solr URLs through ZooKeeper):
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --zk-host zk01.mycompany.com:2181/solr \
  --collection collection1 \
  --go-live \
  hdfs:///user/foo/indir

# MapReduce on Yarn - Pass custom JVM arguments (including a custom tmp directory)
HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/'; \
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapreduce.map.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
  -D 'mapreduce.reduce.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 1 \
  hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro

# MapReduce on MR1 - Pass custom JVM arguments (including a custom tmp directory)
HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/'; \
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 1 \
  hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro