In this post, I will give an example of using python asyncio package to do monitoring. The asyncio APIs are slightly different between python version 3.6 and 3.7. Here I use version 3.7 since it requires less boilerplate. All source code can be found in this repo.

The starting point is a serial implementation of monitor. For simplicity, I also assume the health checks are so fast that timing drift is not a concern.

tic = time.monotonic()

def check_health(i: int, predicate: Callable) -> bool:
    """
    Return True if something goes wrong
    """
    print(f'check health {i} @{time.monotonic() - tic:0.1f}')
    return predicate()

def main():
    any_problem = lambda: False
    while True:
        if check_health(1, any_problem):
            break
        if check_health(2, any_problem):
            break
        if check_health(3, any_problem):
            break
        time.sleep(3)

    do_cleanup()

Here the main function contains an event loop which stops if any health check fails. The dummy function any_problem keeps the loop alive forever. In real situations, different health checks will be applied.

Running this code, we see that the health checks run together periodically, as expected.

chronos (master *+) async-monitor $ python3 serial.py
check health 1 @0.0
check health 2 @0.0
check health 3 @0.0
check health 1 @3.0
check health 2 @3.0
check health 3 @3.0
check health 1 @6.0
check health 2 @6.0
check health 3 @6.0
...

More realistically, we will do different health checks at different intervals. This simple requirement actually has 3 components

  1. health checks should run periodically
  2. they don’t block each other
  3. one failing check can (optionally) stop other checks

The first component is basically an event loop, as in the serial code. The second component can be done in various ways. An obvious choice is to run different checks in different threads (which I won’t show here). The third component requires communication either between different health checks or between the health checks and the main event loop. Again, there are various ways to do it. For example, we can use Event or Condition for the coordination.

Before going to the python asyncio implementation, let’s first look at a golang implementation. This code snippet shares similar overall structure with the later python asyncio code, and reveals some implementation details hidden in the python APIs.

var tic = time.Now()

func runAtInterval(dt time.Duration, checker func() bool, stop chan<- struct{}) {
    ticker := time.NewTicker(dt)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            {
                fmt.Printf("check health %s @%.3s\n", dt, time.Now().Sub(tic))
                bad := checker()
                if bad {
                    stop <- struct{}{}
                }
            }
        }
    }
}

func main() {
    ch1 := make(chan struct{})
    ch2 := make(chan struct{})
    ch3 := make(chan struct{})
    anyProblem := func() bool { return false }
    doomSoon := func() bool { return true }

    go runAtInterval(time.Second*3, anyProblem, ch1)
    go runAtInterval(time.Second*5, anyProblem, ch2)
    go runAtInterval(time.Second*16, doomSoon, ch3)

    select {
    case <-ch1:
        fmt.Println("Problem detected on first checker")
    case <-ch2:
        fmt.Println("Problem detected on second checker")
    case <-ch3:
        fmt.Println("Problem detected on third checker")
    }
    doCleanup()
}

Here the runAtInterval function runs a ticker to do the health check periodically (the first component). Three of they run concurrently in separate goroutines (the second component). When a problem is detected, a stop signal is sent back to the main goroutine via a channel. At that point, the select statement unblocks and cleanup takes place. The unbuffered channels together with the select statement in golang make it very easy to reason about the order of events.

This code actually has a potential problem in the third component. Suppose doCleanup takes some time to complete. While it’s doing the work, the other health checks are still running, which may or may not be desirable. If terminating pending health checks is preferred, we can pass another channel to runAtInterval to signal the cancellation - a second case for the select statement.

Since the focus of this post is not on golang, I won’t elaborate on further variations.

Running this code, we get the interleaving health checks and the mock problem at 16 seconds.

chronos (master *) async-monitor $ go run monitor.go
check health 3s @3.0
check health 5s @5.0
check health 3s @6.0
check health 3s @9.0
check health 5s @10.
check health 3s @12.
check health 5s @15.
check health 3s @15.
check health 16s @16.
Problem detected on third checker

This overall code structure can be mapped to the python asyncio version almost exactly.

tic = time.monotonic()

async def run_at_interval(t: float, predicate: Callable):
    while True:
        await asyncio.sleep(t)
        print(f'check health {t}: @{time.monotonic() - tic:0.1f}')
        feedback = predicate()
        if feedback:
            return feedback

async def main():
    any_problem = lambda: False
    t1 = asyncio.create_task(run_at_interval(3, any_problem))
    t2 = asyncio.create_task(run_at_interval(5, any_problem))
    timeout = asyncio.create_task(asyncio.sleep(16))
    done, pending = await asyncio.wait({t1, t2, timeout},
                                       return_when=asyncio.FIRST_COMPLETED)
    for t in pending:
        t.cancel()
    feedback = await done.pop()
    return feedback

if __name__ == '__main__':
    feedback = asyncio.run(main())
    do_cleanup(feedback)

Here the function run_at_interval loops forever until some health check fails. In the main function, we create 2 health checks with different periods, as well as a timeout event. The wait function plays the same role as the select statement in golang. When the first problem is detected, we further cancel the other health checks. Here I also use a feedback object to facilitate the cleanup process.

If you are new to async/await, this code may look quite different from any “regular” python code. But in fact, most of it are just boilerplate, e.g., creating tasks, waiting for tasks. My quick and dirty way to understand async/await is as follows:

  • async labels the function to be used as coroutine
  • The thing after await should be a coroutine that is not CPU demanding, for example, IO bound operations, or sleep() as in this example.
  • await is like yield from. On one hand, it labels the points where a function could give back control to its caller. On the other hand, it takes care of retrieving data from layers of coroutines.
  • Using await keyword does not indicate concurrency, just like using coroutine does not indicate concurrency.
  • By creating Tasks out of coroutines, we can run them concurrently. In other words, Task does not block. As a result, they may not finish at any point of time. And there are APIs to query and manipulate its internal states.

If you are not familiar with coroutine, search “David Beazley” on Youtube or google. At high level, it is a mechanism to make functions run half way and give back control to their callers. In python, it is implemented with the keyword yield and functions such as next(), send(), and close().

Running this code, we get the same result as the go code

chronos (master *+) async-monitor $ python3 async.py
check health 3: @3.0
check health 5: @5.0
check health 3: @6.0
check health 3: @9.0
check health 5: @10.0
check health 3: @12.0
check health 5: @15.0
check health 3: @15.0

In case we do not cancel the pending tasks in main, this code still run but there will be complaints.

Task was destroyed but it is pending!

Overall, the python asyncio version is slightly easier to write than the go version if the cancellation logic is implemented. The asyncio.wait function takes care of most of the business logic for us. Besides FIRST_COMPLETED, it also supports FIRST_EXCEPTION and ALL_COMPLETED.

further readings