Apache Hive Optimization Techniques — 1

Ankit Prakash Gupta
Towards Data Science
6 min readAug 9, 2019

--

Apache Hive is a query and analysis engine which is built on top of Apache Hadoop and uses MapReduce Programming Model. It provides an abstraction layer to query big-data using the SQL syntax by implementing traditional SQL queries using the Java API. The main components of the Hive are as follows:

  • Metastore
  • Driver
  • Compiler
  • Optimizer
  • Executor
  • Client

While Hadoop/hive can process nearly any amount of data, but optimizations can lead to big savings, proportional to the amount of data, in terms of processing time and cost. There are a whole lot of optimizations that can be applied in the hive. Let us look into the optimization techniques we are going to cover:

  1. Partitioning
  2. Bucketing
  3. Using Tez as Execution Engine
  4. Using Compression
  5. Using ORC Format
  6. Join Optimizations
  7. Cost-based Optimizer

Partitioning

Partitioning divides the table into parts based on the values of particular columns. A table can have multiple partition columns to identify a particular partition. Using partition it is easy to do queries on slices of the data. The data of the partition columns are not saved in the files. On checking the file structure you would notice that it creates folders on the basis of partition column values. This makes sure that only relevant data is read for the execution of a particular job, decreasing the I/O time required by the query. Thus, increasing the query performance.

When we query data on a partitioned table, it will only scan the relevant partitions to be queried and skips irrelevant partitions. Now, assume that even on partitioning, the data in a partition was quite big, to further divide it into more manageable chunks we can use Bucketing.

CREATE TABLE table_name (column1 data_type, column2 data_type, …) PARTITIONED BY (partition1 data_type, partition2 data_type,….);

  • Partition Columns are not defined in the Column List of the table.
  • In insert queries, partitions are mentioned in the start and their column values are also given along with the values of the other columns but at the end.

INSERT INTO TABLE table_name PARTITION (partition1 = ‘partition1_val’, partition2 = ‘partition2_val’, …) VALUES (col1_val, col2_val, …, partition1_val, partition2_val, …);

  • Partitioning is basically of two types: Static and Dynamic. Well, names are very much self-explanatory.
  • Static Partitioning
    This is practiced when we have knowledge about the partitions of data we are going to load. It should be preferred when loading data in a table from large files. It is performed in strict mode:

set hive.mapred.mode = strict;

  • Dynamic Partitioning
    It is used when we do not have knowledge about the partitions of the data. It takes more time to load data in the table. Usually, we load data in the table using another table having non-partitioned data.
    To enable dynamic partitioning in the hive:

SET hive.exec.dynamic.partition = true;

There are two modes of dynamic partitioning:
Strict: This needs at least one column to be static while loading the data.
Non-strict: This allows us to have dynamic values of all the partition columns.

SET hive.exec.dynamic.partition.mode = nonstrict;

Some other things are to be configured when using dynamic partitioning, like

Hive.exec.max.dynamic.partitions.pernode: Maximum number of partitions to be created in each mapper/reducer node

Hive.exec.max.dynamic.partitions: Maximum number of dynamic partitions allowed to be created in total

Hive.exec.max.created.files: Maximum number of HDFS files created by all mappers/reducers in a MapReduce job

Hive.error.on.empty.partition: Whether to throw an exception if the dynamic partition insert generates empty results

Bucketing

Bucketing provides flexibility to further segregate the data into more manageable sections called buckets or clusters. Bucketing is based on the hash function, which depends on the type of the bucketing column. Records which are bucketed by the same column value will always be saved in the same bucket. CLUSTERED BY clause is used to divide the table into buckets. It works well for the columns having high cardinality.

CREATE TABLE table_name (column1 data_type, column2 data_type, …) PARTITIONED BY (partition1 data_type, partition2 data_type,….) CLUSTERED BY (clus_col1) SORTED BY (sort_col2) INTO n BUCKETS;

In Hive Partition, each partition will be created as a directory. But in Hive Buckets, each bucket will be created as a file.

set hive.enforce.bucketing = true;

Using Bucketing we can also sort the data using one or more columns. Since the data files are equal-sized parts, map-side joins will be faster on the bucketed tables.

Bucketing also has its own benefit when used with ORC files and used as the joining column. We will further discuss these benefits.

Using Tez as Execution Engine

Apache Tez is a client-side library which operates like an execution engine, an alternative to traditional MapReduce Engine, under Hive and Pig which allows faster processing of jobs using the DAG formation.

To look into how Tez helps in optimizing the jobs, we will first look into the stereotyped processing sequence of a MapReduce Job:

  • The Mapper function reads data from the file system, processes it into Key-Value Pairs which is further stored temporarily on the local disk. These Key-value pairs, grouped on the key values, are sent to the reducers over the network.
  • On nodes where Reducers are to be run, the data is received and is saved on the local disk and waits for the data from all the mappers to arrive. Then, the entire set of values for a key is read into a single reducer, processed and further writes the output which is then further replicated based on the configuration.
  • As you can notice a whole lot of unnecessary read/write overhead is involved in a single MapReduce job. Multiple MapReduce jobs are run to accomplish a single Hive query and all outputs of the MapReduce Jobs are first written in the DFS and then transferred to nodes, and the cycle is repeated since there is no coordination between two MapReduce jobs.

Apache Tez optimizes it by not breaking a Hive-query in multiple MapReduce Jobs. Since, Tez is a client-side library, to orchestrate the processing of MapReduce Jobs. Tez optimizes the jobs using the steps like the following:

  • Skipping the DFS write by the reducers and piping the output of a reducer directly in the subsequent Mapper as input.
  • Cascading a series of Reducers without intervening Mapper steps.
  • Re-use of containers for successive phases of processing.
  • Optimal Resource usage using Pre-warmed containers.
  • Cost-based Optimizations.
  • Vectorized Query Processing.

We can set the execution engine using the following query, or by setting it in the hive-site.xml.

set hive.execution.engine=tez/mr

Using Compression

As you might have noticed that hive queries involve a lot of Disks I/O or Network I/O operations, which can be easily reduced by reducing the size of the data which is done by compression. Most of the data formats in Hive are the text-based formats which are very compressible and can lead to big savings. But, there is a trade-off involved when we take compression into consideration, the CPU cost of compression and decompression.

Following are the main situations where I/O operations are performed and compression can save cost:

  • Reading data from a local DFS directory
  • Reading data from a non-local DFS directory
  • Moving data from reducers to the Next stage Mappers/Reducers
  • Moving the final output back to the DFS.

Also, DFS replicates the data as well to be fault-tolerant, there are more I/O operations involved when we are replicating data.

You can import text files compressed with Gzip or Bzip2 directly into a table stored as TextFile. Compressed data can directly be loaded in Hive, using the LOAD statement or by creating table over compressed data location. The compression will be detected automatically and the file will be decompressed on-the-fly during query execution. However, in this case, Hadoop will not be able to split your file into chunks/blocks and run multiple maps in parallel. But, zipped sequence files can be split into multiple.

The above optimizations will save a whole lot of execution cost and will lead to pretty quicker execution of jobs. In the next article, we will discuss the remaining techniques: optimizations using ORC files, optimizations in Join queries as well as the Cost Based Optimizer.

I hope you find this article informative and easy to learn if you have any queries feel free to reach me at info.ankitp@gmail.com

--

--