Piping Streams with pipe()
The pipe() method is the simplest way to move data from a readable stream into a writable stream. Instead of manually wiring up data events and calling write() yourself, pipe() connects the two streams and handles the flow automatically — including backpressure, so a fast source never overwhelms a slow destination. Understanding pipe() (and its limitations) is the foundation for working with files, network sockets, compression, and any other streaming workload in Node.js.
How pipe() works
readable.pipe(writable) attaches the writable as a consumer of the readable. Internally Node listens for data events on the source, writes each chunk to the destination, and pauses the source whenever the destination’s internal buffer fills up. When the destination drains, the source resumes. This means data flows at the pace the slowest stream can handle.
The method returns the destination stream, which is what makes chaining possible.
import { createReadStream, createWriteStream } from "node:fs";
const source = createReadStream("input.txt");
const destination = createWriteStream("output.txt");
source.pipe(destination);
destination.on("finish", () => {
console.log("Copy complete");
});
Output:
Copy complete
Compare this to the manual equivalent, which is far more verbose and easy to get wrong:
import { createReadStream, createWriteStream } from "node:fs";
const source = createReadStream("input.txt");
const destination = createWriteStream("output.txt");
source.on("data", (chunk) => {
// write() returns false when the buffer is full
const ok = destination.write(chunk);
if (!ok) {
source.pause();
destination.once("drain", () => source.resume());
}
});
source.on("end", () => destination.end());
pipe() collapses all of that into a single call.
Chaining multiple pipes
Because pipe() returns its destination, you can chain calls to build a processing pipeline. Each stage must be a duplex or transform stream (readable and writable) so it can act as both a destination for the previous stage and a source for the next.
A common example is reading a file, gzip-compressing it, and writing the result:
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
createReadStream("report.txt")
.pipe(createGzip())
.pipe(createWriteStream("report.txt.gz"))
.on("finish", () => console.log("Compressed report.txt -> report.txt.gz"));
Output:
Compressed report.txt -> report.txt.gz
Data flows left to right: the file stream feeds raw bytes into the gzip transform, which emits compressed bytes into the file writer. Backpressure propagates through the entire chain automatically.
Automatic backpressure handling
Backpressure is the mechanism that keeps memory usage flat when a producer is faster than a consumer. With pipe() you get it for free: if the writable side reports that its buffer is full (its write() returns false), pipe() pauses the readable side until the writable emits a drain event.
This is the single biggest reason to prefer pipe() (or pipeline()) over buffering an entire resource into memory. A 4 GB file copy with pipe() uses only a small, bounded buffer regardless of file size.
Tip: If you ever find yourself accumulating chunks into an array and joining them at the end, ask whether a pipe could do the job with constant memory instead.
The error-handling gap
The big caveat with pipe() is that it does not forward errors and it does not clean up streams when something goes wrong. If the source emits an error, the destination is left open; if the destination errors, the source keeps reading. Unhandled stream errors crash the process.
You must attach an error listener to every stream in the chain manually:
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
const source = createReadStream("missing.txt");
const gzip = createGzip();
const dest = createWriteStream("out.gz");
source.on("error", (err) => console.error("source:", err.message));
gzip.on("error", (err) => console.error("gzip:", err.message));
dest.on("error", (err) => console.error("dest:", err.message));
source.pipe(gzip).pipe(dest);
Output:
source: ENOENT: no such file or directory, open 'missing.txt'
Even with listeners attached, pipe() won’t destroy the other streams for you, which can leak file descriptors. This is exactly why Node introduced pipeline() — it pipes streams together and propagates errors and destroys every stream on failure or completion.
pipe() vs pipeline()
| Concern | pipe() | pipeline() |
|---|---|---|
| Backpressure | Handled | Handled |
| Error propagation | Manual, per stream | Centralized callback / promise |
| Cleanup on error | None (FD leaks) | Destroys all streams |
| Completion signal | finish / end events | Callback or await |
| Recommended for production | No | Yes |
For anything beyond a quick script, reach for pipeline() from node:stream (or its promise form):
import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
try {
await pipeline(
createReadStream("report.txt"),
createGzip(),
createWriteStream("report.txt.gz"),
);
console.log("Done");
} catch (err) {
console.error("Pipeline failed:", err.message);
}
Note: In CommonJS, swap the imports for
const { pipeline } = require("node:stream/promises")andconst { createReadStream } = require("node:fs"). The stream APIs are identical.
Best Practices
- Use
pipe()only for short scripts or experiments where error handling is not critical. - Prefer
pipeline()/stream.pipelinein production — it propagates errors and cleans up every stream. - Never forget an
errorlistener on each piped stream if you do usepipe(); unhandled errors crash the process. - Let
pipe()andpipeline()manage backpressure for you instead of buffering whole resources in memory. - Chain transform streams (gzip, hashing, parsing) between source and destination to build readable, composable pipelines.
- Listen for
finishon the final writable (orawaitthe pipeline promise) to know when the data has fully flushed.