Powerful, whiteboard-style ETL.
Used in production at Instacart
Source · Try online · Docs
import etl4s._
/* Define building blocks */
val fiveExtract = Extract(5)
val timesTwo = Transform[Int, Int](_ * 2)
val plusFive = Transform[Int, Int](_ + 5)
val exclaim = Transform[Int, String](x => s"Result: $x!")
val consoleLoad = Load[String, Unit](println)
val dbLoad = Load[String, Unit](x => println(s"[DB] $x"))
/* Compose with `andThen` */
val timesTwoPlusFive = timesTwo `andThen` plusFive
/* Stitch with ~> */
val pipeline =
fiveExtract ~> timesTwoPlusFive ~> exclaim ~> (consoleLoad & dbLoad)
/* Run */
pipeline.unsafeRun()
// Result: 15!
// [DB] Result: 15!
import etl4s._
case class Env(path: String)
val load = Load[String, Unit].requires[Env] { env => data =>
println(s"Writing to ${env.path}")
}
val pipeline = extract ~> transform ~> load
pipeline.provide(Env("s3://dev")).unsafeRun()
pipeline.provide(Env("s3://prod")).unsafeRun()
import etl4s._
val A = Node[String, String](identity)
.lineage(name = "A", inputs = List("s1", "s2"), outputs = List("s3"))
val B = Node[String, String](identity)
.lineage(name = "B", inputs = List("s3"), outputs = List("s4", "s5"))
Seq(A, B).toMermaid
One file, zero dependencies. Lazy, composable, testable. Since pipelines are values, attach metadata, generate lineage diagrams, share them across teams.
Types must align or it won't compile. Misconnections are compile errors.
Nodes declare what they need. Chain freely. The compiler merges and infers the combined type.
Shared execution state across pipeline nodes. Write logs, flag errors, react to upstream failures, track timing. Retrieve with .unsafeRunTrace().
JVM, JavaScript, WebAssembly, native binaries via LLVM. Same code, zero platform-specific APIs.
LLVM
Chaotic, framework-coupled ETL codebases drive dev teams to their knees. etl4s lets you structure your code as clean, typed graphs of pure functions.