HIGHLAND

The high-level streams library for Node.js and the browser.

$ npm install highland

Download Download (minified) Source code

Download Download (minified) Source code

Introduction

Re-thinking the JavaScript utility belt, Highland manages synchronous and asynchronous code easily, using nothing more than standard JavaScript and Node-like streams. You may be familiar with Promises, EventEmitters and callbacks, but moving between them is far from seamless. Thankfully, there exists a deeper abstraction which can free our code. By updating the tools we use on Arrays, and applying them to values distributed in time instead of space, we can discard plumbing and focus on the important things. With Highland, you can switch between synchronous and asynchronous data sources at will, without having to re-write your code. Time to dive in!

Made by @caolan, with help and patience from friends - Leave a tip or fork this :)

Usage in the browser

Highland can be used both in Node.js and in the browser. When you install the highland package, you will find a dist/highland.js file in the package hierarchy. This file has been prepared with browserify in order to bring a browser-ready version of highland.

Simply load this script with a <script> tag, and a highland variable will be made available in the global javascript scope.

If you prefer using highland under the name _ like is done in the examples below, you can then simply use:

var _ = highland

You can also integrate highland in your own browserify bundle using a classical browserify workflow, but this is beyond the scope of this documentation.

Supported Platforms

Highland supports Node, IE8+, and recent versions of evergreen browsers (i.e., Firefox, Chrome, etc).

Examples

Converting to/from Highland Streams

_([1, 2, 3, 4]).toArray(function (xs) {
    console.log(xs); // => [1, 2, 3, 4]
});

Mapping over a Stream

_([1, 2, 3, 4]).map(function (x) {
    return x * 2;
}).toArray(function (xs) {
    console.log(xs); // => [2, 4, 6, 8]
});

Reading files in parallel (4 at once). See the Async section or the method's docs for an explanation of wrapCallback

var readFile = _.wrapCallback(fs.readFile);
var filenames = ['file1', 'file2', 'file3', 'file4', 'file5', 'file6']
var data = _(filenames) // Creates a stream from an array of filenames
    .map(readFile)      // Maps each filename into a Highland stream that reads from that file.
    .parallel(4)        // Reads from 4 files at once and merge the results in order.
    .each(function (fileContent) {
        // Consumes the data.
        // fileContent is a Buffer.
        console.log(fileContent.toString());
    });

Piping to a Node Stream

var data = _([1, 2, 3, 4]).map(function (x) {
    return x + '\n';
});
var output = fs.createWriteStream('output');

// Writes 1, 2, 3, 4 to a file, one on each line.
data.pipe(output);

Piping in data from Node Streams

function isBlogPost(doc) {
    return doc.type === 'blogpost';
}

var output = fs.createWriteStream('output');
var docs = new db.createReadStream();

// Wrap a node stream and pipe to file
_(docs).filter(isBlogpost).pipe(output);

// or, pipe in a node stream directly:
// useful if you need a TransformStream-like object for external APIs.
var transformStream = _.pipeline(_.filter(isBlogpost));
docs.pipe(transformStream).pipe(output);

Handling errors

_(fs.createReadStream('I do not exist.'))
    .errors(function (err, push) {
        console.log('Caught error:', err.message);

        // You may also rethrow the error by calling
        // push(err);

        // or replace it with another error
        // push(new Error('oops'));

        // or even replace it with a value
        // push(null, 'We got an error.')
    }).toArray(function (xs) {
        // Array is empty because the stream couldn't read from the file.
        console.log('Data:', xs); // => []
    });

Handling events

var clicks = _('click', btn).map(1);
var counter = clicks.scan(0, _.add);

counter.each(function (n) {
    $('#count').text(n);
});

Arrays

To work with data in Arrays, just wrap it in _(). The Highland methods are then available on it:

function toUpperCase(string) {
    return string.toUpperCase();
});
var shouty = _(['foo', 'bar', 'baz']).map(toUpperCase); // => 'FOO' 'BAR' 'BAZ'

These methods return Stream objects, not Arrays, so you can chain together method calls:

_(['foo', 'bar', 'baz']).map(toUpperCase).map(function (x) {
    return {name: x};
}); // => {name: 'FOO'}, {name: 'BAR'}, {name: 'BAZ'}

When using the Highland APIs there is little reason to turn this back into an Array, but if you're calling an outside library you may need to convert it back:

_(['foo', 'bar', 'baz']).map(toUpperCase).toArray(function (xs) {
    console.log(xs); // => ['FOO', 'BAR', 'BAZ]
});

Passing a function to the toArray call may seem a little unfamiliar, but this enables an important trick in Highland. Now, without changing any of your existing code, you could swap out ['foo', 'bar', 'baz'] for an asynchronous data source, and it would just work!

You can also pass Arrays into the top-level functions instead of using methods on the Stream object:

_.map(doubled, [1, 2, 3, 4])  // => 2, 4, 6, 8

Note, this still returns a Stream.

Async

Now, let's see how we might swap out an Array source for an asynchronous one. By passing a function to the Stream constructor we can manually push values onto the Stream:

function readFile(filename) {
    // create a new Stream
    return _(function (push, next) {
        // do something async when we read from the Stream
        fs.readFile(filename, function (err, data) {
            push(err, data);
            push(null, _.nil);
        });
    });
};

First, we return a new Stream which when read from will read a file (this is called lazy evaluation). When fs.readFile calls its callback, we push the error and data values onto the Stream. Finally, we push _.nil onto the Stream. This is the "end of stream" marker and will tell any consumers of this stream to stop reading.

Since wrapping a callback is a fairly common thing to do, there is a convenience function:

var readFile = _.wrapCallback(fs.readFile);

Now we have a new asynchronous source, we can run the exact same code from the Array examples on it:

readFile('myfile').map(toUpperCase).map(function (x) {
    return {name: x};
});

With Highland, we really can have one language to work with both synchronous and asynchronous data, whether it's from a Node Stream, an EventEmitter, a callback or an Array. You can even wrap ES6 or jQuery promises:

var foo = _($.getJSON('/api/foo'));

Laziness

When you call map in Highland, it doesn't go off and immediately map over all your data. Rather it defines your intention, and the hard work occurs as you pull data from the Stream. This is lazy evaluation and it's what enables Highland to manage back-pressure and also the sequencing of asynchronous actions, such as reading from a file.

var calls = 0;

var nums = _(['1', '2', '3']).map(function (x) {
    calls++;
    return Number(x);
});

// calls === 0

To get the map iterator to be called, we must consume the stream. A number of Highland methods will do so (e.g., each, done, apply, toArray, pipe, resume). A stream may only be consumed once and consuming an already-consumed stream will result in undefined behavior.

nums.each(function (n) { console.log(n); });

// calls === 3

Equally, when we tell Highland to map a Stream of filenames to the `readFile` function, it doesn't actually go and read all the files at once, it let's us decide on how we want to read them:

var readFile = _.wrapCallback(fs.readFile);
filenames.map(readFile).series();     // Reads one file at a time.
filenames.map(readFile).parallel(10); // Reads 10 files at a time.

Back-pressure

Since Highland is designed to play nicely with Node Streams, it also support back-pressure. This means that a fast source will not overwhelm a slow consumer.

fastSource.map(slowTransform)

In the above example, fastSource will be paused while slowTransform does its processing.

Some streams (such as those based on events) cannot be paused. In these cases data is buffered until the consumer is ready to handle it. If you expect a non-pausable source to be consumed by a slow consumer, then you should use methods such as throttle or latest to selectively drop data and regulate the flow.

