This is the fourth milestone where we speed up the sub-command execution for multiple repos.

The other posts in this series are

introduction

In the previous posts, the gita fetch command executes git fetch in each repo sequentially. This is NOT efficient for multiple repos: if the remote server of the first repo responds slowly, the execution of all other repos is delayed.

There are two obvious solutions

  1. use multiple CPU cores for multiple repos
  2. switch to the next repo while waiting for the current one

The second solution doesn’t require multiple CPU cores. And the two solutions can be easily combined.

The first solution is parallelism. For example, each core can work on one task. When a core finishes a task, it grabs a new one, say, from a task queue. This is commonly implemented as a process pool, where the number of workers equal to the number of CPU cores. Our git delegation tasks are particularly simple as they are independent of each other: they are embarrassingly parallelizable.

The second solution is context switching, which doesn’t require multiple cores. It can be implemented with either a thread pool or a process pool. When an active task blocks, the scheduler (the program that maintains the pool) suspends it and puts the CPU resource on another task. Overall, CPU idle time is reduced.

If you have never heard of processes and threads, it is a good time to look them up. Roughly speaking, a running program is a process (you can see them with ps or top command in terminal), And a process can have subordinate processes, or sub-processes. Threads are light-weight version of processes and they live inside processes (try ps -T or top -H and look for rows with the same PIDs). The main difference is that processes don’t share memories whereas threads of the same process do. Thus one needs to be extra careful in writing multithreading code.

More generally, we can categorize bottlenecks in any running programs as

  • CPU-bound: the waiting is due to real computation, or
  • IO-bound: the waiting is due to data reading/writing, e.g., disk, web, etc.

The CPU-bound blocks require more computation powers, whereas the IO-bound blocks can benefit from context switching.

An example is in order.

# thread_pool_example.py
import time
from timeit import default_timer as timer
from multiprocessing.pool import ThreadPool

def run_task(interval: int):
    print(f'{interval}s: start')
    time.sleep(interval)  # This waiting mimics IO block
    print(f'{interval}s: end')

if __name__ == '__main__':
    t0 = timer()
    with ThreadPool() as pool:
        pool.map(run_task, [1, 2, 1])
    t1 = timer()
    print(t1 - t0)

Here I use a thread pool to execute 3 trivial tasks. Each task simply sleeps for some time, which mimics IO block.

Executing this code on my computer gives the following output

$ python3.7 thread_pool_example.py
1s: start
2s: start
1s: start
1s: end
1s: end
2s: end
2.013366460800171

The total execution time is slightly more than 2 seconds, instead of 4 seconds in the serial case. By default, the Python thread libraries don’t run across different cores. Thus the speedup in this example is fully due to context switching.

There is excellent support for multiprocessing and multithreading in Python. The relevant libraries are

I encourage you to try them for gita speedup, and compare their performances.

In the gita project, I used the Python asyncio library (It requires Python3.6 and the API changes quite much in Python3.7). In theory it can be more efficient than process pool or thread pool since it can switch context faster. In practice, in particular for the gita project, I doubt there are meaningful differences. I chose it mainly because it’s slightly trickier to code.

The Python asyncio library hides all the low-level details. To understand asynchronous execution, take a look at David Beazley’s curious course on coroutines and concurrency. Another casual reading is A gentle introduction to multithreading. A more technical reading is The C10K problem.

In the following, I will first present a simple but problematic implementation, then its fix, and finally unit test.

a simple implementation with interleaving output

In my first trial, the code looks like this (and the cmds is ['git', 'fetch']):

async def run_async(path: str, cmds: List[str]):
    process = await asyncio.create_subprocess_exec(*cmds, cwd=path)
    await process.wait()

Here we create a new sub-process to execute git fetch in the specified path. Another piece of code will call this async function for each repo. And the asyncio library takes care of the asynchronous context switching when IO block happens.

However, all these sub-processes share the standard output. And the git fetch outputs from different repos are mixed together. This problem can be reproduced by the following code

tasks = [
    run_async('.', [
        'python3', '-c',
        f"print({i});import time; time.sleep({i});print({i})"
    ]) for i in range(4)
]
if platform.system() == 'Windows':
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(asyncio.gather(*tasks))
finally:
    loop.close()

Here the tasks simply wait. The waiting time is printed twice, once before and once after the sleep. On my terminal, the output is

0
0
2
1
3
1
2
3

Try to reason the ordering.

To fix the output order, we need to stop sharing stdout, which can be achieved by a PIPE.

async def run_async(path: str, cmds: List[str]):
    """
    Run `cmds` asynchronously in `path` directory
    """
    process = await asyncio.create_subprocess_exec(
        *cmds, stdout=asyncio.subprocess.PIPE, cwd=path)
    stdout, _ = await process.communicate()
    stdout and print(stdout.decode())

With this update, I managed to disentangle the outputs.

0
0
1
1
2
2
3
3

test async functions

Mocking an async input/output is tricky. But with the previous example, it is easy to test the output directly

def test_async_output(capfd):
    tasks = [
        utils.run_async('myrepo', '.', [
            'python3', '-c',
            f"print({i});import time; time.sleep({i});print({i})"
        ]) for i in range(4)
    ]
    # I don't fully understand why a new loop is needed here. Without a new
    # loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest
    # itself uses asyncio (or maybe pytest-xdist does)?
    asyncio.set_event_loop(asyncio.new_event_loop())
    utils.exec_async_tasks(tasks)

    out, err = capfd.readouterr()
    assert err == ''
    assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n'

You should also take a look at this article Unit Testing AsyncIO Code by Miguel Grinberg.

v0.4: clean up and tag

This completes milestone 4. At this point, you can optionally tag the code base using

git tag v0.4