Source

Top Overview Interfaces Classes Utils Author

Overview

espo: extensions for stateful programming in JavaScript. Source

Library for stateful and reactive programming: queues, observables, ownership and lifetimes, procedural reactivity. Lightweight alternative to RxJS for GUI programs.

See sibling library fpx for functional programming utils.

Install with npm (version 0.1.2):

npm i --save --save-exact espo

All examples imply an import:

const {someFunction} = require('espo')

On this page, all Espo words are exported into global scope. You can run examples in the browser console.


Interfaces

Espo’s “interfaces” are abstract definitions and runtime boolean tests. Espo utilities and classes check their inputs with these interfaces rather than with instanceof.

isDeinitable(value)

interface isDeinitable {
  deinit(): void
}

Interface for objects that have a lifetime and must be deinitialised before you can leave them to the GC. .deinit() should make the object inert, releasing any resources it owns, tearing down any subscriptions, etc.

.deinit() must be idempotent and reentrant: redundant calls to .deinit(), even when accidentally overlapping with an ongoing .deinit() call, should have no adverse effects.

isDeinitable(null)            // false
isDeinitable(new Que())       // true
isDeinitable({deinit () {}})  // true

See complementary functions deinit and deinitDiff.


isOwner(value)

interface isOwner extends isDeinitable {
  unwrap(): any
}

Interface for objects that wrap a value, automatically managing its lifetime. Deiniting an owner should also deinit the inner value. See ownership in Rust.

.unwrap() should remove the inner value from the owner without deiniting it, and return it to the caller. See move in Rust.

See Agent and agent.unwrap() for practical examples.

See complementary function unwrap.

isOwner(new Atom())   // false
isOwner(new Agent())  // true

isRef(value)

interface isRef {
  deref(): any
}

Interface for objects that wrap a value, such as Atom or any other observable ref. .deref() should return the underlying value. Note: an object may point to itself, returning this from .deref().

isRef(new Atom())      // true
new Atom(100).deref()  // 100

isObservable(value)

interface isObservable extends isDeinitable {
  subscribe(subscriber: ƒ(...any)): isSubscription
  unsubscribe(subscription: isSubscription): void
}

Interface for objects that let you subscribe to notifications, such as MessageQue, Atom or Computation. See isSubscription below.


isObservableRef(value)

interface isObservableRef extends isRef, isDeinitable {
  subscribe(subscriber: ƒ(observable)): isSubscription
  unsubscribe(subscription: isSubscription): void
}

Signifies that you can subscribe to be notified whenever the value wrapped by the object changes, and call .deref() to get the new value.

Example: Atom.


isAtom(value)

interface isAtom extends isObservableRef {
  swap(ƒ(...any), ...any): void
  reset(any): void
}

Interface for observable references with FP-style state transitions, in the style of clojure.core/atom.

See Atom.


isAgent(value)

interface isAgent extends isAtom, isOwner {}

Interface for observable references with FP-style state transitions that automatically manage the lifetimes of owned resources.

See Agent.


isSubscription(value)

interface isSubscription extends isDeinitable {
  trigger(...any): void
}

Interface for subscription objects returned by observable.subscribe(). The .trigger() method is called by the observable that created the subscription. Calling .deinit() should stop the subscription immediately, even if the observable has a pending notification.


Classes

Espo provides several utility classes. Some of them are intended for direct use, some should be subclassed.

Que(deque)

implements isDeinitable

Synchronous, unbounded, FIFO queue. Takes a deque function that will process the values pushed into the queue in a strictly linear order. The calls to deque never overlap.

Resilient to exceptions. If deque throws an exception when more values are pending, the other values will still be processed, and the exception will be delayed until the end.

function deque (value) {
  if (value === 'first') {
    que.push('second')
    que.push('third')
  }
  console.info(value)
}

const que = new Que(deque)

que.push('first')

// prints:
// 'first'
// 'second'
// 'third'

que.push(value)

Adds value to the end of the queue. It will be processed by deque after all other values that are already in the queue. If the que is not dammed, this automatically triggers .flush().

const que = new Que(
  function deque (value) {
    if (value === 'first') {
      que.push('second')
      que.push('third')
    }
    console.info(value)
  }
)

que.push('first')