Occasionally, you'll need to split Streams in your program. At this point, Highland will force you to choose between sharing back-pressure with the new consumer, or letting the existing consumer regulate backpressure and have the new consumer simply observe values as they arrive. Attempting to add two consumers to a Stream without calling fork or observe will throw an error.

// shared back-pressure
// transform1 and transform2 will operate in lock-step.
source.map(transform1);
source.fork().map(transform2);

// Let the first handle backpressure and the second simply observe.
// Only transform1 will slow down the source. If transform2 is slow,
// data from the source will be buffered.
source.map(transform1);
source.observe().map(transform2);

Currying

As well as calling functions as methods on the Stream object, Highland also exports them at the top-level.

mystream.map(doubled)
// is equivalent to
_.map(doubled, mystream)

By convention, all top-level functions are "curryable", meaning you can partially apply their arguments. In the above example, this could be called as:

_.map(doubled)(mystream);

Warning: curried transforms do not respect optional arguments. You must pass every declared argument to the function, using null for arguments that you don't want to provide. For example, stream.slice(1) is equivalent to _.slice(1, null)(stream) and not _.slice(1)(stream).

In real-world use, this means you can define the behaviour you'd like before knowing what stream you'd like to perform it on:

// partially apply the filter() function to create a new function
var getBlogposts = _.filter(function (doc) {
    return doc.type === 'blogpost';
});

// now we can use the new function by completing its arguments
getBlogposts(data); // => new Stream of blogposts

Using seq, you can even define complex transformations.

var getBlogContents = _.seq(
    _.filter(function (doc) { return doc.type === 'blogpost'; }),
    _.map(function (doc) { return doc.contents; })
);

You can curry your own functions too:

var myCurryableFn = _.curry(fn);

Events

Streams can be used to handle events as well as data, control-flow and error propagation. This is often a convenient way to filter and combine events into groups, a common goal on dynamically updated sites.

var inbox = _('message', client).where({recipient: 'me'});

If you expect to receive a lot of events, and perform an async process on each of them, then you should sample the events instead of buffering all of them.

// get a frequent event source
var text = _('keyup', $('#searchbox'));

// Regulate event stream:
// - wait until no keyup events for 1s
// - when read from, only return the latest value
var searches = text.debounce(1000).latest();

// map the search events to an AJAX request
var results = searches.map(searchRequest);

// for each response, display it
results.each(function (result) {
    // display result
});

Stream Objects

_(source)

The Stream constructor, accepts an array of values or a generator function as an optional argument. This is typically the entry point to the Highland APIs, providing a convenient way of chaining calls together.

Arrays - Streams created from Arrays will emit each value of the Array and then emit a nil value to signal the end of the Stream.

Generators - These are functions which provide values for the Stream. They are lazy and can be infinite, they can also be asynchronous (for example, making a HTTP request). You emit values on the Stream by calling push(err, val), much like a standard Node.js callback. Once it has been called, the generator function will not be called again unless you call next(). This call to next() will signal you've finished processing the current data and allow for the generator function to be called again. If the Stream is still being consumed the generator function will then be called again.

You can also redirect a generator Stream by passing a new source Stream to read from to next. For example: next(other_stream) - then any subsequent calls will be made to the new source.

Node Readable Stream - Pass in a Node Readable Stream object to wrap it with the Highland API. Reading from the resulting Highland Stream will begin piping the data from the Node Stream to the Highland Stream.

A stream constructed in this way relies on Readable#pipe to end the Highland Stream once there is no more data. Not all Readable Streams do this. For example, IncomingMessage will only emit close when the client aborts communications and will not properly call end. In this case, you can provide an optional onFinished function with the signature onFinished(readable, callback) as the second argument.

This function will be passed the Readable and a callback that should called when the Readable ends. If the Readable ended from an error, the error should be passed as the first argument to the callback. onFinished should bind to whatever listener is necessary to detect the Readable's completion. If the callback is called multiple times, only the first invocation counts. If the callback is called after the Readable has already ended (e.g., the pipe method already called end), it will be ignored.

The onFinished function may optionally return one of the following:

  • A cleanup function that will be called when the stream ends. It should unbind any listeners that were added.
  • An object with the following optional properties:
    • onDestroy - the cleanup function.
    • continueOnError - Whether or not to continue the stream when an error is passed to the callback. Set this to true if the Readable may continue to emit values after errors. Default: false.

See this issue for a discussion on why Highland cannot reliably detect stream completion for all implementations and why the onFinished function is required.

EventEmitter / jQuery Elements - Pass in both an event name and an event emitter as the two arguments to the constructor and the first argument emitted to the event handler will be written to the new Stream.

You can pass a mapping hint as the third argument, which specifies how event arguments are pushed into the stream. If no mapping hint is provided, only the first value emitted with the event to the will be pushed onto the Stream.

If mappingHint is a number, an array of that length will be pushed onto the stream, containing exactly that many parameters from the event. If it's an array, it's used as keys to map the arguments into an object which is pushed to the tream. If it is a function, it's called with the event arguments, and the returned value is pushed.

Promise - Accepts an ES6 / jQuery style promise and returns a Highland Stream which will emit a single value (or an error). In case you use bluebird cancellation Highland Stream will be empty for a cancelled promise.

Iterator - Accepts an ES6 style iterator that implements the iterator protocol: yields all the values from the iterator using its next() method and terminates when the iterator's done value returns true. If the iterator's next() method throws, the exception will be emitted as an error, and the stream will be ended with no further calls to next().

Iterable - Accepts an object that implements the iterable protocol, i.e., contains a method that returns an object that conforms to the iterator protocol. The stream will use the iterator defined in the Symbol.iterator property of the iterable object to generate emitted values.

Parameters
  • source - Array | Function | Iterator | Iterable | Promise | Readable Stream | String - (optional) source to take values from from
  • onFinished - Function - (optional) a function that detects when the readable completes. Second argument. Only valid if source is a Readable.
  • eventEmitter - EventEmitter | jQuery Element - (optional) An event emitter. Second argument. Only valid if source is a String.
  • mappingHint - Array | Function | Number - (optional) how to pass the arguments to the callback. Only valid if source is a String.
// from an Array
_([1, 2, 3, 4]);

// using a generator function
_(function (push, next) {
    push(null, 1);
    push(err);
    next();
});

// a stream with no source, can pipe node streams through it etc.
var through = _();

// wrapping a Node Readable Stream so you can easily manipulate it
_(readable).filter(hasSomething).pipe(writeable);

// wrapping a Readable that may signify completion by emitting `close`
// (e.g., IncomingMessage).
_(req, function (req, callback) {
    req.on('end', callback)
        .on('close', callback)
        .on('error', callback);

    return function () {
        req.removeListener('end', callback);
        req.removeListener('close', callback);
        req.removeListener('error', callback);
    };
}).pipe(writable);

// wrapping a Readable that may emit values after errors.
_(req, function (req, callback) {
    req.on('error', callback);

    return {
        onDestroy: function () {
            req.removeListener('error', callback);
        },
        continueOnError: true
    };
}).pipe(writable);

// creating a stream from events
_('click', btn).each(handleEvent);

// creating a stream from events with a mapping array
_('request', httpServer, ['req', 'res']).each(handleEvent);
//=> { req: IncomingMessage, res: ServerResponse }

// creating a stream from events with a mapping function
_('request', httpServer, function(req, res) {
    return res;
}).each(handleEvent);
//=> IncomingMessage

// from a Promise object
var foo = _($.getJSON('/api/foo'));

