I’m working on a bot for a competition that receives its input through sys.stdin
and uses Python’s print()
for output. I have the following:
import sys
def main():
while True:
line = sys.stdin.readline()
parts = line.split()
if len(parts) > 0:
# do stuff
The problem is that the input comes in through a stream and using the above, blocks me from printing anything back until the stream is closed. What can I do to make this work?
2
By turning blocking off you can only read a character at a time. So, there is no way to get readline()
to work in a non-blocking context. I assume you just want to read key presses to control the robot.
I have had no luck using select.select()
on Linux and created a way with tweaking termios
settings. So, this is Linux specific but works for me:
import atexit, termios
import sys, os
import time
old_settings=None
def init_any_key():
global old_settings
old_settings = termios.tcgetattr(sys.stdin)
new_settings = termios.tcgetattr(sys.stdin)
new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags
new_settings[6][termios.VMIN] = 0 # cc
new_settings[6][termios.VTIME] = 0 # cc
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
@atexit.register
def term_any_key():
global old_settings
if old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def any_key():
ch_set = []
ch = os.read(sys.stdin.fileno(), 1)
while ch is not None and len(ch) > 0:
ch_set.append( ord(ch[0]) )
ch = os.read(sys.stdin.fileno(), 1)
return ch_set
init_any_key()
while True:
key = any_key()
if key is not None:
print(key)
else:
time.sleep(0.1)
A better Windows or cross-platform answer is here: Non-blocking console input?
3
You can use selectors for handle I/O multiplexing:
https://docs.python.org/3/library/selectors.html
Try this out:
#! /usr/bin/python3
import sys
import fcntl
import os
import selectors
# set sys.stdin non-blocking
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)
# function to be called when enter is pressed
def got_keyboard_data(stdin):
print('Keyboard input: {}'.format(stdin.read()))
# register event
m_selector = selectors.DefaultSelector()
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)
while True:
sys.stdout.write('Type something and hit enter: ')
sys.stdout.flush()
for k, mask in m_selector.select():
callback = k.data
callback(k.fileobj)
The above code will hold on the line
for k, mask in m_selector.select():
until a registered event occurs, returning a selector_key instance (k) and a mask of monitored events.
In the above example we registered only one event (Enter key press):
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)
The selector key instance is defined as follows:
abstractmethod register(fileobj, events, data=None)
Therefore, the register method sets k.data
as our callback function got_keyboard_data
, and calls it when the Enter key is pressed:
callback = k.data
callback(k.fileobj)
A more complete example (and hopefully more useful) would be to multiplex stdin data from user with incoming connections from network:
import selectors
import socket
import sys
import os
import fcntl
m_selector = selectors.DefaultSelector()
# set sys.stdin non-blocking
def set_input_nonblocking():
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)
def create_socket(port, max_conn):
server_addr = ('localhost', port)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server.bind(server_addr)
server.listen(max_conn)
return server
def read(conn, mask):
global GO_ON
client_address = conn.getpeername()
data = conn.recv(1024)
print('Got {} from {}'.format(data, client_address))
if not data:
GO_ON = False
def accept(sock, mask):
new_conn, addr = sock.accept()
new_conn.setblocking(False)
print('Accepting connection from {}'.format(addr))
m_selector.register(new_conn, selectors.EVENT_READ, read)
def quit():
global GO_ON
print('Exiting...')
GO_ON = False
def from_keyboard(arg1, arg2):
line = arg1.read()
if line == 'quitn':
quit()
else:
print('User input: {}'.format(line))
GO_ON = True
set_input_nonblocking()
# listen to port 10000, at most 10 connections
server = create_socket(10000, 10)
m_selector.register(server, selectors.EVENT_READ, accept)
m_selector.register(sys.stdin, selectors.EVENT_READ, from_keyboard)
while GO_ON:
sys.stdout.write('>>> ')
sys.stdout.flush()
for k, mask in m_selector.select():
callback = k.data
callback(k.fileobj, mask)
# unregister events
m_selector.unregister(sys.stdin)
# close connection
server.shutdown()
server.close()
# close select
m_selector.close()
You can test using two terminals.
first terminal:
$ python3 test.py
>>> bla
open another terminal and run:
$ nc localhost 10000
hey!
back to the first
>>> qwerqwer
Result (seen on the main terminal):
$ python3 test.py
>>> bla
User input: bla
>>> Accepting connection from ('127.0.0.1', 39598)
>>> Got b'hey!n' from ('127.0.0.1', 39598)
>>> qwerqwer
User input: qwerqwer
>>>
2
#-----------------------------------------------------------------------
# Get a character from the keyboard. If Block is True wait for input,
# else return any available character or throw an exception if none is
# available. Ctrl+C isn't handled and continues to generate the usual
# SIGINT signal, but special keys like the arrows return the expected
# escape sequences.
#
# This requires:
#
# import sys, select
#
# This was tested using python 2.7 on Mac OS X. It will work on any
# Linux system, but will likely fail on Windows due to select/stdin
# limitations.
#-----------------------------------------------------------------------
def get_char(block = True):
if block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
return sys.stdin.read(1)
raise error('NoChar')
3
This is a posix solution, similar to the answer by swdev.
As they stated, you have to play with termios.VMIN
and termios.VTIME
to catch more than one char without requiring user to press Enter. Trying to only use raw mode will be a problem as special keys like arrows can mess next keypress.
Here we use tty.setcbreak()
or tty.setraw()
as a shortcut, but they have short internals.
import termios
import tty
import sys
import select
def get_enter_key():
fd = sys.stdin.fileno()
orig_fl = termios.tcgetattr(fd)
try:
tty.setcbreak(fd) # use tty.setraw() instead to catch ^C also
mode = termios.tcgetattr(fd)
CC = 6
mode[CC][termios.VMIN] = 0
mode[CC][termios.VTIME] = 0
termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
keypress, _, _ = select.select([fd], [], [])
if keypress:
return sys.stdin.read(4095)
finally:
termios.tcsetattr(fd, termios.TCSANOW, orig_fl)
try:
while True:
print(get_enter_key())
except KeyboardInterrupt:
print('exiting')
sys.exit()
note that there are two potential timeouts you could add here:
- one is adding last parameter to
select.select()
- another is playing with VMIN and VTIME
2
You can just use set_blocking
now. This works on Linux and can probably be adapted to work on other systems by changing how the fd is created.
class CooperativeInterrupt:
def __init__(self):
self.fd = open('/dev/stdin', 'rb')
os.set_blocking(self.fd.fileno(), False)
def __del__(self):
if self.fd is not None:
self.fd.close()
def has_interrupt(self):
data = self.fd.read()
if data is None:
return False
if len(data) == 0:
raise KeyboardInterrupt() # Stdin closed (e.g. CTRL+D).
return True
coi = CooperativeInterrupt()
for i in range(10000000000000):
if coi.has_interrupt():
print(i)
time.sleep(0.1)
Might I suggest nobreak
? If’n you are willing to use curses.
https://docs.python.org/3/library/curses.html#curses.window.nodelay
You should be able to get read of a stream with either
sys.stdin.read(1)
to read utf-8 decoded chars or:
sys.stdin.buffer.read(1)
to read raw chars.
I would do this if I wanted to get raw data from the stdin and do something with it in a timely manner, without reading a newline or filling up the internal buffer first. This is suitable for running programs remotely via ssh where tty is not available, see:
ssh me@host '/usr/bin/python -c "import sys; print(sys.stdin.isatty())"'
There are some other things to think about to make programs work as expected in this scenario. You need to flush the output when you’re done to avoid buffering delays, and it could be easy to assume a program hasn’t read the input, when you’ve simply not flushed the output.
stdout.write("my data")
stdout.flush()
But usually it’s not the input reading that’s the problem but that the terminal (or program) supplying the input stream is not handing it over when you expect, or perhaps it’s not reading your output when you expect. If you have a tty to start with (see ssh check above) you can put it into raw mode with the tty module.
import sys
import termios
import tty
old = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin)
c = None
try:
c = sys.stdin.read(1)[0]
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old)
print(c)
… if using Mac/Linux. If using Windows you could use msvcrt.getch().
Use a generator – thankfully sys.stdin
is already a generator!
A generator enables you to work on an infinite stream. Always when you call it, it returns the next element. In order to build a generator you need the yield
keyword.
for line in sys.stdin:
print line
if a_certain_situation_happens:
break
Do not forget to place a break
statement into the loop if a certain, wished situation happens.
You can find more information about generators on:
- http://www.dabeaz.com/generators/index.html
- http://linuxgazette.net/100/pramode.html
3