// prints:
// 'first'
// 'second'
// 'third'

que.dam()

Pauses an idle que. A dammed que accumulates values added by .push(), but doesn’t flush automatically. This allows you to delay processing, batching multiple values. Call .flush() to unpause and resume processing.

Has no effect if the que is already flushing at the time of the call.

const que = new Que(
  function deque (value) {console.info(value)}
)

que.dam()

// nothing happens yet
que.push('first')
que.push('second')

// prints 'first' and 'second'
que.flush()

que.flush()

Unpauses and resumes processing. You only need to call it after .dam().

que.isEmpty()

Self-explanatory.

que.isDammed()

Self-explanatory.

que.deinit()

Empties the pending value buffer. The que remains usable afterwards.


TaskQue()

extends Que

Special case of Que: a synchronous, unbounded, FIFO task queue. You push functions into it, and they execute in a strictly linear order.

const taskQue = new TaskQue()

function first () {
  console.info('first started')
  taskQue.push(second)
  console.info('first ended')
}

function second () {
  console.info('second')
}

taskQue.push(first)

// prints:
// 'first started'
// 'first ended'
// 'second'

taskQue.push(task, ...args)

Adds task to the end of the queue. It will be called with args as arguments and taskQue as this after executing all other tasks that are already in the queue. If the que is not dammed, this automatically triggers .flush().

Returns a function that removes the task from the que when called.

const taskQue = new TaskQue()

// optional, for batching
taskQue.dam()

const abort = taskQue.push(function report () {console.info('reporting')})

// if you wish to abort
// abort()

// if dammed
taskQue.flush()

MessageQue()

extends Que

implements isObservable

An “event que” / “message bus” / “many-to-many channel” / “event emitter”. Call .subscribe() to add subscribers, then .push() to broadcast messages. Can be dammed and flushed just like a normal Que.

Resilient to exceptions. Subscribers don’t interfere with each other. If a subscriber throws an exception, others will still be notified, and the exception will be delayed until the end of the broadcast. Similarly, exceptions in one broadcast don’t interfere with the other pending broadcasts.

const mq = new MessageQue()

const sub = mq.subscribe((...args) => {
  console.info(args)
})

// will print ['hello', 'world!']
mq.push('hello', 'world!')

sub.deinit()

messageQue.subscribe(subscriber)

where subscriber: ƒ(...any)

Conscripts subscriber to be called on every broadcast. Returns a subscription object that you can .deinit(). Deiniting a subscription is immediate, even during an ongoing broadcast.

const mq = new MessageQue()

const sub = mq.subscribe(function subscriber (...args) {})

// call when you're done
sub.deinit()

messageQue.unsubscribe(subscription)

Same as subscription.deinit().

messageQue.push(...args)

Broadcasts ...args to all current subscribers.


Observable()

implements isDeinitable, isObservable

Abstract class for implementing observables and observable refs. Not useful on its own. See Atom and Computation, which are based on this.

Uses subscription counting to lazily initialise and deinitialise. Calls .onInit() when adding the first subscription, and .onDeinit() when removing the last. May initialise and deinitialise repeatedly over the course of its lifetime. A subclass may override .onInit() and .onDeinit() to setup and teardown any external resources it needs, such as HTTP requests or websockets.

observable.subscribe(subscriber)

where subscriber: ƒ(...any)

Conscripts subscriber to be called every time the observable is triggered.

Returns a subscription object that you can .deinit(). Deiniting a subscription is immediate, even during an ongoing trigger.

const sub = someObservable.subscribe(function subscriber (...args) {
  // ...
})

// call when you're done
sub.deinit()

observable.unsubscribe(subscription)

Same as subscription.deinit().

observable.trigger(...args)

Call to notify subscribers, passing ...args to each. Triggers never overlap: if .trigger() is called during another ongoing trigger, the redundant call is put on an internal Que to be executed later.

observable.onInit()

Called when adding the first subscription.

observable.onDeinit()

Called when removing the last subscription.

observable.deinit()

Deinits all current subscriptions. This incidentally triggers .onDeinit() if the observable is active.


Atom(value)

extends Observable

implements isAtom

Basic observable reference. Inspired by clojure.core/atom. Should be paired with Emerge for efficient nested updates.

