Skip to main content

External Shuffle Storage

When External Shuffle Storage is turned on, Spark writes shuffle data to a shared remote filesystem, such as S3. This enables recovering shuffle data written by failed Spark kubernetes pods, avoiding task retries. External Shuffle Storage is also useful with dynamic allocation enabled, as it allows scaling down Spark executors that are kept running to serve shuffle data for other tasks. Storing shuffle data on a remote drive accessible from all executors can save time and resources.

Configuration

To turn on External Shuffle Storage, add the following configuration in your Spark application:

{
"shuffle": {
"enabled": "true",
"rootDir": "s3a://<bucket>/path/to/shuffle"
}
}

The shuffle.rootdir configuration is the location where the shuffle data will be written. The shuffle reuse feature writes the shuffle data to the Hadoop filesystem and, as such, supports any filesystem that Hadoop supports. The root dir option can be a local path, HDFS path, or any other Hadoop-supported filesystem. A shared remote drive such as S3 CSI must be mounted on all the executors in the cluster when using a local path.

For instance

{
"shuffle": {
"rootDir": "/opt/spark/work-dir/shuffle"
},
"volumes": [
{
"name": "spark-data",
"persistentVolumeClaim": {
"claimName": "s3-claim"
}
}
],
"driver": {
"volumeMounts": [
{
"mountPath": "/opt/spark/work-dir/shuffle",
"name": "spark-data"
}
]
},
"executor": {
"volumeMounts": [
{
"mountPath": "/opt/spark/work-dir/shuffle",
"name": "spark-data"
}
]
}
}

Optimizations

The External Shuffle Storage plugin shards the shuffle files on different S3 folder prefixes for better performance. The configuration key spark.shuffle.s3.folderPrefixes can be used to control the number of partitions, with the default of 10.

{
"shuffle": {
"rootDir": "/shuffle"
},
"sparkConf": {
"spark.shuffle.s3.folderPrefixes": "2"
},
"volumes": [
{
"name": "spark-vol1",
"persistentVolumeClaim": {
"claimName": "s3-claim-1"
}
},
{
"name": "spark-vol2",
"persistentVolumeClaim": {
"claimName": "s3-claim-2"
}
}
],
"driver": {
"volumeMounts": [
{
"mountPath": "/shuffle/0",
"name": "spark-vol1"
},
{
"mountPath": "/shuffle/1",
"name": "spark-vol2"
}
]
},
"executor": {
"volumeMounts": [
{
"mountPath": "/shuffle/0",
"name": "spark-vol1"
},
{
"mountPath": "/shuffle/1",
"name": "spark-vol2"
}
]
}
}

The above configuration will shard the shuffle data across two different PVC volumes defined in kubernetes, such as

{
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": "s3-claim-1"
},
"spec": {
"accessModes": ["ReadWriteMany"],
"resources": {
"requests": {
"storage": "200Gi"
}
},
"storageClassName": "sc-ontap-nas"
}
}

When using S3 as the shuffle storage medium, adjusting the spark.hadoop.fs.s3a.block.size and spark.hadoop.fs.s3a.multipart.size configurations can also improve performance.

Limitations

  • Shuffle Data reuse is only available for Spark 3.2 and later.
  • Preferably set the spark.dynamicAllocation.shuffleTracking.enabled to false when using External Shuffle Storage.