Skip to content

Execution

Executing a Graph can be done by calling run(graph).

As an example, let's look at what happens when executing the following graph.

from asyncgraphs import Graph, run

g = Graph()
transformed = g | [1, 2, 3] | (lambda x: x + 1)
transformed | (lambda x: x * 2) | (lambda x: print(f"Doubled: {x}"))
transformed | (lambda x: print(f"Regular: {x}"))

await run(g)

The graph thus looks like this:

flowchart LR
    Source["[1, 2, 3]"]
    AddOne["x + 1"]
    Double["x * 2"]
    PrintDouble["print(f#quot;Doubled: {x}#quot;)"]
    PrintRegular["print(f#quot;Regular: {x}#quot;)"]

    Source --> AddOne
    AddOne --> Double --> PrintDouble
    AddOne --> PrintRegular

When calling run(g), this library adds asyncio Queues between each pair of nodes. At runtime, this is the graph:

flowchart LR
    Source["[1, 2, 3]"]
    AddOne["x + 1"]
    Double["x * 2"]
    PrintDouble["print(f#quot;Doubled: {x}#quot;)"]
    PrintRegular["print(f#quot;Regular: {x}#quot;)"]


    Source --> SourceQ([Queue]) --> AddOne
    AddOne --> DoubleQ([Queue]) --> Double --> PrintDoubleQ([Queue]) --> PrintDouble
    AddOne --> PrintRegularQ([Queue]) --> PrintRegular

Concurrency

Inter-node concurrency

There is concurrency between nodes. When the execution of 1 node is blocked by I/O, other nodes get to process their input events.

Intra-node concurrency

Currently, intra-node concurrency (a.k.a processing handling multiple events in 1 node concurrently) is not supported.

This will be implemented in this ticket.