Skip to main content

Uniffle Shuffle Client Guide

Uniffle is designed as a unified shuffle engine for multiple computing frameworks, including Apache Spark and Apache Hadoop. Uniffle has provided pluggable client plugins to enable remote shuffle in Spark and MapReduce.

Deploy

This document will introduce how to deploy Uniffle client plugins with Spark and MapReduce.

Deploy Spark Client Plugin

  1. Add client jar to Spark classpath, eg, SPARK_HOME/jars/

    The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-XXXXX-shaded.jar

    The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar

  2. Update Spark conf to enable Uniffle, eg,

    spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
    spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
    # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.

Support Spark Dynamic Allocation

To support spark dynamic allocation with Uniffle, spark code should be updated. There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference.

After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:

spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true

Deploy MapReduce Client Plugin

  1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP_HOME>/share/hadoop/mapreduce/

The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar

  1. Update MapReduce conf to enable Uniffle, eg,

    -Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999
    -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster
    -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector
    -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle

    Note that the RssMRAppMaster will automatically disable slow start (i.e., mapreduce.job.reduce.slowstart.completedmaps=1) and job recovery (i.e., yarn.app.mapreduce.am.job.recovery.enable=false)

Configuration

The important configuration of client is listed as following.

Common Setting

These configurations are shared by all types of clients.

Property NameDefaultDescription
<client_type>.rss.coordinator.quorum-Coordinator quorum
<client_type>.rss.writer.buffer.size3mBuffer size for single partition data
<client_type>.rss.storage.type-Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
<client_type>.rss.client.read.buffer.size14mThe max data size read from storage
<client_type>.rss.client.send.threadPool.size5The thread size for send shuffle data to shuffle server
<client_type>.rss.client.assignment.tags-The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not
<client_type>.rss.client.data.commit.pool.sizeThe number of assigned shuffle serversThe thread size for sending commit to shuffle servers
<client_type>.rss.client.assignment.shuffle.nodes.max-1The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default

Notice:

  1. <client_type> should be spark or mapreduce

  2. <client_type>.rss.coordinator.quorum is compulsory, and other configurations are optional when coordinator dynamic configuration is enabled.

Adaptive Remote Shuffle Enabling

To select build-in shuffle or remote shuffle in a smart manner, Uniffle support adaptive enabling. The client should use DelegationRssShuffleManager and provide its unique <access_id> so that the coordinator could distinguish whether it should enable remote shuffle.

spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager
spark.rss.access.id=<access_id>

Notice: Currently, this feature only supports Spark.

Other configuration:

Property NameDefaultDescription
spark.rss.access.timeout.ms10000The timeout to access Uniffle coordinator
spark.rss.client.access.retry.interval.ms20000The interval between retries fallback to SortShuffleManager
spark.rss.client.access.retry.times0The number of retries fallback to SortShuffleManager

Client Quorum Setting

Uniffle supports client-side quorum protocol to tolerant shuffle server crash. This feature is client-side behaviour, in which shuffle writer sends each block to multiple servers, and shuffle readers could fetch block data from one of server. Since sending multiple replicas of blocks can reduce the shuffle performance and resource consumption, we designed it as an optional feature.

Property NameDefaultDescription
<client_type>.rss.data.replica1The max server number that each block can be send by client in quorum protocol
<client_type>.rss.data.replica.write1The min server number that each block should be send by client successfully
<client_type>.rss.data.replica.read1The min server number that metadata should be fetched by client successfully

Notice:

  1. spark.rss.data.replica.write + spark.rss.data.replica.read > spark.rss.data.replica

Recommended examples:

  1. Performance First (default)
spark.rss.data.replica 1
spark.rss.data.replica.write 1
spark.rss.data.replica.read 1
  1. Fault-tolerant First
spark.rss.data.replica 3
spark.rss.data.replica.write 2
spark.rss.data.replica.read 2

Spark Specialized Setting

The important configuration is listed as following.

Property NameDefaultDescription
spark.rss.writer.buffer.spill.size128mBuffer size for total partition data
spark.rss.client.send.size.limit16mThe max data size sent to shuffle server
spark.rss.client.unregister.thread.pool.size10The max size of thread pool of unregistering
spark.rss.client.unregister.request.timeout.sec10The max timeout sec when doing unregister to remote shuffle-servers

MapReduce Specialized Setting

Property NameDefaultDescription
mapreduce.rss.client.max.buffer.size3kThe max buffer size in map side
mapreduce.rss.client.batch.trigger.num50The max batch of buffers to send data in map side

Remote Spill (Experimental)

In cloud environment, VM may have very limited disk space and performance. This experimental feature allows reduce tasks to spill data to remote storage (e.g., hdfs)

Property NameDefaultDescription
mapreduce.rss.reduce.remote.spill.enablefalseWhether to use remote spill
mapreduce.rss.reduce.remote.spill.attempt.inc1Increase reduce attempts as hdfs is easier to crash than disk
mapreduce.rss.reduce.remote.spill.replication1The replication number to spill data to hdfs
mapreduce.rss.reduce.remote.spill.retries5The retry number to spill data to hdfs

Notice: this feature requires the MEMORY_LOCAL_HDFS mode.