July 23, 2024
Unlocking Effectivity and Efficiency: Navigating the Spark 3 and EMR 6 Improve Journey at Slack

Slack Information Engineering lately underwent knowledge workload migration from AWS EMR 5 (Spark 2/Hive 2 processing engine) to EMR 6 (Spark 3 processing engine). On this weblog, we are going to share our migration journey, challenges, and the efficiency good points we noticed within the course of. This weblog goals to help Information Engineers, Information Infrastructure Engineers, and Product Managers who could also be contemplating migrating to EMR 6/Spark 3.

In Information Engineering, our main goal is to help inner groups—reminiscent of Product Engineering, Machine Studying, and Information Science—by offering important datasets and a dependable knowledge infrastructure to facilitate the creation of their very own datasets. We make sure the reliability and timeliness of vital billing and utilization knowledge for our purchasers. Sustaining Touchdown Time SLAs (Service Degree Agreements) serves as a measure to maintain up these guarantees.

Over time, the speedy growth of our knowledge quantity incessantly led to the violation of our vital knowledge pipeline’s SLAs. As we sought alternate options to Spark 2 and Hive 2, Spark 3 emerged as a compelling resolution for all our knowledge processing wants, notably resulting from its  Adaptive Query Execution (AQE) function that might enhance efficiency for a few of our skewed datasets. We launched into this EMR 6/Spark 3 migration resulting from enhanced efficiency, enhanced safety—with up to date log4j libraries—and the potential for vital cost financial savings.

This year-long venture consisted of two main phases:

  • Part 1: Improve EMR from 5.3x to six.x.
  • Part 2: Improve from Hive 2.x/Spark 2.x to Spark 3.x.

Migration journey

Present panorama 

We at Slack Information Engineering use a federated AWS EMR cluster mannequin to handle all knowledge analytics necessities. The information that lives within the knowledge warehouse is bodily saved in S3 and its metadata is saved in Hive Metastore schema on an RDS database. SQL handles most of our use instances. Moreover, we depend on Scala/PySpark for sure complicated workloads. We use Apache Airflow to orchestrate our workflows and have designed customized Spark Airflow operators for submitting SparkSQL, PySpark and Scala jobs to the EMR cluster by way of Livy Batches API utilizing authenticated HTTP requests.

Right here is an instance of our hierarchical customized Airflow Spark operators:

BaseAirflowOperator → SparkBaseAirflowOperator → CustomPySparkAirflowOperator or CustomSparkSqlAirflowOperator

Right here is an instance of how we use CustomSparkSqlAirflowOperator to schedule Airflow job:

Under is a pictorial illustration of all of the parts working collectively:

Our knowledge warehouse infrastructure includes over 60 EMR clusters, catering to the wants of over 40 groups and supporting 1000’s of Airflow Directed Acyclic Graphs (DAGs). Previous to this migration, all workloads have been executed on EMR 5.36, Spark 2.4.8, and Hive 2.3.9.

Migration challenges

As the vast majority of our workloads have been managed by Hive 2, making the transition to Hive 3 in EMR 6 was the popular selection for our inner clients resulting from minimal adjustments required within the codebase. Nonetheless, we opted to consolidate right into a single compute engine, Spark 3. This strategic determination was made to leverage Spark 3 Adaptive Query Execution (AQE) function, develop experience in Spark 3 throughout our groups, and fine-tune Hadoop clusters completely for Spark operations for effectivity.

Given the size of this migration, a phased method was important. Thus, we determined to help each AWS EMR 5 and EMR 6 variations till the migration was full, permitting us to transition workloads with out disrupting roadmaps for present groups.

Nonetheless, sustaining two completely different cluster settings (Hive 2.x/Spark 2.x in EMR 5.x and Spark 3.x in EMR 6) offered a number of challenges for us:

  • How can we help the identical Hive catalog throughout Spark 2/Spark 3 workloads?
  • How can we provision completely different variations of EMR clusters?
  • How can we management value?
  • How can we help completely different variations of our job libraries throughout these clusters?
  • How can we submit and route jobs throughout these completely different variations of clusters?

Pre-migration planning

Hive catalog migration

How can we help the identical Hive catalog throughout Spark 2/Spark 3 workloads?

