Source code for cltk.readers.readers

"""`reader.py` - Corpus reader utility objects."""
import codecs
import json
import logging
import os
import re
import time
from typing import Any, Dict, Generator, List, Set, Tuple

from nltk import pos_tag  # Replace with CLTK
from nltk.corpus.reader import PlaintextCorpusReader
from nltk.corpus.reader.api import CorpusReader
from nltk.probability import FreqDist
from nltk.tokenize import sent_tokenize, word_tokenize  # Replace with CLTK

from cltk.prosody.lat.string_utils import flatten
from cltk.sentence.sentence import SentenceTokenizer
from cltk.tokenizers.word import WordTokenizer
from cltk.utils import get_cltk_data_dir

LOG = logging.getLogger(__name__)
LOG.addHandler(logging.NullHandler())

# TODO add your corpus here:
SUPPORTED_CORPORA = {
    "lat": ["lat_text_latin_library", "lat_text_perseus", "lat_text_tesserae"],
    "grc": ["grc_text_perseus", "grc_text_tesserae"],
}  # type: Dict[str, List[str]]


[docs]def get_corpus_reader(corpus_name: str = None, language: str = None) -> CorpusReader: """ Corpus reader factory method :param corpus_name: the name of the supported corpus, available as: [package].SUPPORTED_CORPORA :param langugage: the language for search in :return: NLTK compatible corpus reader """ BASE = get_cltk_data_dir() + "/{}/text".format(language) root = os.path.join(os.path.expanduser(BASE), corpus_name) if not os.path.exists(root) or corpus_name not in SUPPORTED_CORPORA.get(language): raise ValueError( "Specified corpus data not found, please install {} for language: {}".format( corpus_name, language ) ) sentence_tokenizer = SentenceTokenizer(language) the_word_tokenizer = WordTokenizer(language) doc_pattern = r".*\.txt" #: Generic file ending, override below in your own CorpusReader implementation if language == "lat": if corpus_name == "lat_text_latin_library": skip_keywords = ["Latin", "Library"] return FilteredPlaintextCorpusReader( root=root, fileids=doc_pattern, sent_tokenizer=sentence_tokenizer, word_tokenizer=the_word_tokenizer, skip_keywords=skip_keywords, ) if corpus_name == "lat_text_perseus": valid_json_root = os.path.join( root, "cltk_json" ) # we only support this subsection return JsonfileCorpusReader( root=valid_json_root, sent_tokenizer=sentence_tokenizer, word_tokenizer=the_word_tokenizer, target_language="lat", ) # perseus also contains English if corpus_name == "lat_text_tesserae": return TesseraeCorpusReader( root=root, fileids=r".*\.tess", sent_tokenizer=sentence_tokenizer, word_tokenizer=the_word_tokenizer, ) if language == "grc": if corpus_name == "grc_text_perseus": valid_json_root = os.path.join( root, "cltk_json" ) #: we only support this subsection return JsonfileCorpusReader( root=valid_json_root, sent_tokenizer=sentence_tokenizer, word_tokenizer=the_word_tokenizer, target_language="grc", ) #: this abbreviation is required if corpus_name == "grc_text_tesserae": # tokenizers/taggers need to be replaced with CLTK version # most obv. for POS tagging! return TesseraeCorpusReader( root=root, fileids=r".*\.tess", sent_tokenizer=sent_tokenize, word_tokenizer=word_tokenize, pos_tagger=pos_tag, target_language="grc", ) #: this abbreviation is required
# TODO add other languages and write tests for each corpus
[docs]def assemble_corpus( corpus_reader: CorpusReader, types_requested: List[str], type_dirs: Dict[str, List[str]] = None, type_files: Dict[str, List[str]] = None, ) -> CorpusReader: """ Create a filtered corpus. :param corpus_reader: This get mutated :param types_requested: a list of string types, which are to be found in the type_dirs and type_files mappings :param type_dirs: a dict of corpus types to directories :param type_files: a dict of corpus types to files :return: a CorpusReader object containing only the mappings desired """ fileid_names = [] # type: List[str] try: all_file_ids = list(corpus_reader.fileids()) clean_ids_types = [] # type: List[Tuple[str, str]] if type_files: for key, valuelist in type_files.items(): if key in types_requested: for value in valuelist: if value in all_file_ids: if key: clean_ids_types.append((value, key)) if type_dirs: for key, valuelist in type_dirs.items(): if key in types_requested: for value in valuelist: corrected_dir = value.replace("./", "") corrected_dir = "{}/".format(corrected_dir) for name in all_file_ids: if name and name.startswith(corrected_dir): clean_ids_types.append((name, key)) clean_ids_types.sort(key=lambda x: x[0]) fileid_names, categories = zip(*clean_ids_types) # type: ignore corpus_reader._fileids = fileid_names return corpus_reader except Exception: LOG.exception("failure in corpus building")
[docs]class FilteredPlaintextCorpusReader(PlaintextCorpusReader, CorpusReader): """ A corpus reader for plain text documents with simple filtration for streamlined pipeline use. A list keywords may be provided, and if any of these keywords are found in a document's paragraph, that whole paragraph will be skipped, same for sentences and words. """ def __init__( self, root, fileids=None, encoding="utf8", skip_keywords=None, **kwargs ): """ :param root: The file root of the corpus directory :param fileids: the list of file ids to consider, or wildcard expression :param skip_keywords: a list of words which indicate whole paragraphs that should be skipped by the paras and words methods() :param encoding: utf8 :param kwargs: Any values to be passed to NLTK super classes, such as sent_tokenizer, word_tokenizer. """ if not fileids: fileids = r".*\.txt" # Initialize the NLTK corpus reader objects PlaintextCorpusReader.__init__(self, root, fileids, encoding) CorpusReader.__init__(self, root, fileids, encoding) if "sent_tokenizer" in kwargs: self._sent_tokenizer = kwargs["sent_tokenizer"] if "word_tokenizer" in kwargs: self._word_tokenizer = kwargs["word_tokenizer"] self.skip_keywords = skip_keywords
[docs] def words(self, fileids=None) -> Generator[str, str, None]: """ Provide the words of the corpus; skipping any paragraphs flagged by keywords to the main class constructor :param fileids: :return: words, including punctuation, one by one """ if not fileids: fileids = self.fileids() for para in self.paras(fileids): flat_para = flatten(para) skip = False if self.skip_keywords: for keyword in self.skip_keywords: if keyword in flat_para: skip = True if not skip: for word in flat_para: yield word
[docs] def paras(self, fileids=None) -> Generator[str, str, None]: """ Provide paragraphs, if possible :param fileids: :return: a generator of paragraphs """ if not fileids: fileids = self.fileids() for para in super().paras(fileids): flat_para = flatten(para) skip = False if self.skip_keywords: for keyword in self.skip_keywords: if keyword in flat_para: skip = True if not skip: yield para
[docs] def sents(self, fileids=None) -> Generator[str, str, None]: """ A generator for sentences in a text, or texts :param fileids: :return: a generator of sentences """ if not fileids: fileids = self.fileids() for sent in super().sents(fileids): skip = False if self.skip_keywords: for keyword in self.skip_keywords: if keyword in sent: skip = True if not skip: yield sent
[docs] def docs(self, fileids=None) -> Generator[str, str, None]: """ Returns the complete text of an Text document, closing the document after we are done reading it and yielding it in a memory safe fashion. """ if not fileids: fileids = self.fileids() # Create a generator, loading one document into memory at a time. for path, encoding in self.abspaths(fileids, include_encoding=True): with codecs.open(path, "r", encoding=encoding) as reader: if self.skip_keywords: tmp_data = [] for line in reader: skip = False for keyword in self.skip_keywords: if keyword in line: skip = True if not skip: tmp_data.append(line) yield "".join(tmp_data) else: yield reader.read()
[docs] def sizes(self, fileids=None) -> Generator[int, int, None]: """ Returns a list of tuples, the fileid and size on disk of the file. This function is used to detect oddly large files in the corpus. """ if not fileids: fileids = self.fileids() # Create a generator, getting every path and computing filesize for path in self.abspaths(fileids): yield os.path.getsize(path)
def __iter__(self): """convenience iterator for Word2Vec training.""" for sent in self.sents(): yield sent
[docs]class JsonfileCorpusReader(CorpusReader): """ A corpus reader for Json documents where contents are stored in a dictionary. Supports any documents stored under a text key. A document may have any number of subsections as nested dictionaries, as long as their keys are sortable; they will be traversed and only strings datatypes will be collected as the text. E.g.: doc['text']['1'] = "some text" doc['text']['2'] = "more text" Or with one level of subsections: doc['text']['1']['1'] = "some text" doc['text']['1']['2'] = "more text" """ def __init__( self, root, fileids=None, encoding="utf8", skip_keywords=None, target_language=None, paragraph_separator="\n\n", **kwargs ): """ :param root: The file root of the corpus directory :param fileids: the list of file ids to consider, or wildcard expression :param skip_keywords: a list of words which indicate whole paragraphs that should be skipped by the paras and words methods() :param target_language: which files to select; sometimes a corpus contains English translations, we expect these files to be named ...english.json -- if not, pass in fileids :param paragraph_separator: character sequence demarcating paragraph separation :param encoding: utf8 :param kwargs: Any values to be passed to NLTK super classes, such as sent_tokenizer, word_tokenizer. """ if not target_language: target_language = "" if not fileids: fileids = r".*{}\.json".format(target_language) # Initialize the NLTK corpus reader objects CorpusReader.__init__(self, root, fileids, encoding) if "sent_tokenizer" in kwargs: self._sent_tokenizer = kwargs["sent_tokenizer"] if "word_tokenizer" in kwargs: self._word_tokenizer = kwargs["word_tokenizer"] self.skip_keywords = skip_keywords self.paragraph_separator = paragraph_separator
[docs] def words(self, fileids=None) -> Generator[str, str, None]: """ Provide the words of the corpus; skipping any paragraphs flagged by keywords to the main class constructor :param fileids: :return: words, including punctuation, one by one """ for sentence in self.sents(fileids): words = self._word_tokenizer.tokenize(sentence) for word in words: yield word
[docs] def sents(self, fileids=None) -> Generator[str, str, None]: """ :param fileids: :return: A generator of sentences """ for para in self.paras(fileids): sentences = self._sent_tokenizer.tokenize(para) for sentence in sentences: yield sentence
[docs] def paras(self, fileids=None) -> Generator[str, str, None]: """ Yield paragraphs of the text, as demarcated by double new lines. :param fileids: single document file or files of proper JSON objects with a text key, and section subkey :return: a generator of paragraphs """ def _recurse_to_strings(my_dict: Dict[str, Any]) -> List[str]: """Internal accumulator method.""" vals = [] # type: List[str] m_keys = sorted(list(my_dict.keys())) for mkey in m_keys: if isinstance(my_dict[mkey], dict): vals += _recurse_to_strings(my_dict[mkey]) else: vals += [my_dict[mkey]] return vals for doc in self.docs(fileids): text_data = _recurse_to_strings(doc["text"]) # type: List[str] text_sections = [] # type: List[str] for text_part in text_data: skip = False if self.skip_keywords: for keyword in self.skip_keywords: if keyword in text_part: skip = True if not skip: text_sections.append(text_part) for para in text_sections: yield para.strip()
[docs] def docs(self, fileids=None) -> Generator[Dict[str, Any], Dict[str, Any], None]: """ Returns the complete text of an Text document, closing the document after we are done reading it and yielding it in a memory safe fashion. :return : Python Dictionary of strings or Nested Dictionaries. The top level dictionary also contains the filename from which it spawned. """ # Create a generator, loading one document into memory at a time. for path, encoding in self.abspaths(fileids, include_encoding=True): with codecs.open(path, "r", encoding=encoding) as reader: the_doc = json.loads(reader.read()) if "filename" not in the_doc: the_doc["filename"] = path yield the_doc
[docs] def sizes(self, fileids=None) -> Generator[int, int, None]: """ Returns a list of tuples, the fileid and size on disk of the file. This function is used to detect oddly large files in the corpus. """ if not fileids: fileids = self.fileids() # Create a generator, getting every path and computing filesize for path in self.abspaths(fileids): yield os.path.getsize(path)
def __iter__(self) -> Generator[str, str, None]: """convenience iterator for Word2Vec training.""" for sent in self.sents(): yield sent
# WRITE DOCSTRING
[docs]class TesseraeCorpusReader(PlaintextCorpusReader): """""" def __init__( self, root, fileids=None, encoding="utf8", skip_keywords=None, **kwargs ): """ :param root: The file root of the corpus directory :param fileids: the list of file ids to consider, or wildcard expression :param skip_keywords: a list of words which indicate whole paragraphs that should be skipped by the paras and words methods() :param encoding: utf8 :param kwargs: Any values to be passed to NLTK super classes, such as sent_tokenizer, word_tokenizer. """ # Initialize the NLTK corpus reader objects PlaintextCorpusReader.__init__(self, root, fileids, encoding) # CorpusReader.__init__(self, root, fileids, encoding) if "sent_tokenizer" in kwargs: self._sent_tokenizer = kwargs["sent_tokenizer"] if "word_tokenizer" in kwargs: self._word_tokenizer = kwargs["word_tokenizer"] if "pos_tagger" in kwargs: self.pos_tagger = kwargs["pos_tagger"]
[docs] def docs(self: object, fileids: str): """ Returns the complete text of a .tess file, closing the document after we are done reading it and yielding it in a memory-safe fashion. """ for path, encoding in self.abspaths(fileids, include_encoding=True): with codecs.open(path, "r", encoding=encoding) as f: yield f.read()
[docs] def texts(self: object, fileids: str, plaintext: bool = True): """ Returns the text content of a .tess file, i.e. removing the bracketed citation info (e.g. "<Ach. Tat. 1.1.0>") """ for doc in self.docs(fileids): if plaintext == True: doc = re.sub(r"<.+?>\s", "", doc) # Remove citation info doc = doc.rstrip() # Clean up final line breaks yield doc
[docs] def paras(self: object, fileids: str): """ Returns paragraphs in a .tess file, as defined by two \n characters. NB: Most .tess files do not have this feature; only the Homeric poems from what I have noticed so far. Perhaps a feature worth looking into. """ for text in self.texts(fileids): for para in text.split("\n\n"): yield para
[docs] def lines(self: object, fileids: str, plaintext: bool = True): """ Tokenizes documents in the corpus by line """ for text in self.texts(fileids, plaintext): text = re.sub(r"\n\s*\n", "\n", text, re.MULTILINE) # Remove blank lines for line in text.split("\n"): yield line
[docs] def sents(self: object, fileids: str): """ Tokenizes documents in the corpus by sentence """ for para in self.paras(fileids): for sent in sent_tokenize(para): yield sent
[docs] def words(self: object, fileids: str): """ Tokenizes documents in the corpus by word """ for sent in self.sents(fileids): for token in word_tokenize(sent): yield token
[docs] def pos_tokenize(self: object, fileids: str): """ Segments, tokenizes, and POS tag a document in the corpus. """ for para in self.paras(fileids): yield [self.pos_tagger(word_tokenize(sent)) for sent in sent_tokenize(para)]
[docs] def describe(self: object, fileids: str = None): """ Performs a single pass of the corpus and returns a dictionary with a variety of metrics concerning the state of the corpus. based on (Bengfort et al, 2018: 46) """ started = time.time() # Structures to perform counting counts = FreqDist() tokens = FreqDist() # Perform a single pass over paragraphs, tokenize, and counts for para in self.paras(fileids): counts["paras"] += 1 for sent in para: counts["sents"] += 1 # Include POS at some point for word in sent: counts["words"] += 1 tokens[word] += 1 # Compute the number of files in the corpus n_fileids = len(self.fileids()) # Return data structure with information return { "files": n_fileids, "paras": counts["paras"], "sents": counts["sents"], "words": counts["words"], "vocab": len(tokens), "lexdiv": round((counts["words"] / len(tokens)), 3), "ppdoc": round((counts["paras"] / n_fileids), 3), "sppar": round((counts["sents"] / counts["paras"]), 3), "secs": round((time.time() - started), 3), }