//from an iterator
var map = new Map([['a', 1], ['b', 2]]);
var bar = _(map.values()).toArray(_.log);
//=> [1, 2]

//from an iterable
var set = new Set([1, 2, 2, 3, 4]);
var bar = _(set).toArray(_.log);
//=> [ 1, 2, 3, 4]
Stream.destroy()

Destroys a stream by unlinking it from any consumers and sources. This will stop all consumers from receiving events from this stream and removes this stream as a consumer of any source stream.

This function calls end() on the stream and unlinks it from any piped-to streams.

Stream.end()

Ends a Stream. This is the same as sending a nil value as data. You shouldn't need to call this directly, rather it will be called by any Node Readable Streams you pipe in.

Only call this function on streams that were constructed with no source (i.e., with _()).

mystream.end();
Stream.pause()

Pauses the stream. All Highland Streams start in the paused state.

It is unlikely that you will need to manually call this method.

var xs = _(generator);
xs.pause();
Stream.resume()

Resumes a paused Stream. This will either read from the Stream's incoming buffer or request more data from an upstream source. Never call this method on a stream that has been consumed (via a call to consume or any other transform).

var xs = _(generator);
xs.resume();
Stream.write(x)

Writes a value to the Stream. If the Stream is paused it will go into the Stream's incoming buffer, otherwise it will be immediately processed and sent to the Stream's consumers (if any). Returns false if the Stream is paused, true otherwise. This lets Node's pipe method handle back-pressure.

You shouldn't need to call this yourself, but it may be called by Node functions which treat Highland Streams as a Node Writable Stream.

Only call this function on streams that were constructed with no source (i.e., with _()).

Parameters
  • x - the value to write to the Stream
var xs = _();
xs.write(1);
xs.write(2);
xs.end();

xs.toArray(function (ys) {
    // ys will be [1, 2]
});

// Do *not* do this.
var xs2 = _().toArray(_.log);
xs2.write(1); // This call is illegal.

Transforms

Stream.append(y)

Adds a value to the end of a Stream.

Parameters
  • y - the value to append to the Stream
_([1, 2, 3]).append(4)  // => 1, 2, 3, 4
Stream.batch(n)

Takes one Stream and batches incoming data into arrays of given length

Parameters
  • n - Number - length of the array to batch
_([1, 2, 3, 4, 5]).batch(2)  // => [1, 2], [3, 4], [5]
Stream.batchWithTimeOrCount(ms, n)

Takes one Stream and batches incoming data within a maximum time frame into arrays of a maximum length.

Parameters
  • ms - Number - the maximum milliseconds to buffer a batch
  • n - Number - the maximum length of the array to batch
_(function (push) {
    push(1);
    push(2);
    push(3);
    setTimeout(push, 20, 4);
}).batchWithTimeOrCount(10, 2)

// => [1, 2], [3], [4]
Stream.collect()

Groups all values into an Array and passes down the stream as a single data event. This is a bit like doing toArray, but instead of accepting a callback and consuming the stream, it passes the value on.

_(['foo', 'bar']).collect().toArray(function (xs) {
    // xs will be [['foo', 'bar']]
});
Stream.compact()

Filters a Stream to drop all non-truthy values.

var compacted = _([0, 1, false, 3, null, undefined, 6]).compact();
// => 1, 3, 6
Stream.consume(f)

Consumes values from a Stream (once resumed) and returns a new Stream for you to optionally push values onto using the provided push / next functions.

This function forms the basis of many higher-level Stream operations. It will not cause a paused stream to immediately resume, but behaves more like a 'through' stream, handling values as they are read.

Parameters
  • f - Function - the function to handle errors and values
var filter = function (f, source) {
    return source.consume(function (err, x, push, next) {
        if (err) {
            // pass errors along the stream and consume next value
            push(err);
            next();
        }
        else if (x === _.nil) {
            // pass nil (end event) along the stream
            push(null, x);
        }
        else {
            // pass on the value only if the value passes the predicate
            if (f(x)) {
                push(null, x);
            }
            next();
        }
    });
};
Stream.debounce(ms)

Holds off pushing data events downstream until there has been no more data for ms milliseconds. Sends the last value that occurred before the delay, discarding all other values.

Implementation Note: This transform will will not wait the full ms delay to emit a pending value (if any) once it see a nil, as that guarantees that there will be no more values.

Parameters
  • ms - Number - the milliseconds to wait before sending data
function delay(x, ms, push) {
    setTimeout(function () {
        push(null, x);
    }, ms);
}

// sends last keyup event after user has stopped typing for 1 second
$('keyup', textbox).debounce(1000);

// A nil triggers the emit immediately
_(function (push, next) {
    delay(0, 100, push);
    delay(1, 200, push);
    delay(_.nil, 250, push);
}).debounce(75);
// => after 175ms => 1
// => after 250ms (not 275ms!) => 1 2
Stream.doto(f)

Creates a new Stream which applies a function to each value from the source and re-emits the source value. Useful when you want to mutate the value or perform side effects

Parameters
  • f - Function - the function to apply
var appended = _([[1], [2], [3], [4]]).doto(function (x) {
    x.push(1);
});

_([1, 2, 3]).doto(console.log)
// 1
// 2
// 3
// => 1, 2, 3
Stream.drop(n)

Acts as the inverse of take(n) - instead of returning the first n values, it ignores the first n values and then emits the rest. n must be of type Number, if not the whole stream will be returned. All errors (even ones emitted before the nth value) will be emitted.

Parameters
  • n - Number - integer representing number of values to read from source
_([1, 2, 3, 4]).drop(2) // => 3, 4
Stream.errors(f)

Extracts errors from a Stream and applies them to an error handler function. Returns a new Stream with the errors removed (unless the error handler chooses to rethrow them using push). Errors can also be transformed and put back onto the Stream as values.

Parameters
  • f - Function - the function to pass all errors to
getDocument.errors(function (err, push) {
    if (err.statusCode === 404) {
        // not found, return empty doc
        push(null, {});
    }
    else {
        // otherwise, re-throw the error
        push(err);
    }
});
Stream.filter(f)

Creates a new Stream that includes only the values that pass a truth test.

Parameters
  • f - Function - the truth test function
var evens = _([1, 2, 3, 4]).filter(function (x) {
    return x % 2 === 0;
});
Stream.find(f)

A convenient form of filter, which returns the first object from a Stream that passes the provided truth test.

Parameters
  • f - Function - the truth test function which returns a Stream
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

var f = function (x) {
    return x.type == 'blogpost';
};

_(docs).find(f);
// => {type: 'blogpost', title: 'foo'}

// example with partial application
var firstBlogpost = _.find(f);

firstBlogpost(docs)
// => {type: 'blogpost', title: 'foo'}
Stream.findWhere(props)

A convenient form of where, which returns the first object from a Stream that matches a set of property values. findWhere is to where as find is to filter.

Parameters
  • props - Object - the properties to match against
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

_(docs).findWhere({type: 'blogpost'})
// => {type: 'blogpost', title: 'foo'}

// example with partial application
var firstBlogpost = _.findWhere({type: 'blogpost'});

firstBlogpost(docs)
// => {type: 'blogpost', title: 'foo'}
Stream.group(f)

A convenient form of reduce, which groups items based on a function or property name

Parameters
  • f - Function | String - the function or property name on which to group, toString() is called on the result of a function.
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

var f = function (x) {
    return x.type;
};

_(docs).group(f); OR _(docs).group('type');
// => {
// =>    'blogpost': [{type: 'blogpost', title: 'foo'}, {type: 'blogpost', title: 'bar'}]
// =>    'comment': [{type: 'comment', title: 'foo'}]
// =>  }
Stream.intersperse(sep)