const atom = new Atom(10)

atom.deref()  // 10

const sub = atom.subscribe(atom => {
  console.info(atom.deref())
})

atom.swap(value => value + 1)
// prints 11

atom.swap(value => value + 100)
// prints 111

sub.deinit()

atom.swap(mod, ...args)

where mod: ƒ(currentValue, ...args)

Sets the value of atom to the result of calling mod with the current value and the optional args. Triggers subscribers if the value has changed at all.

const atom = new Atom(10)

atom.deref()  // 10

// no additional args
atom.swap(value => value * 2)

atom.deref()  // 20

const add = (a, b, c) => a + b + c

// additional args
atom.swap(add, 1, 2)

atom.deref()  // add(20, 1, 2) = 23

atom.reset(value)

Resets atom‘s value to the provided value and triggers subscribers if the value has changed at all.

const atom = new Atom(10)

atom.reset(20)

atom.deref()  // 20

Agent(value)

extends Atom

implements isAgent

Combines three big ideas. It’s a tool for building:

In addition to its Atom qualities, an agent automatically manages the lifetimes of the objects it contains, directly or indirectly. Modifying an agent’s value via agent.swap() or agent.reset() invokes deinitDiff on the previous and next value, automatically deiniting any removed objects that implement isDeinitable.

const {patch} = require('emerge')

class Resource {
  constructor (name) {this.name = name}
  deinit () {console.info('deiniting:', this.name)}
}

const agent = new Agent({first: new Resource('first')})

agent.swap(patch, {second: new Resource('second')})

agent.deref()
// {inner: {first: Resource{name: 'first'}, second: Resource{name: 'second'}}}

// Any replaced or removed object is automatically deinited

agent.swap(patch, {first: new Resource('third')})
// 'deiniting: first'

agent.swap(patch, {second: null})
// 'deiniting: second'

agent.deref()
// {inner: {first: Resource{name: 'third'}}}

agent.deinit()
// 'deiniting: third'

agent.deref()
// undefined

agent.swap(mod, ...args)

In addition to modifying the agent’s value (see atom.swap()), diffs the previous and the next value, deiniting any removed objects.

See the example above.

agent.reset(value)

In addition to modifying the agent’s value (see atom.reset()), diffs the previous and the next value, deiniting any removed objects.

See the example above.

agent.deinit()

In addition to deiniting subscriptions (see observable.deinit()), resets the agent to undefined, deiniting the previous value.

See the example above.

agent.unwrap()

Resets agent to undefined, returning the previous value as-is, without deiniting it. If one of the subscriptions triggered by .unwrap() produces an exception before .unwrap() returns, the value is automatically deinited to avoid leaks.

In Rust terms, .unwrap() implies moving the value out of the agent. The caller must take responsibility for the lifetime of the returned value.

const atom = new Atom(10)

const sub = atom.subscribe(atom => {
  console.info('updated:', atom.deref())
})

const agent = new Agent({sub})

agent.deref()
// {sub: Subscription{state: 'ACTIVE', ...}}

atom.reset(20)
// 'updated: 20'

const value = agent.unwrap()
// {sub: Subscription{state: 'ACTIVE', ...}}

// The value has been moved out of the agent
agent.deref()
// undefined

// The subscription is still active
atom.reset(30)
// 'updated: 30'

// We must take responsibility for its lifetime
value.sub.deinit()

For comparison, .reset() will diff and deinit the previous value:

const atom = new Atom(10)

const sub = atom.subscribe(atom => {
  console.info('updated:', atom.deref())
})

const agent = new Agent({sub})

agent.deref()
// {sub: Subscription{state: 'ACTIVE', ...}}

atom.reset(20)
// 'updated: 20'

agent.reset(undefined)

sub
// Subscription{state: 'IDLE', ...}

atom.reset(30)
// nothing

Reaction()

implements isDeinitable

Enables implicit reactivity driven by procedural data access. Write code that looks like a plain synchronous function, but is actually reactive. With Reaction, you should never again subscribe or unsubscribe manually. Simply pull data from observable refs. The subscriptions are updated on each run, and therefore may change over time.

See Computation for an observable variant.

const one = new Atom(10)
const other = new Atom(20)

