Intro To Effect, Part 5: Software Transactional Memory in Effect

Intro To Effect, Part 5: Software Transactional Memory in Effect

Intro to Effect series:

  1. What is Effect?
  2. Handling Errors
  3. Managing Dependencies
  4. Concurrency in Effect
  5. Software Transactional Memory in Effect

Software Transactional Memory is an advanced concurrency control technique that simplifies concurrent programming by treating shared memory updates as transactions, allowing multiple operations to be composed in an atomic and consistent manner.

An in-depth understanding of STM requires diving into various aspects, such as its design principles, the core data structures, and the operations that work within the STM context. Let’s explore each of these features to gain a solid understanding of STM and see it in action.

What is Software Transactional Memory

Design Principles

STM in Effect is built upon the following core principles:

  • Atomicity: All operations inside an STM transaction are atomic, meaning either all updates are committed, or none are committed.
  • Consistency: Transactions maintain the consistency of the shared state, ensuring any updates preserve the invariants of your data structures.
  • Isolation: Transactions are isolated from each other, and the updates of one transaction are invisible to others until they are committed.
  • Composability: STMs are effects that can be easily combined and composed together, retaining their functional error-handling capabilities.

Core Data Structures

STM is built around two main data structures: TRef and STM.

TRef

The primary data structure is TRef<A>, a mutable transactional reference to a shared memory location that allows transactional operations. TRefs encapsulate the state and provide the necessary operations to manage state updates transactionally.

A TRef<A> provides a way to transactionally modify a shared state as part of a transactional effect. The primary operations of a TRef are getset, and update. When you inspect the API of TRef, you’ll see that all these operations and their combinations return the STM type, which could be thought of as another effectful type akin to Effect<R, E, A>:

// module: TRef.ts
const get: <T>(self: TRef.TRef<T>) => STM<never, never, T>;
const set: <A>(self: TRef<A>, value: A): STM<never, never, void>;
const update: <A>(self: TRef<A>, f: (a: A) => A): STM<never, never, A>;

It is important to note that while a TRef<A> serves as a transactional mutable reference, the value it contains should be immutable.

STM

STM<R, E, A> is a data type that models a transactional effect and defines the context within which STM operations and combinators are executed. It is essentially an effect type that can fail with an error (of type E) or succeed with a value (of type A) while requiring a context of type R to execute.

STM models an atomic operation that works with TRefs and provides the building blocks for creating consistent and isolated transactions in a concurrent setting. Its transactions consist of read, write, and update operations on TRefs, as well as combinators like check to add conditions, and orElse to compose multiple STM effects. These transactions are atomic, consistent, and isolated from each other.

To run an STM transaction, we use the commit operation, which turns an STM effect into a regular effect (STM<R, E, A> → Effect<R, E, A>). Once converted, the committed transaction can be executed in the broader effect context, benefiting from all the error handling and resource management capabilities.

Let’s take a look at a short example of using STM to model a counter that can be safely accessed from any fiber. To begin, we need to add the @effect/stm package to the project:

npm i @effect/stm

First, we will use the STM effect to implement the increment and decrement operations. They will be working off a simple TRef<number> that represents our shared concurrent counter:

import * as STM from '@effect/stm/STM';
import * as TRef from '@effect/stm/TRef';

const increment = (counter: TRef.TRef<number>): STM.STM<never, never, void> =>
  TRef.update(counter, (count) => count + 1);

const decrement = (counter: TRef.TRef<number>): STM.STM<never, never, void> =>
  TRef.update(counter, (count) => count - 1);

Now we can write a program that increments the counter 10 times in one fiber, and decrements 10 times in another one:

import { Effect, Fiber } from 'effect';

const program = Effect.gen(function* (_) {
  const counter = yield* _(TRef.make(0));

  const incrementFiber = yield* _(Effect.forkAll(Array.from({ length: 10 }, () => STM.commit(increment(counter)))));
  const decrementFiber = yield* _(Effect.forkAll(Array.from({ length: 10 }, () => STM.commit(decrement(counter)))));

  yield* _(Fiber.joinAll([incrementFiber, decrementFiber]));

  const finalCount = yield* _(STM.commit(TRef.get(counter)));
  yield* _(Effect.log(`Final count: ${finalCount}`)); // will be 0, because all increments are matched with a decrement
});

Effect.runPromise(program);

We launch 10 increment fibers and 10 decrement fibers concurrently. Each increment and decrement operation updates the shared Counter state atomically, demonstrating STM’s ability to handle concurrent state changes without race conditions.

