Node.js Streams

Learn to process data efficiently with streams. Handle large files, HTTP requests, and real-time data without loading everything into memory.

Streams are one of the fundamental concepts in Node.js. They allow you to process data piece by piece without loading everything into memory.

Types of Streams

  • **Readable**: Sources of data (files, HTTP requests)
  • **Writable**: Destinations for data (files, HTTP responses)
  • **Duplex**: Both readable and writable (TCP sockets)
  • **Transform**: Modify data as it passes through (compression)

Why Use Streams?

Streams are memory efficient and time efficient. Start processing data before it's fully available, and process files larger than your available memory.

Piping

The pipe method connects streams together. Data flows from readable to writable automatically with backpressure handling.

Stream Events

Streams emit events like 'data', 'end', 'error'. Understanding these is key to working with streams effectively.

Code Examples

Basic Readable Stream

readable.js
const { Readable } = require('stream');

// Create custom readable stream
class CounterStream extends Readable {
  constructor(max) {
    super();
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current < this.max) {
      this.push(String(this.current++) + '\n');
    } else {
      this.push(null); // Signal end of stream
    }
  }
}

const counter = new CounterStream(10);
counter.pipe(process.stdout);

Transform Stream

transform.js
const { Transform, pipeline } = require('stream');
const fs = require('fs');

// Create transform stream that uppercases text
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// Use with pipeline for proper error handling
pipeline(
  fs.createReadStream('input.txt'),
  new UppercaseTransform(),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Async Iteration

async-stream.js
const fs = require('fs');
const readline = require('readline');

async function processLargeFile(filePath) {
  const fileStream = fs.createReadStream(filePath);

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  let lineCount = 0;
  for await (const line of rl) {
    // Process each line
    if (line.includes('error')) {
      console.log('Error found on line', lineCount, ':', line);
    }
    lineCount++;
  }

  console.log('Total lines:', lineCount);
}

processLargeFile('server.log').catch(console.error);

Frequently Asked Questions

What is backpressure in streams?

Backpressure occurs when data is being written faster than it can be consumed. Node.js streams handle this automatically through the pipe method, pausing the readable stream when the writable buffer is full.

Should I use stream events or async iteration?

Async iteration (for await...of) is cleaner and handles errors automatically. Use it for most cases. Raw events give more control but require manual error handling.

How do I handle errors in piped streams?

Use the pipeline function from stream/promises instead of pipe. It properly handles errors and cleans up all streams if one fails. The older pipe method doesn't propagate errors.

Need Node.js Help?

Slashdev.io builds production-ready Node.js applications for businesses of all sizes.

Get in Touch