Code Monkey home page Code Monkey logo

sus4s's Introduction

sus4s

A Direct-Style Scala Wrapper Around the Structured Concurrency of Project Loom

Dependency

The library is available on Maven Central. To use it, add the following dependency to your build.sbt files:

libraryDependencies += "in.rcard.sus4s" %% "core" % "0.0.2"

The library is only available for Scala 3.

Usage

The library provides a direct-style API for Project Loom's structured concurrency. It requires JDK 21 and Scala 3. Moreover, Java preview features must be enabled in the Scala compiler.

The main entry point is the sus4s package object. The following code snippet shows how to use the library:

import in.rcard.sus4s.sus4s.*

val result: Int = structured {
  val job1: Job[Int] = fork {
    Thread.sleep(1000)
    42
  }
  val job2: Job[Int] = fork {
    Thread.sleep(500)
    43
  }
  job1.value + job2.value
}
println(result) // 85

The structured method creates a new structured concurrency scope represented by the Suspend trait. It's built on the java.util.concurrent.StructuredTaskScope class. Hence, the threads forked inside the structured block are Java Virtual Threads.

The fork method creates a new Java Virtual Thread that executes the given code block. The fork method executes functions declared with the capability of suspend:

def findUserById(id: UserId): Suspend ?=> User

Coloring a function with the Suspend capability tells the caller that the function performs a suspendable operation, aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads.

A type alias is available for the Suspend capability:

type Suspended[A] = Suspend ?=> A

So, the above function can be rewritten as:

def findUserById(id: UserId): Suspended[User]

The structured function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the thread executing the block waits to complete all the forked tasks. The structured blocks terminate when:

  • all the forked tasks complete successfully
  • one of the forked tasks throws an exception
  • the block throws an exception

The Job Class

Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented by the Job class. The Job class provides the value method that waits for the completion of the virtual thread and returns the result of the function:

val job1: Job[Int] = fork {
  Thread.sleep(1000)
  42
}
val meaningOfLife: Int = job1.value

If you're not interested in the result of the function, you can use the join method:

val job1: Job[Int] = fork {
  Thread.sleep(1000)
  println("The meaning of life is 42")
}
job1.join()

The structured function is entirely transparent to any exception thrown by the block or forked tasks.

Canceling a Job

Canceling a job is possible by calling the cancel method on the Job instance. The following code snippet shows how:

val queue = new ConcurrentLinkedQueue[String]()
val result = structured {
  val job1 = fork {
    val innerCancellableJob = fork {
      while (true) {
        Thread.sleep(2000)
        queue.add("cancellable")
      }
    }
    Thread.sleep(1000)
    innerCancellableJob.cancel()
    queue.add("job1")
  }
  val job = fork {
    Thread.sleep(500)
    queue.add("job2")
    43
  }
  job.value
}
queue.toArray should contain theSameElementsInOrderAs List("job2", "job1")
result shouldBe 43

Cancellation is collaborative. In the above example, the job innerCancellableJob is marked for cancellation by the call innerCancellableJob.cancel(). However, the job is not immediately canceled. The job is canceled when it reaches the first point operation that can be interrupted by the JVM. Hence, cancellation is based on the concept of interruption. In the above example, the innerCancellableJob is canceled when it reaches the Thread.sleep(2000) operation. The job will never be canceled if we remove the Thread.sleep operation. A similar behavior is implemented by Kotlin coroutines (see Kotlin Coroutines - A Comprehensive Introduction / Cancellation for further details).

Cancelling a job follows the relationship between parent and child jobs. If a parent's job is canceled, all the children's jobs are canceled as well:

val expectedQueue = structured {
  val queue = new ConcurrentLinkedQueue[String]()
  val job1 = fork {
    val innerJob = fork {
      fork {
        Thread.sleep(3000)
        println("inner-inner-Job")
        queue.add("inner-inner-Job")
      }
      Thread.sleep(2000)
      println("innerJob")
      queue.add("innerJob")
    }
    Thread.sleep(1000)
    queue.add("job1")
  }
  val job = fork {
    Thread.sleep(500)
    job1.cancel()
    queue.add("job2")
    43
  }
  queue
}
expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")

Trying to get the value from a canceled job will throw an InterruptedException. However, joining a canceled job will not throw any exception.

You won't pay any additional cost for canceling a job. The cancellation mechanism is based on the interruption of the virtual thread. No new structured scope is created for the cancellation mechanism.

Contributing

If you want to contribute to the project, please do it! Any help is welcome.

Acknowledgments

This project is inspired by the Ox and the Unwrapped libraries.

sus4s's People

Contributors

baldram avatar rcardin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

Forkers

baldram

sus4s's Issues

Add custom error management to structured concurrency

Now, the structured construct responds only to exceptions. If any forked jobs throw an exception, the other jobs are immediately canceled, and the exception is bubbled up to the caller.

It should be possible to have the same behavior using a custom error management, like using Either[E, A] instead of exceptions.

Add a wrapper to the `Thread.sleep` function called `delay`

Add a wrapper to the Thread.sleep function. The wrapper should be available only inside a structured block, and classes from the package scala.concurrent.duration._ should be used as input.

Update the tests with the new delay function

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.