ยปCore Development>Code coverage>Lib/csv.py

Python code coverage for Lib/csv.py

#countcontent
1n/a
2n/a"""
3n/acsv.py - read/write/investigate CSV files
4n/a"""
5n/a
6n/aimport re
7n/afrom _csv import Error, __version__, writer, reader, register_dialect, \
8n/a unregister_dialect, get_dialect, list_dialects, \
9n/a field_size_limit, \
10n/a QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
11n/a __doc__
12n/afrom _csv import Dialect as _Dialect
13n/a
14n/afrom collections import OrderedDict
15n/afrom io import StringIO
16n/a
17n/a__all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
18n/a "Error", "Dialect", "__doc__", "excel", "excel_tab",
19n/a "field_size_limit", "reader", "writer",
20n/a "register_dialect", "get_dialect", "list_dialects", "Sniffer",
21n/a "unregister_dialect", "__version__", "DictReader", "DictWriter",
22n/a "unix_dialect"]
23n/a
24n/aclass Dialect:
25n/a """Describe a CSV dialect.
26n/a
27n/a This must be subclassed (see csv.excel). Valid attributes are:
28n/a delimiter, quotechar, escapechar, doublequote, skipinitialspace,
29n/a lineterminator, quoting.
30n/a
31n/a """
32n/a _name = ""
33n/a _valid = False
34n/a # placeholders
35n/a delimiter = None
36n/a quotechar = None
37n/a escapechar = None
38n/a doublequote = None
39n/a skipinitialspace = None
40n/a lineterminator = None
41n/a quoting = None
42n/a
43n/a def __init__(self):
44n/a if self.__class__ != Dialect:
45n/a self._valid = True
46n/a self._validate()
47n/a
48n/a def _validate(self):
49n/a try:
50n/a _Dialect(self)
51n/a except TypeError as e:
52n/a # We do this for compatibility with py2.3
53n/a raise Error(str(e))
54n/a
55n/aclass excel(Dialect):
56n/a """Describe the usual properties of Excel-generated CSV files."""
57n/a delimiter = ','
58n/a quotechar = '"'
59n/a doublequote = True
60n/a skipinitialspace = False
61n/a lineterminator = '\r\n'
62n/a quoting = QUOTE_MINIMAL
63n/aregister_dialect("excel", excel)
64n/a
65n/aclass excel_tab(excel):
66n/a """Describe the usual properties of Excel-generated TAB-delimited files."""
67n/a delimiter = '\t'
68n/aregister_dialect("excel-tab", excel_tab)
69n/a
70n/aclass unix_dialect(Dialect):
71n/a """Describe the usual properties of Unix-generated CSV files."""
72n/a delimiter = ','
73n/a quotechar = '"'
74n/a doublequote = True
75n/a skipinitialspace = False
76n/a lineterminator = '\n'
77n/a quoting = QUOTE_ALL
78n/aregister_dialect("unix", unix_dialect)
79n/a
80n/a
81n/aclass DictReader:
82n/a def __init__(self, f, fieldnames=None, restkey=None, restval=None,
83n/a dialect="excel", *args, **kwds):
84n/a self._fieldnames = fieldnames # list of keys for the dict
85n/a self.restkey = restkey # key to catch long rows
86n/a self.restval = restval # default value for short rows
87n/a self.reader = reader(f, dialect, *args, **kwds)
88n/a self.dialect = dialect
89n/a self.line_num = 0
90n/a
91n/a def __iter__(self):
92n/a return self
93n/a
94n/a @property
95n/a def fieldnames(self):
96n/a if self._fieldnames is None:
97n/a try:
98n/a self._fieldnames = next(self.reader)
99n/a except StopIteration:
100n/a pass
101n/a self.line_num = self.reader.line_num
102n/a return self._fieldnames
103n/a
104n/a @fieldnames.setter
105n/a def fieldnames(self, value):
106n/a self._fieldnames = value
107n/a
108n/a def __next__(self):
109n/a if self.line_num == 0:
110n/a # Used only for its side effect.
111n/a self.fieldnames
112n/a row = next(self.reader)
113n/a self.line_num = self.reader.line_num
114n/a
115n/a # unlike the basic reader, we prefer not to return blanks,
116n/a # because we will typically wind up with a dict full of None
117n/a # values
118n/a while row == []:
119n/a row = next(self.reader)
120n/a d = OrderedDict(zip(self.fieldnames, row))
121n/a lf = len(self.fieldnames)
122n/a lr = len(row)
123n/a if lf < lr:
124n/a d[self.restkey] = row[lf:]
125n/a elif lf > lr:
126n/a for key in self.fieldnames[lr:]:
127n/a d[key] = self.restval
128n/a return d
129n/a
130n/a
131n/aclass DictWriter:
132n/a def __init__(self, f, fieldnames, restval="", extrasaction="raise",
133n/a dialect="excel", *args, **kwds):
134n/a self.fieldnames = fieldnames # list of keys for the dict
135n/a self.restval = restval # for writing short dicts
136n/a if extrasaction.lower() not in ("raise", "ignore"):
137n/a raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'"
138n/a % extrasaction)
139n/a self.extrasaction = extrasaction
140n/a self.writer = writer(f, dialect, *args, **kwds)
141n/a
142n/a def writeheader(self):
143n/a header = dict(zip(self.fieldnames, self.fieldnames))
144n/a self.writerow(header)
145n/a
146n/a def _dict_to_list(self, rowdict):
147n/a if self.extrasaction == "raise":
148n/a wrong_fields = rowdict.keys() - self.fieldnames
149n/a if wrong_fields:
150n/a raise ValueError("dict contains fields not in fieldnames: "
151n/a + ", ".join([repr(x) for x in wrong_fields]))
152n/a return (rowdict.get(key, self.restval) for key in self.fieldnames)
153n/a
154n/a def writerow(self, rowdict):
155n/a return self.writer.writerow(self._dict_to_list(rowdict))
156n/a
157n/a def writerows(self, rowdicts):
158n/a return self.writer.writerows(map(self._dict_to_list, rowdicts))
159n/a
160n/a# Guard Sniffer's type checking against builds that exclude complex()
161n/atry:
162n/a complex
163n/aexcept NameError:
164n/a complex = float
165n/a
166n/aclass Sniffer:
167n/a '''
168n/a "Sniffs" the format of a CSV file (i.e. delimiter, quotechar)
169n/a Returns a Dialect object.
170n/a '''
171n/a def __init__(self):
172n/a # in case there is more than one possible delimiter
173n/a self.preferred = [',', '\t', ';', ' ', ':']
174n/a
175n/a
176n/a def sniff(self, sample, delimiters=None):
177n/a """
178n/a Returns a dialect (or None) corresponding to the sample
179n/a """
180n/a
181n/a quotechar, doublequote, delimiter, skipinitialspace = \
182n/a self._guess_quote_and_delimiter(sample, delimiters)
183n/a if not delimiter:
184n/a delimiter, skipinitialspace = self._guess_delimiter(sample,
185n/a delimiters)
186n/a
187n/a if not delimiter:
188n/a raise Error("Could not determine delimiter")
189n/a
190n/a class dialect(Dialect):
191n/a _name = "sniffed"
192n/a lineterminator = '\r\n'
193n/a quoting = QUOTE_MINIMAL
194n/a # escapechar = ''
195n/a
196n/a dialect.doublequote = doublequote
197n/a dialect.delimiter = delimiter
198n/a # _csv.reader won't accept a quotechar of ''
199n/a dialect.quotechar = quotechar or '"'
200n/a dialect.skipinitialspace = skipinitialspace
201n/a
202n/a return dialect
203n/a
204n/a
205n/a def _guess_quote_and_delimiter(self, data, delimiters):
206n/a """
207n/a Looks for text enclosed between two identical quotes
208n/a (the probable quotechar) which are preceded and followed
209n/a by the same character (the probable delimiter).
210n/a For example:
211n/a ,'some text',
212n/a The quote with the most wins, same with the delimiter.
213n/a If there is no quotechar the delimiter can't be determined
214n/a this way.
215n/a """
216n/a
217n/a matches = []
218n/a for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",
219n/a r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?",
220n/a r'(?P<delim>>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?"
221n/a r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space)
222n/a regexp = re.compile(restr, re.DOTALL | re.MULTILINE)
223n/a matches = regexp.findall(data)
224n/a if matches:
225n/a break
226n/a
227n/a if not matches:
228n/a # (quotechar, doublequote, delimiter, skipinitialspace)
229n/a return ('', False, None, 0)
230n/a quotes = {}
231n/a delims = {}
232n/a spaces = 0
233n/a groupindex = regexp.groupindex
234n/a for m in matches:
235n/a n = groupindex['quote'] - 1
236n/a key = m[n]
237n/a if key:
238n/a quotes[key] = quotes.get(key, 0) + 1
239n/a try:
240n/a n = groupindex['delim'] - 1
241n/a key = m[n]
242n/a except KeyError:
243n/a continue
244n/a if key and (delimiters is None or key in delimiters):
245n/a delims[key] = delims.get(key, 0) + 1
246n/a try:
247n/a n = groupindex['space'] - 1
248n/a except KeyError:
249n/a continue
250n/a if m[n]:
251n/a spaces += 1
252n/a
253n/a quotechar = max(quotes, key=quotes.get)
254n/a
255n/a if delims:
256n/a delim = max(delims, key=delims.get)
257n/a skipinitialspace = delims[delim] == spaces
258n/a if delim == '\n': # most likely a file with a single column
259n/a delim = ''
260n/a else:
261n/a # there is *no* delimiter, it's a single column of quoted data
262n/a delim = ''
263n/a skipinitialspace = 0
264n/a
265n/a # if we see an extra quote between delimiters, we've got a
266n/a # double quoted format
267n/a dq_regexp = re.compile(
268n/a r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \
269n/a {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE)
270n/a
271n/a
272n/a
273n/a if dq_regexp.search(data):
274n/a doublequote = True
275n/a else:
276n/a doublequote = False
277n/a
278n/a return (quotechar, doublequote, delim, skipinitialspace)
279n/a
280n/a
281n/a def _guess_delimiter(self, data, delimiters):
282n/a """
283n/a The delimiter /should/ occur the same number of times on
284n/a each row. However, due to malformed data, it may not. We don't want
285n/a an all or nothing approach, so we allow for small variations in this
286n/a number.
287n/a 1) build a table of the frequency of each character on every line.
288n/a 2) build a table of frequencies of this frequency (meta-frequency?),
289n/a e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows,
290n/a 7 times in 2 rows'
291n/a 3) use the mode of the meta-frequency to determine the /expected/
292n/a frequency for that character
293n/a 4) find out how often the character actually meets that goal
294n/a 5) the character that best meets its goal is the delimiter
295n/a For performance reasons, the data is evaluated in chunks, so it can
296n/a try and evaluate the smallest portion of the data possible, evaluating
297n/a additional chunks as necessary.
298n/a """
299n/a
300n/a data = list(filter(None, data.split('\n')))
301n/a
302n/a ascii = [chr(c) for c in range(127)] # 7-bit ASCII
303n/a
304n/a # build frequency tables
305n/a chunkLength = min(10, len(data))
306n/a iteration = 0
307n/a charFrequency = {}
308n/a modes = {}
309n/a delims = {}
310n/a start, end = 0, chunkLength
311n/a while start < len(data):
312n/a iteration += 1
313n/a for line in data[start:end]:
314n/a for char in ascii:
315n/a metaFrequency = charFrequency.get(char, {})
316n/a # must count even if frequency is 0
317n/a freq = line.count(char)
318n/a # value is the mode
319n/a metaFrequency[freq] = metaFrequency.get(freq, 0) + 1
320n/a charFrequency[char] = metaFrequency
321n/a
322n/a for char in charFrequency.keys():
323n/a items = list(charFrequency[char].items())
324n/a if len(items) == 1 and items[0][0] == 0:
325n/a continue
326n/a # get the mode of the frequencies
327n/a if len(items) > 1:
328n/a modes[char] = max(items, key=lambda x: x[1])
329n/a # adjust the mode - subtract the sum of all
330n/a # other frequencies
331n/a items.remove(modes[char])
332n/a modes[char] = (modes[char][0], modes[char][1]
333n/a - sum(item[1] for item in items))
334n/a else:
335n/a modes[char] = items[0]
336n/a
337n/a # build a list of possible delimiters
338n/a modeList = modes.items()
339n/a total = float(min(chunkLength * iteration, len(data)))
340n/a # (rows of consistent data) / (number of rows) = 100%
341n/a consistency = 1.0
342n/a # minimum consistency threshold
343n/a threshold = 0.9
344n/a while len(delims) == 0 and consistency >= threshold:
345n/a for k, v in modeList:
346n/a if v[0] > 0 and v[1] > 0:
347n/a if ((v[1]/total) >= consistency and
348n/a (delimiters is None or k in delimiters)):
349n/a delims[k] = v
350n/a consistency -= 0.01
351n/a
352n/a if len(delims) == 1:
353n/a delim = list(delims.keys())[0]
354n/a skipinitialspace = (data[0].count(delim) ==
355n/a data[0].count("%c " % delim))
356n/a return (delim, skipinitialspace)
357n/a
358n/a # analyze another chunkLength lines
359n/a start = end
360n/a end += chunkLength
361n/a
362n/a if not delims:
363n/a return ('', 0)
364n/a
365n/a # if there's more than one, fall back to a 'preferred' list
366n/a if len(delims) > 1:
367n/a for d in self.preferred:
368n/a if d in delims.keys():
369n/a skipinitialspace = (data[0].count(d) ==
370n/a data[0].count("%c " % d))
371n/a return (d, skipinitialspace)
372n/a
373n/a # nothing else indicates a preference, pick the character that
374n/a # dominates(?)
375n/a items = [(v,k) for (k,v) in delims.items()]
376n/a items.sort()
377n/a delim = items[-1][1]
378n/a
379n/a skipinitialspace = (data[0].count(delim) ==
380n/a data[0].count("%c " % delim))
381n/a return (delim, skipinitialspace)
382n/a
383n/a
384n/a def has_header(self, sample):
385n/a # Creates a dictionary of types of data in each column. If any
386n/a # column is of a single type (say, integers), *except* for the first
387n/a # row, then the first row is presumed to be labels. If the type
388n/a # can't be determined, it is assumed to be a string in which case
389n/a # the length of the string is the determining factor: if all of the
390n/a # rows except for the first are the same length, it's a header.
391n/a # Finally, a 'vote' is taken at the end for each column, adding or
392n/a # subtracting from the likelihood of the first row being a header.
393n/a
394n/a rdr = reader(StringIO(sample), self.sniff(sample))
395n/a
396n/a header = next(rdr) # assume first row is header
397n/a
398n/a columns = len(header)
399n/a columnTypes = {}
400n/a for i in range(columns): columnTypes[i] = None
401n/a
402n/a checked = 0
403n/a for row in rdr:
404n/a # arbitrary number of rows to check, to keep it sane
405n/a if checked > 20:
406n/a break
407n/a checked += 1
408n/a
409n/a if len(row) != columns:
410n/a continue # skip rows that have irregular number of columns
411n/a
412n/a for col in list(columnTypes.keys()):
413n/a
414n/a for thisType in [int, float, complex]:
415n/a try:
416n/a thisType(row[col])
417n/a break
418n/a except (ValueError, OverflowError):
419n/a pass
420n/a else:
421n/a # fallback to length of string
422n/a thisType = len(row[col])
423n/a
424n/a if thisType != columnTypes[col]:
425n/a if columnTypes[col] is None: # add new column type
426n/a columnTypes[col] = thisType
427n/a else:
428n/a # type is inconsistent, remove column from
429n/a # consideration
430n/a del columnTypes[col]
431n/a
432n/a # finally, compare results against first row and "vote"
433n/a # on whether it's a header
434n/a hasHeader = 0
435n/a for col, colType in columnTypes.items():
436n/a if type(colType) == type(0): # it's a length
437n/a if len(header[col]) != colType:
438n/a hasHeader += 1
439n/a else:
440n/a hasHeader -= 1
441n/a else: # attempt typecast
442n/a try:
443n/a colType(header[col])
444n/a except (ValueError, TypeError):
445n/a hasHeader += 1
446n/a else:
447n/a hasHeader -= 1
448n/a
449n/a return hasHeader > 0