Example is following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from flask import Flask
from time import sleep
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(2)

app = Flask(__name__)

@app.route('/jobs')
def run_jobs():
executor.submit(some_long_task1)
executor.submit(some_long_task2, 'hello', 123)
return 'Two jobs was launched in background!'

def some_long_task1():
print('Task #1 started')
sleep(10)
print('Task #1 is done')

def some_long_task2(arg1, arg2):
print('Task #2 started')
sleep(5)
print('Task #2 is done')

if __name__=='__main__':
app.run()

concurrent.futures is introduced from python3.2, and provied higher abstract than threading and multiprocessing, it’s include two class ThreadPoolExecutor and ProcessPoolExecutor.

for more information you can view in python docuemnts for concurrent.furtures

  1. Executor

Executor is a base module, this is a abstract class, the subclass is ThreadPoolExecutor and ProcessPoolExecutor which is to be used create thred pool and process pool.

the methods is following:

  • Executor.submit(fn, *args, **kwargs)

example is:

1
2
3
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(pow, 22, 123)
print(futrue.result())

we use submit method join a task in tread pool, submit return a Future object. and the future.result() is use result method get the result.
it will wainting untill the method is finished.
we use with for auto call ‘shutdown’ method otherwise we have to wait some release code.

  • Executor.map(fn, *args, **kwargs)

the map method is same with python map method. but it can set out time by arguement. timeout is int or float. if method overtime, will raise TimeoutError. it will no time out if it’s not set timeout.

arguement desc
func the method
iterables iter object
timeout timeout setting

example is:

1
2
3
4
5
6
7
8
9
10
form concurrent.futures import ThreadPoolExecutor
import requests
URLS = ["http://www.163.com", "https://www.baidu.com", "https://github.com"]

def load_url(url):
req = requests.get(url)
print(f"{url} page is {len(req.content)} bytes")

executor = ThreadPoolExecutor(max_workers=3)
executor.map(load_url, URLS)

  • Executor.shutdown(wait=True)

this method is used by release system resources. Executor have __enter__ and __exit__ methods, so it can use with.

  1. Future

submit method will return a futrue object. future have many methods for shadow task status. Future is created by Executor.submit().

method description
cancel() cancel call, it cannot cancel if running now and return False, otherwise return True
cancelled() check if is it cancel success or false and return True or False
running()
done() check if is it call success or fail when running and return True or False
result(Timeout=None) get result, if it’s not finished, it will waitting
exception(timeout=None) get the except
add_done_callback(fn) it will bind fn to future object.
  • wait

wait method will return a tuple, the tuple include two set, it is completed and uncompleted.
wait method have 3 arguements FIRST_COMPLETED, FIRST_EXCEPTION and ALL_COMPLETE, the default is ALL_COMPLETED.

If use default ALL_COMPLETED, the program will blocked untill all tasks completed and then run main thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.future import ThreadPoolExecutor, wait, as_completed
import requests
URLS = ['http://www.163.com', 'http://www.baidu.com']
def load_url(url):
req = requests.get(url)
print(f'{url} page is {len(req.content)} bytes')

executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url, url)
f_list.append(future)

print(wait(f_list))
print('Main Thread is closed')

If use FIRST_COMPLETED, the program will closed but not waiting all tasks finished.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.future import ThreadPoolExecutor, wait, as_completed
import requests
URLS = ['http://www.163.com', 'http://www.baidu.com']
def load_url(url):
req = requests.get(url)
print(f'{url} page is {len(req.content)} bytes')

executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url, url)
f_list.append(future)

print(wait(f_list, return_when="FIRST_COMPLETED"))
print('Main Thread is closed')