Journal

There is a third data structure, Journal, that plays a crucial role in STM runtime. It is an internal data structure used to record all the changes made to TRefs within a transaction. It is crucial for implementing atomicity, as the journal records changes that are eventually committed or discarded, depending on the transaction’s success or failure.

A journal is a key internal data structure in STM, crucial for maintaining atomicity, consistency, and isolation in transactions. Essentially, it is an in-memory log of a transaction’s read, write, and conditional check operations on TRefs (transactional references). The journal isolates a transaction’s updates, only applying modifications to the shared memory (TRefs) upon successful commitment. Under the hood, Journal is just a Map<TRef<unknown>, Entry>, where Entry is a single versioned entry in the journal that represents the most recent state of the TRef.

During a transaction, the STM runtime consults the journal for read or write operations on TRefs. If a TRef is accessed for the first time, the runtime creates a new journal entry with the TRef’s initial state, ensuring a consistent snapshot of all involved TRefs. Changes to TRefs are recorded in the journal without altering the shared memory location.

Upon commitment, the runtime verifies if journal conditions still hold. If valid, updates are applied atomically to the shared memory. Otherwise, the transaction is suspended, and the runtime retries when any involved references change.

STM In Action

Counters are a simple use case, and perhaps do not demonstrate the full capabilities of STM. To better illustrate the power of STM, let’s explore a more realistic example, such as reserving seats for flights.

Our main data model will hold a single field of TRef<number> type that represents the current number of available seats on the flight:

interface Flight {
  readonly seats: TRef.TRef<number>;
}

Next, let’s imagine that a passenger wants to book a flight. This operation decreases the number of available seats, but it cannot decrease it below zero. In STM, there is a function called check that ensures that a transaction continues only if a predicate holds true. The downside of this function is that it retries silently, so we cannot cap the number of retries or somehow react to the failure. So in this example, I will explicitly return STM.fail:

import * as STM from '@effect/stm/STM';
import * as TRef from '@effect/stm/TRef';
import { Effect, Schedule } from 'effect';

class OverbookedError {
  readonly _tag: 'OverbookedError' = 'OverbookedError';
}

const bookSeats = (flight: Flight, seatsToReserve: number): Effect.Effect<never, OverbookedError, void> =>
  STM.gen(function* (_) {
    const seats = yield* _(TRef.get(flight.seats));

    if (seats < seatsToReserve) {
      return yield* _(STM.fail(new OverbookedError()));
    }

    yield* _(TRef.set(flight.seats, seats - seatsToReserve));
  }).pipe(
    STM.commit,
    Effect.delay(`${Math.ceil(Math.random() * 100)} millis`),
    Effect.retry(Schedule.addDelay(Schedule.recurs(5), () => Duration.millis(50)))
  );

What happens in .pipe needs an additional explanation. First, we turn STM into Effect by committing our transaction with STM.commit. Next, we delay the whole effect by some random amount of milliseconds. This simulates the decision-making process by the passenger, or database/network delay — whatever motivates the delay better for you, my reader. Finally, we make the whole effect retry, and for this case, we construct a custom schedule — a strategy of retrying. In this case, we retry a maximum of 5 times with a 50 milliseconds delay between attempts.

N.B. See how OverbookedError was defined? Having the _tag field present in this class enables us to partially catch that error using Effect.catchTag! This is one of the recommended ways of modelling errors when using Effect.

If we return to our problem domain, we can imagine that sometimes a passenger wants to return their ticket. It is a straightforward function — we just increment the number of available seats by the returned amount:

const unbookSeats = (flight: Flight, seatsToUnbook: number): Effect.Effect<never, never, void> =>
  STM.gen(function* (_) {
    const seats = yield* _(TRef.get(flight.seats));
    yield* _(TRef.set(flight.seats, seats + seatsToUnbook));
  }).pipe(STM.commit);

// or even simpler, using a single `update`:

const unbookSeats = (flight: Flight, seatsToUnbook: number): Effect.Effect<never, never, void> =>
  STM.commit(TRef.update(flight.seats, (seats) => seats + seatsToUnbook));

Now we can start writing our main program.

