I’m working on processing chess games and saving data to a database. My partner developed this batch processing and concurrent threads to run a few in parallel. We want to write each game accuracy, opening_accuracy, middlegame_accuracy, endgame_accuracy, rating, and moves to the MongoDB but I think right now we are writing the average/mean over and over again. I’m not super familiar with MongoDB or concurrent.futures so I’m hoping someone could provide a bit of guidance:
def process_games(self, records: list[dict]):
game_list = pd.DataFrame(columns=PlayerAnalysis.game_columns)
start_time = time.time()
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
# This dict is used to hold the futures objects and their labels (in
# this case indexes).
numbered_futures = {}
# This dict is used to hold the partial data frames created by the
# futures while their results are being corrected.
game_detail_dframe_dict = {}
index = 1
timer = 999
# This loop generates the futures. They start automatically.
for record in records:
body: dict = json.loads(record.get("body", None))
time_control = body.get("time_control", None)
game_str: str = body.get("game", None)
depth: int = body.get("depth", None)
job_id: str = body.get("job_id", None)
username: str = body.get("username", None)
# ADD ERROR HANDLING IF ANY MISSING....
processed_game = chess.pgn.read_game(io.StringIO(game_str))
if processed_game is None:
raise InvalidGame
# For game_detail() to run properly in its own thread it needs its
# own Limit and Engine objects. This factory function builds a
# PlayerAnalysis object with private copies of both.
player_analysis_obj = PlayerAnalysis.from_depth_and_timer(
depth, timer
)
# verbose_game_detail() is just a game_detail() call plus it
# prints when it's starting and ending.
future_obj = executor.submit(
player_analysis_obj.verbose_game_detail,
index,
processed_game,
username,
job_id,
)
numbered_futures[future_obj] = index
index += 1
# This loop collects the results of the futures in the order they
# complete.
for game_detail_future in concurrent.futures.as_completed(numbered_futures):
index = numbered_futures[game_detail_future]
try:
result: FutureResponse = game_detail_future.result()
# Process game_detail_future result and store in game_detail_dframe_dict
game_detail_dframe_dict[index] = result.game_details
game_list = pd.concat(
[game_list, game_detail_dframe_dict[index]], axis=0
).fillna(0)
jobs_collections.insert_one(
{
"job_id": result.job_id,
"status": "COMPLETE",
"overall_accuracy": game_list["ACC"].mean(),
"opening_accuracy": game_list["Opening_ACC"].mean(),
"middlegame_accuracy": game_list["Middlegame_ACC"].mean(),
"endgame_accuracy": game_list["Endgame_ACC"].mean(),
"stephan_rating": game_list["Rating"].mean(),
"number_of_moves": game_list["Moves"].mean(),
},
)
self.logger.info(
f"Thread #{index} Completed Job ID: [{result.job_id}]"
)
except Exception as exc:
self.logger.error(
f"thread job for game #{index} generated an exception: {exc}"
)
end_time = time.time()
time_elapsed = end_time - start_time
self.logger.info("completed in %.1f seconds" % time_elapsed)
return game_list
except Exception as e:
self.logger.error(traceback.print_exc())
self.logger.error(e)
When I look at the database, I would expect the number of moves and the rating to be integer values. But when I looked, some are integers and some are floats (31.666 moves). When I saw this, I knew the insert_one…mean() is the problem. But I’m not exactly sure how to index each game to send it to the mongoDB.
Stephan is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.