Looking at the last few years, Spark’s popularity in the Big Data world has grown remarkably and it is perhaps the most successful, open-source compute engine that is used to solve various problems that deal with extracting and transforming enormous data.
Among the available data formats for storing and querying big data, Parquet has emerged as the de-facto standard, leaving behind ORC as an option only if you have an existing Hive setup to manage your data.
S3, on the other hand, has always been touted as one of the best ( reliable, available & cheap ) object storage available to mankind.
Today we explore the various approaches one could take to improve performance while writing a Spark job to read and write parquet data to & from S3. The contents of this post are purely based on our experience with these technologies and your mileage may vary depending on your use-case and volume of data you might be dealing with.
ParquetOutputCommitter vs DirectParquetOutputCommitter
By default, Spark uses the ParquetOutputCommitter that comes bundled with Hadoop. The default implementation first writes the data to a temp directory in S3 and once it finished successfully it renames the temp directory to the final location. This is slow by design when you work with an inconsistent object storage like S3 where “rename” is a very costly operation. So the recommendation here is to use Spark’s own DirectParquetOutputCommitter which writes directly to S3 and hence the extra “rename” operation is saved. There are 2 catches here:
- Spark Speculation – If you have speculation turned on for your jobs, then this configuration is ignored and the default output committer is used. This is because if a task fails due to some reason and is retried again then and the committer is writing directly to S3 then there is no way for Spark to clean up the data already written and it ends up creating multiple copies of same data. On the other hand, the default committer works by writing to HDFS first and once the task is successful then only perform the copy to s3 making it’s easy for Spark to perform the cleanup in case of task failure.
- If you are already using Spark 2.0 then you are out of luck and stuck with the default one as Spark 2.0 removes the support for DirectParquetOutputCommitter totally as it is suggested that the direct committer is not safe when there is a network partition, e.g. Spark driver might not be aware of a task that’s running on the executor & hence could result in data loss.
So what to do now?
Turns out the if one of these constraints is valid for your use-case, the best you could do is to use the default ParquetOutputCommiter & instead tune some other configuration parameters that affect the whole read-write process.
- Disable Metadata Caching – By default the value of spark.sql.parquet.cacheMetadata is set to true, which means that Spark will cache the metadata for parquet as schema discovery can be expensive if performed every time we want to read the data. But this also means that every time you append something, howsoever small, to an existing parquet file – it will perform schema discovery again for the new file and update the cache. While this is effective for reading/querying static data but for most of the write intensive use-cases you could achieve some gain by disabling the metadata cache.
- Disable Writing Summary Files – As schema merging for Parquet is disabled by default since Spark 1.5.0 and without schema merging creating summary files while writing parquet is not really useful as without schema merging Spark assumes all parquet part files to have an identical schema and the footer could be read from any part file. More importantly, writing summary files can be expensive because footers of all part-files must be read and merged. Due to these two reasons, starting with Spark 2.0 writing Summary files for Parquet has been disabled by default but if you are on 1.6.1 or below you need to turn it off yourself by setting the property parquet.enable.summary-metadata to false. Note that this is a Hadoop configuration and not Spark config so it needs to be set via hadoop’s core-site.xml
Some other workarounds worth mentioning:
- Prefer using s3a over s3 to access and store data as s3a provides better performance than the block-based layout that s3 provides. Although if you are using EMR then s3 works fine.
- Increase the Hadoop’s FileOutputCommiter version to 2 if not already. Reason could be found here. Since this is a hadoop configuration so it must be set accordingly: either via core-site.xml
1234<property><name>mapreduce.fileoutputcommitter.algorithm.version</name><value>2</value></property>
or via calling hadoopConfiguration method on sparkContext
1sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") - Create your directory structure on S3 such that they don’t have deep directory trees with only a few files per directory – i.e. prefer having shallow trees with many files
At the time of this writing, there is active work going around improving consistency for S3A which will be the foundation for an output committer to handle speculative commits and bypass the rename. But until the fix is available in a Hadoop distribution and Spark is modified to work with it, it looks like we have to live with these workarounds.
If you have any additional workarounds that could help us achieve better speeds, kindly drop them in comments.