We would have liked to make use of the identical Hive Metastore catalog for our workloads throughout EMR 5/Spark 2 and EMR 6/Spark 3 as migration of our pipelines from Spark 2 to Spark 3 would take a number of quarters. We solved this drawback by migrating our present HMS 2.3.0 catalog to HMS 3.1.0 catalog, utilizing Hive Schema Tool. We executed the next instructions on the EMR 5 grasp host related to the catalog database.

 

Earlier than migration we took backups of our Hive Metastore database, and in addition took some downtime on job processing throughout migration for schema improve.

Submit schema improve each our EMR 5 and EMR 6 clusters might discuss to the identical upgraded HMS 3 catalog DB because it was backward appropriate with Hive 2 and Spark 2 purposes.

EMR cluster provisioning

How can we provision completely different variations of EMR clusters? How can we management value? 

We use EMR’s golang SDK to launch EMR clusters by way of the RunJobFlow api. This API accepts a JSON-based launch configuration for an EMR cluster. We keep a base JSON config for all clusters and override customized parameters like InstanceFleetsCapability, and Launch Label on the cluster configuration stage. We created particular EMR 6 configurations for brand new EMR 6 clusters with auto-scaling enabled and low minimal capability to maintain prices beneath management. In the course of the strategy of migration, we created extra such EMR 6 cluster configurations for every new cluster. We regulated the capability and general cluster utilization prices by steadily decreasing EMR 5 fleet dimension and rising EMR 6 fleets based mostly on utilization.

Job builds throughout completely different Spark variations

How can we help completely different variations of our job libraries throughout these clusters?

We use Bazel as the first device to construct our codebase. Utilizing Bazel, we applied parallel construct streams for Spark JARs throughout variations 2.x and three.x. We propagated all ongoing config adjustments to each Spark 2 and Spark 3 JARs for consistency. Enabling the construct --config=spark3 flag within the .bazelrc file allowed constructing native JARs with the required model for testing. In our airflow pipelines, as we migrated jobs to EMR 6, the airflow operator would decide Spark 3 jars mechanically based mostly on the flag method described under.

Airflow operators enhancement

How can we submit and route jobs throughout these completely different variations of clusters?

We enhanced our customized Airflow Spark operator to route jobs to completely different variations of clusters through the use of a boolean flag. This flag supplied the comfort of submitting jobs to both pre-migration and post-migration cluster by a easy toggle.

Moreover we launched 4 logical teams of Spark config sizing choices (SMALL, DEFAULT, LARGE and EXTRA_LARGE) embedded within the Airflow Spark operator. Every choice has its personal executor reminiscence, driver reminiscence, and executor ranges. Sizing choices helped a few of our finish customers emigrate present Hive jobs with minimal understanding of Spark configurations.

That is an instance of our enhanced CustomSparkSqlAirflowOperator:

Code adjustments 

For many instances, the prevailing Hive and Spark 2 code ran high-quality in Spark 3. There have been few instances the place we needed to make adjustments to the code to make it Spark 3 appropriate.

One instance of a code change from Hive to Spark 3 can be using a salting perform for skewed joins. Whereas some code used cumbersome subqueries to generate salt keys, others used RAND() within the becoming a member of key as a workaround for dealing with skew. Whereas RAND() within the becoming a member of key works in Hive, it throws an error in Spark 3: org.apache.spark.sql.AnalysisException: nondeterministic expressions are solely allowed in Venture, Filter, Combination, or Window. We eliminated all skew-handling code and let Spark 3’s Adaptive Query Execution (AQE) deal with the information skew. Extra about AQE within the ‘Migration acquire and influence’ part.

Moreover, Spark 3 threw errors for sure knowledge sort casting eventualities that labored effectively in Spark 2. We needed to change the default worth of some Spark 3 configurations. One instance is setting spark.sql.storeAssignmentPolicy to ‘Legacy’ as a substitute of default Spark 3 worth ‘ANSI’.

We confronted a number of situations the place the Spark 3 job inferred the schema from the Hive Metastore however did not consolidate schemas, erroring with java.lang.StackOverflowError.  This occurred resulting from a scarcity of synchronization between the underlying Parquet knowledge and the Hive metastore schema. By setting spark.sql.hive.convertMetastoreParquet to False, we efficiently resolved the difficulty.

