Using Java Futures in Scala

Occasionally, you may need to interface with Java Futures in Scala. Unfortunately, it is “an abomination”. Several older Java libraries, like Amazon's SDK, provides asynchronous interfaces using Java's Futures. java.util.concurrent.Future only provides a .get, and no mechanism to react to completion. The best solution to avoid blocking a thread checking for completion (or simply .geting inside another thread) is to periodically check for completion.

Searching for solutions yields the above and an (outdated) Play dependant implementation. Here's a more simple solution using netty's HashWheelTimer. Please note that new HashWheelTimer creates a thread for the event loop, which will exist until you stop it. So, you likely only want one of these for your entire application. Keep it in a singleton instance. Also, you will want to tune the pollIntervalMs and the newTimeout polling setting. As written, the future will be resolved at worst ~100ms after the event; however, less resources are spent checking for completion than if the timeout was much lower. I've found that most IO tasks are not performance sensitive, so can happily have these delays.

import java.util.concurrent.{CancellationException, Future => JFuture, TimeUnit}
import org.jboss.netty.util.{Timeout, TimerTask, HashedWheelTimer}
import scala.concurrent.{Promise, Future}
import scala.util.Try

object Implicits {

  private val pollIntervalMs = 100L
  private val timer = new HashedWheelTimer(pollIntervalMs, TimeUnit.MILLISECONDS)

  implicit class JFutureHelpers[T](javaFuture: JFuture[T]) {
    def toScala: Future[T] = {
      val promise = Promise[T]()

      def checkCompletion(): Unit = {
        if (javaFuture.isCancelled) {
          promise.failure(new CancellationException())
        } else if (javaFuture.isDone) {
          promise.complete(Try(javaFuture.get))
        } else {
          scheduleTimeout()
        }
        ()
      }

      def scheduleTimeout(): Unit = {
        timer.newTimeout(new TimerTask {
          override def run(timeout: Timeout): Unit = checkCompletion()
        }, pollIntervalMs, TimeUnit.MILLISECONDS)
        ()
      }

      checkCompletion()
      promise.future
    }
  }

}


Stay connected

I send out occasional updates on posts, interesting finds, and projects I'm working on. I'd love to include you. No tracking, one-click unsubscribe.