Skip to content

AsyncGraphs

Test Coverage Package version Supported Python versions


AsyncGraphs is a tiny ETL framework that leverages asyncio to make the execution concurrent whilst blocked on I/O.

Source: https://github.com/SamVermeulen42/asyncgraphs

Documentation: https://samvermeulen42.github.io/asyncgraphs/


Features

  • Typed
  • Simple concurrency based on asyncio
  • Easy construction of ETL graphs

Installation

pip install asyncgraphs

Example

The following example prints random Pokémon and the games they appear in.

It does this every 10 seconds and uses PokéApi.

import aiohttp
from asyncgraphs import Graph, run
import asyncio
from functools import partial
from random import randint
from typing import Dict, Any


async def random_pokemon_id():
    while True:
        yield randint(1, 151)
        await asyncio.sleep(10)

async def get_pokemon_info(session: aiohttp.ClientSession, pokemon_id: int) -> Dict[str, Any]:
    pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}"
    async with session.get(pokemon_url) as response:
        yield await response.json()

def format_pokemon(pokemon_info: Dict[str, Any]) -> str:
    name = pokemon_info["name"]
    versions = (game['version']['name'] for game in pokemon_info['game_indices'])
    return f"{name}: {', '.join(versions)}"

async def main():
    async with aiohttp.ClientSession() as session:
        g = Graph()
        g | random_pokemon_id() | partial(get_pokemon_info, session) | format_pokemon | print
        await run(g)

asyncio.run(main())