forked from Mirrors/apostrophe
290 lines
7.4 KiB
Python
290 lines
7.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Poio Tools for Linguists
|
|
#
|
|
# Copyright (C) 2009-2013 Poio Project
|
|
# Author: Peter Bouda <pbouda@cidles.eu>
|
|
# URL: <http://media.cidles.eu/poio/>
|
|
# For license information, see LICENSE
|
|
|
|
"""
|
|
Several classes to tokenize text.
|
|
|
|
"""
|
|
|
|
from __future__ import absolute_import, unicode_literals
|
|
|
|
import abc
|
|
import codecs
|
|
import collections
|
|
|
|
from . import character
|
|
|
|
class Tokenizer(object):
|
|
"""
|
|
Base class for all tokenizers.
|
|
|
|
"""
|
|
|
|
__metaclass__ = abc.ABCMeta
|
|
|
|
def __init__(self, stream, blankspaces = character.blankspaces,
|
|
separators = character.separators):
|
|
"""
|
|
Constructor of the Tokenizer base class.
|
|
|
|
Parameters
|
|
----------
|
|
stream : str or io.IOBase
|
|
The stream to tokenize. Can be a filename or any open IO stream.
|
|
|
|
blankspaces : str
|
|
The characters that represent empty spaces.
|
|
|
|
separators : str
|
|
The characters that separate token units (e.g. word boundaries).
|
|
|
|
"""
|
|
self.separators = separators
|
|
self.blankspaces = blankspaces
|
|
self.lowercase = False
|
|
|
|
self.offbeg = 0
|
|
self.offset = None
|
|
self.offend = None
|
|
|
|
def is_blankspace(self, char):
|
|
"""
|
|
Test if a character is a blankspace.
|
|
|
|
Parameters
|
|
----------
|
|
char : str
|
|
The character to test.
|
|
|
|
Returns
|
|
-------
|
|
ret : bool
|
|
True if character is a blankspace, False otherwise.
|
|
|
|
"""
|
|
if len(char) > 1:
|
|
raise TypeError("Expected a char.")
|
|
if char in self.blankspaces:
|
|
return True
|
|
return False
|
|
|
|
def is_separator(self, char):
|
|
"""
|
|
Test if a character is a separator.
|
|
|
|
Parameters
|
|
----------
|
|
char : str
|
|
The character to test.
|
|
|
|
Returns
|
|
-------
|
|
ret : bool
|
|
True if character is a separator, False otherwise.
|
|
|
|
"""
|
|
if len(char) > 1:
|
|
raise TypeError("Expected a char.")
|
|
if char in self.separators:
|
|
return True
|
|
return False
|
|
|
|
@abc.abstractmethod
|
|
def count_characters(self):
|
|
raise NotImplementedError("Method must be implemented")
|
|
|
|
@abc.abstractmethod
|
|
def reset_stream(self):
|
|
raise NotImplementedError("Method must be implemented")
|
|
|
|
@abc.abstractmethod
|
|
def count_tokens(self):
|
|
raise NotImplementedError("Method must be implemented")
|
|
|
|
@abc.abstractmethod
|
|
def has_more_tokens(self):
|
|
raise NotImplementedError("Method must be implemented")
|
|
|
|
@abc.abstractmethod
|
|
def next_token(self):
|
|
raise NotImplementedError("Method must be implemented")
|
|
|
|
@abc.abstractmethod
|
|
def progress(self):
|
|
raise NotImplementedError("Method must be implemented")
|
|
|
|
|
|
class ForwardTokenizer(Tokenizer):
|
|
|
|
def __init__(self, stream, blankspaces = character.blankspaces,
|
|
separators = character.separators):
|
|
Tokenizer.__init__(self, stream, blankspaces, separators)
|
|
if not hasattr(stream, 'read'):
|
|
stream = codecs.open(stream, "r", "utf-8")
|
|
self.text = stream.read()
|
|
stream.close()
|
|
|
|
|
|
self.offend = self.count_characters() - 1
|
|
self.reset_stream()
|
|
|
|
def count_tokens(self):
|
|
count = 0
|
|
while(self.has_more_tokens()):
|
|
count += 1
|
|
self.next_token()
|
|
|
|
self.reset_stream()
|
|
|
|
return count
|
|
|
|
def count_characters(self):
|
|
"""
|
|
Counts the number of unicode characters in the IO stream.
|
|
|
|
"""
|
|
return len(self.text)
|
|
|
|
def has_more_tokens(self):
|
|
if self.offset < self.offend:
|
|
return True
|
|
return False
|
|
|
|
def next_token(self):
|
|
current = self.text[self.offset]
|
|
self.offset += 1
|
|
token = ""
|
|
|
|
if self.offset <= self.offend:
|
|
while (self.is_blankspace(current) or self.is_separator(current)) \
|
|
and self.offset < self.offend:
|
|
current = self.text[self.offset]
|
|
self.offset += 1
|
|
|
|
while not self.is_blankspace(current) and not self.is_separator(
|
|
current) and self.offset <= self.offend:
|
|
|
|
if self.lowercase:
|
|
current = current.lower()
|
|
|
|
token += current
|
|
|
|
current = self.text[self.offset]
|
|
self.offset += 1
|
|
|
|
if self.offset > self.offend:
|
|
token += self.text[-1]
|
|
|
|
|
|
return token
|
|
|
|
def progress(self):
|
|
return float(offset)/offend
|
|
|
|
def reset_stream(self):
|
|
self.offset = 0
|
|
|
|
|
|
class ReverseTokenizer(Tokenizer):
|
|
|
|
def __init__(self, stream, blankspaces = character.blankspaces,
|
|
separators = character.separators):
|
|
Tokenizer.__init__(self, stream, blankspaces, separators)
|
|
if not hasattr(stream, 'read'):
|
|
stream = codecs.open(stream, "r", "utf-8")
|
|
self.text = stream.read()
|
|
stream.close()
|
|
|
|
self.offend = self.count_characters() - 1
|
|
self.offset = self.offend
|
|
|
|
def count_tokens(self):
|
|
curroff = self.offset
|
|
self.offset = self.offend
|
|
count = 0
|
|
while (self.has_more_tokens()):
|
|
self.next_token()
|
|
count += 1
|
|
self.offset = curroff
|
|
return count
|
|
|
|
def count_characters(self):
|
|
"""
|
|
Counts the number of unicode characters in the IO stream.
|
|
|
|
"""
|
|
return len(self.text)
|
|
|
|
def has_more_tokens(self):
|
|
if (self.offbeg <= self.offset):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def next_token(self):
|
|
token = ""
|
|
|
|
while (self.offbeg <= self.offset) and len(token) == 0:
|
|
current = self.text[self.offset]
|
|
|
|
if (self.offset == self.offend) and (self.is_separator(current) \
|
|
or self.is_blankspace(current)):
|
|
self.offset -= 1
|
|
return token
|
|
|
|
while (self.is_blankspace(current) or self.is_separator(current)) \
|
|
and self.offbeg < self.offset:
|
|
self.offset -= 1
|
|
if (self.offbeg <= self.offset):
|
|
current = self.text[self.offset]
|
|
|
|
while not self.is_blankspace(current) and not self.is_separator(
|
|
current) and self.offbeg <= self.offset:
|
|
if self.lowercase:
|
|
current = current.lower()
|
|
token = current + token
|
|
self.offset -= 1
|
|
if (self.offbeg <= self.offset):
|
|
current = self.text[self.offset]
|
|
|
|
return token
|
|
|
|
def progress(self):
|
|
return float(self.offend - self.offset) / (self.offend - self.offbeg)
|
|
|
|
def reset_stream(self):
|
|
self.offset = self.offend
|
|
|
|
|
|
def forward_tokenize_file(infile, ngram_size, lowercase=False, cutoff=0):
|
|
ngram_map = collections.defaultdict(int)
|
|
ngram_list = []
|
|
tokenizer = ForwardTokenizer(infile)
|
|
tokenizer.lowercase = lowercase
|
|
|
|
for i in range(ngram_size - 1):
|
|
if not tokenizer.has_more_tokens():
|
|
break
|
|
ngram_list.append(tokenizer.next_token())
|
|
|
|
while (tokenizer.has_more_tokens()):
|
|
token = tokenizer.next_token()
|
|
ngram_list.append(token)
|
|
ngram_map[tuple(ngram_list)] += 1
|
|
ngram_list.pop(0)
|
|
|
|
ngram_map_tmp = dict()
|
|
if cutoff > 0:
|
|
for k in ngram_map.keys():
|
|
if ngram_map[k] <= cutoff:
|
|
del(ngram_map[k])
|
|
|
|
return ngram_map
|
|
|