Submit-migration knowledge validation 

We in contrast two tables:

  • prod_table_hive2_or_spark2 (EMR 5 desk)
  • test_table_spark3 (EMR 6 desk)

We aimed for a precise knowledge match between the tables relatively than counting on sampling, significantly as a result of a few of our knowledge, reminiscent of buyer billing knowledge, is mission-critical.

We used config recordsdata and macros to allow our SQL script to learn from the manufacturing schema and write to the check schema within the check atmosphere. This helped us to populate the precise prod knowledge within the check schema utilizing Spark 3 for simple comparability. We then ran besides and rely SQL queries between prod_table_hive2_or_spark2 and test_table_spark3 in Trino to hurry up the validation course of.

In case of mismatch in besides or rely question output, we used our in-house Python framework with the Trino engine for detailed evaluation. We constantly monitored publish migration manufacturing runtime of our pipelines utilizing Airflow metadata DB tables and tuned pipelines as required.

There have been few sources of uncertainties within the validation course of. For instance:

  • When the code relied on the present timestamp, it triggered variations between manufacturing and improvement runs. We excluded timestamp associated columns whereas validating these tables.
  • Random rows appeared when there’s no differentiable order by clause within the code to resolve ties. We mounted the code to have a differentiable order by clause for future.
  • Discrepancies appeared within the habits of sure built-in capabilities between Hive and Spark. For example, capabilities like Greatest, which is used to return the best worth of the record of arguments, exhibit completely different habits when one of many arguments is NULL. We made code adjustments to stick to the right enterprise logic.

Migration acquire and influence

After migration, we noticed substantial runtime efficiency enhancements throughout the vast majority of our pipeline duties. Most of our Airflow duties confirmed enhancements starting from 30% to 60%, with some jobs experiencing a powerful 90% enhance in runtime effectivity. We used Airflow metadata DB tables (period column in task_instance desk) to get runtime comparability numbers. Right here is an instance of how the runtime of considered one of our vital duties improved considerably publish migration:

EMR 6 EMRFS S3-optimized committer mounted the issue of incomplete writes and deceptive SUCCESS statuses for a few of our Spark jobs that dealt with text-based enter and output format. It additionally improves software efficiency by avoiding record and rename operations finished in S3 throughout job and job commit phases. Previous to EMR 6.4.0, this function was solely obtainable for Apache Parquet file format. From EMR 6.4.0 it was prolonged to all frequent codecs, together with parquet, ORC, and text-based codecs (together with CSV and JSON).

As anticipated, we observed a number of Adaptive Query Execution(AQE) enhancements within the question execution plan. One of many key enhancements was dynamically optimizing skew be a part of. This helped us to take away a number of traces of skew dealing with logic from our codebase and substitute them by easy be a part of situation between the keys. Under is an instance which reveals AQE (skew=true) trace within the question plan.

One other enchancment was in dynamically coalescing shuffle partitions. This function simplified the tuning of the shuffle partition quantity by deciding on the right shuffle partition quantity at runtime. We solely had to offer a big sufficient preliminary shuffle partition quantity utilizing spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration. Under is a question plan which reveals partition rely going from 3000 to 348 utilizing AQE.

Conclusion

The migration to EMR 6 has resulted in vital enchancment within the runtime efficiency, effectivity, and reliability of our knowledge processing pipelines.

AQE enhancements, reminiscent of dynamically optimizing skew joins and coalescing shuffle partitions, have simplified question optimization and decreased the necessity for guide intervention in tuning parameters. S3-optimized committer has addressed points associated to incomplete writes and deceptive statuses in Spark jobs, resulting in improved stability. Your complete strategy of migration described right here ran fairly easily and didn’t trigger any incidents in any of the steps! We improved our pipeline codebase alongside the way in which, making it simpler for brand new engineers to onboard on a clear basis and work solely off Spark 3 engine. The migration has additionally laid the inspiration for a extra scalable lakehouse with availability of recent desk codecs like Iceberg and Hudi in EMR 6. We advocate knowledge organizations to spend money on such long-term modernization initiatives because it brings efficiencies throughout the board.

Curious about becoming a member of our Information Engineering crew? Apply now