I’m trying to count all the tokens in a catalog. Due to the large amount of document, I’d like to do this counting using multiprocessing (or any other parallel calculation tool that you’re free to mention). My problem is that naive construction does not work.
Here the minimal example I constructed
import multiprocessing
import random
def count_tokens(document):
counter = dict()
for token in document:
if token in counter:
counter[token] += 1
else:
counter[token] = 1
return counter
tokens = ['tok'+str(i) for i in range(int(9))]
catalog = [random.choices(tokens, k=8) for _ in range(100)]
token_counts = {token: 0 for token in tokens}
def callback(result):
global token_counts
for token, count in result.items():
token_counts[token] += count
return token_counts
with multiprocessing.Pool() as pool:
for document in catalog:
pool.apply_async(count_tokens, args=(document,), callback=callback)
and the problem is that the returned token_counts
is not the same as the un-parallel calculation
token_counts = {token: 0 for token in tokens}
for document in catalog:
callback(count_tokens(document))
To be sure I constructed the complete script
import multiprocessing
import random
def count_tokens(document):
counter = dict()
for token in document:
if token in counter:
counter[token] += 1
else:
counter[token] = 1
return counter
tokens = ['tok'+str(i) for i in range(int(9))]
catalog = [random.choices(tokens, k=8) for _ in range(100)]
token_counts = {token: 0 for token in tokens}
def callback(result):
global token_counts
for token, count in result.items():
token_counts[token] += count
return token_counts
with multiprocessing.Pool() as pool:
for document in catalog:
pool.apply_async(count_tokens, args=(document,), callback=callback)
count_multiprocessing = dict(**token_counts)
print(count_multiprocessing)
token_counts = {token: 0 for token in tokens}
for document in catalog:
callback(count_tokens(document))
count_onecpu = dict(**token_counts)
print(count_onecpu)
for token, count in count_onecpu.items():
assert count == count_multiprocessing[token]
for token, count in count_multiprocessing.items():
assert count == count_onecpu[token]
that always end up with an assertion error at home (Python 3.10.9 if that matters).