etl4s

etl4s

Powerful, whiteboard-style ETL.

import etl4s._

/* Define building blocks */
val fiveExtract = Extract(5)
val timesTwo    = Transform[Int, Int](_ * 2)
val plusFive    = Transform[Int, Int](_ + 5)
val exclaim     = Transform[Int, String](x => s"Result: $x!")
val consoleLoad = Load[String, Unit](println)
val dbLoad      = Load[String, Unit](x => println(s"[DB] $x"))

/* Compose with `andThen` */
val timesTwoPlusFive = timesTwo `andThen` plusFive

/* Stitch with ~> */
val pipeline = fiveExtract ~> timesTwoPlusFive ~> exclaim ~> (consoleLoad & dbLoad)

/* Run */
pipeline.unsafeRun()
// Result: 15!
// [DB] Result: 15!
import etl4s._

case class Env(path: String)

val load = Load[String, Unit].requires[Env] { env => data =>
  println(s"Writing to ${env.path}")
}

val pipeline = extract ~> transform ~> load

pipeline.provide(Env("s3://dev")).unsafeRun()
pipeline.provide(Env("s3://prod")).unsafeRun()
import etl4s._

val A = Node[String, String](identity)
  .lineage(name = "A", inputs = List("s1", "s2"), outputs = List("s3"))

val B = Node[String, String](identity)
  .lineage(name = "B", inputs = List("s3"), outputs = List("s4", "s5"))

Seq(A, B).toMermaid
graph LR
    classDef pipeline fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000
    classDef dataSource fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000

    A["A"]
    B["B"]
    s1(["s1"])
    s2(["s2"])
    s3(["s3"])
    s4(["s4"])
    s5(["s5"])

    s1 --> A
    s2 --> A
    A --> s3
    s3 --> B
    B --> s4
    B --> s5

    class A pipeline
    class B pipeline
    class s1,s2,s3,s4,s5 dataSource
import etl4s._

val process = Transform[List[Row], List[Row]] { rows =>
  Tel.addCounter("rows.processed", rows.size)
  Tel.setGauge("batch.size", rows.size.toDouble)
  rows.filter(_.isValid)
}

process.unsafeRun(rows)  // no-ops (zero cost)

implicit val t: Etl4sTelemetry = Prometheus()
process.unsafeRun(rows)  // metrics flowing

How it works
1
Import
Drop one file into your project. No dependencies, no framework lock-in.
2
Chain
Connect nodes with ~>, branch with &, inject config with .requires
3
Run
Call .unsafeRun(). Works in scripts, Spark, Flink, anywhere Scala runs.

Pipelines as values.

One file, zero dependencies. Lazy, composable, testable. Since pipelines are values, attach metadata, generate lineage diagrams, share them across teams.

// One import. That's it.
import etl4s._
val pipeline =
extract ~> transform ~> load

Type-safe composition.

Types must align or it won't compile. Misconnections are compile errors.

E[A, Int]
~>
T[Int, Str]
~>
L[Str, B]
won't compile

Dependency injection, inferred.

Nodes declare what they need. Chain freely. The compiler merges and infers the combined type.

Needs[Db]
~>
Needs[Api]
=
Needs[Db & Api]

Built-in tracing.

Shared execution state across pipeline nodes. Write logs, flag errors, react to upstream failures, track timing. Retrieve with .unsafeRunTrace().

E
log
~>
T
check
log
~>
L
check
Trace
logs "read 1420 rows" "validated 89"
time
0 1 2 3 4 ms
Trace[B] result: B logs: 2 errors: 1 time: 4ms

Runs anywhere.

JVM, JavaScript, WebAssembly, native binaries via LLVM. Same code, zero platform-specific APIs.

WA LLVM

Why etl4s?

Chaotic, framework-coupled ETL codebases drive dev teams to their knees. etl4s lets you structure your code as clean, typed graphs of pure functions.

(~>) is just *chef's kiss*. There are so many synergies here, haven't pushed for something this hard in a while.
Sr Engineering Manager, Instacart

...the advantages of full blown effect systems without the complexities, and awkward monad syntax!
u/RiceBroad4552