Rolling history logs in Spark History UI

Versions: Apache Spark 3.5.0

Stream processing is great but it brings some gotchas that are not obvious. Logs are one of them.

The issue with logs in the streaming context is related to the long-running character of a streaming job. After all, it's supposed to never stop, so are the generated log entries. If you don't have a way to reduce this volume that, let's be honest, will never be useful in 100%, you're in trouble.

I remember one of my first data engineering contracts. We've been running a Spark Streaming [yes, no Structured Streaming!] job on AWS EMR with YARN, and the job failed one day, without any prior infrastructure- or application-related change. Turns out, the disk was full because we didn't define any logs retention policy. Lessons learned, I never ever set up a cluster without taking care of this logs retention aspect.

If you want to be better than me back then, this blog post should be helpful as it explains how Apache Spark rolls the logs to avoid indefinite storage when Spark History is enabled. The feature may also help improve the Spark History UI responsiveness by reducing the amount of logs to parse, hence by reducing the parsing time. Curious to see what this feature is? Let's deep dive!

Rolling event log writer

Apache Spark 3.0.0 has got a new configuration option, disabled by default, called spark.eventLog.rolling.enabled. In that configuration, Apache Spark initializes a SingleEventLogFileWriter that, as per the Scaladoc:

/**
 * The writer to write event logs into single file.
 */

Consequently, your streaming logs will grow. An alternative proposed by the rolling logs configuration is the RollingEventLogFilesWriter. You can already grasp some details from the Scaladoc:

 * The writer to write event logs into multiple log files, rolled over via configured size.
// ...
The name of directory and files in the directory would follow:
 *
 * - The name of directory: eventlog_v2_appId(_[appAttemptId])
 * - The prefix of name on event files: events_[index]_[appId](_[appAttemptId])(.[codec])
 *   - "index" would be monotonically increasing value (say, sequence)
 * - The name of metadata (app. status) file name: appstatus_[appId](_[appAttemptId])(.inprogress)
 *

How does it work? Besides the boolean flag, you need to configure the size for each log file in the spark.eventLog.rolling.maxFileSize property. Now whenever the accumulated event logs reach this threshold, the RollingEventLogFilesWriter creates a new file and flushes there all buffered events:

// RollingEventLogFilesWriter

def getEventLogFilePath(appLogDir: Path, appId: String, appAttemptId: Option[String], index: Long, codecName: Option[String]): Path = {
  val base = s"${EVENT_LOG_FILE_NAME_PREFIX}${index}_" +
  EventLogFileWriter.nameForAppAndAttempt(appId, appAttemptId)
  val codec = codecName.map("." + _).getOrElse("")
  new Path(appLogDir, base + codec)
}

private[history] def rollEventLogFile(): Unit = {
  closeWriter()

  index += 1
  currentEventLogFilePath = getEventLogFilePath(logDirForAppPath, appId, appAttemptId, index, compressionCodecName)

  initLogFile(currentEventLogFilePath) { os =>
    countingOutputStream = Some(new CountingOutputStream(os))
    new PrintWriter(
      new OutputStreamWriter(countingOutputStream.get, StandardCharsets.UTF_8))
  }
}

Cleaning

However, the rolling itself doesn't reduce the log size. It'll only split event log files into multiple smaller ones. To reduce the logs size in for streaming applications in Spark History you also need to configure the spark.history.fs.eventLog.rolling.maxFilesToRetain that by default is defined as:

  private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = ConfigBuilder("spark.history.fs.eventLog.rolling.maxFilesToRetain")
  .doc("The maximum number of event log files which will be retained as non-compacted. By default, all event log files will be retained. Please set the configuration " +
    s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control the overall size of event log files.")
  .version("3.0.0")
  .intConf
  .checkValue(_ > 0, "Max event log files to retain should be higher than 0.")
  .createWithDefault(Integer.MAX_VALUE)

Remember, the MaxValue is 2 147 483 647, so you can still end up storing the full logs history. Therefore, you will also need to change this value. This value is used by a EventLogFileCompactor to compact the event logs. What does it mean, "compact" the event logs? No, it's not about the compression. The compaction consists of removing all irrelevant information, such as completed jobs. The implementation is very smart as it uses the...listeners that adds or removes the aforementioned completed components while the compactor replies the events, as shown below:

private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
  private val liveJobToStages = new mutable.HashMap[Int, Set[Int]]
  private val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
  private val stageToRDDs = new mutable.HashMap[Int, Set[Int]]
  private val _liveExecutors = new mutable.HashSet[String]

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
	totalJobs += 1
	totalStages += jobStart.stageIds.length
	liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet
  }

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
	val stages = liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
	liveJobToStages -= jobEnd.jobId
	stageToTasks --= stages
	stageToRDDs --= stages
  }

After applying these filters, the compactor verifies whether the compaction should occur. It does it by measuring the filtered out events and triggering the rewriting. The filter outcome is compared with the minimal ratio set in the spark.history.fs.eventLog.rolling.compaction.score.threshold property. The method responsible for generating the ratio is just below:

private def calculateScore(stats: FilterStatistics): Double = {
  // For now it's simply measuring how many task events will be filtered out (rejected)
  // but it can be sophisticated later once we get more heuristic information and found
  // the case where this simple calculation doesn't work.
  (stats.totalTasks - stats.liveTasks) * 1.0 / stats.totalTasks
}

In the end, the compactor writes the filtered events into a dedicated log file and removes the raw event files.

So that's the story and configuration that might help you reduce the storage overhead in History UI. I'm aware that I took some shortcuts, especially when it comes to the logging itself. For that reason, I've already planned some follow-up blog posts on that matter! Not only for Apache Spark by the way, as it's worth seeing now, once I'm delving into details, this part in Apache Flink!