const reaction = Reaction.loop(({deref}) => {
  console.info(deref(one), deref(other))
})
// prints 10, 20

one.swap(value => 'hello')
// prints 'hello', 20

other.swap(value => 'world')
// prints 'hello', 'world'

reaction.deinit()

reaction.run(fun, onTrigger)

where fun: ƒ(reaction), onTrigger: ƒ(reaction)

Runs fun in the context of the reaction, subscribing to any observable refs passed to .deref() during the run. Returns the result of fun. onTrigger will be called when any of those observable refs is triggered.

The subscriptions created during a .run() race with each other. As soon as one is triggered, all subscriptions are invalidated and onTrigger is called. Until the next .run(), which is typically immediate, no further triggers will occur, but subscriptions remain “active” until the end of the next .run(), at which point they’re replaced with the new subscriptions and deinited. They’re also deinited on .deinit(). Overlapping the subscription lifetimes allows to avoid premature deinitialisation of lazy observables.

const atom = new Atom(10)

const reaction = new Reaction()

reaction.run(
  function effect ({deref}) {
    return deref(atom)
  },
  function update () {
    console.info('notified')
    // maybe rerun
  }
)
// 10

atom.swap(value => 20)
// 'notified'

reaction.deinit()

reaction.deref(ref) bound method

Outside a .run(), equivalent to deref(ref). During a .run(), and if ref implements isObservable, implicitly subscribes to ref. See the examples above.

.deref() is instance-bound for compatibility with destructuring.

static Reaction.loop(fun)

Creates and starts a reaction that reruns fun on every change. See the examples above.


Computation(def, equal)

where def: ƒ(Reaction), equal: ƒ(any, any): bool

extends Observable

implements isObservableRef

Defines a reactive computation that pulls data from multiple observable refs. Filters redundant updates using the equal function. Based on Reaction. Lazy: doesn’t update when it has no subscribers.

Inspired by Reagent’s reaction.

const eq = (a, b) => a === b
const one = new Atom(10)
const other = new Atom({outer: {inner: 20}})
const inOther = new PathQuery(other, ['outer', 'inner'], eq)

const computation = new Computation(({deref}) => {
  return deref(one) + deref(inOther)
}, eq)

computation.deref()  // undefined

const sub = computation.subscribe(({deref}) => {
  console.info(deref())
})

computation.deref()  // 30

one.swap(value => 'hello')
// 'hello20'

other.swap(value => ({outer: {inner: ' world'}}))
// 'hello world'

sub.deinit()

// computation is now inert and safe to leave to GC
// alternatively, call computation.deinit() to drop all subs

Query(observableRef, query, equal)

where query: ƒ(any): any, equal: ƒ(any, any): bool

extends Observable

implements isObservableRef

Creates an observable that derives its value from observableRef by calling query and filters redundant updates by calling equal. Lazy: doesn’t update when it has no subscribers.

const eq = (a, b) => a === b
const atom = new Atom({outer: {inner: 10}})
const query = new Query(atom, (value => value.outer.inner * 2), eq)

query.deref()  // undefined

const sub = query.subscribe(query => {
  console.info(query.deref())
})

query.deref()  // 20

atom.swap(value => ({outer: {inner: 20}}))
// prints 40

// now the query is inert again
sub.deinit()

In RxJS terms, new Query(observableRef, query, equal) is equivalent to observable.map(query).distinctUntilChanged(equal).


PathQuery(observableRef, path, equal)

where path: [string|number], equal: ƒ(any, any): bool

extends Query

implements isObservableRef

Special case of Query. Shortcut to accessing value by path.

new PathQuery(observableRef, path, equal)
// equivalent to:
new Query(observableRef, value => derefIn(value, path), equal)
const eq = (a, b) => a === b
const atom = new Atom({outer: {inner: 10}})
const query = new PathQuery(atom, ['outer', 'inner'], eq)

query.deref()  // undefined

const sub = query.subscribe(query => {
  console.info(query.deref())
})

query.deref()  // 10

atom.swap(value => ({outer: {inner: 20}}))
// prints 20

// now the query is inert again
sub.deinit()

Utils

global

Current global context. Browser: window, Node.js: global, webworkers: self, and so on.


isMutable(value)