Creates a new Stream with the separator interspersed between the elements of the source.

intersperse is effectively the inverse of splitBy.

Parameters
  • sep - String - the value to intersperse between the source elements
_(['ba', 'a', 'a']).intersperse('n')  // => 'ba', 'n', 'a', 'n', 'a'
_(['mississippi']).splitBy('ss').intersperse('ss')  // => 'mi', 'ss', 'i', 'ss', 'ippi'
_(['foo']).intersperse('bar')  // => 'foo'
Stream.invoke(method, args)

Calls a named method on each object from the Stream - returning a new stream with the result of those calls.

Parameters
  • method - String - the method name to call
  • args - Array - the arguments to call the method with
_(['foo', 'bar']).invoke('toUpperCase', [])  // => 'FOO', 'BAR'

var readFile = _.wrapCallback(fs.readFile);
filenames.flatMap(readFile).invoke('toString', ['utf8']);
Stream.last()

Drops all values from the Stream apart from the last one (if any).

_([1, 2, 3, 4]).last()  // => 4
Stream.latest()

Creates a new Stream, which when read from, only returns the last seen value from the source. The source stream does not experience back-pressure. Useful if you're using a Stream to model a changing property which you need to query periodically.

// slowThing will always get the last known mouse position
// when it asks for more data from the mousePosition stream
mousePosition.latest().map(slowThing)
Stream.map(f)

Creates a new Stream of transformed values by applying a function to each value from the source. The transformation function can be replaced with a non-function value for convenience, and it will emit that value for every data event on the source Stream.

Deprecation warning: The use of the convenience non-function argument for map is deprecated and will be removed in the next major version.

Parameters
  • f - Function - the transformation function or value to map to
var doubled = _([1, 2, 3, 4]).map(function (x) {
    return x * 2;
});
Stream.nfcall(args)

Takes a Stream of callback-accepting node-style functions, wraps each one into a stream-returning function, calls them with the arguments provided, and returns the results as a Stream.

This can be used as a control flow shortcut and draws parallels with some control flow functions from async. A few rough correspondences include:

  • .nfcall([]).series() to async.series()
  • .nfcall([]).parallel(n) to async.parallelLimit(n)
  • .nfcall(args) to async.applyEach(..., args)
  • .nfcall(args).series() to async.applyEachSeries(..., args)
Parameters
  • args - Array - the arguments to call each function with
_([
  function (callback) {
    setTimeout(function () {
      callback(null, 'one');
    }, 200);
  },
  function (callback) {
    setTimeout(function () {
      callback(null, 'two');
    }, 100);
  }
]).nfcall([]).parallel(2).toArray(function (xs) {
  // xs is ['one', 'two'] even though second function had a shorter timeout
});

_([enableSearch, updateSchema]).nfcall(['bucket']).toArray(callback);
// does roughly the same as
async.applyEach([enableSearch, updateSchema], 'bucket', callback);

_([
  fs.appendFile,
  fs.appendFile
]).nfcall(['example.txt', 'hello']).series().toArray(function() {
  // example.txt now contains 'hellohello'
});
Stream.pick(properties)

Retrieves copies of all elements in the collection, with only the whitelisted keys. If one of the whitelisted keys does not exist, it will be ignored.

Parameters
  • properties - Array - property names to white filter
var dogs = [
     {breed: 'chihuahua', name: 'Princess', age: 5},
     {breed: 'labrador', name: 'Rocky', age: 3},
     {breed: 'german-shepherd', name: 'Waffles', age: 9}
];

_(dogs).pick(['breed', 'age']).toArray(function (xs) {
      // xs is now:
      [
          {breed: 'chihuahua', age: 5},
          {breed: 'labrador', age: 3},
          {breed: 'german-shepherd', age: 9}
      ]
});

_(dogs).pick(['owner']).toArray(function (xs) {
     // xs is now:
     [
         {},
         {},
         {}
     ]
});
Stream.pickBy(f)

Retrieves copies of all the elements in the collection that satisfy a given predicate. Note: When using ES3, only enumerable elements are selected. Both enumerable and non-enumerable elements are selected when using ES5.

Parameters
  • f - Function - the predicate function
 var dogs = [
     {breed: 'chihuahua', name: 'Princess', age: 5},
     {breed: 'labrador', name: 'Rocky', age: 3},
     {breed: 'german-shepherd', name: 'Waffles', age: 9}
 ];

 _(dogs).pickBy(function (key, value) {
     return value > 4;
 }).toArray(function (xs) {
   // xs is now:
   [
     { age: 5 },
     {},
     { age: 9 }
   ]
 });
Stream.pluck(property)

Retrieves values associated with a given property from all elements in the collection.

Parameters
  • prop - String - the property to which values should be associated
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'baz'}
];

_(docs).pluck('title').toArray(function (xs) {
   // xs is now ['foo', 'bar', 'baz']
});
Stream.ratelimit(num, ms)

Limits number of values through the stream to a maximum of number of values per window. Errors are not limited but allowed to pass through as soon as they are read from the source.

Parameters
  • num - Number - the number of operations to perform per window
  • ms - Number - the window of time to limit the operations in (in ms)
_([1, 2, 3, 4, 5]).ratelimit(2, 100);

// after 0ms => 1, 2
// after 100ms => 1, 2, 3, 4
// after 200ms => 1, 2, 3, 4, 5
Stream.reduce(memo, iterator)

Boils down a Stream to a single value. The memo is the initial state of the reduction, and each successive step of it should be returned by the iterator function. The iterator is passed two arguments: the memo and the next value.

If the iterator throws an error, the reduction stops and the resulting stream will emit that error instead of a value.

Note: The order of the memo and iterator arguments will be flipped in the next major version release.

Parameters
  • memo - the initial state of the reduction
  • iterator - Function - the function which reduces the values
var add = function (a, b) {
    return a + b;
};

_([1, 2, 3, 4]).reduce(0, add)  // => 10
Stream.reduce1(iterator)

Same as reduce, but uses the first element as the initial state instead of passing in a memo value.

Parameters
  • iterator - Function - the function which reduces the values
_([1, 2, 3, 4]).reduce1(add)  // => 10
Stream.reject(f)

The inverse of filter.

Parameters
  • f - Function - the truth test function
var odds = _([1, 2, 3, 4]).reject(function (x) {
    return x % 2 === 0;
});
Stream.scan(memo, iterator)

Like reduce, but emits each intermediate value of the reduction as it is calculated.

If the iterator throws an error, the scan will stop and the stream will emit that error. Any intermediate values that were produced before the error will still be emitted.

Note: The order of the memo and iterator arguments will be flipped in the next major version release.

Parameters
  • memo - the initial state of the reduction
  • iterator - Function - the function which reduces the values
_([1, 2, 3, 4]).scan(0, add)  // => 0, 1, 3, 6, 10
Stream.scan1(iterator)

Same as scan, but uses the first element as the initial state instead of passing in a memo value.

Parameters
  • iterator - Function - the function which reduces the values
_([1, 2, 3, 4]).scan1(add)  // => 1, 3, 6, 10
Stream.slice(start, end)

Creates a new Stream with the values from the source in the range of start (inclusive) to end (exclusive). start and end must be of type Number, if start is not a Number it will default to 0 and, likewise, end will default to Infinity: this could result in the whole stream being be returned.

Parameters
  • start - Number - integer representing index to start reading from source (inclusive)
  • stop - Number - integer representing index to stop reading from source (exclusive)
