ยปCore Development>Code coverage>Demo/sockets/mcast.py

Python code coverage for Demo/sockets/mcast.py

#countcontent
1n/a#!/usr/bin/env python3
2n/a#
3n/a# Send/receive UDP multicast packets.
4n/a# Requires that your OS kernel supports IP multicast.
5n/a#
6n/a# Usage:
7n/a# mcast -s (sender, IPv4)
8n/a# mcast -s -6 (sender, IPv6)
9n/a# mcast (receivers, IPv4)
10n/a# mcast -6 (receivers, IPv6)
11n/a
12n/aMYPORT = 8123
13n/aMYGROUP_4 = '225.0.0.250'
14n/aMYGROUP_6 = 'ff15:7079:7468:6f6e:6465:6d6f:6d63:6173'
15n/aMYTTL = 1 # Increase to reach other networks
16n/a
17n/aimport time
18n/aimport struct
19n/aimport socket
20n/aimport sys
21n/a
22n/adef main():
23n/a group = MYGROUP_6 if "-6" in sys.argv[1:] else MYGROUP_4
24n/a
25n/a if "-s" in sys.argv[1:]:
26n/a sender(group)
27n/a else:
28n/a receiver(group)
29n/a
30n/a
31n/adef sender(group):
32n/a addrinfo = socket.getaddrinfo(group, None)[0]
33n/a
34n/a s = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
35n/a
36n/a # Set Time-to-live (optional)
37n/a ttl_bin = struct.pack('@i', MYTTL)
38n/a if addrinfo[0] == socket.AF_INET: # IPv4
39n/a s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin)
40n/a else:
41n/a s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin)
42n/a
43n/a while True:
44n/a data = repr(time.time()).encode('utf-8') + b'\0'
45n/a s.sendto(data, (addrinfo[4][0], MYPORT))
46n/a time.sleep(1)
47n/a
48n/a
49n/adef receiver(group):
50n/a # Look up multicast group address in name server and find out IP version
51n/a addrinfo = socket.getaddrinfo(group, None)[0]
52n/a
53n/a # Create a socket
54n/a s = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
55n/a
56n/a # Allow multiple copies of this program on one machine
57n/a # (not strictly needed)
58n/a s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
59n/a
60n/a # Bind it to the port
61n/a s.bind(('', MYPORT))
62n/a
63n/a group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
64n/a # Join group
65n/a if addrinfo[0] == socket.AF_INET: # IPv4
66n/a mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
67n/a s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
68n/a else:
69n/a mreq = group_bin + struct.pack('@I', 0)
70n/a s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
71n/a
72n/a # Loop, printing any data we receive
73n/a while True:
74n/a data, sender = s.recvfrom(1500)
75n/a while data[-1:] == '\0': data = data[:-1] # Strip trailing \0's
76n/a print(str(sender) + ' ' + repr(data))
77n/a
78n/a
79n/aif __name__ == '__main__':
80n/a main()