const program = Effect.gen(function* (_) {
  const seats = yield* _(TRef.make(50)); // Canadair Regional Jet 200 with 50 seats
  const flight: Flight = { seats };

As usual, we construct a new TRef instance. Make a note that it is an effectful operation, so you need to await its completion with yield* _(..).

Next, let’s spawn 30 fibers, each representing a passenger that wants to book N seats for their family (N ∈ [1, 5]). On a rare occasion, a passenger might return M ≤ N seats:

  yield* _(
    Effect.all(
      Array.from({ length: 30 }, (_, i) => {
        const idx = `${i + 1}`.padStart(2, '0');

        return Effect.gen(function* (_) {
          const seatsToReserve = yield* _(Random.nextIntBetween(1, 5));
          const bookingResult = yield* _(Effect.either(bookSeats(flight, seatsToReserve)));

          yield* _(
            Either.match(bookingResult, {
              onLeft: () =>
                Effect.logError(`Failed to reserve ${seatsToReserve} seats for passenger #${idx} after 5 attempts`),

              onRight: () =>
                Effect.gen(function* (_) {
                  yield* _(Effect.log(`Reserved ${seatsToReserve} seats for passenger #${idx}`));

                  // Sometimes a passenger will return their booked seats:
                  if (Math.random() > 0.2) {
                    // We simulate passenger's hesitance here:
                    yield* _(Effect.delay(Effect.succeed(undefined), `${Math.ceil(Math.random() * 100)} millis`));

                    const seatsReturned = yield* _(Random.nextIntBetween(0, seatsToReserve));

                    if (seatsReturned > 0) {
                      yield* _(unbookSeats(flight, seatsReturned));
                      yield* _(Effect.log(`Unbooked ${seatsReturned} seats for passenger #${idx}`));
                    }
                  }
                }),
            })
          );
        }).pipe(Effect.withLogSpan(`Reserve#${idx}`));
      }),
      { concurrency: 'unbounded' }
    )
  );

Finally, we want to see how many seats remain after all bookings:

  const remainingSeats = yield* _(STM.commit(TRef.get(flight.seats)));

  yield* _(Effect.log(`Remaining seats: ${remainingSeats}`));
});

When we run this program using Effect.runPromise(program), we might see something similar to this:

