{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Async programming with Vaex\n", "\n", "Using the [Rich based progress bar](progressbars.ipynb) we can see that if we call two methods on a dataframe, we get two passes over the data (as indicated by the `[1]` and `[2]`). " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
  Two passes                                    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.15s     \n",
       "├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s     \n",
       "    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s[2]  \n",
       "
\n" ], "text/plain": [ " \u001b[31mTwo passes \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.15s \u001b[0m \n", "├── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "│ └── \u001b[31mvaex.agg.sum('tip_amount') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n", "└── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.06s \u001b[0m \n", " └── \u001b[31mvaex.agg.sum('passenger_count') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.06s[2]\u001b[0m \n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import vaex\n", "\n", "df = vaex.datasets.taxi()\n", "\n", "with vaex.progress.tree('rich', title=\"Two passes\"):\n", " print(df.tip_amount.sum())\n", " print(df.passenger_count.sum())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using `delay=True`\n", "\n", "If we pass `delay=True`, Vaex will not start to execute the tasks it created internally, but will return a [promise](https://en.wikipedia.org/wiki/Futures_and_promises) instead. After calling `df.execute()` all tasks will execute, and the promises will be resolved, meaning that you can use the `.get()` method to get the final value, or use the `.then()` method to represent the result." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
  Single pass using delay                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "
\n" ], "text/plain": [ " \u001b[31mSingle pass using delay \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "├── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "│ └── \u001b[31mvaex.agg.sum('tip_amount') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n", "└── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", " └── \u001b[31mvaex.agg.sum('passenger_count') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "tip_per_passenger = 0.5774000691888607\n" ] } ], "source": [ "with vaex.progress.tree('rich', title=\"Single pass using delay\"):\n", " tip_sum_promise = df.tip_amount.sum(delay=True)\n", " passengers_promise = df.passenger_count.sum(delay=True)\n", " df.execute()\n", " tip_per_passenger = tip_sum_promise.get() / passengers_promise.get()\n", " print(f\"tip_per_passenger = {tip_per_passenger}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using the `@delayed` decorator\n", "\n", "To make life easier, Vaex implements the [vaex.delayed](https://vaex.io/docs/api.html#vaex.delayed) decorator. Once all arguments are resolved, the decorated function will be executed automatically." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
  Single pass using delay + using delayed       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "
\n" ], "text/plain": [ " \u001b[31mSingle pass using delay + using delayed \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "├── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "│ └── \u001b[31mvaex.agg.sum('tip_amount') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n", "└── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", " └── \u001b[31mvaex.agg.sum('passenger_count') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "with vaex.progress.tree('rich', title=\"Single pass using delay + using delayed\"):\n", " @vaex.delayed\n", " def compute(tip_sum, passengers):\n", " return tip_sum/passengers\n", "\n", " tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),\n", " df.passenger_count.sum(delay=True))\n", " df.execute()\n", " print(f\"tip_per_passenger = {tip_per_passenger_promise.get()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Async `await`\n", "\n", "In all of the above cases, we called `df.execute()` which will synchronously execute all tasks using threads. However, if you are using Async IO in Python, this means you are blocking all other async coroutines from running.\n", "\n", "To allow other coroutines to continue running (e.g. in a FastAPI context), we can instead await `df.execute_async()`. On top of that, we can also `await` the promise to get the result, instead of calling `.get()` to make your code look more AsyncIO like.\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
  Single pass using delay + using delayed and await ━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.14s     \n",
       "├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.09s     \n",
       "│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "
\n" ], "text/plain": [ " \u001b[31mSingle pass using delay + using delayed and await\u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.14s \u001b[0m \n", "├── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.09s \u001b[0m \n", "│ └── \u001b[31mvaex.agg.sum('tip_amount') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n", "└── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", " └── \u001b[31mvaex.agg.sum('passenger_count') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "tip_per_passenger = 0.5774000691888603\n" ] } ], "source": [ "with vaex.progress.tree('rich', title=\"Single pass using delay + using delayed and await\"):\n", " @vaex.delayed\n", " def compute(tip_sum, passengers):\n", " return tip_sum/passengers\n", "\n", " tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),\n", " df.passenger_count.sum(delay=True))\n", " await df.execute_async()\n", " tip_per_passenger = await tip_per_passenger_promise\n", " print(f\"tip_per_passenger = {tip_per_passenger}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "**Note:** In the Jupyter notebook, an asyncio event loop is already running. In a script you may need to use `asyncio.run(my_top_level_coroutine())` in order to use `await`.\n", "\n", "
\n", "\n", "### Async auto execute\n", "\n", "In the previous example we manually had to call `df.execute_async()`. This enables Vaex to execute all tasks in as little passes over the data as possible.\n", "\n", "To make life easier, and your code even more AsyncIO like, we can use the `df.executor.auto_execute()` async context manager that will automatically call `df.execute_async()` for you when a promise is awaited." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
  Single pass using auto_execute                ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s     \n",
       "    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]  \n",
       "
\n" ], "text/plain": [ " \u001b[31mSingle pass using auto_execute \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "├── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", "│ └── \u001b[31mvaex.agg.sum('tip_amount') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n", "└── \u001b[31msum \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s \u001b[0m \n", " └── \u001b[31mvaex.agg.sum('passenger_count') \u001b[0m \u001b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[35m100%\u001b[0m \u001b[33m00.08s[1]\u001b[0m \n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "tip_per_passenger = 0.5774000691888609\n" ] } ], "source": [ "with vaex.progress.tree('rich', title=\"Single pass using auto_execute\"):\n", " async with df.executor.auto_execute():\n", " @vaex.delayed\n", " def compute(tip_sum, passengers):\n", " return tip_sum/passengers\n", "\n", " tip_per_passenger = await compute(df.tip_amount.sum(delay=True),\n", " df.passenger_count.sum(delay=True))\n", " print(f\"tip_per_passenger = {tip_per_passenger}\")" ] } ], "metadata": { "interpreter": { "hash": "2b337e1aa502f5cea9a92c761ad75d3ab5045107ee3446fdbe7f873d4f1936e7" }, "kernelspec": { "display_name": "Python 3.8.5 64-bit ('base': conda)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }