Creating Custom RxJS Operators
It provides APIs for the creation of applications and libraries using asynchronous streams of data and reactive methods. It’s one of the foundation libraries of Angular .
Included in it are over 100 operators - functions that take an Observable stream of data and return values for use in chains of operators.
Many of the operators are low level, and combining them through the
pipe method they create a powerful way to work
Creating custom operators for a domain
The good news is it’s also very easy to create new higher-level operators for our domain code - these can be used where you find duplicate or complicated operations.
Creating operators we can also ensure well-tested code using marble testing and they can be shared among your team to make your code more readable and stable.
There are two types of operators that can be created - a
OperatorFunction and all
operators must do two things:
- Return a function which accepts as its parameter a source from the previous Observable value in the stream
- Return a value of the same type for
MonoTypeOperatorFunctionor different type for an
OperatorFunctionby using the source value with
Below we’ll have an example of each, but first, to support creating the operators we need some code to simplify:
Creating MonoTypeOperatorFunction for single types
As the name suggests a
MonoTypeOperatorFunction is a function that works with a single type of data - the input and
output value must be of the same type.
Looking at our code we can identify two multiplication operations in our code that are the same. To turn this into an operator the function will look like this:
Here, we are returning an arrow function that takes the previous source - which must be an
source is piped to map
which allows the source value to be converted to a new
value, in our case we multiply by the
TypeScript understands that the output must also be a number - and if you try to return another value type it will throw a compile error.
Writing a marble test
Marble testing is a way to write tests for RxJS operators that deal with data over time - data is not static due to its asynchronous nature and cannot always be guaranteed in a specific order. Luckily the test for this operator is simple.
My personal preference has been to write these test in Jest using rxjs-marbles and jest-marbles , but there are other libraries to write these test available.
Using marbles, we can set up a mock source that will emit 5 numbers at the specified frames.
The test result contains two things:
- A subscriptions string which is used to check that the operator handle subscription ending properly
- An output Observable that will contain the results of the operator and compared against the expectations
In this test, we’ll pass a source of numbers and multiply by
Now the operator is created it can be used in the existing code from above - ideally the operator should be part of a shared library of code:
Already much more readable! Our code explains our intent, but we haven’t really reduced the duplication of our sources.
Changing the API with OperatorFunction
In our domain, we know we always want more than one value from a source and using the
OperatorFunction we can use that
to reduce our duplicate code even more.
This would introduce an API change, but with proper tests, we should be able to migrate our code easily.
For our source value, it is still a single number value, but in the API we’ve changed:
- The input
factorcan be a single value or an array of values
- The return value is now an array of values, regardless of the input.
Instead of forcing the users to check the type of response, this single API can be well documented and expected when we use it in our code:
Updating the tests
First, we need to update the existing test - here we only have to change the values in our
expected Observable - we
now expect an array of numbers regardless of the input - but with a single value our array length will be
To ensure full coverage, we should also test for the case where were have an array input for the multiplication factor:
We can now update the code further - here we can now remove the two additional cold Observables and create a single one
using our new
multiply operator, passing it an array containing out factors:
Now we can subscribe to the
multiplyValues$ source and get both our new result which contains the multiplication of
You can see a working version of this operator on StackBlitz by opening the console to see the result.
This operator is just a taste of what’s possible with RxJS - diving into the API you’ll find many more operators to help work with data in other synchronous and asynchronous operations.
A collection of pre-built operators for your projects
Now for a shameless plug - my own library - RxJS Ninja - is a collection of over 130 operators for working with various types of data (such as arrays or numbers ) and streams allowing for modifying, filtering and querying the data.
Still in active development, you might find useful operators that provide clearer intent for your RxJS code.
You can check out the source code on GitHub . There you can also find a starter project for creating your own TypeScript libraries like this.