Skip to main content

· 14 min read

Background

Shuffle is the process in distributed computing frameworks used to redistribute data between upstream and downstream tasks. It is a crucial component within computing frameworks and directly impacts their performance and stability. However, with the exploration of cloud-native architectures, traditional shuffle solutions have revealed various issues.

In a cloud-native architecture, with use of techniques such as the separation of storage and compute, mixed deployment.The computational nodes have relatively low disk volume, poor IO performance, and an imbalance between CPU and IO resources. Additionally, computational nodes could be preempted by high-priority jobs due to mixed deployments.

In traditional Shuffle implementations, shuffle nodes tightly coupled with computational nodes. However, due to the different resource requirements for disk, memory and CPU between computational nodes and shuffle nodes, it is challenging to independently scale them based on their resource needs. By separating the computational nodes from shuffle nodes, the computational node's state becomes more lightweight after offloading the Shuffle state to shuffle nodes, reducing the job recomputation when computational nodes are preempted.

Decoupling computational and Shuffle nodes also reduces the demand for disk specifications on computational nodes, enabling an increase in the number of accessible computational nodes.

In cloud-native architectures, large Shuffle jobs can exert significant pressure on local disk drives, leading to issues such as insufficient disk capacity on computational nodes and higher disk random IO, thus affecting the performance and stability of large Shuffle jobs.

The industry has explored various new Shuffle technologies, including Google's BigQuery, Baidu DCE Shuffle, Facebook's Cosco Shuffle, Uber Zeus Shuffle, Alibaba's Celeborn Shuffle, and many others. Each system has made its own trade-offs based on different scenarios. Uniffle aims to create a fast, accurate, stable and cost-efficient cloud-native Remote Shuffle Service, considering performance, correctness, stability, and cost as its core aspects.

Architecture

Architecture

Coordinator

The Coordinator is responsible for managing the entire cluster, and the Shuffle Server reports the cluster's load situation to the Coordinator through heartbeats. Based on the cluster's load, the Coordinator assigns suitable Shuffle Servers for jobs. To facilitate operations and maintenance, the Coordinator supports configuration deployment and provides a RESTFUL API for external access.

Shuffle Server

Shuffle Server is primarily responsible for receiving, aggregating and writing shuffle data into storage. For shuffle data stored in local disks, Shuffle Server provides the ability to read the data.

Client

The Client is responsible for communicating with the Coordinator and Shuffle Server. It handles tasks such as requesting Shuffle Servers, sending heartbeats, and performing read and write operations on shuffle data. It provides an SDK for Spark, MapReduce and Tez to use.

Read & Write process

process

  1. The Driver obtains allocation information from the Coordinator.
  2. The Driver registers Shuffle information with the Shuffle Server.
  3. Based on the allocation information, the Executor sends Shuffle data to the Shuffle Server in the form of Blocks.
  4. The Shuffle Server writes the data into storage.
  5. After writing tasks completed, the Executor updates the result to the Driver.
  6. The read task retrieves successful write task information from the Driver.
  7. The read task retrieves Shuffle metadata (such as all blockIds) from the Shuffle Server.
  8. Based on the storage model, the read task reads Shuffle data from the storage side.

Performance

1) Hybrid storage

In our internal production environment, there are Partition data blocks at the KB level which account for more than 80% of the total. In order to effectively address the random IO issues caused by these small partitions, Uniffle incorporates the concept of in-memory Shuffle, taking reference from Google's Dremel. Additionally, considering that 80% of the data capacity in the our production environment is due to large partitions, Uniffle introduces disk and HDFS as storage media to address the data capacity problem. This forms a hybrid storage solution.

2) Random IO Optimization

random_io The essence of random IO is the existence of numerous small data block operations. In order to avoid these operations, Uniffle first aggregates multiple MapTasks' identical partitions in the memory of the Shuffle Server to generate larger partition data. When the Shuffle data in memory reaches the partition threshold or the overall threshold, it is written into local or remote storage. io_sort When the overall threshold of memory reached, Uniffle sorted the partition data in memory based on their size. Uniffle write the larger partitions to the storage media first. Additionally, when the data in memory reach to a certain size, the writing of Shuffle data to the storage media is stopped, let some data stay in the memory to further reduce random IO on the disk.

3) Storage media selection strategy