timestamp=2023-08-22T12:48:30.335Z level=INFO fiber=#19 message="Reserved 1 seats for passenger #19" Reserve#19=2ms
timestamp=2023-08-22T12:48:30.337Z level=INFO fiber=#21 message="Reserved 1 seats for passenger #21" Reserve#21=4ms
timestamp=2023-08-22T12:48:30.339Z level=INFO fiber=#7 message="Reserved 4 seats for passenger #07" Reserve#07=8ms
timestamp=2023-08-22T12:48:30.342Z level=INFO fiber=#16 message="Reserved 3 seats for passenger #16" Reserve#16=10ms
timestamp=2023-08-22T12:48:30.344Z level=INFO fiber=#2 message="Reserved 4 seats for passenger #02" Reserve#02=14ms
timestamp=2023-08-22T12:48:30.346Z level=INFO fiber=#20 message="Reserved 3 seats for passenger #20" Reserve#20=13ms
timestamp=2023-08-22T12:48:30.348Z level=INFO fiber=#6 message="Reserved 3 seats for passenger #06" Reserve#06=17ms
timestamp=2023-08-22T12:48:30.349Z level=INFO fiber=#22 message="Reserved 2 seats for passenger #22" Reserve#22=16ms
timestamp=2023-08-22T12:48:30.349Z level=INFO fiber=#1 message="Reserved 3 seats for passenger #01" Reserve#01=20ms
timestamp=2023-08-22T12:48:30.351Z level=INFO fiber=#27 message="Reserved 1 seats for passenger #27" Reserve#27=18ms
timestamp=2023-08-22T12:48:30.355Z level=INFO fiber=#10 message="Reserved 3 seats for passenger #10" Reserve#10=23ms
timestamp=2023-08-22T12:48:30.356Z level=INFO fiber=#13 message="Reserved 4 seats for passenger #13" Reserve#13=24ms
timestamp=2023-08-22T12:48:30.360Z level=INFO fiber=#17 message="Reserved 1 seats for passenger #17" Reserve#17=28ms
timestamp=2023-08-22T12:48:30.362Z level=INFO fiber=#25 message="Reserved 2 seats for passenger #25" Reserve#25=29ms
timestamp=2023-08-22T12:48:30.363Z level=INFO fiber=#14 message="Reserved 1 seats for passenger #14" Reserve#14=31ms
timestamp=2023-08-22T12:48:30.367Z level=INFO fiber=#5 message="Reserved 3 seats for passenger #05" Reserve#05=36ms
timestamp=2023-08-22T12:48:30.369Z level=INFO fiber=#18 message="Reserved 2 seats for passenger #18" Reserve#18=36ms
timestamp=2023-08-22T12:48:30.369Z level=INFO fiber=#26 message="Reserved 3 seats for passenger #26" Reserve#26=36ms
timestamp=2023-08-22T12:48:30.372Z level=INFO fiber=#28 message="Reserved 2 seats for passenger #28" Reserve#28=39ms
timestamp=2023-08-22T12:48:30.372Z level=INFO fiber=#29 message="Reserved 1 seats for passenger #29" Reserve#29=39ms
timestamp=2023-08-22T12:48:30.372Z level=INFO fiber=#22 message="Unbooked 1 seats for passenger #22" Reserve#22=39ms
timestamp=2023-08-22T12:48:30.377Z level=INFO fiber=#16 message="Unbooked 2 seats for passenger #16" Reserve#16=45ms
timestamp=2023-08-22T12:48:30.379Z level=INFO fiber=#15 message="Reserved 3 seats for passenger #15" Reserve#15=47ms
timestamp=2023-08-22T12:48:30.383Z level=INFO fiber=#7 message="Unbooked 3 seats for passenger #07" Reserve#07=52ms
timestamp=2023-08-22T12:48:30.387Z level=INFO fiber=#23 message="Reserved 2 seats for passenger #23" Reserve#23=54ms
timestamp=2023-08-22T12:48:30.387Z level=INFO fiber=#24 message="Reserved 1 seats for passenger #24" Reserve#24=54ms
timestamp=2023-08-22T12:48:30.392Z level=INFO fiber=#30 message="Reserved 1 seats for passenger #30" Reserve#30=59ms
timestamp=2023-08-22T12:48:30.396Z level=INFO fiber=#18 message="Unbooked 1 seats for passenger #18" Reserve#18=63ms
timestamp=2023-08-22T12:48:30.403Z level=INFO fiber=#5 message="Unbooked 2 seats for passenger #05" Reserve#05=72ms
timestamp=2023-08-22T12:48:30.417Z level=INFO fiber=#9 message="Reserved 3 seats for passenger #09" Reserve#09=86ms
timestamp=2023-08-22T12:48:30.417Z level=INFO fiber=#23 message="Unbooked 1 seats for passenger #23" Reserve#23=84ms
timestamp=2023-08-22T12:48:30.418Z level=INFO fiber=#4 message="Reserved 1 seats for passenger #04" Reserve#04=88ms
timestamp=2023-08-22T12:48:30.426Z level=INFO fiber=#11 message="Reserved 1 seats for passenger #11" Reserve#11=94ms
timestamp=2023-08-22T12:48:30.436Z level=INFO fiber=#15 message="Unbooked 2 seats for passenger #15" Reserve#15=104ms
timestamp=2023-08-22T12:48:30.448Z level=INFO fiber=#26 message="Unbooked 1 seats for passenger #26" Reserve#26=115ms
timestamp=2023-08-22T12:48:30.453Z level=INFO fiber=#9 message="Unbooked 1 seats for passenger #09" Reserve#09=122ms
timestamp=2023-08-22T12:48:30.465Z level=INFO fiber=#28 message="Unbooked 1 seats for passenger #28" Reserve#28=132ms
timestamp=2023-08-22T12:48:30.482Z level=INFO fiber=#3 message="Reserved 4 seats for passenger #03" Reserve#03=152ms
timestamp=2023-08-22T12:48:31.166Z level=ERROR fiber=#12 message="Failed to reserve 3 seats for passenger #12 after 5 attempts" Reserve#12=834ms
timestamp=2023-08-22T12:48:31.176Z level=ERROR fiber=#8 message="Failed to reserve 3 seats for passenger #08 after 5 attempts" Reserve#08=845ms
timestamp=2023-08-22T12:48:31.179Z level=INFO fiber=#0 message="Remaining seats: 2"

Here we see how different passengers book various amounts of seats, sometimes returning them back. And for the last two poor fellas — the system could not satisfy their demands, so after 5 attempts their bookings were rejected.

Conclusion

As we saw, Software Transactional Memory offers an effective and sophisticated approach to managing concurrency in functional programming. By adhering to core principles such as atomicity, consistency, isolation, and composability, STM simplifies the coordination of shared mutable states, ensuring a more robust and reliable concurrent programming experience. Leveraging transactional references (TRefs) and STM effects, developers can construct concurrent programs that are both composable and maintainable, avoiding common issues and race conditions that often arise in concurrent software development. Overall, STM allows for the creation of dependable, scalable, and efficient concurrent applications that align well with the functional programming paradigm.