Creating Custom RxJS Operators

 Published: Jan 25, 2021  (last updated: Jan 25, 2021)~ 1500 words~ 7 minutes reading time

RxJS is a popular library available for TypeScript and JavaScript.

It provides APIs for the creation of applications and libraries using asynchronous streams of data and reactive methods. It’s one of the foundation libraries of Angular.

Included in it are over 100 operators - functions that take an Observable stream of data and return values for use in chains of operators.

Many of the operators are low level, and combining them through the pipe method they create a powerful way to work with data.

Creating custom operators for a domain

The good news is it’s also very easy to create new higher-level operators for our domain code - these can be used where you find duplicate or complicated operations.

Creating operators we can also ensure well-tested code using marble testing and they can be shared among your team to make your code more readable and stable.

There are two types of operators that can be created - a MonoTypeOperatorFunction and OperatorFunction and all operators must do two things:

  • Return a function which accepts as its parameter a source from the previous Observable value in the stream
  • Return a value of the same type for MonoTypeOperatorFunction or different type for an OperatorFunction by using the source value with pipe

Below we’ll have an example of each, but first, to support creating the operators we need some code to simplify:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { from } from 'rxjs';
import { map, tap } from 'rxjs/operators';

// Create a cold source that will emit each number
const source$ = from([1, 2, 3, 4, 5]);

// Create a cold source that multiplies each number by `5`
const multiplyByFive$ = source$.pipe(map(value => value * 5));
// Create a cold source that multiplies each number by `10`
const multiplyByTen$ = source$.pipe(map(value => value * 10));

// Subscribe to the sources and console.log the output
multiplyByFive$.pipe(tap(console.log)).subscribe();
// Output: `5, 10, 15, 20, 25`

multiplyByTen$.pipe(tap(console.log)).subscribe();
// Output: `10, 20, 30, 40, 50`

Creating MonoTypeOperatorFunction for single types

As the name suggests a MonoTypeOperatorFunction is a function that works with a single type of data - the input and output value must be of the same type.

Looking at our code we can identify two multiplication operations in our code that are the same. To turn this into an operator the function will look like this:

1
2
3
4
5
6
import { MonoTypeOperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

export function multiply(factor: number): MonoTypeOperatorFunction<number> {
  return (source) => source.pipe(map(value => value * factor))
}

Here, we are returning an arrow function that takes the previous source - which must be an Observable<number>. The source is piped to map which allows the source value to be converted to a new value, in our case we multiply by the factor

TypeScript understands that the output must also be a number - and if you try to return another value type it will throw a compile error.

Writing a marble test

Marble testing is a way to write tests for RxJS operators that deal with data over time - data is not static due to its asynchronous nature and cannot always be guaranteed in a specific order. Luckily the test for this operator is simple.

My personal preference has been to write these test in Jest using rxjs-marbles and jest-marbles, but there are other libraries to write these test available.

Using marbles, we can set up a mock source that will emit 5 numbers at the specified frames.

The test result contains two things:

  • A subscriptions string which is used to check that the operator handle subscription ending properly using toHaveSubscriptions
  • An output Observable that will contain the results of the operator and compared against the expectations using toBeObservable

In this test, we’ll pass a source of numbers and multiply by 10

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import { marbles } from "rxjs-marbles/jest";
import { map } from "rxjs/operators";
import { multiply } from './multiply'

describe("multiply", () => {
  it("should multiply by 10", marbles(m => {
    const input = m.hot('-a-b-c-d-e-|', {a: 2, b: 3, c: 4, d: 5, e: 6});
    const subs = '^----------!';
    const expected = m.cold('-a-b-c-d-e-|', {a: 20, b: 30, c: 40, d: 50, e: 60});
    m.expect(input.pipe(mul(10))).toBeObservable(expected);
    m.expect(input).toHaveSubscriptions(subs);
  }));
});

Update Code

Now the operator is created it can be used in the existing code from above - ideally the operator should be part of a shared library of code:

1
2
3
4
5
6
7
import { from } from 'rxjs';
import { multiply } from '@myorg/rxjs-library'

const source$ = from([1, 2, 3, 4, 5]);

const multiplyByFive$ = source$.pipe(multiply(5));
const multiplyByTen$ = source$.pipe(multiply(10));

Already much more readable! Our code explains our intent, but we haven’t really reduced the duplication of our sources.

Changing the API with OperatorFunction

In our domain, we know we always want more than one value from a source and using the OperatorFunction we can use that to reduce our duplicate code even more.

This would introduce an API change, but with proper tests, we should be able to migrate our code easily.

For our source value, it is still a single number value, but in the API we’ve changed:

  • The input factor can be a single value or an array of values
  • The return value is now an array of values, regardless of the input.

Instead of forcing the users to check the type of response, this single API can be well documented and expected when we use it in our code:

1
2
3
4
5
6
import { OperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

export function multiply(factor: number | number[]): OperatorFunction<number, number[]> {
  return source => source.pipe(map(value => (Array.isArray(factor) ? factor : [factor]).map(f => value * f)))
}

Updating the tests

First, we need to update the existing test - here we only have to change the values in our expected Observable - we now expect an array of numbers regardless of the input - but with a single value our array length will be 1

1
2
3
4
5
6
7
it("should multiply by 10", marbles(m => {
  const input = m.hot('-a-b-c-d-e-|', {a: 2, b: 3, c: 4, d: 5, e: 6});
  const subs = '^----------!';
  const expected = m.cold('-a-b-c-d-e-|', {a: [20], b: [30], c: [40], d: [50], e: [60]});
  m.expect(input.pipe(mul(10))).toBeObservable(expected);
  m.expect(input).toHaveSubscriptions(subs);
}));

To ensure full coverage, we should also test for the case where were have an array input for the multiplication factor:

1
2
3
4
5
6
7
it("should multiply by by 5 and 10", marbles(m => {
  const input = m.hot('-a-b-c-d-e-|', {a: 2, b: 3, c: 4, d: 5, e: 6});
  const subs = '^----------!';
  const expected = m.cold('-a-b-c-d-e-|', {a: [10, 20], b: [15, 30], c: [20, 40], d: [25, 50], e: [30, 60]});
  m.expect(input.pipe(mul([5, 10]))).toBeObservable(expected);
  m.expect(input).toHaveSubscriptions(subs);
}));

Update Code

We can now update the code further - here we can now remove the two additional cold Observables and create a single one using our new multiply operator, passing it an array containing out factors:

1
2
3
4
5
6
import { from } from 'rxjs';
import { multiply } from '@myorg/rxjs-library'

const source$ = from([1, 2, 3, 4, 5]);

const multiplyValues$ = source$.pipe(multiply([5, 10]));

Now we can subscribe to the multiplyValues$ source and get both our new result which contains the multiplication of both numbers

1
2
multiplyValues$.pipe(tap(console.log)).subscribe();
// Output: `[5, 10], [10, 20], [15, 30], [20, 40], [25, 50]`

Next Steps

You can see a working version of this operator on StackBlitz by opening the console to see the result.

This operator is just a taste of what’s possible with RxJS - diving into the API you’ll find many more operators to help work with data in other synchronous and asynchronous operations.

A collection of pre-built operators for your projects

The RxJS Logo, a Ninja jumping over a moon

Now for a shameless plug - my own library - RxJS Ninja - is a collection of over 130 operators for working with various types of data (such as arrays or numbers) and streams allowing for modifying, filtering and querying the data.

Still in active development, you might find useful operators that provide clearer intent for your RxJS code.

You can check out the source code on GitHub. There you can also find a starter project for creating your own TypeScript libraries like this.

RxJS Ninja Updates - New operators for math and working with streams

 Published: Jan 14, 2021  (last updated: Jan 14, 2021)~ 300 words~ 2 minutes reading time

Since the last update on RxJS Ninja there have been a few new operators added, below are some details and links to StackBlitz demos showing them in action.

Numbers and Math operators

In @rxjs-ninja/rxjs-number the missing toFixed operator has been added, alongside a new custom toHex operator and the corresponding parseHex one allowing hex numbers to be worked with ( such as converting colours)

There are also new operators for some basic math - add , sub, div, mul, mod and pow all allowing you to modify source numbers, all accept a number, or an Observable number source.

Working with Browser Streams

Some new operators have been added to @rxjs-ninja/rxjs-utility that allow interoperability between RxJS and the StreamsAPI. These APIs are not in all browsers but there is an available polyfill.

fromReadableStream

This operator accepts a ReadableStream and provides the emitted values as an Observable, allowing you to use RxJS operators to work with the data.

  • Demo - Using fetch body with fromReadableStream to show partial images
  • Demo - A basic infinite number stream that ticks per second

toWritableStream

This operator accepts a WritableStream. The operator emits the source value and also writes to the stream. The operator takes care of closing the writer when the Observable subscription is closed.

  • Demo - Streams an Observable interval value into a WritableStream with optional ability to stop the writer without ending the Observable subscription.

fromFetchWithProgress

This operator uses fetch to do a HTTP request, instead of the body response it emits either a number which is the current progress, or a Uint8Array containing the final response from the body.

  • Demo Fetches an image and shows a progress bar with the current percentage and once complete shows the image.

RxJS Primitives is now RxJS Ninja

 Published: Nov 23, 2020  (last updated: Nov 23, 2020)~ 100 words~ 1 minutes reading time

RxJS Ninja Logo is a Ninja jumping over a crescent moon

Today I’ve re-released RxJS Primitives as RxJS Ninja. The new libraries have been published as the same last versions under their old name, so it’s easy to migrate to the new version - all now published under @rxjs-ninja instead of @tinynodes (a deprecation notice has also been left the old packages).

New RxJS Primitives release, new operators + Typescript 4

 Published: Nov 18, 2020  (last updated: Nov 18, 2020)~ 900 words~ 5 minutes reading time

This week I released new versions of my RxJS libraries in rxjs-primitives. Since it’s released I’ve added a few new utility operators. Some of these have been out for a while since I originally wrote about the release, so I’ve highlighted them here as they may be useful to some developers.

You can check out the full docs here.

Typescript 4

Upgrading to Typescript 4 has allowed the removal of polymorphic functions in place of Vardic Tuple Types and is why there is a major bump on all packages.

This can be seen in the old and new concat operator in the rxjs-string package.

Most of the tests have also been converted to rxjs-marbles allowing for more robust Observable testing (if you are working with RxJS I highly recommend checking it out, it integrates well with runners like Jest).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
it(
    'should return string value of string ending with passed character',
    marbles((m) => {
      const input = m.hot('-a-b-c-|', { a: 'test', b: 'testing', c: 'gone' });
      const subs = '^------!';
      const expected = m.cold('---y---|', { y: 'testing' });
      m.expect(input.pipe(filterEndsWith('g'))).toBeObservable(expected);
      m.expect(input).toHaveSubscriptions(subs);
    }),
  );

rxjs-array

npm install @tinynodes/rxjs-array

In the array module there are some operators to use with finding the difference or intersection between a source and a passed array, for example:

1
2
3
4
5
6
7
of(['a', 'b', 'd'])
 .pipe(difference(['a', 'c']))
 .subscribe(console.log) // ['b', 'd']

of(['a', 'b', 'd'])
 .pipe(intersects(['a', 'c']))
 .subscribe(console.log) // ['a']

These methods accept an array, or an Observable<Array> of items to compare against.

The module also included a binarySearch operator which returns a custom BinarySearchResult tuple.

rxjs-boolean

npm install @tinynodes/rxjs-boolean

A new Luhn algorithm operator luhnCheck is provided that does validation on numbers such as credit cards, ID cards and other value schemes that use the check.

1
2
3
fromString('4485275742308327')
    .pipe(luhnCheck())
    .subscribe(console.log) // true, this is a valid credit card

rxjs-number

npm install @tinynodes/rxjs-number

inRange / outOfRange and filterInRange / filterOutOfRange both all two numbers, the filter methods return the value from the source observable within the range of those values, while the other methods return a boolean value if in range. An optional third value will include/exclude the range value based on the method

1
2
3
4
5
6
7
8
fromNumber([-1, 0, 1, 2, 10, 11])
 .pipe(filterInRange(0, 10))
 .subscribe(console.log) // [0, 1, 2, 10]

// Passing true as the third parameter, the range numbers will also be excluded
fromNumber([-1, 0, 1, 2, 10, 11])
 .pipe(filterInRange(0, 10, true))
 .subscribe(console.log) // [1, 2]

rxjs-string

npm install @tinynodes/rxjs-string

New operators such as titleize, repeat and match add new utility features for strings. Where they can they also support localisation:

1
2
3
4
5
6
7
fromString('Mary had a little lamb')
 .pipe(titleize())
 .subscribe(console.log) // 'Mary Had A Little Lamb'

fromString('Mary had ä little lamb')
 .pipe(titleize('de-DE'))
 .subscribe(console.log) // 'Mary Had Ä Little Lamb'

rxjs-utility

npm install @tinynodes/rxjs-utility

The utility module contains some specialised tap operators such as tapIf, startWithTap and tapOnSubscribe. These provide a way to do side effects. With startWithTap it can be used with Angular to do a form touch, also tapOnSubscribe will fire when there is a subscription to the Observable:

1
2
3
4
5
6
7
8
9
// Only touch on first value change
form.valueChange.pipe(
 startWithTap(() => this.onTouch())
).subscribe()

// Fire when a component subscribes to the service bus
this.serviceBus.pipe(
  tapOnSubscribe((name: string) => console.log(`New Subscription to ${name}`))
).subscribe()

The tapIf will only fire if a passed method result is truthy:

1
2
3
fromNumber([1, 2, 3, 4, 5, 6]).pipe(
  tapIf((val) => val % 2 === 0), (val) => console.log(val)
).subscribe() // 2, 4, 6

The last operator is mapIfSource which might be a bit of a weird one but I hope might become useful.

The operator takes the value from the source and passes to a predicate method, and depending on the result will map the result of a passed method. A simple example would be:

1
2
3
4
5
6
7
fromNumber([1, 2, 3, 4, 5, 6]).pipe(
  mapIfSource(
    (value) => val % 2 === 0,
    (value) => val * 10,
    (value) => val * 20
  )
).subscribe() // 20, 20, 60 40, 100, 60

Here, if the result of the predicate is true multiply by 10, otherwise by 20. The method is typed to allow different return values based on the result (so you will have to handle the type later). For example we could even turn it into a FizzBuzz operator:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
export function fizzbuzz(): OperatorFunction<number, string | number> {
  return (source: Observable<number>) =>
    source.pipe(
      mapIfSource<number, string, number>(
        (value) => value % 15 == 0 || value % 3 == 0 || value % 5 == 0,
        (value) => (value % 15 == 0 ? `FizzBuzz` : value % 3 === 0 ? 'Fizz' : 'Buzz'),
        (value) => value
     )
  );
}

// And now we use it in our code
fromNumber([1, 3, 5, 15, 16]).pipe(
  fizzbuzz(),
).subscribe() // 1, 'Fizz', 'Buzz', 'FizzBuzz', 16

Hopefully you’ll find these operators useful and feel free to leave feedback and suggestions.

Polymorphic TypeScript - Function overloading with rest parameters

 Published: May 12, 2020  (last updated: May 12, 2020)~ 600 words~ 3 minutes reading time

Recently in RxJS Primitives I encountered a situation where one of the methods - concat was initially designed to take an argument list of strings and in the method used rest (...args) parameters, mimicking the signature and passing them to the String.prototype.concat

I’ve created a StackBlitz Project with the code for each step that can be followed along.

I wanted to refactor it to support an array of strings, but found that in the current implementation this is not possible and throws a TypeScript error:

1
2
3
4
5
export function concat(...args: string[]): MonoTypeOperatorFunction<string> {
  return (source: Observable<string>) => source.pipe(map(value => value.concat(...args)))
}

fromString('Testing').pipe(concat([' one', ' two'])).subscribe(console.log)
1
Argument of type 'string[]' is not assignable to parameter of type 'string'.

Due to how TypeScript treats rest parameters, it expects a list of parameters as a single string which is turned into an array-like arguments.

To get around this, we can use TypeScript function overloading.

How to overload functions with rest parameters

My first attempt at writing this method lead to this code which attempts to keep the type information in all implementations:

1
2
3
4
5
6
7
function concat(...args: string[]): MonoTypeOperatorFunction<string>;
function concat(args: string[]): MonoTypeOperatorFunction<string>;
function concat(...args: string[]): MonoTypeOperatorFunction<string> {
  return (source: Observable<string>) => source.pipe(map(value => value.concat(...args)))
}

export { concat }

This resulted in an error on the second implementation:

1
2
function concat(args: string[]): MonoTypeOperatorFunction<string> (+1 overload)
This overload signature is not compatible with its implementation signature.

It appears that TypeScript cannot convert an Array rest type to an arguments Array-like value, to get around this we need to use the any value in the last implementation:

1
2
3
4
5
6
7
function concat(...args: string[]): MonoTypeOperatorFunction<string>;
function concat(args: string[]): MonoTypeOperatorFunction<string>;
function concat(...args: any): MonoTypeOperatorFunction<string> {
  return (source: Observable<string>) => source.pipe(map(value => value.concat(...args)))
}

export { concat }

So now the TypeScript compiler stops complaining, and we can test it using both supported argument types:

1
2
fromString('Testing').pipe(concat(' one', ' two')).subscribe(console.log) // Testing one two
fromString('Testing').pipe(concat([' one', ' two'])).subscribe(console.log) // Testing one, two

However, if you look at the output of the result you’ll notice a bug in our Array implementation where a comma appears in the text - the issue is that now the implementation treats the array as the first argument in a list of arguments - and changing the last method to be args:any would remove our rest parameter destructing.

To solve this, we need to use the any type again to destructure our arguments and check for the first one being an array, if it is then we use this as our destructured arguments into the String.prototype.concat method, but if it’s a string then we pass all the arguments using destructuring:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
function concat(...args: string[]): MonoTypeOperatorFunction<string>;
function concat(args: string[]): MonoTypeOperatorFunction<string>;
function concat(...args: any): MonoTypeOperatorFunction<string> {
  const inArgs: any[] = [...args];
  if (inArgs[0] instanceof Array) {
    return (source: Observable<string>) => source.pipe(map(value => value.concat(...inArgs[0])))
  }
  return (source: Observable<string>) => source.pipe(map(value => value.concat(...inArgs)))
}

export { concat }

Now the implementation works as intended - our method can accept one or more string arguments, or an array of strings as the first argument - and it’s not possible to mix the two up - if you try add a second array to the arguments, the TypeScript compiler will look for single string arguments.