select For writing Shuffle data to local or remote storage, Uniffle has observed through testing that larger data block sizes result in better write performance for remote storage. When the data block size exceeds 64MB, the write performance to remote storage can reach 32MB/s. It's enough writing speed if we use multiple threads to write, comparing to the 100MB/s writing speed of HDD.Therefore, when writing data to storage media, Uniffle selects to write larger data blocks to remote storage based on their size, while smaller data blocks are written to local storage.

4) Write concurrency

For larger partitions, it is challenging to meet the performance requirements of writing to remote storage with a single thread. In HDFS, a file can only be written by one writer. To address this limitation, Uniffle allows multiple files to be mapped to a single partition for remote storage. Uniffle utilizes multi-threading to increase the writing performance of large partitions. However, it's important to note that a single partition occupying all remote storage threads can affect the writing performance of other partitions. Typically, there is a maximum limit on the number of concurrent write threads for a single partition. To avoid creating too many files, during the writing process, a partition will prioritize using already existing files. Only when all existing files are being written to, a new file will be created to store the data.

5) Data Distribution

aqe For computational engines like Spark AQE (Adaptive Query Execution), there are scenarios where a single task needs to read only a portion of a partition's data, as well as scenarios where multiple partitions need to be read. In the case of reading a portion of a partition's data, if the data is randomly distributed, it can result in a significant amount of read amplification. Performing data sorting and rewriting after the data is written can lead to considerable performance loss. Therefore, Uniffle adopts a solution of partial ordering to optimize the reading of partial data. For more detailed information, please refer to [3]. get_results In scenarios where multiple partitions need to be read, an optimization technique in Uniffle involves allocating the task of reading multiple partitions to a single ShuffleServer. This allows for the aggregation of Rpc (Remote Procedure Call) requests, which means that multiple Rpc requests can be sent to a single Shuffle Server. This approach helps to minimize network overhead and improve overall performance. For more detailed information, please refer to [4].

6) Off-heap memory management

In the data communication process of Uniffle, Grpc is used, and there are multiple memory copying processes in the Grpc code implementation. Additionally, the Shuffle Server currently uses heap memory for management. When using an 80GB memory Shuffle Server in a production environment, it may experience a significant amount of garbage collection (GC), with individual GC pauses lasting approximately 22 seconds. To address this issue, Uniffle upgraded the JDK to version 11. On the data transmission side, Uniffle refer to the communication protocol of Spark Shuffle and adopted Netty for data transfer. It also utilized ByteBuf to manage off-heap memory more efficiently.

7) Columnar Shuffle Format

The Uniffle framework itself does not natively support columnar Shuffle. To leverage the columnar Shuffle capabilities, Uniffle integrates with Gluten, a columnar shuffle component. By integrating with Gluten, Uniffle is able to reuse the columnar Shuffle capabilities provided by Gluten. For more detailed information, please refer to [5]. Gluten

8) Barrier-Free

For batch processing in distributed computing frameworks, the commonly used model is the Bulk Synchronous Parallel (BSP) model. In this model, downstream tasks started after all upstream tasks have completed. However, to reduce the impact of straggler nodes on job performance, some computing frameworks support a "slow start" mechanism to allow upstream and downstream tasks to run concurrently. On the other hand, for stream processing and OLAP engines, a pipeline model is used where upstream and downstream tasks can run simultaneously.

To cater to various computing frameworks, Uniffle employs a barrier-free design that allows upstream and downstream stages to run concurrently. The key to achieving this barrier-free execution lies in supporting efficient in-memory read/write operations and an effective index filtering mechanism. With this design, job execution does not require a request to the Shuffle Server for writing all data to storage media at the end of each stage. Additionally, since upstream and downstream stages run simultaneously, there may be cases where downstream readers only need to read incremental data. The index filtering mechanism effectively avoids reading redundant data.

Uniffle has designed both bitmap index filtering and file index filtering mechanisms to handle in-memory and storage media data respectively. This enables Uniffle to efficiently support barrier-free execution and improve performance by avoiding redundant data reads.

Performance evaluation

When using version 0.2 of Uniffle and conducting benchmarks, Uniffle's shuffle performance is similar to Spark's vanilla shuffle for small data volumes. However, for large data volumes, Uniffle's shuffle outperforms Spark's vanilla shuffle by up to 30%. The benchmark results can be found at the following link: https://github.com/apache/incubator-uniffle/blob/master/docs/benchmark.md

Correctness

Metadata Verification