_([1, 2, 3, 4]).slice(1, 3) // => 2, 3
Stream.sort()

Collects all values together then emits each value individually but in sorted order. The method for sorting the elements is ascending lexical.

var sorted = _(['b', 'z', 'g', 'r']).sort().toArray(_.log);
// => ['b', 'g', 'r', 'z']
Stream.sortBy(f)

Collects all values together then emits each value individually in sorted order. The method for sorting the elements is defined by the comparator function supplied as a parameter.

The comparison function takes two arguments a and b and should return

  • a negative number if a should sort before b.
  • a positive number if a should sort after b.
  • zero if a and b may sort in any order (i.e., they are equal).

This function must also define a partial order. If it does not, the resulting ordering is undefined.

Parameters
  • f - Function - the comparison function
var sorts = _([3, 1, 4, 2]).sortBy(function (a, b) {
    return b - a;
}).toArray(_.log);

//=> [4, 3, 2, 1]
Stream.split()

splitBy over newlines.

_(['a\n', 'b\nc\n', 'd', '\ne']).split()  // => 'a', 'b', 'c', 'd', 'e'
_(['a\r\nb\nc']]).split()  // => 'a', 'b', 'c'
Stream.splitBy(sep)

Splits the source Stream by a separator and emits the pieces in between, much like splitting a string.

splitBy is effectively the inverse of intersperse.

Parameters
  • sep - String | RegExp - the separator to split on
_(['mis', 'si', 's', 'sippi']).splitBy('ss')  // => 'mi', 'i', 'ippi'
_(['ba', 'a', 'a']).intersperse('n').splitBy('n')  // => 'ba', 'a', 'a'
_(['foo']).splitBy('bar')  // => 'foo'
Stream.stopOnError(f)

Like the errors method, but emits a Stream end marker after an Error is encountered.

Parameters
  • f - Function - the function to handle an error
brokenStream.stopOnError(function (err) {
    //console.error('Something broke: ' + err);
});
Stream.take(n)

Creates a new Stream with the first n values from the source. n must be of type Number, if not the whole stream will be returned.

Parameters
  • n - Number - integer representing number of values to read from source
_([1, 2, 3, 4]).take(2) // => 1, 2
Stream.tap(f)

An alias for the doto method.

Parameters
  • f - Function - the function to apply
_([1, 2, 3]).tap(console.log)
Stream.throttle(ms)

Ensures that only one data event is push downstream (or into the buffer) every ms milliseconds, any other values are dropped.

Parameters
  • ms - Number - the minimum milliseconds between each value
_('mousemove', document).throttle(1000);
Stream.transduce(xf)

Applies the transformation defined by the the given transducer to the stream. A transducer is any function that follows the Transducer Protocol. See transduce-js for more details on what transducers actually are.

The result object that is passed in through the Transformer Protocol will be the push function provided by the consume transform.

Like scan, if the transducer throws an exception, the transform will stop and emit that error. Any intermediate values that were produced before the error will still be emitted.

Parameters
  • xf - Function - The transducer.
var xf = require('transducer-js').map(_.add(1));
_([1, 2, 3, 4]).transduce(xf);
// => 2, 3, 4, 5
Stream.uniq()

Filters out all duplicate values from the stream and keeps only the first occurence of each value, using === to define equality.

Like uniqBy, this transform needs to store a buffer containing all unique values that has been encountered. Be careful about using this transform on a stream that has many unique values.

var colors = [ 'blue', 'red', 'red', 'yellow', 'blue', 'red' ]

_(colors).uniq()
// => 'blue'
// => 'red'
// => 'yellow'
Stream.uniqBy(compare)

Filters out all duplicate values from the stream and keeps only the first occurence of each value, using the provided function to define equality.

Note:

  • Memory: In order to guarantee that each unique item is chosen only once, we need to keep an internal buffer of all unique values. This may outgrow the available memory if you are not cautious about the size of your stream and the number of unique objects you may receive on it.
  • Errors: The comparison function should never throw an error. However, if it does, this transform will emit an error for each all that throws. This means that one value may turn into multiple errors.
Parameters
  • compare - Function - custom equality predicate
var colors = [ 'blue', 'red', 'red', 'yellow', 'blue', 'red' ]

_(colors).uniqBy(function(a, b) { return a[1] === b[1]; })
// => 'blue'
// => 'red'
Stream.where(props)

A convenient form of filter, which returns all objects from a Stream which match a set of property values.

Parameters
  • props - Object - the properties to match against
var docs = [
    {type: 'blogpost', title: 'foo'},
    {type: 'blogpost', title: 'bar'},
    {type: 'comment', title: 'foo'}
];

_(docs).where({title: 'foo'})
// => {type: 'blogpost', title: 'foo'}
// => {type: 'comment', title: 'foo'}

// example with partial application
var getBlogposts = _.where({type: 'blogpost'});

getBlogposts(docs)
// => {type: 'blogpost', title: 'foo'}
// => {type: 'blogpost', title: 'bar'}

Higher-order Streams

Stream.concat(ys)

Concatenates a Stream to the end of this Stream.

Be aware that in the top-level export, the args may be in the reverse order to what you'd expect _([a], [b]) => b, a, as this follows the convention of other top-level exported functions which do x to y.

Parameters
  • ys - Stream | Array - the values to concatenate onto this Stream
_([1, 2]).concat([3, 4])  // => 1, 2, 3, 4
_.concat([3, 4], [1, 2])  // => 1, 2, 3, 4
Stream.flatFilter(f)

Filters using a predicate which returns a Stream. If you need to check against an asynchronous data source when filtering a Stream, this can be convenient. The Stream returned from the filter function should have a Boolean as its first value (all other values on the Stream will be disregarded).

Parameters
  • f - Function - the truth test function which returns a Stream
var checkExists = _.wrapCallback(fs.access);

filenames.flatFilter(checkExists)
Stream.flatMap(f)

Creates a new Stream of values by applying each item in a Stream to an iterator function which must return a (possibly empty) Stream. Each item on these result Streams are then emitted on a single output Stream.

This transform is functionally equivalent to .map(f).sequence().

Parameters
  • f - Function - the iterator function
var readFile = _.wrapCallback(fs.readFile);
filenames.flatMap(readFile)
Stream.flatten()

Recursively reads values from a Stream which may contain nested Streams or Arrays. As values or errors are encountered, they are emitted on a single output Stream.

_([1, [2, 3], [[4]]]).flatten();  // => 1, 2, 3, 4

var nums = _(
    _([1, 2, 3]),
    _([4, _([5, 6]) ])
);

nums.flatten();  // => 1, 2, 3, 4, 5, 6
Stream.fork()

Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will pull values, one at a time, from its source as only fast as the slowest consumer can handle them.

NOTE: Do not depend on a consistent execution order between the forks. This transform only guarantees that all forks will process a value foo before any will process a second value bar. It does not guarantee the order in which the forks process foo.

TIP: Be careful about modifying stream values within the forks (or using a library that does so). Since the same value will be passed to every fork, changes made in one fork will be visible in any fork that executes after it. Add to that the inconsistent execution order, and you can end up with subtle data corruption bugs. If you need to modify any values, you should make a copy and modify the copy instead.

Deprecation warning: It is currently possible to fork a stream after consuming it (e.g., via a transform). This will no longer be possible in the next major release. If you are going to fork a stream, always call fork on it.

function delay(x, ms) {
  return _((push) => {
    setTimeout(() => {
      push(null, x);
      push(null, _.nil);
    }, ms);
  });
}

