ยปCore Development>Code coverage>Lib/xml/dom/pulldom.py

Python code coverage for Lib/xml/dom/pulldom.py

#countcontent
1n/aimport xml.sax
2n/aimport xml.sax.handler
3n/a
4n/aSTART_ELEMENT = "START_ELEMENT"
5n/aEND_ELEMENT = "END_ELEMENT"
6n/aCOMMENT = "COMMENT"
7n/aSTART_DOCUMENT = "START_DOCUMENT"
8n/aEND_DOCUMENT = "END_DOCUMENT"
9n/aPROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
10n/aIGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
11n/aCHARACTERS = "CHARACTERS"
12n/a
13n/aclass PullDOM(xml.sax.ContentHandler):
14n/a _locator = None
15n/a document = None
16n/a
17n/a def __init__(self, documentFactory=None):
18n/a from xml.dom import XML_NAMESPACE
19n/a self.documentFactory = documentFactory
20n/a self.firstEvent = [None, None]
21n/a self.lastEvent = self.firstEvent
22n/a self.elementStack = []
23n/a self.push = self.elementStack.append
24n/a try:
25n/a self.pop = self.elementStack.pop
26n/a except AttributeError:
27n/a # use class' pop instead
28n/a pass
29n/a self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
30n/a self._current_context = self._ns_contexts[-1]
31n/a self.pending_events = []
32n/a
33n/a def pop(self):
34n/a result = self.elementStack[-1]
35n/a del self.elementStack[-1]
36n/a return result
37n/a
38n/a def setDocumentLocator(self, locator):
39n/a self._locator = locator
40n/a
41n/a def startPrefixMapping(self, prefix, uri):
42n/a if not hasattr(self, '_xmlns_attrs'):
43n/a self._xmlns_attrs = []
44n/a self._xmlns_attrs.append((prefix or 'xmlns', uri))
45n/a self._ns_contexts.append(self._current_context.copy())
46n/a self._current_context[uri] = prefix or None
47n/a
48n/a def endPrefixMapping(self, prefix):
49n/a self._current_context = self._ns_contexts.pop()
50n/a
51n/a def startElementNS(self, name, tagName , attrs):
52n/a # Retrieve xml namespace declaration attributes.
53n/a xmlns_uri = 'http://www.w3.org/2000/xmlns/'
54n/a xmlns_attrs = getattr(self, '_xmlns_attrs', None)
55n/a if xmlns_attrs is not None:
56n/a for aname, value in xmlns_attrs:
57n/a attrs._attrs[(xmlns_uri, aname)] = value
58n/a self._xmlns_attrs = []
59n/a uri, localname = name
60n/a if uri:
61n/a # When using namespaces, the reader may or may not
62n/a # provide us with the original name. If not, create
63n/a # *a* valid tagName from the current context.
64n/a if tagName is None:
65n/a prefix = self._current_context[uri]
66n/a if prefix:
67n/a tagName = prefix + ":" + localname
68n/a else:
69n/a tagName = localname
70n/a if self.document:
71n/a node = self.document.createElementNS(uri, tagName)
72n/a else:
73n/a node = self.buildDocument(uri, tagName)
74n/a else:
75n/a # When the tagname is not prefixed, it just appears as
76n/a # localname
77n/a if self.document:
78n/a node = self.document.createElement(localname)
79n/a else:
80n/a node = self.buildDocument(None, localname)
81n/a
82n/a for aname,value in attrs.items():
83n/a a_uri, a_localname = aname
84n/a if a_uri == xmlns_uri:
85n/a if a_localname == 'xmlns':
86n/a qname = a_localname
87n/a else:
88n/a qname = 'xmlns:' + a_localname
89n/a attr = self.document.createAttributeNS(a_uri, qname)
90n/a node.setAttributeNodeNS(attr)
91n/a elif a_uri:
92n/a prefix = self._current_context[a_uri]
93n/a if prefix:
94n/a qname = prefix + ":" + a_localname
95n/a else:
96n/a qname = a_localname
97n/a attr = self.document.createAttributeNS(a_uri, qname)
98n/a node.setAttributeNodeNS(attr)
99n/a else:
100n/a attr = self.document.createAttribute(a_localname)
101n/a node.setAttributeNode(attr)
102n/a attr.value = value
103n/a
104n/a self.lastEvent[1] = [(START_ELEMENT, node), None]
105n/a self.lastEvent = self.lastEvent[1]
106n/a self.push(node)
107n/a
108n/a def endElementNS(self, name, tagName):
109n/a self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
110n/a self.lastEvent = self.lastEvent[1]
111n/a
112n/a def startElement(self, name, attrs):
113n/a if self.document:
114n/a node = self.document.createElement(name)
115n/a else:
116n/a node = self.buildDocument(None, name)
117n/a
118n/a for aname,value in attrs.items():
119n/a attr = self.document.createAttribute(aname)
120n/a attr.value = value
121n/a node.setAttributeNode(attr)
122n/a
123n/a self.lastEvent[1] = [(START_ELEMENT, node), None]
124n/a self.lastEvent = self.lastEvent[1]
125n/a self.push(node)
126n/a
127n/a def endElement(self, name):
128n/a self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
129n/a self.lastEvent = self.lastEvent[1]
130n/a
131n/a def comment(self, s):
132n/a if self.document:
133n/a node = self.document.createComment(s)
134n/a self.lastEvent[1] = [(COMMENT, node), None]
135n/a self.lastEvent = self.lastEvent[1]
136n/a else:
137n/a event = [(COMMENT, s), None]
138n/a self.pending_events.append(event)
139n/a
140n/a def processingInstruction(self, target, data):
141n/a if self.document:
142n/a node = self.document.createProcessingInstruction(target, data)
143n/a self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
144n/a self.lastEvent = self.lastEvent[1]
145n/a else:
146n/a event = [(PROCESSING_INSTRUCTION, target, data), None]
147n/a self.pending_events.append(event)
148n/a
149n/a def ignorableWhitespace(self, chars):
150n/a node = self.document.createTextNode(chars)
151n/a self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
152n/a self.lastEvent = self.lastEvent[1]
153n/a
154n/a def characters(self, chars):
155n/a node = self.document.createTextNode(chars)
156n/a self.lastEvent[1] = [(CHARACTERS, node), None]
157n/a self.lastEvent = self.lastEvent[1]
158n/a
159n/a def startDocument(self):
160n/a if self.documentFactory is None:
161n/a import xml.dom.minidom
162n/a self.documentFactory = xml.dom.minidom.Document.implementation
163n/a
164n/a def buildDocument(self, uri, tagname):
165n/a # Can't do that in startDocument, since we need the tagname
166n/a # XXX: obtain DocumentType
167n/a node = self.documentFactory.createDocument(uri, tagname, None)
168n/a self.document = node
169n/a self.lastEvent[1] = [(START_DOCUMENT, node), None]
170n/a self.lastEvent = self.lastEvent[1]
171n/a self.push(node)
172n/a # Put everything we have seen so far into the document
173n/a for e in self.pending_events:
174n/a if e[0][0] == PROCESSING_INSTRUCTION:
175n/a _,target,data = e[0]
176n/a n = self.document.createProcessingInstruction(target, data)
177n/a e[0] = (PROCESSING_INSTRUCTION, n)
178n/a elif e[0][0] == COMMENT:
179n/a n = self.document.createComment(e[0][1])
180n/a e[0] = (COMMENT, n)
181n/a else:
182n/a raise AssertionError("Unknown pending event ",e[0][0])
183n/a self.lastEvent[1] = e
184n/a self.lastEvent = e
185n/a self.pending_events = None
186n/a return node.firstChild
187n/a
188n/a def endDocument(self):
189n/a self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
190n/a self.pop()
191n/a
192n/a def clear(self):
193n/a "clear(): Explicitly release parsing structures"
194n/a self.document = None
195n/a
196n/aclass ErrorHandler:
197n/a def warning(self, exception):
198n/a print(exception)
199n/a def error(self, exception):
200n/a raise exception
201n/a def fatalError(self, exception):
202n/a raise exception
203n/a
204n/aclass DOMEventStream:
205n/a def __init__(self, stream, parser, bufsize):
206n/a self.stream = stream
207n/a self.parser = parser
208n/a self.bufsize = bufsize
209n/a if not hasattr(self.parser, 'feed'):
210n/a self.getEvent = self._slurp
211n/a self.reset()
212n/a
213n/a def reset(self):
214n/a self.pulldom = PullDOM()
215n/a # This content handler relies on namespace support
216n/a self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
217n/a self.parser.setContentHandler(self.pulldom)
218n/a
219n/a def __getitem__(self, pos):
220n/a rc = self.getEvent()
221n/a if rc:
222n/a return rc
223n/a raise IndexError
224n/a
225n/a def __next__(self):
226n/a rc = self.getEvent()
227n/a if rc:
228n/a return rc
229n/a raise StopIteration
230n/a
231n/a def __iter__(self):
232n/a return self
233n/a
234n/a def expandNode(self, node):
235n/a event = self.getEvent()
236n/a parents = [node]
237n/a while event:
238n/a token, cur_node = event
239n/a if cur_node is node:
240n/a return
241n/a if token != END_ELEMENT:
242n/a parents[-1].appendChild(cur_node)
243n/a if token == START_ELEMENT:
244n/a parents.append(cur_node)
245n/a elif token == END_ELEMENT:
246n/a del parents[-1]
247n/a event = self.getEvent()
248n/a
249n/a def getEvent(self):
250n/a # use IncrementalParser interface, so we get the desired
251n/a # pull effect
252n/a if not self.pulldom.firstEvent[1]:
253n/a self.pulldom.lastEvent = self.pulldom.firstEvent
254n/a while not self.pulldom.firstEvent[1]:
255n/a buf = self.stream.read(self.bufsize)
256n/a if not buf:
257n/a self.parser.close()
258n/a return None
259n/a self.parser.feed(buf)
260n/a rc = self.pulldom.firstEvent[1][0]
261n/a self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
262n/a return rc
263n/a
264n/a def _slurp(self):
265n/a """ Fallback replacement for getEvent() using the
266n/a standard SAX2 interface, which means we slurp the
267n/a SAX events into memory (no performance gain, but
268n/a we are compatible to all SAX parsers).
269n/a """
270n/a self.parser.parse(self.stream)
271n/a self.getEvent = self._emit
272n/a return self._emit()
273n/a
274n/a def _emit(self):
275n/a """ Fallback replacement for getEvent() that emits
276n/a the events that _slurp() read previously.
277n/a """
278n/a rc = self.pulldom.firstEvent[1][0]
279n/a self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
280n/a return rc
281n/a
282n/a def clear(self):
283n/a """clear(): Explicitly release parsing objects"""
284n/a self.pulldom.clear()
285n/a del self.pulldom
286n/a self.parser = None
287n/a self.stream = None
288n/a
289n/aclass SAX2DOM(PullDOM):
290n/a
291n/a def startElementNS(self, name, tagName , attrs):
292n/a PullDOM.startElementNS(self, name, tagName, attrs)
293n/a curNode = self.elementStack[-1]
294n/a parentNode = self.elementStack[-2]
295n/a parentNode.appendChild(curNode)
296n/a
297n/a def startElement(self, name, attrs):
298n/a PullDOM.startElement(self, name, attrs)
299n/a curNode = self.elementStack[-1]
300n/a parentNode = self.elementStack[-2]
301n/a parentNode.appendChild(curNode)
302n/a
303n/a def processingInstruction(self, target, data):
304n/a PullDOM.processingInstruction(self, target, data)
305n/a node = self.lastEvent[0][1]
306n/a parentNode = self.elementStack[-1]
307n/a parentNode.appendChild(node)
308n/a
309n/a def ignorableWhitespace(self, chars):
310n/a PullDOM.ignorableWhitespace(self, chars)
311n/a node = self.lastEvent[0][1]
312n/a parentNode = self.elementStack[-1]
313n/a parentNode.appendChild(node)
314n/a
315n/a def characters(self, chars):
316n/a PullDOM.characters(self, chars)
317n/a node = self.lastEvent[0][1]
318n/a parentNode = self.elementStack[-1]
319n/a parentNode.appendChild(node)
320n/a
321n/a
322n/adefault_bufsize = (2 ** 14) - 20
323n/a
324n/adef parse(stream_or_string, parser=None, bufsize=None):
325n/a if bufsize is None:
326n/a bufsize = default_bufsize
327n/a if isinstance(stream_or_string, str):
328n/a stream = open(stream_or_string, 'rb')
329n/a else:
330n/a stream = stream_or_string
331n/a if not parser:
332n/a parser = xml.sax.make_parser()
333n/a return DOMEventStream(stream, parser, bufsize)
334n/a
335n/adef parseString(string, parser=None):
336n/a from io import StringIO
337n/a
338n/a bufsize = len(string)
339n/a buf = StringIO(string)
340n/a if not parser:
341n/a parser = xml.sax.make_parser()
342n/a return DOMEventStream(buf, parser, bufsize)