Python Multiprocessing with Pipe: Subprocesses sleep forever when training deep reinforcement learning agent with no error raised

I was using Pytorch-based Deep Reinforcement Learning(DRL) agent to train environment with Multiprocessing package to train in multiple process(thanks to renowned elegantrl framework).

To specify, multiple processes with Pipes include: 1 learner, which receives env info from workers, updates network, sends updated actor to workers and evaluator; 2 or more workers, which receive actor from learner, explore environment and send training data back to learner; 1 evaluator, which receives actor from learner, evaluates the trained net, decides whether to stop training and sends break signal to learner to stop all subprocesses.

Then is the problem: when the multiprocessing training steps reached break_step(I set), only evaluator subprocess finished and broke, while other subprocesses slept forever with no error raised, and main process could not proceed any more.

Input states and state transition in the environment are all in numpy.array before entering training.

Using Control-C, I found that subprocess slept at the worker subprocess, which was expecting to receive signal from learner through pipe (learner failed to receive ‘stop’ signal from evaluator?):

^CProcess Worker-5:
Process Worker-3:
Process Worker-4:
Process Learner-1:
Process Worker-2:
Traceback (most recent call last):
  File "/Users/zhangtuo_shan/Documents/Resarch/test_week.py", line 175, in <module>
Traceback (most recent call last):
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 291, in run
    actor = self.recv_pipe.recv()
Traceback (most recent call last):
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/process.py", lTraceback (most recent call last):
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 231, in run
    self.eval_pipe.send((actor, num_steps, exp_r, logging_tuple))
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
KeyboardInterrupt
ine 315, in _bootstrap
    self.run()
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 291, in run
    actor = self.recv_pipe.recv()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 291, in run
    actor = self.recv_pipe.recv()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 291, in run
    actor = self.recv_pipe.recv()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
    train_ddpg_td3_sac_for_multi_echelon_inventory(lr, nd)
  File "/Users/zhangtuo_shan/Documents/Resarch/test_week.py", line 153, in train_ddpg_td3_sac_for_multi_echelon_inventory
    train_agent_multiprocessing(args)  # train_agent(args)
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 136, in train_agent_multiprocessing
    [process.join() for process in process_list]
  File "/Users/zhangtuo_shan/Documents/Resarch/elegantrl/train/run.py", line 136, in <listcomp>
    [process.join() for process in process_list]
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/popen_fork.py", line 43, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/Users/zhangtuo_shan/anaconda3/envs/RL/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt

Another weird condition is that the problem does not occur at all time(but occur for the same input). When I change, reduce or add the scale or size of the environment(input and output size of DRL nn), the problem could disappear.

I also tried different OS systems(MacOS/Windows/Linux) or CPU/GPU, the problem still exists.

Main code for multiprocessing is copied here, which is consistent with elegantrl.

def train_agent_multiprocessing(args: Config):
    args.init_before_training()

    """Don't set method='fork' when send tensor in GPU"""
    # method = 'spawn' if os.name == 'nt' else 'forkserver'  # os.name == 'nt' means Windows NT operating system (WinOS)
    # method = 'forkserver'
    method = 'spawn'
    mp.set_start_method(method=method, force=True)

    '''build the Pipe'''

    worker_pipes = [Pipe(duplex=False) for _ in range(args.num_workers)]  # receive, send
    learner_pipe = Pipe(duplex=False)
    evaluator_pipe = Pipe(duplex=True)

    '''build Process'''
    learner = Learner(learner_pipe=learner_pipe, worker_pipes=worker_pipes, evaluator_pipe=evaluator_pipe, args=args)
    workers = [Worker(worker_pipe=worker_pipe, learner_pipe=learner_pipe, worker_id=worker_id, args=args)
               for worker_id, worker_pipe in enumerate(worker_pipes)]
    evaluator = EvaluatorProc(evaluator_pipe=evaluator_pipe, args=args)

    '''start Process'''
    process_list = [learner, *workers, evaluator]
    [process.start() for process in process_list]
    [process.join() for process in process_list]
    # print('Process finished')