True if value can be mutated (add/remove/modify properties). This includes most objects and functions. False if value is frozen or a primitive (nil, string, number, etc).

isMutable({})                  =   true
isMutable(isMutable)           =   true
isMutable(null)                =   false
isMutable(Object.freeze({}))   =   false

bindAll(object)

Takes a mutable object and binds all of its methods to it, via Function.prototype.bind. They become bound methods and can be freely detached.

Currently supports only enumerable properties (both own and inherited), and therefore doesn’t work with spec-compliant classes.

Returns the same object.

// Setup
const object = {getSelf () {return this}}
object.getSelf() === object

// Detached unbound method: doesn't work
const unbound = object.getSelf
unbound() === global  // true

// Detached bound method: works
bindAll(object)
const bound = object.getSelf
bound() === object  // true

assign(object, ...sources)

Similar to Object.assign. Mutates object, assigning enumerable properties (own and inherited) from each source. Returns the same object.

Be wary: mutation is often misused. When dealing with data, you should program in a functional style, treating your data structures as immutable. Use a library like Emerge for data transformations.

assign()                        =  {}
assign({})                      =  {}
assign({}, {one: 1}, {two: 2})  =  {one: 1, two: 2}

pull(array, value)

Mutates array, removing one occurrence of value from the start, comparing by fpx.is. Returns array.

Counterpart to the built-ins Array.prototype.push and Array.prototype.unshift.

const array = [10, 20]
pull(array, 10)  // returns `array`
array  // [20]

deinit(ref)

Complementary function for isDeinitable. Calls ref.deinit() if available. Safe to call on values that don’t implement isDeinitable.

const ref = {
  deinit () {
    console.info('deiniting')
  }
}

deinit(ref)
// 'deiniting'

// calling with a non-deinitable does nothing
deinit('non-deinitable')

deinitDiff(prev, next)

Utility for automatic management of object lifetimes. See isDeinitable, isOwner, Agent for more details and examples.

Diffs prev and next, deiniting any objects that implement isDeinitable and are present in prev but not in next. The diff algorithm recursively traverses plain data structures (fpx.isDict and fpx.isArray), but stops at non-plain objects, allowing you to safely include third party objects of unknown size and structure.

Resilient to exceptions: if a deiniter or a property accessor produces an exception, deinitDiff will still traverse the rest of the tree, delaying exceptions until the end.

Detects and avoids circular references.

class Resource {
  constructor (name) {this.name = name}
  deinit () {console.info('deiniting:', this.name)}
}

class BlackBox {
  constructor (inner) {this.inner = inner}
}

const prev = {
  root: new Resource('Sirius'),
  dict: {
    inner: new Resource('Arcturus'),
  },
  list: [new Resource('Rigel')],
  // Sun is untouchable to deinitDiff because it's wrapped
  // into a non-plain object that doesn't implement isDeinitable
  blackBox: new BlackBox(new Resource('Sun'))
}

const next = {
  root: prev.root,
  dict: {
    inner: new Resource('Bellatrix')
  },
  list: null,
}

deinitDiff(prev, next)

// 'deiniting: Arcturus'
// 'deiniting: Rigel'

deinitDiff(next, null)

// 'deiniting: Sirius'
// 'deiniting: Bellatrix'

unwrap(ref)

Complementary function for isOwner. Calls ref.unwrap(), returning the inner value. Safe to call on values that don’t implement isOwner.

See agent.unwrap() for examples.


deref(ref)

Complementary function for isRef. Calls ref.deref() and continues recursively, eventually returning a non-ref. Safe to call on values that don’t implement isRef.

deref('value')                      // 'value'
deref(new Atom('value'))            // 'value'
deref(new Atom(new Atom('value')))  // 'value'
deref({deref () {return 'value'}})  // 'value'

derefIn(ref, path)

Like deref, but on a nested path. Recursively dereferences any nested refs while drilling down. Safe to call on values that don’t implement isRef.

derefIn(10, [])  // 10

const ref = {
  deref () {
    return {
      nested: {
        deref () {
          return 100
        }
      }
    }
  }
}

derefIn(ref, ['nested'])  // 100

derefAt(path, ref)

Same as derefIn(ref, path). Useful for partial application when path is known in advance.


Author

Nelo Mitranim: https://mitranim.com