Skip to main content

Parallel Operations

Overview

As discussed in Flow and Action Functions, flow functions can orchestrate work across actions and child flows. Sometimes, it is desirable to launch multiple flow and action functions in parallel and then join on results later. Oryonix supports this use case with simple APIs for launching parallel work and waiting on results.

  • All flow and action functions have .start(...) methods for launching them in parallel.
  • .start(...) returns an object that can be used to wait on and fetch results once the function has completed.
  • onix.wait() can be used to collect the results of multiple parallel operations.

Launch Parallel Work with .start()

Directly calling a flow or action function is blocking--the caller will wait for the callee to finish before proceeding. To launch a flow or action function in parallel, call .start(...) on it instead. .start(...) returns a Future-like object that keeps track of the running function.

When you are ready to retrieve the result, call .result() on the future returned by .start(...). This will block until the result is available.

import onix

@onix.action
def fetch_profile(user_id: str):
# e.g. API call
return {"user_id": user_id, "name": "Ada"}

@onix.action
def fetch_recent_orders(user_id: str):
# e.g. DB query
return [{"id": "ord-1"}, {"id": "ord-2"}]

@onix.flow
def get_dashboard(user_id: str):
# Calls fetch_profile(user_id=user_id) in the background, and immediately
# proceeds to the next line without waiting for the result.
profile_future = fetch_profile.start(user_id=user_id)

# Calls fetch_recent_orders(...) in the background, similar to the above.
orders_future = fetch_recent_orders.start(user_id=user_id)

# Waits for the earlier call to fetch_profile() to finish, and assigns the
# result to `profile`. Note that the call to fetch_recent_orders() is still
# running in the background while we wait for fetch_profile() to finish.
profile = profile_future.result()

# Waits for the earlier call to fetch_recent_orders() to finish, similar to
# the above.
orders = orders_future.result()

return {
"profile": profile,
"orders": orders,
}

Wait for Multiple Operations with onix.wait()

There may be times when you wish to wait for multiple operations to finish, but you don't care about the order in which they finish. For example, you may wish to wait for all operations to finish before proceeding, or you may wish to proceed as soon as the first operation finishes.

All of these scenarios can be handled using onix.wait(). Similar to Python's concurrent.futures.wait(), onix.wait() takes a collection of futures and a return_when condition, and returns two sets in a named tuple:

  1. onix.wait(...).done is a set that contains all completed futures.
  2. onix.wait(...).not_done is a set that contains all incomplete futures.

Note that, because done and not_done are sets, the futures may be returned in a different order than they were passed in. If the order is important, you can keep track of the original order separately and use the sets returned by onix.wait() to determine which futures have completed.

Wait for All Operations

For fan-out/fan-in workflows, you can wait on a collection of futures using onix.wait(...) with return_when=onix.ALL_COMPLETED. This will block until all of the futures have completed. This is useful in situations where you want to do a lot of work in parallel, for example, performing different validation checks in parallel, or doing batch processing of large datasets.

In this case, the done set returned by onix.wait(...) will contain all of the futures, and the not_done set will always be empty. This is because the function only returns once all futures have completed.

import onix

@onix.action
def check_inventory(order_id: str):
if order_id == "ord-1":
return [] # No errors
else:
return ["item out of stock"]

# ...

VALIDATION_CHECKS = [check_inventory, check_shipping_address, check_fraud]

@onix.flow
def validate_order(order_id: str):
# Launch all three function calls in parallel
check_futures = [f.start(order_id=order_id) for f in VALIDATION_CHECKS]

# Wait for all three to complete.
done, not_done = onix.wait(check_futures, return_when=onix.ALL_COMPLETED)

# After ALL_COMPLETED, all three futures are in the done set.
# The not_done set is always empty.
errors = []
for f in done:
errors.extend(f.result())

return {"ok": not errors, "errors": errors}

Wait for the Fastest Operation

When onix.wait() is called with return_when=onix.FIRST_COMPLETED, onix.wait(...) returns as soon as at least one child flow finishes. This is useful if you are doing any kind of incremental processing or streaming and want to process results as they come in. It is also useful if you want to "race" different operations in parallel and use the fastest result--for example, fetching from two different caches in different locations.