class Learner(Process):
    def __init__(self, learner_pipe: Pipe, worker_pipes: [Pipe], evaluator_pipe: Pipe, args: Config):
        super().__init__()
        self.recv_pipe = learner_pipe[0]
        self.send_pipes = [worker_pipe[1] for worker_pipe in worker_pipes]
        self.eval_pipe = evaluator_pipe[1]
        self.args = args

    def run(self):
        args = self.args
        torch.set_grad_enabled(False)

        '''init agent'''
        agent = args.agent_class(args.net_dims, args.state_dim, args.action_dim, gpu_id=args.gpu_id, args=args)
        agent.save_or_load_agent(args.cwd, if_save=False)

        '''init buffer'''
        if args.if_off_policy:
            buffer = ReplayBuffer(
                gpu_id=args.gpu_id,
                num_seqs=args.num_envs * args.num_workers,
                max_size=args.buffer_size,
                state_dim=args.state_dim,
                action_dim=1 if args.if_discrete else args.action_dim,
                if_use_per=args.if_use_per,
                args=args,
            )
        else:
            buffer = []

        '''loop'''
        if_off_policy = args.if_off_policy
        if_save_buffer = args.if_save_buffer
        num_workers = args.num_workers
        num_envs = args.num_envs
        state_dim = args.state_dim
        action_dim = args.action_dim
        horizon_len = args.horizon_len
        num_seqs = args.num_envs * args.num_workers
        num_steps = args.horizon_len * args.num_workers
        cwd = args.cwd
        del args

        agent.last_state = torch.empty((num_seqs, state_dim), dtype=torch.float32, device=agent.device)

        states = torch.empty((horizon_len, num_seqs, state_dim), dtype=torch.float32, device=agent.device)
        actions = torch.empty((horizon_len, num_seqs, action_dim), dtype=torch.float32, device=agent.device)
        rewards = torch.empty((horizon_len, num_seqs), dtype=torch.float32, device=agent.device)
        undones = torch.empty((horizon_len, num_seqs), dtype=torch.bool, device=agent.device)
        if if_off_policy:
            buffer_items_tensor = (states, actions, rewards, undones)
        else:
            logprobs = torch.empty((horizon_len, num_seqs), dtype=torch.float32, device=agent.device)
            buffer_items_tensor = (states, actions, logprobs, rewards, undones)

        if_train = True
        while if_train:
            '''Learner send actor to Workers'''
            for send_pipe in self.send_pipes:
                send_pipe.send(agent.act)

            '''Learner receive (buffer_items, last_state) from Workers'''
            for _ in range(num_workers):
                worker_id, buffer_items, last_state = self.recv_pipe.recv()

                buf_i = worker_id * num_envs
                buf_j = worker_id * num_envs + num_envs
                for buffer_item, buffer_tensor in zip(buffer_items, buffer_items_tensor):
                    buffer_tensor[:, buf_i:buf_j] = buffer_item
                agent.last_state[buf_i:buf_j] = last_state

            '''Learner update training data to (buffer, agent)'''
            if if_off_policy:
                buffer.update(buffer_items_tensor)
            else:
                buffer[:] = buffer_items_tensor

            '''agent update network using training data'''
            torch.set_grad_enabled(True)
            logging_tuple = agent.update_net(buffer)
            torch.set_grad_enabled(False)

            '''Learner receive training signal from Evaluator'''
            if self.eval_pipe.poll():  # whether there is any data available to be read of this pipe
                if_train = self.eval_pipe.recv()  # True means evaluator in idle moments.
                actor = agent.act  # so Leaner send an actor to evaluator for evaluation.
            else:
                actor = None

            '''Learner send actor and training log to Evaluator'''
            exp_r = buffer_items_tensor[2].mean().item()  # the average rewards of exploration
            self.eval_pipe.send((actor, num_steps, exp_r, logging_tuple))

        '''Learner send the terminal signal to workers after break the loop'''
        for send_pipe in self.send_pipes:
            send_pipe.send(None)

        '''save'''
        agent.save_or_load_agent(cwd, if_save=True)
        if if_save_buffer and hasattr(buffer, 'save_or_load_history'):
            print(f"| LearnerPipe.run: ReplayBuffer saving in {cwd}")
            buffer.save_or_load_history(cwd, if_save=True)
            print(f"| LearnerPipe.run: ReplayBuffer saved  in {cwd}")
        print('Learner finished')


