Scalability Considerations for Impala
This section explains how the size of your cluster and the volume of data influences SQL performance and schema design for Impala tables. Typically, adding more cluster capacity reduces problems due to memory limits or disk throughput. On the other hand, larger clusters are more likely to have other kinds of scalability issues, such as a single slow node that causes performance problems for queries.
Continue reading:
- Impact of Many Tables or Partitions on Impala Catalog Performance and Memory Usage
- Scalability Consideration for Large Clusters
- Scalability Considerations for the Impala Statestore
- SQL Operations that Spill to Disk
- Limits on Query Size and Complexity
- Scalability Considerations for Impala I/O
- Scalability Considerations for Table Layout
- Kerberos-Related Network Overhead for Large Clusters
- Kerberos-Related Memory Overhead for Large Clusters
- Avoiding CPU Hotspots for HDFS Cached Data
A good source of tips related to scalability and performance tuning is the Impala Cookbook presentation. These slides are updated periodically as new features come out and new benchmarks are performed.
Impact of Many Tables or Partitions on Impala Catalog Performance and Memory Usage
Because Hadoop I/O is optimized for reading and writing large files, Impala is optimized for tables containing relatively few, large data files. Schemas containing thousands of tables, or tables containing thousands of partitions, can encounter performance issues during startup or during DDL operations such as ALTER TABLE statements.
-
Check current memory usage for the catalogd daemon by running the following commands on the host where that daemon runs on your cluster:
jcmd catalogd_pid VM.flags jmap -heap catalogd_pid
-
Decide on a large enough value for the catalogd heap. You express it as an environment variable value as follows:
JAVA_TOOL_OPTIONS="-Xmx8g"
-
On systems managed by Cloudera Manager, include this value in the configuration field Java Heap Size of Catalog Server in Bytes (Cloudera Manager 5.7 and higher), or Impala Catalog Server Environment Advanced Configuration Snippet (Safety Valve) (prior to Cloudera Manager 5.7). Then restart the Impala service.
-
On systems not managed by Cloudera Manager, put this environment variable setting into the startup script for the catalogd daemon, then restart the catalogd daemon.
-
Use the same jcmd and jmap commands as earlier to verify that the new settings are in effect.
Scalability Consideration for Large Clusters
When processing queries, Impala daemons (Impalads) frequently exchange large volumes of data with other Impala daemons in the cluster, for example during a partitioned hash join. This communication between the Impala daemons happens through remote procedure calls (RPCs). In CDH 5.14 / CDH 6.0 and lower, intercommunication was done exclusively using the Apache Thrift library. With Thrift RPC, the number of network connections per host sharply increases as the number of nodes goes up:
number of connections per host = (number of nodes) x (average number of query fragments per host)
For example, in a 100-node cluster with 32 concurrent queries, each of which had 50 fragments, there would be 100 x 32 x 50 = 160,000 connections and threads per host. Across the entire cluster, there would be 16 million connections. As more nodes are added to a CDH cluster due to increase in data and workloads, the excessive number of RPC threads and network connections can lead to instability and poor performance in Impala in CDH 5.14 / CDH 6.0 and lower.
In CDH 5.15.0 / CDH 6.1 and higher, Impala can use an alternate RPC option via KRPC that provides improvements in throughput and reliability while reducing the resource usage significantly. Using KRPC, Impala can reliably handle concurrent complex queries on large data sets. See this blog for detail information on KRPC for communications among Impala daemons.
- Use dedicated coordinators.
- Isolate some workloads.
- Increase the status reporting interval by setting the start up flag ‑‑status_report_interval to a larger value. By default, it's set to 5 seconds.
- Use Workload Experience Manager analysis to identify any other improvements.
Scalability Considerations for the Impala Statestore
Before CDH 5.5 / Impala 2.3, the statestore sent only one kind of message to its subscribers. This message contained all updates for any topics that a subscriber had subscribed to. It also served to let subscribers know that the statestore had not failed, and conversely the statestore used the success of sending a heartbeat to a subscriber to decide whether or not the subscriber had failed.
Combining topic updates and failure detection in a single message led to bottlenecks in clusters with large numbers of tables, partitions, and HDFS data blocks. When the statestore was overloaded with metadata updates to transmit, heartbeat messages were sent less frequently, sometimes causing subscribers to time out their connection with the statestore. Increasing the subscriber timeout and decreasing the frequency of statestore heartbeats worked around the problem, but reduced responsiveness when the statestore failed or restarted.
As of CDH 5.5 / Impala 2.3, the statestore now sends topic updates and heartbeats in separate messages. This allows the statestore to send and receive a steady stream of lightweight heartbeats, and removes the requirement to send topic updates according to a fixed schedule, reducing statestore network overhead.
The statestore now has the following relevant configuration flags for the statestored daemon:
- -statestore_num_update_threads
- The number of threads inside the statestore dedicated to sending topic updates. You should not typically need to change this value.
Default: 10
- -statestore_update_frequency_ms
- The frequency, in milliseconds, with which the statestore tries to send topic updates to each subscriber. This is a best-effort value; if the statestore is unable to meet this
frequency, it sends topic updates as fast as it can. You should not typically need to change this value.
Default: 2000
- -statestore_num_heartbeat_threads
- The number of threads inside the statestore dedicated to sending heartbeats. You should not typically need to change this value.
Default: 10
- -statestore_heartbeat_frequency_ms
- The frequency, in milliseconds, with which the statestore tries to send heartbeats to each subscriber. This value should be good for large catalogs and clusters up to approximately 150
nodes. Beyond that, you might need to increase this value to make the interval longer between heartbeat messages.
Default: 1000 (one heartbeat message every second)
As of CDH 5.5 / Impala 2.3 , not all of these flags are present in the Cloudera Manager user interface. Some must be set using the Advanced Configuration Snippet fields for the statestore component.
If it takes a very long time for a cluster to start up, and impala-shell consistently displays This Impala daemon is not ready to accept user requests, the statestore might be taking too long to send the entire catalog topic to the cluster. In this case, consider adding --load_catalog_in_background=false to your catalog service configuration. This setting stops the statestore from loading the entire catalog into memory at cluster startup. Instead, metadata for each table is loaded when the table is accessed for the first time.
SQL Operations that Spill to Disk
Certain memory-intensive operations write temporary data to disk (known as spilling to disk) when Impala is close to exceeding its memory limit on a particular host.
The result is a query that completes successfully, rather than failing with an out-of-memory error. The tradeoff is decreased performance due to the extra disk I/O to write the temporary data and read it back in. The slowdown could be potentially be significant. Thus, while this feature improves reliability, you should optimize your queries, system parameters, and hardware configuration to make this spilling a rare occurrence.
What kinds of queries might spill to disk:
Several SQL clauses and constructs require memory allocations that could activat the spilling mechanism:
-
when a query uses a GROUP BY clause for columns with millions or billions of distinct values, Impala keeps a similar number of temporary results in memory, to accumulate the aggregate results for each value in the group.
-
When large tables are joined together, Impala keeps the values of the join columns from one table in memory, to compare them to incoming values from the other table.
-
When a large result set is sorted by the ORDER BY clause, each node sorts its portion of the result set in memory.
-
The DISTINCT and UNION operators build in-memory data structures to represent all values found so far, to eliminate duplicates as the query progresses.
When the spill-to-disk feature is activated for a join node within a query, Impala does not produce any runtime filters for that join operation on that host. Other join nodes within the query are not affected.
How Impala handles scratch disk space for spilling:
By default, intermediate files used during large sort, join, aggregation, or analytic function operations are stored in the directory /tmp/impala-scratch . These files are removed when the operation finishes. (Multiple concurrent queries can perform operations that use the "spill to disk" technique, without any name conflicts for these temporary files.) You can specify a different location by starting the impalad daemon with the --scratch_dirs="path_to_directory" configuration option or the equivalent configuration option in the Cloudera Manager user interface. You can specify a single directory, or a comma-separated list of directories. The scratch directories must be on the local filesystem, not in HDFS. You might specify different directory paths for different hosts, depending on the capacity and speed of the available storage devices. In CDH 5.5 / Impala 2.3 or higher, Impala successfully starts (with a warning written to the log) if it cannot create or read and write files in one of the scratch directories. If there is less than 1 GB free on the filesystem where that directory resides, Impala still runs, but writes a warning message to its log. If Impala encounters an error reading or writing files in a scratch directory during a query, Impala logs the error and the query fails.
Memory usage for SQL operators:
The infrastructure of the spilling feature affects the way the affected SQL operators, such as GROUP BY, DISTINCT, and joins, use memory. On each host that participates in the query, each such operator in a query accumulates memory while building the data structure to process the aggregation or join operation. The amount of memory used depends on the portion of the data being handled by that host, and thus might be different from one host to another. When the amount of memory being used for the operator on a particular host reaches a threshold amount, Impala reserves an additional memory buffer to use as a work area in case that operator causes the query to exceed the memory limit for that host. After allocating the memory buffer, the memory used by that operator remains essentially stable or grows only slowly, until the point where the memory limit is reached and the query begins writing temporary data to disk.
Prior to CDH 5.4 / Impala 2.2 , the extra memory buffer for an operator that might spill to disk was allocated when the data structure used by the applicable SQL operator reaches 16 MB in size, and the memory buffer itself was 512 MB. In CDH 5.4 / Impala 2.2 , these values are halved: the threshold value is 8 MB and the memory buffer is 256 MB. In CDH 5.5 / Impala 2.3 and higher, the memory for the buffer is allocated in pieces, only as needed, to avoid sudden large jumps in memory usage. A query that uses multiple such operators might allocate multiple such memory buffers, as the size of the data structure for each operator crosses the threshold on a particular host.
Therefore, a query that processes a relatively small amount of data on each host would likely never reach the threshold for any operator, and would never allocate any extra memory buffers. A query that did process millions of groups, distinct values, join keys, and so on might cross the threshold, causing its memory requirement to rise suddenly and then flatten out. The larger the cluster, less data is processed on any particular host, thus reducing the chance of requiring the extra memory allocation.
Added in: This feature was added to the ORDER BY clause in CDH 5.1 / Impala 1.4 . This feature was extended to cover join queries, aggregation functions, and analytic functions in CDH 5.2 / Impala 2.0 . The size of the memory work area required by each operator that spills was reduced from 512 megabytes to 256 megabytes in CDH 5.4 / Impala 2.2 .
Avoiding queries that spill to disk:
Because the extra I/O can impose significant performance overhead on these types of queries, try to avoid this situation by using the following steps:
- Detect how often queries spill to disk, and how much temporary data is written. Refer to the following sources:
- The output of the PROFILE command in the impala-shell interpreter. This data shows the memory usage for each host and in total across the cluster. The BlockMgr.BytesWritten counter reports how much data was written to disk during the query.
- The Impala Queries dialog in Cloudera Manager. You can see the peak memory usage for a query, combined across all nodes in the cluster.
- The Queries tab in the Impala debug web user interface. Select the query to examine and click the corresponding Profile link. This data breaks down the memory usage for a single host within the cluster, the host whose web interface you are connected to.
- Use one or more techniques to reduce the possibility of the queries spilling to disk:
- Increase the Impala memory limit if practical, for example, if you can increase the available memory by more than the amount of temporary data written to disk on a particular node. Remember that in Impala 2.0 and later, you can issue SET MEM_LIMIT as a SQL statement, which lets you fine-tune the memory usage for queries from JDBC and ODBC applications.
- Increase the number of nodes in the cluster, to increase the aggregate memory available to Impala and reduce the amount of memory required on each node.
- Increase the overall memory capacity of each DataNode at the hardware level.
- On a cluster with resources shared between Impala and other Hadoop components, use resource management features to allocate more memory for Impala. See Resource Management for Impala for details.
- If the memory pressure is due to running many concurrent queries rather than a few memory-intensive ones, consider using the Impala admission control feature to lower the limit on the number of concurrent queries. By spacing out the most resource-intensive queries, you can avoid spikes in memory usage and improve overall response times. See Admission Control and Query Queuing for details.
- Tune the queries with the highest memory requirements, using one or more of the following techniques:
- Run the COMPUTE STATS statement for all tables involved in large-scale joins and aggregation queries.
- Minimize your use of STRING columns in join columns. Prefer numeric values instead.
- Examine the EXPLAIN plan to understand the execution strategy being used for the most resource-intensive queries. See Using the EXPLAIN Plan for Performance Tuning for details.
- If Impala still chooses a suboptimal execution strategy even with statistics available, or if it is impractical to keep the statistics up to date for huge or rapidly changing tables, add hints to the most resource-intensive queries to select the right execution strategy. See Query Hints in Impala SELECT Statements for details.
- If your queries experience substantial performance overhead due to spilling, enable the DISABLE_UNSAFE_SPILLS query option. This option prevents queries whose memory usage is likely to be exorbitant from spilling to disk. See DISABLE_UNSAFE_SPILLS Query Option (CDH 5.2 or higher only) for details. As you tune problematic queries using the preceding steps, fewer and fewer will be cancelled by this option setting.
Testing performance implications of spilling to disk:
To artificially provoke spilling, to test this feature and understand the performance implications, use a test environment with a memory limit of at least 2 GB. Issue the SET command with no arguments to check the current setting for the MEM_LIMIT query option. Set the query option DISABLE_UNSAFE_SPILLS=true. This option limits the spill-to-disk feature to prevent runaway disk usage from queries that are known in advance to be suboptimal. Within impala-shell, run a query that you expect to be memory-intensive, based on the criteria explained earlier. A self-join of a large table is a good candidate:
select count(*) from big_table a join big_table b using (column_with_many_values);
Issue the PROFILE command to get a detailed breakdown of the memory usage on each node during the query. The crucial part of the profile output concerning memory is the BlockMgr portion. For example, this profile shows that the query did not quite exceed the memory limit.
BlockMgr: - BlockWritesIssued: 1 - BlockWritesOutstanding: 0 - BlocksCreated: 24 - BlocksRecycled: 1 - BufferedPins: 0 - MaxBlockSize: 8.00 MB (8388608) - MemoryLimit: 200.00 MB (209715200) - PeakMemoryUsage: 192.22 MB (201555968) - TotalBufferWaitTime: 0ns - TotalEncryptionTime: 0ns - TotalIntegrityCheckTime: 0ns - TotalReadBlockTime: 0ns
In this case, because the memory limit was already below any recommended value, I increased the volume of data for the query rather than reducing the memory limit any further.
Set the MEM_LIMIT query option to a value that is smaller than the peak memory usage reported in the profile output. Do not specify a memory limit lower than about 300 MB, because with such a low limit, queries could fail to start for other reasons. Now try the memory-intensive query again.
Check if the query fails with a message like the following:
WARNINGS: Spilling has been disabled for plans that do not have stats and are not hinted to prevent potentially bad plans from using too many cluster resources. Compute stats on these tables, hint the plan or disable this behavior via query options to enable spilling.
If so, the query could have consumed substantial temporary disk space, slowing down so much that it would not complete in any reasonable time. Rather than rely on the spill-to-disk feature in this case, issue the COMPUTE STATS statement for the table or tables in your sample query. Then run the query again, check the peak memory usage again in the PROFILE output, and adjust the memory limit again if necessary to be lower than the peak memory usage.
At this point, you have a query that is memory-intensive, but Impala can optimize it efficiently so that the memory usage is not exorbitant. You have set an artificial constraint through the MEM_LIMIT option so that the query would normally fail with an out-of-memory error. But the automatic spill-to-disk feature means that the query should actually succeed, at the expense of some extra disk I/O to read and write temporary work data.
Try the query again, and confirm that it succeeds. Examine the PROFILE output again. This time, look for lines of this form:
- SpilledPartitions: N
If you see any such lines with N greater than 0, that indicates the query would have failed in Impala releases prior to 2.0, but now it succeeded because of the spill-to-disk feature. Examine the total time taken by the AGGREGATION_NODE or other query fragments containing non-zero SpilledPartitions values. Compare the times to similar fragments that did not spill, for example in the PROFILE output when the same query is run with a higher memory limit. This gives you an idea of the performance penalty of the spill operation for a particular query with a particular memory limit. If you make the memory limit just a little lower than the peak memory usage, the query only needs to write a small amount of temporary data to disk. The lower you set the memory limit, the more temporary data is written and the slower the query becomes.
Now repeat this procedure for actual queries used in your environment. Use the DISABLE_UNSAFE_SPILLS setting to identify cases where queries used more memory than necessary due to lack of statistics on the relevant tables and columns, and issue COMPUTE STATS where necessary.
When to use DISABLE_UNSAFE_SPILLS:
You might wonder, why not leave DISABLE_UNSAFE_SPILLS turned on all the time. Whether and how frequently to use this option depends on your system environment and workload.
DISABLE_UNSAFE_SPILLS is suitable for an environment with ad hoc queries whose performance characteristics and memory usage are not known in advance. It prevents "worst-case scenario" queries that use large amounts of memory unnecessarily. Thus, you might turn this option on within a session while developing new SQL code, even though it is turned off for existing applications.
Organizations where table and column statistics are generally up-to-date might leave this option turned on all the time, again to avoid worst-case scenarios for untested queries or if a problem in the ETL pipeline results in a table with no statistics. Turning on DISABLE_UNSAFE_SPILLS lets you "fail fast" in this case and immediately gather statistics or tune the problematic queries.
Some organizations might leave this option turned off. For example, you might have tables large enough that the COMPUTE STATS takes substantial time to run, making it impractical to re-run after loading new data. If you have examined the EXPLAIN plans of your queries and know that they are operating efficiently, you might leave DISABLE_UNSAFE_SPILLS turned off. In that case, you know that any queries that spill will not go overboard with their memory consumption.
Limits on Query Size and Complexity
There are hardcoded limits on the maximum size and complexity of queries. Currently, the maximum number of expressions in a query is 2000. You might exceed the limits with large or deeply nested queries produced by business intelligence tools or other query generators.
If you have the ability to customize such queries or the query generation logic that produces them, replace sequences of repetitive expressions with single operators such as IN or BETWEEN that can represent multiple values or ranges. For example, instead of a large number of OR clauses:
WHERE val = 1 OR val = 2 OR val = 6 OR val = 100 ...
use a single IN clause:
WHERE val IN (1,2,6,100,...)
Scalability Considerations for Impala I/O
Impala parallelizes its I/O operations aggressively, therefore the more disks you can attach to each host, the better. Impala retrieves data from disk so quickly using bulk read operations on large blocks, that most queries are CPU-bound rather than I/O-bound.
Because the kind of sequential scanning typically done by Impala queries does not benefit much from the random-access capabilities of SSDs, spinning disks typically provide the most cost-effective kind of storage for Impala data, with little or no performance penalty as compared to SSDs.
Resource management features such as YARN, Llama, and admission control typically constrain the amount of memory, CPU, or overall number of queries in a high-concurrency environment. Currently, there is no throttling mechanism for Impala I/O.
Scalability Considerations for Table Layout
Due to the overhead of retrieving and updating table metadata in the metastore database, try to limit the number of columns in a table to a maximum of approximately 2000. Although Impala can handle wider tables than this, the metastore overhead can become significant, leading to query performance that is slower than expected based on the actual data volume.
To minimize overhead related to the metastore database and Impala query planning, try to limit the number of partitions for any partitioned table to a few tens of thousands.
If the volume of data within a table makes it impractical to run exploratory queries, consider using the TABLESAMPLE clause to limit query processing to only a percentage of data within the table. This technique reduces the overhead for query startup, I/O to read the data, and the amount of network, CPU, and memory needed to process intermediate results during the query. See TABLESAMPLE Clause for details.
Kerberos-Related Network Overhead for Large Clusters
When Impala starts up, or after each kinit refresh, Impala sends a number of simultaneous requests to the KDC. For a cluster with 100 hosts, the KDC might be able to process all the requests within roughly 5 seconds. For a cluster with 1000 hosts, the time to process the requests would be roughly 500 seconds. Impala also makes a number of DNS requests at the same time as these Kerberos-related requests.
While these authentication requests are being processed, any submitted Impala queries will fail, with errors like the following:
Query Status: Couldn't open transport for hostname (SASL(-1): generic failure: GSSAPI Error: Unspecified GSS failure. Minor code may provide more information (Cannot contact any KDC for realm 'realm'))
During this period, the KDC and DNS may be slow to respond to requests from components other than Impala, so other secure services might be affected temporarily.
If you encounter this problem, consider taking one or more of these actions to address it:
-
Scale your KDC implementation to support more load. Contact your KDC provider support to discuss options.
-
Initiate the KDC ticket acquisition process after the cluster starts up by issuing a warm-up query. An ideal query is a SHUFFLE JOIN between two tables, so that all Impala daemons try to communicate with all others. Repeat the warm-up query until it successfully completes. For example, a 175 node cluster might take approximately 5 minutes.
-
To reduce the frequency of the kinit renewal that initiates a new set of authentication requests, increase the kerberos_reinit_interval configuration setting for the impalad daemons. Currently, the default for a cluster not managed by Cloudera Manager is 60 minutes, while the default under Cloudera Manager is 10 minutes. Consider using a higher value such as 360 (6 hours).
Kerberos-Related Memory Overhead for Large Clusters
Failed to obtain Kerberos ticket for principal: <varname>principal_details</varname> Failed to execute shell cmd: 'kinit -k -t <varname>keytab_details</varname>', error was: Error(12): Cannot allocate memory
echo 1 > /proc/sys/vm/overcommit_memory
vm.overcommit_memory=1
Then run sysctl -p. No reboot is needed.
Avoiding CPU Hotspots for HDFS Cached Data
You can use the HDFS caching feature, described in Using HDFS Caching with Impala (CDH 5.3 or higher only), with Impala to reduce I/O and memory-to-memory copying for frequently accessed tables or partitions.
In the early days of this feature, you might have found that enabling HDFS caching resulted in little or no performance improvement, because it could result in "hotspots": instead of the I/O to read the table data being parallelized across the cluster, the I/O was reduced but the CPU load to process the data blocks might be concentrated on a single host.
To avoid hotspots, include the WITH REPLICATION clause with the CREATE TABLE or ALTER TABLE statements for tables that use HDFS caching. This clause allows more than one host to cache the relevant data blocks, so the CPU load can be shared, reducing the load on any one host. See CREATE TABLE Statement and ALTER TABLE Statement for details.
Hotspots with high CPU load for HDFS cached data could still arise in some cases, due to the way that Impala schedules the work of processing data blocks on different hosts. In CDH 5.7 / Impala 2.5 and higher, scheduling improvements mean that the work for HDFS cached data is divided better among all the hosts that have cached replicas for a particular data block. When more than one host has a cached replica for a data block, Impala assigns the work of processing that block to whichever host has done the least work (in terms of number of bytes read) for the current query. If hotspots persist even with this load-based scheduling algorithm, you can enable the query option SCHEDULE_RANDOM_REPLICA=TRUE to further distribute the CPU load. This setting causes Impala to randomly pick a host to process a cached data block if the scheduling algorithm encounters a tie when deciding which host has done the least work.