meta Spark reports information about all completed tasks to the driver. In the first step of the reducer, the reducer retrieves a list of task unique identifiers from the driver. Blocks are the data sent from mappers to the shuffle server, and each block has a unique identifier. The data of a block is stored in memory, on local disk, or in HDFS. To ensure data integrity, Uniffle incorporates metadata verification. Uniffle designs index files for data files stored on local disks and in HDFS. The index file contains information such as the block ID, relative offset, data checksum, compressed length, uncompressed length, and task ID. Before reading a data file, Uniffle first reads the corresponding index file. To address duplicate read issues, Uniffle uses a bitmap to keep track of the already read block IDs. By checking the block ID, Uniffle can determine if a duplicate read exists.

Data Verification

verify To address data corruption issues, Uniffle performs CRC (Cyclic Redundancy Check) verification on data blocks. When reading data, Uniffle recalculates the CRC and compares it with the CRC stored in the file to determine if the data is corrupted. This helps prevent reading incorrect data.

Stability

1) Fallback for Hybrid storage

hdfs HDFS online clusters may experience some fluctuations in stability, which can result in failures to write data to HDFS during certain time periods. In order to minimize the impact caused by HDFS fluctuations, Uniff has designed a Fallback mechanism. When writing to HDFS fails, the data will be stored locally instead, reducing the impact on the job.

2) Flow Control

Before sending a request, the job client will first request the memory resources corresponding to the data. If there is insufficient memory, the job will wait and stop sending data, thereby implementing flow control for the job.

3) Replication

Uniffle adopts the Quorum replica protocol, allowing jobs to configure the number of replicas for their data writes based on their own needs. This helps prevent stability issues caused by having only a single replica for the job.

4) Stage Recomputation

Currently, Spark supports recomputing the entire stage if there is a failure in reading from the Shuffle Server, helping the job recover and resume its execution.

5) Quota Management

When a job reaches the user's quota limit, the Coordinator can make the job fallback to the vanilla Spark mode. This prevents from the situation that a single user's erroneous submission of numerous jobs.

6) Coordinator HA

Uniffle does not choose solutions like Zookeeper, Etcd, or Raft for high availability (HA) purposes, mainly considering the complexity introduced by these consensus protocol systems. For Uniffle, the Coordinator is stateless and does not persist any state. All state information is reported by the Shuffle Server through heartbeats, so there is no need to determine which node is the master. Deploying multiple Coordinator instances ensures high availability of the service.

Cost Effective

1) Low-Cost Remote Storage

In general, for a relatively stable business, computational resources tend to remain stable while storage resources grow linearly. These storage resources store a large amount of cold data. Uniffle supports hybrid storage, which allows the utilization of these unused storage resources, thereby reducing the overall system cost.

2) Automatic Scaling

Uniffle has developed a K8S Operator that implements scaling operations for stateful services using webhooks. By leveraging Horizontal Pod Autoscaler (HPA), automatic scaling can be achieved, further reducing system costs.

Community Engagement

Currently, Uniffle supports multiple computational frameworks such as Spark, MapReduce, and Tez.

Uniffle Spark has been adopted by companies like Tencent, Didi, iQiyi, SF Express, and Vipshop, handling PB-level data on a daily basis.

Uniffle MapReduce is employed in mixed deployment scenarios by companies like Bilibili and Zhihu

Uniffle Tez has been jointly developed by HuoLaLa, Beike, and Shein.

The development of many important features in the community has involved contributions from well-known Chinese internet companies. For example, iQiyi has contributed support for accessing Kerberos HDFS clusters and has optimized the performance of Spark AQE on Uniffle.Didi has added support for multi-tenant job quotas. Netty data plane optimizations were jointly completed by Didi and Vipshop.The support for Gluten was contributed by Baidu and SF Express.

Currently, the community has more than 50 contributors, with over 600 commits, and has released four Apache versions. It is being used by dozens of companies. Additionally, teams and companies interested in contributing to Uniffle Flink can contact the Uniffle community at mailto:dev@uniffle.apache.org.

Currently, there are no companies participating in the community with deployment scenarios or development plans for Uniffle Flink. Your help in filling this gap in the community would be greatly appreciated. Uniffle's design incorporates a large number of mechanisms and strategies, and users are welcome to contribute strategies that suit their own scenarios.

Future Plans