class Worker(Process):
    def __init__(self, worker_pipe: Pipe, learner_pipe: Pipe, worker_id: int, args: Config):
        super().__init__()
        self.recv_pipe = worker_pipe[0]
        self.send_pipe = learner_pipe[1]
        self.worker_id = worker_id
        self.args = args

    def run(self):
        args = self.args
        worker_id = self.worker_id
        torch.set_grad_enabled(False)

        '''init environment'''
        env = build_env(args.env_class, args.env_args, args.gpu_id)

        '''init agent'''
        agent = args.agent_class(args.net_dims, args.state_dim, args.action_dim, gpu_id=args.gpu_id, args=args)
        agent.save_or_load_agent(args.cwd, if_save=False)

        '''init agent.last_state'''
        state = env.reset()
        if args.num_envs == 1:
            assert state.shape == (args.state_dim,)
            assert isinstance(state, np.ndarray)
            state = torch.tensor(state, dtype=torch.float32, device=agent.device).unsqueeze(0)
        else:
            assert state.shape == (args.num_envs, args.state_dim)
            assert isinstance(state, torch.Tensor)
            state = state.to(agent.device)
        assert state.shape == (args.num_envs, args.state_dim)
        assert isinstance(state, torch.Tensor)
        agent.last_state = state.detach()

        '''init buffer'''
        horizon_len = args.horizon_len
        if args.if_off_policy:
            buffer_items = agent.explore_env(env, args.horizon_len, if_random=True)
            self.send_pipe.send((worker_id, buffer_items, agent.last_state))

        '''loop'''
        del args

        while True:
            '''Worker receive actor from Learner'''
            actor = self.recv_pipe.recv()
            if actor is None:
                break

            '''Worker send the training data to Learner'''
            agent.act = actor
            buffer_items = agent.explore_env(env, horizon_len)
            self.send_pipe.send((worker_id, buffer_items, agent.last_state))

        env.close() if hasattr(env, 'close') else None
        print(f'Worker {self.worker_id} finished')


class EvaluatorProc(Process):
    def __init__(self, evaluator_pipe: Pipe, args: Config):
        super().__init__()
        self.pipe = evaluator_pipe[0]
        self.args = args

    def run(self):
        args = self.args
        torch.set_grad_enabled(False)

        '''wandb(weights & biases): Track and visualize all the pieces of your machine learning pipeline.'''
        wandb = None
        if getattr(args, 'if_use_wandb', False):
            import wandb
            wandb_project_name = "train"
            wandb.init(project=wandb_project_name)

        '''init evaluator'''
        eval_env_class = args.eval_env_class if args.eval_env_class else args.env_class
        eval_env_args = args.eval_env_args if args.eval_env_args else args.env_args
        eval_env = build_env(eval_env_class, eval_env_args, args.gpu_id)
        evaluator = Evaluator(cwd=args.cwd, env=eval_env, args=args, if_tensorboard=False)

        '''loop'''
        cwd = args.cwd
        break_step = args.break_step
        device = torch.device(f"cuda:{args.gpu_id}" if (torch.cuda.is_available() and (args.gpu_id >= 0)) else "cpu")
        del args

        if_train = True
        while if_train:
            '''Evaluator receive training log from Learner'''
            actor, steps, exp_r, logging_tuple = self.pipe.recv()
            wandb.log({"obj_cri": logging_tuple[0], "obj_act": logging_tuple[1]}) if wandb else None

            '''Evaluator evaluate the actor and save the training log'''
            if actor is None:
                evaluator.total_step += steps  # update total_step but don't update recorder
            else:
                actor = actor.to(device)
                evaluator.evaluate_and_save(actor, steps, exp_r, logging_tuple)

            '''Evaluator send the training signal to Learner'''
            if_train = (evaluator.total_step <= break_step) and (not os.path.exists(f"{cwd}/stop"))
            self.pipe.send(if_train)

        '''Evaluator save the training log and draw the learning curve'''
        evaluator.save_training_curve_jpg()
        print(f'| UsedTime: {time.time() - evaluator.start_time:>7.0f} | SavedDir: {cwd}')

        eval_env.close() if hasattr(eval_env, 'close') else None
        print('Evaluator finished')

New contributor

Kevin_zt is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật