> For a complete index of all SignalWire documentation pages, fetch https://signalwire.com/docs/llms.txt

# RxJS Primer

Most state in the Browser SDK — devices, participants, capabilities, call status — is exposed as **RxJS observables**. This page covers the subset of RxJS used by the SDK. For everything else, see the [RxJS docs](https://rxjs.dev/).

## Observables and subscriptions

An observable is a stream of values over time. It does nothing until `.subscribe()` is called; `.subscribe()` returns a subscription, and `.unsubscribe()` ends it. Subscriptions left open are the main source of memory leaks.

SDK observables behave like `BehaviorSubject`s: subscribing emits the current value synchronously, then every subsequent change.

Transform a stream with `.pipe()` and operators (`filter`, `map`, `take`, `combineLatest`, `switchMap`, `debounceTime`, etc.).

## The `$` suffix

A property ending in `$` is the observable. The same name without `$` is the current snapshot.

```js
const status = call.status;        // current value
call.status$.subscribe(handler);   // current value + every change
```

**Snapshots are not always populated.** For state the SDK already holds in memory (`call.status`, `call.participants`, device lists), the snapshot is the current value and reading it is fine. For lazily-loaded collections — `directory.addresses`, `address.textMessages`, `address.history` — the snapshot starts empty until something subscribes to the corresponding `$` observable (and, for paginated collections, until `loadMore()` is called). Reading the snapshot synchronously right after connecting will give you `[]`. Subscribe to the `$` form, then call `loadMore()` to trigger the first page.

## Subscribing

```js
const sub = client.audioInputDevices$.subscribe((devices) => {
  console.log("mics:", devices);
});

sub.unsubscribe();
```

The first emission fires synchronously with the current device list; subsequent emissions fire on device changes.

## Common patterns

Wait for a value, then proceed once. `take(1)` ends the subscription after the first match.

```js
import { filter, take } from "rxjs";

client.ready$
  .pipe(filter(Boolean), take(1))
  .subscribe(async () => {
    const call = await client.dial(destination);
  });
```

React on every match.

```js
import { filter } from "rxjs";

call.status$.pipe(filter((s) => s === "connected")).subscribe(showCallControls);
```

Combine the latest of multiple streams.

```js
import { combineLatest } from "rxjs";

combineLatest([client.audioInputDevices$, client.selectedAudioInputDevice$])
  .subscribe(([devices, selected]) => {
    const activeMic = devices.find((d) => d.deviceId === selected?.deviceId);
  });
```

Skip the initial value when only changes matter.

```js
import { skip } from "rxjs";

call.status$.pipe(skip(1)).subscribe(showStatusNotification);
```

Throttle high-frequency streams.

```js
import { debounceTime } from "rxjs";

call.localAudioLevel$.pipe(debounceTime(100)).subscribe(updateVolumeIndicator);
```

## Cleanup

Two patterns:

```js
class CallManager {
  subs = [];
  start(call) {
    this.subs.push(
      call.status$.subscribe(this.onStatus),
      call.participants$.subscribe(this.onParticipants),
    );
  }
  stop() {
    this.subs.forEach((s) => s.unsubscribe());
    this.subs = [];
  }
}
```

```js
import { Subject, takeUntil } from "rxjs";

const destroy$ = new Subject();

call.status$.pipe(takeUntil(destroy$)).subscribe(updateStatus);
call.participants$.pipe(takeUntil(destroy$)).subscribe(updateParticipants);

destroy$.next();
destroy$.complete();
```

`takeUntil` scales better with many subscriptions; an array is fine for a few.

## References

* [RxJS docs](https://rxjs.dev/)
* [Observables](https://rxjs.dev/guide/observable)
* [Operators](https://rxjs.dev/guide/operators)
* [Subjects](https://rxjs.dev/guide/subject) (including `BehaviorSubject`)