ยปCore Development>Code coverage>Lib/pty.py

Python code coverage for Lib/pty.py

#countcontent
1n/a"""Pseudo terminal utilities."""
2n/a
3n/a# Bugs: No signal handling. Doesn't set slave termios and window size.
4n/a# Only tested on Linux.
5n/a# See: W. Richard Stevens. 1992. Advanced Programming in the
6n/a# UNIX Environment. Chapter 19.
7n/a# Author: Steen Lumholt -- with additions by Guido.
8n/a
9n/afrom select import select
10n/aimport os
11n/aimport tty
12n/a
13n/a__all__ = ["openpty","fork","spawn"]
14n/a
15n/aSTDIN_FILENO = 0
16n/aSTDOUT_FILENO = 1
17n/aSTDERR_FILENO = 2
18n/a
19n/aCHILD = 0
20n/a
21n/adef openpty():
22n/a """openpty() -> (master_fd, slave_fd)
23n/a Open a pty master/slave pair, using os.openpty() if possible."""
24n/a
25n/a try:
26n/a return os.openpty()
27n/a except (AttributeError, OSError):
28n/a pass
29n/a master_fd, slave_name = _open_terminal()
30n/a slave_fd = slave_open(slave_name)
31n/a return master_fd, slave_fd
32n/a
33n/adef master_open():
34n/a """master_open() -> (master_fd, slave_name)
35n/a Open a pty master and return the fd, and the filename of the slave end.
36n/a Deprecated, use openpty() instead."""
37n/a
38n/a try:
39n/a master_fd, slave_fd = os.openpty()
40n/a except (AttributeError, OSError):
41n/a pass
42n/a else:
43n/a slave_name = os.ttyname(slave_fd)
44n/a os.close(slave_fd)
45n/a return master_fd, slave_name
46n/a
47n/a return _open_terminal()
48n/a
49n/adef _open_terminal():
50n/a """Open pty master and return (master_fd, tty_name)."""
51n/a for x in 'pqrstuvwxyzPQRST':
52n/a for y in '0123456789abcdef':
53n/a pty_name = '/dev/pty' + x + y
54n/a try:
55n/a fd = os.open(pty_name, os.O_RDWR)
56n/a except OSError:
57n/a continue
58n/a return (fd, '/dev/tty' + x + y)
59n/a raise OSError('out of pty devices')
60n/a
61n/adef slave_open(tty_name):
62n/a """slave_open(tty_name) -> slave_fd
63n/a Open the pty slave and acquire the controlling terminal, returning
64n/a opened filedescriptor.
65n/a Deprecated, use openpty() instead."""
66n/a
67n/a result = os.open(tty_name, os.O_RDWR)
68n/a try:
69n/a from fcntl import ioctl, I_PUSH
70n/a except ImportError:
71n/a return result
72n/a try:
73n/a ioctl(result, I_PUSH, "ptem")
74n/a ioctl(result, I_PUSH, "ldterm")
75n/a except OSError:
76n/a pass
77n/a return result
78n/a
79n/adef fork():
80n/a """fork() -> (pid, master_fd)
81n/a Fork and make the child a session leader with a controlling terminal."""
82n/a
83n/a try:
84n/a pid, fd = os.forkpty()
85n/a except (AttributeError, OSError):
86n/a pass
87n/a else:
88n/a if pid == CHILD:
89n/a try:
90n/a os.setsid()
91n/a except OSError:
92n/a # os.forkpty() already set us session leader
93n/a pass
94n/a return pid, fd
95n/a
96n/a master_fd, slave_fd = openpty()
97n/a pid = os.fork()
98n/a if pid == CHILD:
99n/a # Establish a new session.
100n/a os.setsid()
101n/a os.close(master_fd)
102n/a
103n/a # Slave becomes stdin/stdout/stderr of child.
104n/a os.dup2(slave_fd, STDIN_FILENO)
105n/a os.dup2(slave_fd, STDOUT_FILENO)
106n/a os.dup2(slave_fd, STDERR_FILENO)
107n/a if (slave_fd > STDERR_FILENO):
108n/a os.close (slave_fd)
109n/a
110n/a # Explicitly open the tty to make it become a controlling tty.
111n/a tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
112n/a os.close(tmp_fd)
113n/a else:
114n/a os.close(slave_fd)
115n/a
116n/a # Parent and child process.
117n/a return pid, master_fd
118n/a
119n/adef _writen(fd, data):
120n/a """Write all the data to a descriptor."""
121n/a while data:
122n/a n = os.write(fd, data)
123n/a data = data[n:]
124n/a
125n/adef _read(fd):
126n/a """Default read function."""
127n/a return os.read(fd, 1024)
128n/a
129n/adef _copy(master_fd, master_read=_read, stdin_read=_read):
130n/a """Parent copy loop.
131n/a Copies
132n/a pty master -> standard output (master_read)
133n/a standard input -> pty master (stdin_read)"""
134n/a fds = [master_fd, STDIN_FILENO]
135n/a while True:
136n/a rfds, wfds, xfds = select(fds, [], [])
137n/a if master_fd in rfds:
138n/a data = master_read(master_fd)
139n/a if not data: # Reached EOF.
140n/a fds.remove(master_fd)
141n/a else:
142n/a os.write(STDOUT_FILENO, data)
143n/a if STDIN_FILENO in rfds:
144n/a data = stdin_read(STDIN_FILENO)
145n/a if not data:
146n/a fds.remove(STDIN_FILENO)
147n/a else:
148n/a _writen(master_fd, data)
149n/a
150n/adef spawn(argv, master_read=_read, stdin_read=_read):
151n/a """Create a spawned process."""
152n/a if type(argv) == type(''):
153n/a argv = (argv,)
154n/a pid, master_fd = fork()
155n/a if pid == CHILD:
156n/a os.execlp(argv[0], *argv)
157n/a try:
158n/a mode = tty.tcgetattr(STDIN_FILENO)
159n/a tty.setraw(STDIN_FILENO)
160n/a restore = 1
161n/a except tty.error: # This is the same as termios.error
162n/a restore = 0
163n/a try:
164n/a _copy(master_fd, master_read, stdin_read)
165n/a except OSError:
166n/a if restore:
167n/a tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
168n/a
169n/a os.close(master_fd)
170n/a return os.waitpid(pid, 0)[1]