Storage Optimization

  1. Integration with object storage to optimize system costs.
  2. Merging index files and data files to further reduce IO overhead.
  3. Support for heterogeneous storage resources such as SSD and HDD.
  4. Support for sorting data by key.

Computation Optimization

  1. Support for dynamic allocation of Shuffle Servers.
  2. Partial support for Slow Start feature in some engines.
  3. Continuous optimizations for Spark AQE.
  4. Support for the Flink engine.
  5. Asynchronous data reading support for compute engines.

Summary

Uniffle has been designed with a focus on performance, correctness, stability, and cost-effectiveness, making it a suitable Shuffle system for cloud-native architectures. We welcome everyone to contribute to the Uniffle project. The Uniffle project can be found at https://github.com/apache/incubator-uniffle.

Reference

[1] https://cloud.tencent.com/developer/article/1903023

[2] https://cloud.tencent.com/developer/article/1943179

[3] https://github.com/apache/incubator-uniffle/pull/137

[4] https://github.com/apache/incubator-uniffle/pull/307

[5] https://github.com/apache/incubator-uniffle/pull/950

· 4 min read

Apache Uniffle (incubating) - 2022 summary

Introduction

At the end of 2020, Apache Uniffle (incubating)'s first line of code was written inside Tencent. It was open-sourced in November 2021, and then donated to the Apache Foundation in the mid-2022. Since its donation, it has attracted many developers from various companies. This article makes a brief summary of Apache Uniffle (incubating) in 2022.

Timeline

Apache Uniffle (Incubating) was accepted as an Apache incubator project on June 6, 2022. As of the end of December 2022 (current writing time: 2022.12.26), a total of 157 new issues have been created (76 of which have been closed or resolved), and new 272 PRs (of which 264 were merged or closed).

Apache Uniffle (incubating) has released two versions in 2022: 0.6.0 and 0.6.1. Among them:

  1. 2022.10.27: Version 0.6.0 released
  2. 2022.11.30: Version 0.6.1 released

Version 0.6.0 is the first version released after Uniffle entered the Apache incubator. It features:

  1. Optimizes the Coordinator's allocation mechanism
  2. Optimizes the scheduling strategy for Shuffle Server
  3. Optimizes performance and stability
  4. Supports HDFS that requires Kerberos authentication
  5. Supports the Uniffle K8S Operator, allowing it to be deployed and applied in a cloud-native environment.

Version 0.6.1 is an important bug fix version after version 0.6.0. It mainly fixes the following problems:

  1. In the MR computing framework, when the number of reduce tasks exceeds 1024, the partition cannot be accessed
  2. Concurrent registration of shuffle leads to failure to obtain shuffle results
  3. Handle NPE in WriteBufferManager#addRecord
  4. When there is a bad disk, there may be a memory leak In addition to the above two versions, local order is introduced in the current master branch to cope with the data skew optimization of Spark AQE. Compared with the unoptimized version, the performance is improved by 3 times.

Community

Apache Uniffle (incubating) has added 22 contributors since entering the incubator, with a total of 33 contributors. Contributors come from Tencent, iQiyi, Ebay, Didi, SF Express, Baidu, ByteDance, JD.com, Bilibili, Databricks And other Internet companies. Among the 22 new contributors, Apache Uniffle (incubating) PMC voted to add 2 new committers according to their contribution. The new recruited committers are expected to make continuous contributions to Apache Uniffle (incubating) and we are planing to added more committers in 2023.

Company Usage

According to the online/offline communication with contributors and users, Apache Uniffle (incubating) is currently in production and used in companies such as Tencent, iQiyi, Didi, SF Express, VIPShop, Bilibili, etc. The amount of shuffle data processed by Uniffle of many companies exceeds PBs per day, and tens of thousands of apps are running. In addition to solving the stability and scalability problems of native Spark shuffle, it also improves the overall resource utilization.

Project Plans

In 2023, Apache Uniffle (incubating) will continue to provide an efficient and pervasive Shuffle Service. Currently, the following work is on the planning list:

  1. A more complete compute engine ecology
    • Support Tez computing framework
    • Support Flink computing framework
  2. Support more storage media, especially distributed storages:
    • Efficient support for object storage
    • HDD, SSD and other local hybrid storage support
  3. Performance optimization:
    • Replace gRPC with Netty
    • Off-heap memory management, application of zero-copy technology, reducing data copying
  4. More enterprise-level features:
    • Multi-tenant isolation related function development
    • Improved stability and reliability