const then = Date.now();

const source = _([1, 2, 3, 4])
    .tap((x) => console.log(`source: ${x} (${Date.now() - then})`));
const fork1 = source.fork().flatMap((x) => delay(x, 1000));
const fork2 = source.fork().flatMap((x) => delay(x, 2000));

// No values will be pulled from source until fork2 also starts consuming.
fork1.each((x) => console.log(`fork1 : ${x} (${Date.now() - then})`));

// Now both fork1 and fork2 will get values from source as fast as they both
// can process them.
fork2.each((x) => console.log(`fork2 : ${x} (${Date.now() - then})`));

// =>
// source: 1 (3)
// fork1 : 1 (1014)
// fork2 : 1 (2011)
// source: 2 (2011)
// fork1 : 2 (3012)
// fork2 : 2 (4012)
// source: 3 (4013)
// fork1 : 3 (5014)
// fork2 : 3 (6020)
// source: 4 (6020)
// fork1 : 4 (7024)
// fork2 : 4 (8032)
Stream.merge()

Takes a Stream of Streams and merges their values and errors into a single new Stream. The merged stream ends when all source streams have ended.

Note that no guarantee is made with respect to the order in which values for each stream end up in the merged stream. Values in the merged stream will, however, respect the order they were emitted from their respective streams.

var readFile = _.wrapCallback(fs.readFile);

var txt = _(['foo.txt', 'bar.txt']).map(readFile)
var md = _(['baz.md']).map(readFile)

_([txt, md]).merge();
// => contents of foo.txt, bar.txt and baz.txt in the order they were read
Stream.mergeWithLimit(n)

Takes a Stream of Streams and merges their values and errors into a single new Stream, limitting the number of unpaused streams that can running at any one time.

Note that no guarantee is made with respect to the order in which values for each stream end up in the merged stream. Values in the merged stream will, however, respect the order they were emitted from their respective streams.

Parameters
  • n - Number - the maximum number of streams to run in parallel
var readFile = _.wrapCallback(fs.readFile);

var txt = _(['foo.txt', 'bar.txt']).flatMap(readFile)
var md = _(['baz.md']).flatMap(readFile)
var js = _(['bosh.js']).flatMap(readFile)

_([txt, md, js]).mergeWithLimit(2);
// => contents of foo.txt, bar.txt, baz.txt and bosh.js in the order
// they were read, but bosh.js is not read until either foo.txt and bar.txt
// has completely been read or baz.md has been read
Stream.observe()

Observes a stream, allowing you to handle values as they are emitted, without adding back-pressure or causing data to be pulled from the source. Unlike forks, observers are passive. They don't affect each other or the source stream. They only observe data as the source stream emits them.

Observers will buffer data that it sees from the source, so an observer that cannot process the data as fast as the source produces it can end up consuming a lot of memory.

function delay(x, ms) {
  return _((push) => {
    setTimeout(() => {
      push(null, x);
      push(null, _.nil);
    }, ms);
  });
}

const then = Date.now();

const source = _([1, 2, 3, 4])
    .tap((x) => console.log(`source: ${x} (${Date.now() - then})`));
const obs = source.observe().flatMap((x) => delay(x, 1000));
const main = source.flatMap((x) => delay(x, 10));

// obs will not receive any data yet, since it is only a passive
// observer.
obs.each((x) => console.log(`obs   : ${x} (${Date.now() - then})`));

// Now both obs and main will receive data as fast as main can handle it.
// Even though since obs is very slow, main will still receive all of the
// source's data.
main.each((x) => console.log(`main  : ${x} (${Date.now() - then})`));

// =>
// source: 1 (3)
// main  : 1 (21)
// source: 2 (22)
// main  : 2 (33)
// source: 3 (37)
// main  : 3 (47)
// source: 4 (47)
// main  : 4 (57)
// obs   : 1 (1010)
// obs   : 2 (2012)
// obs   : 3 (3018)
// obs   : 4 (4052)
Stream.otherwise(ys)

Switches source to an alternate Stream if the current Stream is empty.

Parameters
  • ys - Stream | Function - alternate stream (or stream-returning function) to use if this stream is empty
_([1,2,3]).otherwise(['foo'])  // => 1, 2, 3
_([]).otherwise(['foo'])       // => 'foo'

_.otherwise(_(['foo']), _([1,2,3]))    // => 1, 2, 3
_.otherwise(_(['foo']), _([]))         // => 'foo'
Stream.parallel(n)

Takes a Stream of Streams and reads from them in parallel, buffering the results until they can be returned to the consumer in their original order.

Parameters
  • n - Number - the maximum number of concurrent reads/buffers
var readFile = _.wrapCallback(fs.readFile);
var filenames = _(['foo.txt', 'bar.txt', 'baz.txt']);

// read from up to 10 files at once
filenames.map(readFile).parallel(10);
_.pipeline(...)

Creates a 'Through Stream', which passes data through a pipeline of functions or other through Streams. This is particularly useful when combined with partial application of Highland functions to expose a Node-compatible Through Stream.

This is not a method on a Stream, and it only exposed at the top-level as _.pipeline. It takes an arbitrary number of arguments.

var through = _.pipeline(
    _.map(parseJSON),
    _.filter(isBlogpost),
    _.reduce(collectCategories)
    _.through(otherPipeline)
);

readStream.pipe(through).pipe(outStream);

// Alternatively, you can use pipeline to manipulate a stream in
// the chained method call style:

var through2 = _.pipeline(function (s) {
    return s.map(parseJSON).filter(isBlogpost); // etc.
});
Stream.sequence()

Reads values from a Stream of Streams or Arrays, emitting them on a single output Stream. This can be thought of as a flatten, just one level deep, often used for resolving asynchronous actions such as a HTTP request or reading a file.

var nums = _([
    _([1, 2, 3]),
    _([4, 5, 6])
]);

nums.sequence()  // => 1, 2, 3, 4, 5, 6

// using sequence to read from files in series
var readFile = _.wrapCallback(fs.readFile);
filenames.map(readFile).sequence()
Stream.series()

An alias for the sequence method.

var readFile = _.wrapCallback(fs.readFile);
filenames.map(readFile).series()
Stream.through(target)

Transforms a stream using an arbitrary target transform.

If target is a function, this transform passes the current Stream to it, returning the result.

If target is a Duplex Stream, this transform pipes the current Stream through it. It will always return a Highland Stream (instead of the piped to target directly as in pipe). Any errors emitted will be propagated as Highland errors.

TIP: Passing a function to through is a good way to implement complex reusable stream transforms. You can even construct the function dynamically based on certain inputs. See examples below.

Parameters
  • target - Function | Duplex Stream - the stream to pipe through or a function to call.
// This is a static complex transform.
function oddDoubler(s) {
    return s.filter(function (x) {
        return x % 2; // odd numbers only
    })
    .map(function (x) {
        return x * 2;
    });
}

// This is a dynamically-created complex transform.
function multiplyEvens(factor) {
    return function (s) {
        return s.filter(function (x) {
            return x % 2 === 0;
        })
        .map(function (x) {
            return x * factor;
        });
    };
}

_([1, 2, 3, 4]).through(oddDoubler); // => 2, 6

_([1, 2, 3, 4]).through(multiplyEvens(5)); // => 10, 20

// Can also be used with Node Through Streams
_(filenames).through(jsonParser).map(function (obj) {
    // ...
});

// All errors will be propagated as Highland errors
_(['zz{"a": 1}']).through(jsonParser).errors(function (err) {
  console.log(err); // => SyntaxError: Unexpected token z
});
Stream.zip(ys)

