Background
I am trying to run a Monte-Carlo sampling for Reinforcement Learning with Python multiprocessing to enhance sampling efficiency. However, I have observed several odds (error) of “multiprocessing” package. I will provide code and cause I found.
problem statement and odds I observed
- I have confirmed all of data in the loop saving in Queue are all numpy arrays since blog says tensor involved multiprocessing fails.
- This hanging disappears when I get rid of all neural networks involved in the code (i.e., neural network predicting action given state).
- However, neural network involved code (which should hang) works fine on my personal desktop (AMD Ryzen5 5600, 16gb, GTX 1060), but lab computer (i9, 128gb ram, rtx 3090) and my labtop (i7, 16gb, no gpu).
Codes
This is the code for multiprocessing python
if self.total_num_workers != 1:
workers = []
queue = multiprocessing.Manager().Queue()
for i, env in enumerate(self.training_envs):
for j in range(self.num_worker_per_env):
worker_idx = i*self.num_worker_per_env + j + 1
if worker_idx == self.total_num_workers:
break
else:
worker_args = (worker_idx, queue, env, policy, self.thread_batch_size, self.episode_len,
deterministic, self.running_state, i, seed)
workers.append(multiprocessing.Process(target=self.collect_trajectory, args=worker_args, daemon=False))
for worker in workers:
worker.start()
memory = self.collect_trajectory(0, None, self.training_envs[-1], policy, self.thread_batch_size, self.episode_len,
deterministic, self.running_state, len(self.training_envs)-1, seed)
if self.total_num_workers != 1:
worker_memories = [None] * len(workers)
for worker in workers:
pid, worker_memory = queue.get()
worker_memories[pid - 1] = worker_memory
'''
for worker_memory in worker_memories:
for k in memory:
memory[k] = np.concatenate((memory[k], worker_memory[k]), axis=0)
'''
for worker_memory in worker_memories:
memory.append(worker_memory)
batch = memory.sample()
This is the code for sampling
while t < episode_len:
with torch.no_grad():
a, logprob = policy.actforward(e_s, deterministic=deterministic)
a = a.numpy(); logprob = logprob[0].numpy()
try:
ns, rew, term, trunc, infos = env.step(a)
except:
ns, rew, term, infos = env.step(a)
trunc = True if t == episode_len else False
_, ns, _, e_ns = policy.encode_obs((s, a, ns, [rew], mask), running_state=running_state, env_idx=env_idx, reset=False)
s = ns; e_s = e_ns
cost = cost_fn(s, a, ns)
done = trunc or term
mask = 0 if done else 1
_returns += rew
try:
success = infos['success']
except:
success = 0.0
memory.push(s, a, ns, rew, cost, term, trunc, mask, logprob, env_idx, success)
if queue is not None:
queue.put([pid, memory])
else:
return memory
Questions
The above code works fine with my desktop but not other computers. When policy.encode and policy.actforward are commented but giving same dimension of numpy array (e.g. np.zeros()), the code works for all computers. Can anyone explain what are the reasons for these odds?
Minjae Cho is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.