Bigdata SQL: Spark SQL Optimization—Catalyst Optimizer

Catalyst is a query plan optimizer. It is a rule-based framework that
• allows developers to plug custom rules specific to their DSL for adding new forms of optimizations to query execution; and
• allows extensibility of existing rules to add data-source-specific rules that can push filtering or aggregation into external storage systems or to support new data types.
The Catalyst optimizer is based on functional programming constructs available in the Scala language. It supports both rule-based and cost-based optimization.
One of the first steps for executing an SQL query is transformation of the query into a DataFrame API, which is also a logical plan representation of the query. The logical plan is the tree representation of the query. Every transformation in Spark is essentially modeled as a tree, which is optimized by the Catalyst optimizeres built-in rules. The logical plan goes through a series of rules to resolve and optimize the execution plan, and after optimization, the logical plan is converted to a physical plan for actual execution of the query.
Spark SQL, along with the Catalyst optimizer, helps to read less data by converting to more efficient formats and modifying the logical plan to execute the query with the lowest
possible latency.
Catalyst optimizes the logical plan by rearranging the query operators and lower level operations. As an example, the Catalyst optimizer might decide to move a filter operation before a join operation—a very typical example to reduce the data that is operated on during the join execution phase.
Because Spark is a lazy execution framework, optimization occurs as late as possible; therefore, Spark SQL can optimize across functions. Some of the optimizations that Catalyst performs include the following:
• It pushes filter predicates down to the data source, so irrelevant data can be skipped right at the source, thus reducing unnecessary data movement.
• When reading Parquet files, it skips entire blocks and turns string comparisons to integer comparisons via dictionary encoding, which results in faster operations.
• Catalyst compiles operations into physical plans and generates highly optimized Java virtual machine (JVM) bytecodes.
• It intelligently chooses between broadcast joins and shuffle joins, to reduce network traffic.
• It eliminates expensive object allocations and reduces virtual function calls.
When the DataFrame is cached in memory by the Spark code, the Catalyst optimizer automatically calculates the statistics—maximum and minimum values of a column, number of distinct and NULL values—which it later uses to skip some partitions while running filter-based SQL queries, resulting in additional performance improvements.
The Catalyst optimizer is also a compiled library to manipulate trees, in this case, a logical plan tree specific to relational query processing, to move around nodes, remove edges, and short-circuit branches. Internally, it applies pattern matching recursively across the tree, to apply the optimization rules and update the tree with the best execution plan.
Different steps through which an SQL query has to go through in order to be ready for execution. These sets of steps are generally the same across most of the SQL engines. The core difference between SQL engines is in the middle sections, where the optimizer steps in to build the best optimized plan, based on different criteria. These optimization criteria could be based on rules, code generation, and query execution costs. The core IP of SQL engines lies in this optimizer layer, to lower the SQL data-processing latencies. The final piece of work that the Catalyst optimizer does is code generation. For workloads that are CPU-bound, any optimization at the code-execution level for each row of data to be processed can result in massive speedups for large data sets. Imagine if you can shave off 1 microsecond of query execution time from a single row of data, for big data sets having a billion row data, the query engine would be 1000 seconds faster. This is the extreme to which the Catalyst optimizer works to improve the query speed. Code generation is an extensive and complex topic in itself, and we will not go into
too much detail about it here. Essentially, the Catalyst optimizer uses a special feature of the Scala language called quasiquotes that allows automatic generation of abstract syntax
trees (AST), which are fed to the Scala compiler to generate bytecodes.
An expression used in the SQL query is converted to AST by Catalyst, which is then worked upon by Scala code to evaluate the expression and then compile and run the generated code. Without code generation, simple expressions within an SQL query, such as (Col1 + Col2)*2 , would have to be parsed and interpreted for each row of data. This could result in lots of overhead, especially CPU branching and virtual function calls, which can slow down processing.