Takes two Streams and returns a Stream of corresponding pairs. The size of the resulting stream is the smaller of the two source streams.

Parameters
  • ys - Array | Stream - the other stream to combine values with
_(['a', 'b', 'c']).zip([1, 2, 3])  // => ['a', 1], ['b', 2], ['c', 3]

_(['a', 'b', 'c']).zip(_([1]))  // => ['a', 1]
Stream.zipAll(ys)

Takes a stream and a finite stream of N streams and returns a stream of the corresponding (N+1)-tuples.

Note: This transform will be renamed zipEach in the next major version release.

Parameters
  • ys - Array | Stream - the array of streams to combine values with
_([1,2,3]).zipAll([[4, 5, 6], [7, 8, 9], [10, 11, 12]])
// => [1, 4, 7, 10], [2, 5, 8, 11], [3, 6, 9, 12]

// shortest stream determines length of output stream
_([1, 2, 3, 4]).zipAll([[5, 6, 7, 8], [9, 10, 11, 12], [13, 14]])
// => [1, 5, 9, 13], [2, 6, 10, 14]
Stream.zipAll0()

Takes a finite stream of streams and returns a stream where the first element from each separate stream is combined into a single data event, followed by the second elements of each stream and so on until the shortest input stream is exhausted.

Note: This transform will be renamed zipAll in the next major version release.

_([
    _([1, 2, 3]),
    _([4, 5, 6]),
    _([7, 8, 9]),
    _([10, 11, 12])
]).zipAll0()
// => [1, 4, 7, 10], [2, 5, 8, 11], [3, 6, 9, 12]

// shortest stream determines length of output stream
_([
    _([1, 2, 3, 4]),
    _([5, 6, 7, 8]),
    _([9, 10, 11, 12]),
    _([13, 14])
]).zipAll0()
// => [1, 5, 9, 13], [2, 6, 10, 14]

Consumption

Stream.apply(f)

Applies all values from a Stream as arguments to a function. This method consumes the stream. f will always be called when the nil token is encountered, even when the stream is empty.

Parameters
  • f - Function - the function to apply arguments to
_([1, 2, 3]).apply(function (a, b, c) {
    // a === 1
    // b === 2
    // c === 3
});

_([1, 2, 3]).apply(function (a) {
    // arguments.length === 3
    // a === 1
});
Stream.done(f)

Calls a function once the Stream has ended. This method consumes the stream. If the Stream has already ended, the function is called immediately.

If an error from the Stream reaches this call, it will emit an error event (i.e., it will call emit('error') on the stream being consumed). This event will cause an error to be thrown if unhandled.

As a special case, it is possible to chain done after a call to each even though both methods consume the stream.

Parameters
  • f - Function - the callback
var total = 0;
_([1, 2, 3, 4]).each(function (x) {
    total += x;
}).done(function () {
    // total will be 10
});
Stream.each(f)

Iterates over every value from the Stream, calling the iterator function on each of them. This method consumes the Stream.

If an error from the Stream reaches this call, it will emit an error event (i.e., it will call emit('error') on the stream being consumed). This event will cause an error to be thrown if unhandled.

While each consumes the stream, it is possible to chain done (and only done) after it.

Parameters
  • f - Function - the iterator function
_([1, 2, 3, 4]).each(function (x) {
    // will be called 4 times with x being 1, 2, 3 and 4
});
Stream.pipe(dest, options)

Pipes a Highland Stream to a Node Writable Stream. This will pull all the data from the source Highland Stream and write it to the destination, automatically managing flow so that the destination is not overwhelmed by a fast source.

Users may optionally pass an object that may contain any of these fields:

  • end - Ends the destination when this stream ends. Default: true. This option has no effect if the destination is either process.stdout or process.stderr. Those two streams are never ended.

Like Readable#pipe, this function will throw errors if there is no error handler installed on the stream.

This function returns the destination so you can chain together pipe calls.

NOTE: While Highland streams created via _() and pipeline support being piped to, it is almost never appropriate to pipe from a Highland stream to another Highland stream. Those two cases are meant for use when piping from Node streams. You might be tempted to use pipe to construct reusable transforms. Do not do it. See through for a better way.

Parameters
  • dest - Writable Stream - the destination to write all data to
  • options - Object - (optional) pipe options.
var source = _(generator);
var dest = fs.createWriteStream('myfile.txt')
source.pipe(dest);

// chained call
source.pipe(through).pipe(dest);

// DO NOT do this! It will not work. The stream returned by oddDoubler does
// not support being piped to.
function oddDoubler() {
    return _()
        return x % 2; // odd numbers only
    })
    .map(function (x) {
        return x * 2;
    });
}

_([1, 2, 3, 4]).pipe(oddDoubler()) // => Garbage
Stream.pull(f)

Consumes a single item from the Stream. Unlike consume, this function will not provide a new stream for you to push values onto, and it will unsubscribe as soon as it has a single error, value or nil from the source.

You probably won't need to use this directly, but it is used internally by some functions in the Highland library.

Parameters
  • f - Function - the function to handle data
xs.pull(function (err, x) {
    // do something
});
Stream.toArray(f)

Collects all values from a Stream into an Array and calls a function with the result. This method consumes the stream.

If an error from the Stream reaches this call, it will emit an error event (i.e., it will call emit('error') on the stream being consumed). This event will cause an error to be thrown if unhandled.

Parameters
  • f - Function - the callback to provide the completed Array to
_([1, 2, 3, 4]).toArray(function (x) {
    // parameter x will be [1,2,3,4]
});
Stream.toCallback(cb)

Returns the result of a stream to a nodejs-style callback function.

If the stream contains a single value, it will call cb with the single item emitted by the stream (if present). If the stream is empty, cb will be called without any arguments. If an error is encountered in the stream, this function will stop consumption and call cb with the error. If the stream contains more than one item, it will stop consumption and call cb with an error.

Parameters
  • cb - Function - the callback to provide the error/result to
_([1, 2, 3, 4]).collect().toCallback(function (err, result) {
    // parameter result will be [1,2,3,4]
    // parameter err will be null
});
Stream.toNodeStream(options)

Converts the stream to a node Readable Stream for use in methods or pipes that depend on the native stream type.

The options parameter can be an object passed into the Readable constructor.

Parameters
_(fs.createReadStream('./abc')).toNodeStream()
_(fs.createReadStream('./abc')).toNodeStream({objectMode: false})
_([{a: 1}]).toNodeStream({objectMode: true})
Stream.toPromise(PromiseCtor)

Converts the result of a stream to Promise.

If the stream contains a single value, it will return with the single item emitted by the stream (if present). If the stream is empty, undefined will be returned. If an error is encountered in the stream, this function will stop consumption and call cb with the error. If the stream contains more than one item, it will stop consumption and reject with an error.

Parameters
  • PromiseCtor - Function - Promises/A+ compliant constructor
_([1, 2, 3, 4]).collect().toPromise(Promise).then(function (result) {
    // parameter result will be [1,2,3,4]
});

Utils

_.fromError(err)

Creates a stream that sends a single error then ends.

Parameters
  • error - the error to send