In this case, the done set returned by onix.wait(...) will contain at least one future that has completed, and the not_done set will contain the remaining futures that are still running.

note

In rare scenarios, multiple futures may complete at the same time. In this case, onix.wait() returns all of the completed futures in the done set. You may need to account for this in scenarios where the results of futures other than the first-completed one are also relevant.

import onix

@onix.flow
def fetch_from_cache(user_id: str):
return {"user_id": user_id, "source": "cache"}

@onix.flow
def fetch_from_db(user_id: str):
return {"user_id": user_id, "source": "db"}

@onix.flow
def get_user_info(user_id: str):
# Launch multiple redundant calls in parallel, and use the result of the
# fastest one to improve latency.
f1 = fetch_from_cache.start(user_id=user_id)
f2 = fetch_from_db.start(user_id=user_id)

done, not_done = onix.wait(
[f1, f2],
return_when=onix.FIRST_COMPLETED,
)

# done contains the future that finished first.
# not_done contains the other future that is still running.

return done.pop().result()
🚧coming soon

FIRST_COMPLETED is currently supported for child flow futures only. If any action future is included, onix.wait(..., return_when=onix.FIRST_COMPLETED) raises NotImplementedError.

tip

onix.wait(..., return_when=onix.FIRST_EXCEPTION) is not currently supported. If you need this behavior, you can implement it manually by checking for exceptions in the done set after FIRST_COMPLETED returns.

Handle Failures from Parallel Operations

Just like regular calls to flow and action functions, parallel calls using .start(...) can also fail. If a blocking call to a function would fail, then attempting to fetch the result of the corresponding future will also fail.

import onix

@onix.action
def might_fail(x):
if x == 0:
raise Exception("x cannot be zero")
return 10 / x

@onix.flow
def test_parallel_failures(x):
# Launch the action that might fail in the background:
future = might_fail.start(x=x)

# The exception is thrown when result() is called:
try:
return future.result()
except Exception as e:
return f"Caught exception: {e}"
tip

As a convenience, if you want to retrieve an exception from a failed call, you can use future.exception() instead of future.result(). This will return the exception object instead of re-raising it. If the call was successful, future.exception() returns None.

The same holds true of onix.wait()--if any of the waited-on futures fail, their exceptions will be raised when calling .result() on those futures. onix.wait() itself does not throw exceptions from failed futures.

import onix

@onix.flow
def test_wait_failures(x):
future1 = might_fail.start(x=x)
future2 = might_fail.start(x=x + 1)

done, not_done = onix.wait([future1, future2], return_when=onix.ALL_COMPLETED)

results = []
for f in done:
try:
results.append(f.result())
except Exception as e:
results.append(f"Caught exception: {e}")

return results

What Happens If Futures Are Leaked?

A leaked future means a flow or action was started with .start(...), but the caller never consumes that future with .result(), .exception(), or onix.wait(...).

tip

start() should not be used as a "fire-and-forget" primitive. If you launch work, you should wait for it to complete before you return from the flow that started it, unless you truly do not care about the work being terminated prematurely.

If a future is leaked, the behavior is undefined. Specifically:

  • The work has already been dispatched, so side effects may still happen.
  • If the parent flow reaches completion without waiting for outstanding futures, those in-flight operations are not guaranteed to complete in a way that is observable to the caller. They may be prematurely terminated, or they may continue running to completion.
  • Any success/failure outcome from the leaked future is never observed by your code, so errors can be silently lost.

Leaked futures are most likely to show up when using onix.wait() with return_when=onix.FIRST_COMPLETED, since the remaining futures that are still running in the background may be accidentally forgotten about. If you use FIRST_COMPLETED and you care about the side-effects or results of these futures, make sure to wait on all the futures in the not_done set before returning from the flow.

🚧coming soon

We plan to more clearly define the behavior of leaked futures in the future. In particular, we may introduce warnings or errors for leaked futures, and we may introduce APIs for explicitly detaching or abandoning a future if you really do want "fire-and-forget" behavior. We may also introduce an API for cancelling an in-flight future.