_.fromError(new Error('Single Error')).toCallback(function (err, result) {
    // err contains Error('Single Error') object
}
_.isNil(x)

Returns true if x is the end of stream marker.

Parameters
  • x - the object to test
_.isStream(x)

Returns true if x is a Highland Stream.

Parameters
  • x - the object to test
_.isStream('foo')  // => false
_.isStream(_([1,2,3]))  // => true
_.log(args..)

Logs values to the console, a simple wrapper around console.log that it suitable for passing to other functions by reference without having to call bind.

_.log('Hello, world!');

_([1, 2, 3, 4]).each(_.log);

The end of stream marker. This is sent along the data channel of a Stream to tell consumers that the Stream has ended. See the example map code for an example of detecting the end of a Stream.

Note: nil is setup as a global where possible. This makes it convenient to access, but more importantly lets Streams from different Highland instances work together and detect end-of-stream properly. This is mostly useful for NPM where you may have many different Highland versions installed.

var map = function (iter, source) {
    return source.consume(function (err, val, push, next) {
        if (err) {
            push(err);
            next();
        }
        else if (val === _.nil) {
            push(null, val);
        }
        else {
            push(null, iter(val));
            next();
        }
    });
};
_.of(x)

Creates a stream that sends a single value then ends.

Parameters
  • x - the value to send
_.of(1).toArray(_.log); // => [1]
_.streamifyAll(source)

Takes an object or a constructor function and returns that object or constructor with streamified versions of its function properties. Passed constructors will also have their prototype functions streamified. This is useful for wrapping many node style async functions at once, and for preserving those functions' context.

Parameters
  • source - Object | Function - the function or object with node-style function properties.
var fs = _.streamifyAll(require('fs'));

fs.readFileStream('example.txt').apply(function (data) {
    // data is now the contents of example.txt
});
_.wrapCallback(f)

Wraps a node-style async function which accepts a callback, transforming it to a function which accepts the same arguments minus the callback and returns a Highland Stream instead. The wrapped function keeps its context, so you can safely use it as a method without binding (see the second example below).

wrapCallback also accepts an optional mappingHint, which specifies how callback arguments are pushed to the stream. This can be used to handle non-standard callback protocols that pass back more than one value.

mappingHint can be a function, number, or array. See the documentation on EventEmitter Stream Objects for details on the mapping hint. If mappingHint is a function, it will be called with all but the first argument that is passed to the callback. The first is still assumed to be the error argument.

Parameters
  • f - Function - the node-style function to wrap
  • mappingHint - Array | Function | Number - (optional) how to pass the arguments to the callback
var fs = require('fs');

var readFile = _.wrapCallback(fs.readFile);

readFile('example.txt').apply(function (data) {
    // data is now the contents of example.txt
});

function Reader(file) {
    this.file = file;
}

Reader.prototype.read = function(cb) {
    fs.readFile(this.file, cb);
};

Reader.prototype.readStream = _.wrapCallback(Reader.prototype.read);

Objects

_.extend(a, b)

Extends one object with the properties of another. Note: The arguments are in the reverse order of other libraries such as underscore. This is so it follows the convention of other functions in this library and so you can more meaningfully partially apply it.

Parameters
  • a - Object - the properties to extend b with
  • b - Object - the original object to extend
_.extend({name: 'bar'}, {name: 'foo', price: 20})
// => {name: 'bar', price: 20}

// example of partial application
var publish = _.extend({published: true});

publish({title: 'test post'})
// => {title: 'test post', published: true}
_.get(prop, obj)

Returns a property from an object.

Parameters
  • prop - String - the property to return
  • obj - Object - the object to read properties from
var obj = {foo: 'bar', baz: 123};
_.get('foo', obj) // => 'bar'

// making use of partial application
var posts = [
  {title: 'one'},
  {title: 'two'},
  {title: 'three'}
];

_(posts).map(_.get('title'))  // => 'one', 'two', 'three'
_.keys(obj)

Returns keys from an Object as a Stream.

Parameters
  • obj - Object - the object to return keys from
_.keys({foo: 1, bar: 2, baz: 3})  // => 'foo', 'bar', 'baz'
_.pairs(obj)

Returns key/value pairs for an Object as a Stream. Reads properties lazily, so if you don't read from all keys on an object, not all properties will be read from (may have an effect where getters are used).

Parameters
  • obj - Object - the object to return key/value pairs from
_.pairs({foo: 1, bar: 2})  // => ['foo', 1], ['bar', 2]
_.set(prop, value, obj)

Updates a property on an object, returning the updated object.

Parameters
  • prop - String - the property to return
  • value - the value to set the property to
  • obj - Object - the object to set properties on
var obj = {foo: 'bar', baz: 123};
_.set('foo', 'wheeee', obj) // => {foo: 'wheeee', baz: 123}

// making use of partial application
var publish = _.set('published', true);

publish({title: 'example'})  // => {title: 'example', published: true}
_.values(obj)

Returns values from an Object as a Stream. Reads properties lazily, so if you don't read from all keys on an object, not all properties will be read from (may have an effect where getters are used).

Parameters
  • obj - Object - the object to return values from
_.values({foo: 1, bar: 2, baz: 3})  // => 1, 2, 3

Functions

_.compose(fn1, fn2, ...)

Creates a composite function, which is the application of function1 to the results of function2. You can pass an arbitrary number of arguments and have them composed. This means you can't partially apply the compose function itself.

var add1 = add(1);
var mul3 = mul(3);

var add1mul3 = compose(mul3, add1);
add1mul3(2) == 9
_.curry(fn, [*arguments])

Transforms a function with specific arity (all arguments must be defined) in a way that it can be called as a chain of functions until the arguments list is saturated.

This function is not itself curryable.

Parameters
  • fn - Function - the function to curry
  • args.. - any number of arguments to pre-apply to the function
fn = curry(function (a, b, c) {
    return a + b + c;
});

fn(1)(2)(3) == fn(1, 2, 3)
fn(1, 2)(3) == fn(1, 2, 3)
fn(1)(2, 3) == fn(1, 2, 3)
_.flip(fn, [x, y])

Evaluates the function fn with the argument positions swapped. Only works with functions that accept two arguments.

Parameters
  • fn - Function - function to flip argument application for
  • x - parameter to apply to the right hand side of f
  • y - parameter to apply to the left hand side of f
div(2, 4) == 0.5
flip(div, 2, 4) == 2
flip(div)(2, 4) == 2
_.ncurry(n, fn, [args...])

Same as curry but with a specific number of arguments. This can be useful when functions do not explicitly define all its parameters.

This function is not itself curryable.

Parameters
  • n - Number - the number of arguments to wait for before apply fn
  • fn - Function - the function to curry
  • args... - any number of arguments to pre-apply to the function
fn = ncurry(3, function () {
    return Array.prototype.join.call(arguments, '.');
});

fn(1, 2, 3) == '1.2.3';
fn(1, 2)(3) == '1.2.3';
fn(1)(2)(3) == '1.2.3';
_.partial(fn, args...)

Partially applies the function (regardless of whether it has had curry called on it). This will always postpone execution until at least the next call of the partially applied function.

Parameters
  • fn - Function - function to partial apply
  • args... - the arguments to apply to the function
var addAll = function () {
    var args = Array.prototype.slice.call(arguments);
    return foldl1(add, args);
};
var f = partial(addAll, 1, 2);
f(3, 4) == 10
_.seq(fn1, fn2, ...)

The reversed version of compose. Where arguments are in the order of application.

var add1 = add(1);
var mul3 = mul(3);

var add1mul3 = seq(add1, mul3);
add1mul3(2) == 9

Operators

_.add(a, b)

Add two values. Can be partially applied.

_.add(1, 2) === 3
_.add(1)(5) === 6
_.not(x)

Perform logical negation on a value. If x is truthy then returns false, otherwise returns true.

Parameters
  • x - the value to negate
_.not(true)   // => false
_.not(false)  // => true