initial scripts

parent de7aca55
This diff could not be displayed because it is too large.
from ComplexityLanguage import ComplexityLanguage
import re
import math
from functools import reduce
import freeling
import numpy as np
class ComplexityEnglish(ComplexityLanguage):
def __init__(self):
ComplexityLanguage.__init__(self,'en')
# create parsers
self.parser= freeling.chart_parser(self.DATA+self.lang+"/chunker/grammar-chunk.dat")
self.dep=freeling.dep_txala(self.DATA+self.lang+"/dep_txala/dependences.dat", self.parser.get_start_symbol())
"""
config es una lista de valores booleanos que activa o desactivan el cálculo de una medida
config = [
True|False, # MAXIMUN EMBEDDING DEPTH OF SENTENCE (MaxDEPTH)
True|False, # MINIMUN EMBEDDING DEPTH OF SENTENCE (MinDEPTH)
True|False, # AVERAGE EMBEDDING DEPTH OF SENTENCE (MeanDEPTH)
True|False, # FOG
True|False, # FLESCH
True|False, # FLESCH-KINCAID
True|False, # SMOG
]
"""
self.config += [True, True, True, True, True, True, True, True]
self.metricsStr.extend(['MaxDEPTH','MinDEPTH', 'MeanDEPTH', 'StdDEPTH', 'FOG', 'FLESCH', 'FLESCH-KINCAID', 'SMOG'])
def textProcessing(self, text):
text = text.replace(u'\xa0', u' ').replace('"', '')
# meter todas las funciones en una patron de los tokens válidos
#ls = sen.analyze(ls)
sid=self.sp.open_session()
tokens = self.tk.tokenize(text)
#print("Tokens:", [w.get_form() for w in tokens])
#print("hay Tokens:", len(tokens))
ls = self.sp.split(sid,tokens,True)
#print("After split", len(ls))
ls = self.mf.analyze(ls)
#print("After morpho", len(ls))
ls = self.tg.analyze(ls)
#print("After tagger", len(ls))
ls = self.parser.analyze(ls)
#print("After parser", len(ls))
ls = self.dep.analyze(ls)
#print("After dependencies", len(ls))
self.sentences = ls
#print("oraciones con split:", len(ls))
self.N_sentences = len(ls)
self.sp.close_session(sid)
return self.sentences, self.N_sentences
def getDepth(self, ptree, depth=0):
node = ptree.begin()
info = node.get_info()
nch = node.num_children()
if (nch == 0) :
return depth
else :
child_depth = []
for i in range(nch) :
child = node.nth_child_ref(i)
child_depth.append(self.getDepth(child, depth+1))
return max(child_depth)
def embeddingDepth(self):
##output results
max_list = []
for s in self.sentences:
tr = s.get_parse_tree()
max_list.append(self.getDepth(tr,0))
#print('Longitud mi lista es:', len(max_list))
#print('Mi lista es:', max_list)
self.max_list = max_list
mean_max_list = sum(max_list)/float(len(max_list))
max_max_list = max(max_list)
min_max_list = min(max_list)
std_max_list= np.std(max_list)
#print('MAXIMUN EMBEDDING DEPTH OF SENTENCE (MaxDEPTH): ', max_max_list, '\n')
#print('MINIMUN EMBEDDING DEPTH OF SENTENCE (MinDEPTH): ', min_max_list, '\n')
#print('AVERAGE EMBEDDING DEPTH OF SENTENCE (MeanDEPTH): ', mean_max_list, '\n')
#print('STANDARD DEVIATION: ', std_max_list)
#lin=sys.stdin.readline()
self.max_max_list = max_max_list
self.min_max_list = min_max_list
self.mean_max_list = mean_max_list
self.std_max_list = std_max_list
return self.max_max_list, self.min_max_list, self.mean_max_list, self.std_max_list
def readability(self):
#Number of syllables and Number of words with 3 or more syllables:tagger
N_syllables = 0
N_syllables3 = 0
for words in self.listwords:
count=0
for character in words:
if re.match('a|e|i|o|u|y', character):
N_syllables +=1
count+=1
if count>=3:
N_syllables3 += 1
self.N_syllables = N_syllables
self.N_syllables3 = N_syllables3
#print("sílabas: ", self.N_syllables)
fogreadability = 0.4 * ( self.N_words / self.N_sentences + 100 * self.N_syllables3 / self.N_words)
#print("FOG: ", fogreadability, "\n")
self.fogreadability = fogreadability
fleschreadability = 206.835 - 84.6 * (self.N_syllables / self.N_words) - 1.015 * (self.N_words / self.N_sentences)
#print("FLESCH: ", fleschreadability, "\n")
self.fleschreadability = fleschreadability
fkincaidreadability = - 15.59 + 11.8 * (self.N_syllables / self.N_words) + 0.39 * (self.N_words / self.N_sentences)
#print("FLESCH-KINCAID: ", fkincaidreadability, "\n")
self.fkincaidreadability = fkincaidreadability
return self.fogreadability, self.fleschreadability, self.fkincaidreadability
def ageReadability(self):
smogreadability= 3.1291+1.0430*math.sqrt(self.N_syllables3*(30/self.N_sentences))
#print("READABILITY SMOG: ", smogreadability, "\n")
self.smogreadability = smogreadability
return self.smogreadability
def calcMetrics(self, text):
"""
Calcula la métricas de complejidad activadas en la configuración
Si config == None se calculan todas las métricas de complejidad soportadas
"""
self.textProcessing(text)
metrics = super().calcMetrics(text)
metricsEn = self.metricsStr
embdep = None
readability = None
for i in range(len(metrics)-1, len(metricsEn)):
if self.config == None or self.config[i] and metricsEn[i] == 'MaxDEPTH':
embdep = self.embeddingDepth()
metrics['MaxDEPTH'] = embdep[0]
if self.config == None or self.config[i] and metricsEn[i] == 'MinDEPTH':
if not embdep: embdep = self.embeddingDepth()
metrics['MinDEPTH'] = embdep[1]
if self.config == None or self.config[i] and metricsEn[i] == 'MeanDEPTH':
if not embdep: embdep = self.embeddingDepth()
metrics['MeanDEPTH'] = embdep[2]
if self.config == None or self.config[i] and metricsEn[i] == 'StdDEPTH':
if not embdep: embdep = self.embeddingDepth()
metrics['StdDEPTH'] = embdep[3]
if self.config == None or self.config[i] and metricsEn[i] == 'FOG':
readability = self.readability()
metrics['FOG'] = readability[0]
if self.config == None or self.config[i] and metricsEn[i] == 'FLESCH':
if not readability: readability = self.readability()
metrics['FLESCH'] = readability[1]
if self.config == None or self.config[i] and metricsEn[i] == 'FLESCH-KINCAID':
if not readability: readability = self.readability()
metrics['FLESCH-KINCAID'] = readability[2]
if self.config == None or self.config[i] and metricsEn[i] == 'SMOG':
metrics['SMOG'] = self.ageReadability()
return metrics
\ No newline at end of file
from ComplexityLanguage import ComplexityLanguage
import freeling
import os
import re
from functools import reduce
import numpy as np
import scipy.stats
import math
class ComplexityFrench(ComplexityLanguage):
def __init__(self):
lang = 'fr'
ComplexityLanguage.__init__(self, lang)
## Modify this line to be your FreeLing installation directory
FREELINGDIR = "/usr/local"
DATA = FREELINGDIR+"/share/freeling/"
CLASSDIR = "/home/sinai/Experiments/CLEF-PAN/"
self.lang = lang
freeling.util_init_locale("default")
# create language analyzer
self.la=freeling.lang_ident(DATA+"common/lang_ident/ident.dat")
# create options set for maco analyzer. Default values are Ok, except for data files.
op= freeling.maco_options(lang)
op.set_data_files( "",
DATA + "common/punct.dat",
DATA + lang + "/dicc.src",
DATA + lang + "/afixos.dat",
"",
DATA + lang + "/locucions.dat",
DATA + lang + "/np.dat",
DATA + lang + "/quantities.dat",
DATA + lang + "/probabilitats.dat")
# create analyzers
self.tk=freeling.tokenizer(DATA+lang+"/tokenizer.dat")
self.sp=freeling.splitter(DATA+lang+"/splitter.dat")
self.mf=freeling.maco(op)
# activate mmorpho modules to be used in next call
self.mf.set_active_options(False, True, True, True, # select which among created
True, True, False, True, # submodules are to be used.
True, True, True, True ) # default: all created submodules are used
# create tagger and sense anotator
self.tg=freeling.hmm_tagger(DATA+lang+"/tagger.dat",True,2)
self.sen=freeling.senses(DATA+lang+"/senses.dat")
#Listas de palabras de Dale-Chall
CLASSDIR = "/home/sinai/Experiments/CLEF-PAN/"
f = open(CLASSDIR + 'DaleChall.txt')
lines = f.readlines()
f.close()
listDaleChall = []
for l in lines:
data = l.strip().split()
listDaleChall += data
self.listDaleChall=listDaleChall
"""
config es una lista de valores booleanos que activa o desactivan el cálculo de una medida
config = [
True|False, # KANDEL MODELS
True|False, # DALE CHALL
True|False, # SOL
]
"""
self.config += [True, True, True]
self.metricsStr.extend(['KANDEL-MODELS','DALE CHALL', 'SOL'])
def readability(self):
#Number of low frequency words
count = 0
for sentence in self.pos_content_sentences:
for w in sentence:
if w.get_form() not in self.listDaleChall:
count+=1
N_difficultwords = count
#Number of syllables and Number of words with 3 or more syllables:tagger
N_syllables = 0
N_syllables3 = 0
for words in self.listwords:
count=0
for character in words:
if re.match('a|e|i|o|u|y', character):
N_syllables +=1
count+=1
if count>=3:
N_syllables3 += 1
self.N_syllables = N_syllables
self.N_syllables3 = N_syllables3
kandelmodelsreadability = 207 - 1.015 * (self.N_words / self.N_sentences) - 73.6 * (self.N_syllables / self.N_words)
#print("KANDEL-MODELS: ", kandelmodelsreadability, "\n")
self.kandelmodelsreadability = kandelmodelsreadability
dalechallreadability =15.79 * (N_difficultwords / self.N_words) + 0.04906 * (self.N_words / self.N_sentences)
#print("DALE CHALL: ", dalechallreadability, "\n")
self.dalechallreadability = dalechallreadability
return self.kandelmodelsreadability, self.dalechallreadability
def ageReadability(self):
solreadability= - 1.35 + 0.77 * (3.1291 + 1.0430 * math.sqrt(self.N_syllables3 * (30/self.N_sentences)))
#print("READABILITY SOL: ", solreadability, "\n")
self.solreadability = solreadability
return self.solreadability
def calcMetrics(self, text):
"""
Calcula la métricas de complejidad activadas en la configuración
Si config == None se calculan todas las métricas de complejidad soportadas
"""
self.textProcessing(text)
metrics = super().calcMetrics(text)
metricsFr = self.metricsStr
readability = None
for i in range(len(metrics)-1, len(metricsFr)):
if self.config == None or self.config[i] and metricsFr[i] == 'KANDEL MODELS':
readability = self.readability()
metrics['KANDEL-MODELS'] = readability[0]
if self.config == None or self.config[i] and metricsFr[i] == 'DALE CHALL':
if not readability: readability = self.readability()
metrics['DALE CHALL'] = readability[1]
if self.config == None or self.config[i] and metricsFr[i] == 'SOL':
metrics['SOL'] = self.ageReadability()
return metrics
\ No newline at end of file
import nltk
import re
class ComplexityPolish():
def __init__(self, lang= 'pl'):
"""
config es una lista de valores booleanos que activa o desactivan el cálculo de una medida
config = [
True|False, # PUNCTUATION MARKS
True|False, # ARI
True|False, # FOG
True|False, # FLESCH
True|False, # FLESCH-KINCAID
True|False, # PISAREK
]
Si config == None se calculan todas las métricas de complejidad soportadas
"""
self.config = [True, True, True, True, True, True]
self.metricsStr = ['AVERAGE PUNCTUATION MARKS', 'ARI', 'FOG', 'FLESCH', 'FLESCH-KINCAID', 'PISAREK']
pass
def textProcessing(self, text):
text = text.replace(u'\xa0', u' ')
'''
Cada token corresponde a un término (palabra, número...) o un signo de puntuación
'''
# patron de los tokens válidos
pattern = r'''(?x)
(?:[A-Z]\.)+ # permitimos abreviaturas como EE.UU., U.S.A., etc.
| \w+(?:-\w+)* # palabras con guiones intermedios
| \$?\d+(?:\.\d+)?%?€? # monedas y porcentajes, ejm: $12.40, 35%, 36.3€
| \.\.\. # elipsis "..."
| \s\s(?:\s)+ # más de dos espacios (' ', \r, \n) se considera un token, uno o dos se ignoran
| [][.,;"'?():-_`'] # estos se consideran tokens aislados
'''
# extraemos los tokens desde el texto ya en minúsculas
tokens = nltk.regexp_tokenize(text, pattern)
self.text_tokens = tokens
N_text_tokens = len(self.text_tokens)
self.N_text_tokens = N_text_tokens
#print('Tokens: ', self.N_text_tokens)
# y ahora reorganizamos las oraciones a partir de los puntos aislados
sentences = []
ini = 0
# Estos son los marcadores de fin de oración (el punto o nueva línea)
sent_end = set(('.','!','?', '\n', '\r\n\r\n'))
for i, x in enumerate(self.text_tokens):
if x in sent_end:
if i > ini: # para evitar oraciones con sólo el token de separación
# vamos añadiendo frases y eliminando el token de fin de oración
sentences.append(self.text_tokens[ini:i])
ini = i+1
self.sentences = sentences
N_sentences = len(sentences)
self.N_sentences = N_sentences
#print('Sentences: ',self.sentences)
N_charac=0
for word in self.text_tokens:
N_charac += len(word)
self.N_charac = N_charac
#print('The number the character is: ', self.N_charac)
N_syllables = 0
N_syllables3 = 0
for words in self.text_tokens:
count=0
for character in words:
if re.match('a|e|i|o|u|y', character):
N_syllables +=1
count+=1
if count>=3:
N_syllables3 += 1
self.N_syllables = N_syllables
self.N_syllables3 = N_syllables3
#print('The number of syllables is: ',self.N_syllables)
#print('The number of syllables3 is: ', self.N_syllables3)
return self.text_tokens, self.N_text_tokens, self.sentences, self.N_sentences, self.N_charac, self.N_syllables, self.N_syllables3
def punctuationMarks(self):
N_punctuation = 0
letters = []
N_letters = 0
for word in self.text_tokens:
if re.match('[a-zA-Z]|á|ó|í|ú|é', word):
letters.append(word)
N_letters+=len(word)
else:
N_punctuation += 1
self.words = letters
self.N_words = len(letters)
#print('N_words: ', self.N_words)
self.N_letters = N_letters
self.N_punctuation = N_punctuation
if self.N_words == 0:
punctuation_over_words = 0
else:
punctuation_over_words = self.N_punctuation / self.N_words
self.punctuation_over_words = punctuation_over_words
#print('The number of letter is: ', N_letters)
#print('The list of letter is: ', letters)
#print('The PUNCTUATION MARKS is: ', self.N_punctuation, '\n')
return self.punctuation_over_words, self.N_punctuation, self.words, self.N_words, self.N_letters
def readability(self):
ARI = 4.71 * self.N_charac / self.N_words + 0.5 * self.N_words / self.N_sentences -21.43
self.ARI = ARI
#print("AUTOMATED READABILITY INDEX (ARI) = ", self.ARI, '\n')
fogreadability = 0.4 * ( self.N_words / self.N_sentences + 100 * self.N_syllables3 / self.N_words)
self.fogreadability = fogreadability
#print("FOG: ", self.fogreadability, "\n")
fleschreadability = 206.835 - 84.6 * (self.N_syllables / self.N_words) - 1.015 * (self.N_words / self.N_sentences)
self.fleschreadability = fleschreadability
#print("Syllables:", self.N_syllables)
#print("Sentences:", self.N_sentences)
#print("FLESCH: ", self.fleschreadability, "\n")
fkincaidreadability = - 15.59 + 11.8 * (self.N_syllables / self.N_words) + 0.39 * (self.N_words / self.N_sentences)
self.fkincaidreadability = fkincaidreadability
#print("FLESCH-KINCAID: ", self.fkincaidreadability, "\n")
self.fkincaidreadability = fkincaidreadability
pisarekreadability = (self.N_words / self.N_sentences)/3 + self.N_syllables3/3 +1
self.pisarekreadability = pisarekreadability
#print("PISAREK (2007): ", self.pisarekreadability, "\n")
return self.ARI, self.fogreadability, self.fleschreadability, self.fkincaidreadability, self.pisarekreadability
def calcMetrics(self, text):
self.textProcessing(text)
metrics = {}
metricsPo = self.metricsStr
readability = None
for i in range(0, len(metricsPo)):
if self.config == None or self.config[i] and metricsPo[i] == 'AVERAGE PUNCTUATION MARKS':
punctuationmarks = self.punctuationMarks()
metrics['AVERAGE PUNCTUATION MARKS'] = punctuationmarks[0]
if self.config == None or self.config[i] and metricsPo[i] == 'ARI':
readability = self.readability()
metrics['ARI'] = readability[0]
if self.config == None or self.config[i] and metricsPo[i] == 'FOG':
if not readability: readability = self.readability()
metrics['FOG'] = readability[1]
if self.config == None or self.config[i] and metricsPo[i] == 'FLESCH':
if not readability: readability = self.readability()
metrics['FLESCH'] = readability[2]
if self.config == None or self.config[i] and metricsPo[i] == 'FLESCH-KINCAID':
if not readability: readability = self.readability()
metrics['FLESCH-KINCAID'] = readability[3]
if self.config == None or self.config[i] and metricsPo[i] == 'PISAREK':
if not readability: readability = self.readability()
metrics['PISAREK'] = readability[4]
return metrics
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
A baseline authorship attribution method
based on a character n-gram representation
and a linear SVM classifier
for Python 2.7
Questions/comments: stamatatos@aegean.gr
It can be applied to datasets of PAN-18 cross-domain authorship attribution task
See details here: http://pan.webis.de/clef18/pan18-web/author-identification.html
Dependencies:
- Python 2.7 or 3.6 (we recommend the Anaconda Python distribution)
- scikit-learn
Usage from command line:
> python pan18-cdaa-baseline.py -i EVALUATION-DIRECTORY -o OUTPUT-DIRECTORY [-n N-GRAM-ORDER] [-ft FREQUENCY-THRESHOLD] [-c CLASSIFIER]
EVALUATION-DIRECTORY (str) is the main folder of a PAN-18 collection of attribution problems
OUTPUT-DIRECTORY (str) is an existing folder where the predictions are saved in the PAN-18 format
Optional parameters of the model:
N-GRAM-ORDER (int) is the length of character n-grams (default=3)
FREQUENCY-THRESHOLD (int) is the curoff threshold used to filter out rare n-grams (default=5)
CLASSIFIER (str) is either 'OneVsOne' or 'OneVsRest' version of SVM (default=OneVsRest)
Example:
> python pan18-cdaa-baseline.py -i "mydata/pan18-cdaa-development-corpus" -o "mydata/pan18-answers"
"""
from __future__ import print_function
import os
import glob
import json
import argparse
import time
import codecs
from collections import defaultdict
from sklearn.svm import LinearSVC
from sklearn.multiclass import OneVsOneClassifier
from sklearn.multiclass import OneVsRestClassifier
from sklearn.feature_extraction.text import CountVectorizer
from sklearn import preprocessing
def represent_text(text,n):
# Extracts all character 'n'-grams from a 'text'
if n>0:
tokens = [text[i:i+n] for i in range(len(text)-n+1)]
frequency = defaultdict(int)
for token in tokens:
frequency[token] += 1
return frequency
def read_files(path,label):
# Reads all text files located in the 'path' and assigns them to 'label' class
files = glob.glob(path+os.sep+label+os.sep+'*.txt')
texts=[]
for i,v in enumerate(files):
f=codecs.open(v,'r',encoding='utf-8')
texts.append((f.read(),label))
f.close()
return texts
def extract_vocabulary(texts,n,ft):
# Extracts all characer 'n'-grams occurring at least 'ft' times in a set of 'texts'
occurrences=defaultdict(int)
for (text,label) in texts:
text_occurrences=represent_text(text,n)
for ngram in text_occurrences:
if ngram in occurrences:
occurrences[ngram]+=text_occurrences[ngram]
else:
occurrences[ngram]=text_occurrences[ngram]
vocabulary=[]
for i in occurrences.keys():
if occurrences[i]>=ft:
vocabulary.append(i)
return vocabulary
def baseline(path,outpath,n=3,ft=5,classifier='OneVsRest'):
start_time = time.time()
# Reading information about the collection
infocollection = path+os.sep+'collection-info.json'
problems = []
language = []
with open(infocollection, 'r') as f:
for attrib in json.load(f):
problems.append(attrib['problem-name'])
language.append(attrib['language'])
for index,problem in enumerate(problems):
print(problem)
# Reading information about the problem
infoproblem = path+os.sep+problem+os.sep+'problem-info.json'
candidates = []
with open(infoproblem, 'r') as f:
fj = json.load(f)
unk_folder = fj['unknown-folder']
for attrib in fj['candidate-authors']:
candidates.append(attrib['author-name'])
# Building training set
train_docs=[]
for candidate in candidates:
train_docs.extend(read_files(path+os.sep+problem,candidate))
train_texts = [text for i,(text,label) in enumerate(train_docs)]
train_labels = [label for i,(text,label) in enumerate(train_docs)]
vocabulary = extract_vocabulary(train_docs,n,ft)
vectorizer = CountVectorizer(analyzer='char',ngram_range=(n,n),lowercase=False,vocabulary=vocabulary)
train_data = vectorizer.fit_transform(train_texts)
train_data = train_data.astype(float)
for i,v in enumerate(train_texts):
train_data[i]=train_data[i]/len(train_texts[i])
print('\t', 'language: ', language[index])
print('\t', len(candidates), 'candidate authors')
print('\t', len(train_texts), 'known texts')
print('\t', 'vocabulary size:', len(vocabulary))
# Building test set
test_docs=read_files(path+os.sep+problem,unk_folder)
test_texts = [text for i,(text,label) in enumerate(test_docs)]
test_data = vectorizer.transform(test_texts)
test_data = test_data.astype(float)
for i,v in enumerate(test_texts):
test_data[i]=test_data[i]/len(test_texts[i])
print('\t', len(test_texts), 'unknown texts')
# Applying SVM
max_abs_scaler = preprocessing.MaxAbsScaler()
scaled_train_data = max_abs_scaler.fit_transform(train_data)
scaled_test_data = max_abs_scaler.transform(test_data)
if classifier=='OneVsOne':
clf=OneVsOneClassifier(LinearSVC(C=1)).fit(scaled_train_data, train_labels)
else:
clf=OneVsRestClassifier(LinearSVC(C=1)).fit(scaled_train_data, train_labels)
predictions=clf.predict(scaled_test_data)
# Writing output file
out_data=[]
unk_filelist = glob.glob(path+os.sep+problem+os.sep+unk_folder+os.sep+'*.txt')
pathlen=len(path+os.sep+problem+os.sep+unk_folder+os.sep)
for i,v in enumerate(predictions):
out_data.append({'unknown-text': unk_filelist[i][pathlen:], 'predicted-author': v})
with open(outpath+os.sep+'answers-'+problem+'.json', 'w') as f:
json.dump(out_data, f, indent=4)
print('\t', 'answers saved to file','answers-'+problem+'.json')
print('elapsed time:', time.time() - start_time)
def main():
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(description='PAN-18 Baseline Authorship Attribution Method')
parser.add_argument('-i', type=str, help='Path to the main folder of a collection of attribution problems')
parser.add_argument('-o', type=str, help='Path to an output folder')
parser.add_argument('-n', type=int, default=3, help='n-gram order (default=3)')
parser.add_argument('-ft', type=int, default=5, help='frequency threshold (default=5)')
parser.add_argument('-c', type=str, default='OneVsRest', help='OneVsRest or OneVsOne (default=OneVsRest)')
args = parser.parse_args()
if not args.i:
print('ERROR: The input folder is required')
parser.exit(1)
if not args.o:
print('ERROR: The output folder is required')
parser.exit(1)
baseline(args.i, args.o, args.n, args.ft, args.c)
if __name__ == '__main__':
main()
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# Evaluation script for the Cross-Domain Authorship Attribution task @PAN2018.
We use the F1 metric (macro-average) as implemented in scikit-learn:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html
We include the following ad hoc rules:
- If authors are predicted which were not seen during training,
these predictions will count as false predictions ('<UNK>' class)
and they will negatively effect performance.
- If texts are left unattributed they will assigned to the ('<UNK>'
class) and they will negatively effect performance.
- The <UNK> class is excluded from the macro-average across classes.
- If multiple test attributions are given for a single unknown document,
only the first one will be taken into consideration.
Dependencies:
- Python 2.7 or 3.6 (we recommend the Anaconda Python distribution)
- scikit-learn
- matplotlib
Usage from the command line:
>>> python pan18-cdaa-evaluator-single.py -g GROUND-TRUTH-FILE -p PREDICTIONS-FILE [-c CONFUSION-MATRIX-FILE]
where
GROUND-TRUTH-FILE is the path to the (json) ground truth file of an attribution problem
PREDICTIONS-FILE is the path to the (json) predictions file of an attribution problem
CONFUSION-MATRIX-FILE is the path to the file where the image of the confusion matrix will be saved (optionally)
Example:
>>> python pan18-cdaa-evaluator-single.py -g "/mydata/pan18-cdaa-development-corpus/problem00001/ground-truth.json" -p "/mydata/pan18-answers/answers-problem00001.json"
# References:
@article{scikit-learn,
title={Scikit-learn: Machine Learning in {P}ython},
author={Pedregosa, F. and Varoquaux, G. and Gramfort, A. and Michel, V.
and Thirion, B. and Grisel, O. and Blondel, M. and Prettenhofer, P.
and Weiss, R. and Dubourg, V. and Vanderplas, J. and Passos, A. and
Cournapeau, D. and Brucher, M. and Perrot, M. and Duchesnay, E.},
journal={Journal of Machine Learning Research},
volume={12},
pages={2825--2830},
year={2011}
}
"""
import argparse
import os
import json
import warnings
from itertools import product
import logging
logging.basicConfig(format='%(levelname)s : %(message)s', level=logging.INFO)
logging.root.level = logging.INFO
import matplotlib
#matplotlib.use('Agg')
import matplotlib.pyplot as plt
plt.style.use("seaborn-deep")
import numpy as np
from sklearn.metrics import f1_score, confusion_matrix
from sklearn.preprocessing import LabelEncoder
def macro_f1(gt, pred, cm_path=None):
"""Compute macro-averaged F1-scores according the ad hoc
rules discussed at the top of this file.
Parameters
----------
gt : dict
Ground truth, where keys indicate text file names
(e.g. `unknown00002.txt`), and values represent
author labels (e.g. `candidate00003`)
pred : dict
Predicted attribution, where keys indicate text file names
(e.g. `unknown00002.txt`), and values represent
author labels (e.g. `candidate00003`)
cm_path : str (default: None)
Path to where to write the confusion matrix image. If `None`,
no confusion matrix is created.
Returns
-------
f1 : float
Macro-averaged F1-score
"""
actual_authors = list(gt.values())
encoder = LabelEncoder().fit(['<UNK>'] + actual_authors)
text_ids, gold_authors, silver_authors = [], [], []
for text_id in sorted(gt):
text_ids.append(text_id)
gold_authors.append(gt[text_id])
try:
silver_authors.append(pred[text_id])
except KeyError:
# missing attributions get <UNK>:
silver_authors.append('<UNK>')
assert len(text_ids) == len(gold_authors)
assert len(text_ids) == len(silver_authors)
# replace non-existent silver authors with '<UNK>':
silver_authors = [a if a in encoder.classes_ else '<UNK>'
for a in silver_authors]
gold_author_ints = encoder.transform(gold_authors)
silver_author_ints = encoder.transform(silver_authors)
# get F1 for individual classes (and suppress warnings):
with warnings.catch_warnings():
warnings.simplefilter('ignore')
f1 = f1_score(gold_author_ints,
silver_author_ints,
labels=list(set(gold_author_ints)),
average='macro')
# save the confusion matrix
if cm_path:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
cm = confusion_matrix(gold_author_ints, silver_author_ints)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
np.set_printoptions(precision=2)
plt.figure(figsize=(20, 20))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.tick_params(labelsize=12)
plt.title('Confusion matrix')
plt.colorbar()
if len(encoder.classes_)==len(cm):
tick_marks = np.arange(len(encoder.classes_))
plt.xticks(tick_marks, encoder.classes_, rotation=90)
plt.yticks(tick_marks, encoder.classes_)
else:
tick_marks = np.arange(len(encoder.classes_[1:]))
plt.xticks(tick_marks, encoder.classes_[1:], rotation=90)
plt.yticks(tick_marks, encoder.classes_[1:])
thresh = cm.max() / 2.
for i, j in product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, round(cm[i, j], 2),
horizontalalignment='center',
color='white' if cm[i, j] > thresh else 'black')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
plt.plot()
plt.savefig(cm_path)
return f1
def main():
logging.info('>>> Evaluation Cross-Domain Authorship Attribution @PAN2018 <<<')
parser = argparse.ArgumentParser(description='Evaluation script AA@PAN2018')
parser.add_argument('-g', type=str,
help='Path to ground truth file (json formatted)')
parser.add_argument('-p', type=str,
help='Path to system predictions (json formatted)')
parser.add_argument('-c', type=str,
help='Path to plot confusion matrix (optional)')
args = parser.parse_args()
if not args.g:
print('ERROR: The ground truth file is required')
parser.exit(1)
if not args.p:
print('ERROR: The predictions file is required')
parser.exit(1)
logging.info(args)
gt = {}
with open(args.g, 'r') as f:
for attrib in json.load(f)['ground_truth']:
gt[attrib['unknown-text']] = attrib['true-author']
pred = {}
with open(args.p, 'r') as f:
for attrib in json.load(f):
if attrib['unknown-text'] not in pred:
pred[attrib['unknown-text']] = attrib['predicted-author']
f1 = macro_f1(gt=gt, pred=pred, cm_path=args.c)
logging.info('MACRO-AVERAGED F1: %f',f1)
logging.info('>>> Evaluation done <<<')
if __name__ == '__main__':
main()
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# Evaluation script for the Cross-Domain Authorship Attribution task @PAN2018.
We use the F1 metric (macro-average) as implemented in scikit-learn:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html
We include the following ad hoc rules:
- If authors are predicted which were not seen during training,
these predictions will count as false predictions ('<UNK>' class)
and they will negatively effect performance.
- If texts are left unattributed they will assigned to the ('<UNK>'
class) and they will negatively effect performance.
- The <UNK> class is excluded from the macro-average across classes.
- If multiple test attributions are given for a single unknown document,
only the first one will be taken into consideration.
Dependencies:
- Python 2.7 or 3.6 (we recommend the Anaconda Python distribution)
- scikit-learn
Usage from the command line:
>>> python pan18-cdaa-evaluator.py -i COLLECTION -a ANSWERS -o OUTPUT
where
COLLECTION is the path to the main folder of the evaluation collection
ANSWERS is the path to the answers folder of a submitted method
OUTPUT is the path to the folder where the results of the evaluation will be saved
Example:
>>> python pan18-cdaa-evaluator.py -i "/mydata/pan18-cdaa-development-corpus" -a "/mydata/pan18-answers" -o "/mydata/pan18-evaluation"
# References:
@article{scikit-learn,
title={Scikit-learn: Machine Learning in {P}ython},
author={Pedregosa, F. and Varoquaux, G. and Gramfort, A. and Michel, V.
and Thirion, B. and Grisel, O. and Blondel, M. and Prettenhofer, P.
and Weiss, R. and Dubourg, V. and Vanderplas, J. and Passos, A. and
Cournapeau, D. and Brucher, M. and Perrot, M. and Duchesnay, E.},
journal={Journal of Machine Learning Research},
volume={12},
pages={2825--2830},
year={2011}
}
"""
import argparse
import os
import json
import warnings
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
from sklearn.preprocessing import LabelEncoder
def eval_measures(gt, pred):
"""Compute macro-averaged F1-scores, macro-averaged precision,
macro-averaged recall, and micro-averaged accuracy according the ad hoc
rules discussed at the top of this file.
Parameters
----------
gt : dict
Ground truth, where keys indicate text file names
(e.g. `unknown00002.txt`), and values represent
author labels (e.g. `candidate00003`)
pred : dict
Predicted attribution, where keys indicate text file names
(e.g. `unknown00002.txt`), and values represent
author labels (e.g. `candidate00003`)
Returns
-------
f1 : float
Macro-averaged F1-score
precision : float
Macro-averaged precision
recall : float
Macro-averaged recall
accuracy : float
Micro-averaged F1-score
"""
actual_authors = list(gt.values())
encoder = LabelEncoder().fit(['<UNK>'] + actual_authors)
text_ids, gold_authors, silver_authors = [], [], []
for text_id in sorted(gt):
text_ids.append(text_id)
gold_authors.append(gt[text_id])
try:
silver_authors.append(pred[text_id])
except KeyError:
# missing attributions get <UNK>:
silver_authors.append('<UNK>')
assert len(text_ids) == len(gold_authors)
assert len(text_ids) == len(silver_authors)
# replace non-existent silver authors with '<UNK>':
silver_authors = [a if a in encoder.classes_ else '<UNK>'
for a in silver_authors]
gold_author_ints = encoder.transform(gold_authors)
silver_author_ints = encoder.transform(silver_authors)
# get F1 for individual classes (and suppress warnings):
with warnings.catch_warnings():
warnings.simplefilter('ignore')
f1 = f1_score(gold_author_ints,
silver_author_ints,
labels=list(set(gold_author_ints)),
average='macro')
precision = precision_score(gold_author_ints,
silver_author_ints,
labels=list(set(gold_author_ints)),
average='macro')
recall = recall_score(gold_author_ints,
silver_author_ints,
labels=list(set(gold_author_ints)),
average='macro')
accuracy = accuracy_score(gold_author_ints,
silver_author_ints)
return f1,precision,recall,accuracy
def evaluate(ground_truth_file,predictions_file):
# Calculates evaluation measures for a single attribution problem
gt = {}
with open(ground_truth_file, 'r') as f:
for attrib in json.load(f)['ground_truth']:
gt[attrib['unknown-text']] = attrib['true-author']
pred = {}
with open(predictions_file, 'r') as f:
for attrib in json.load(f):
if attrib['unknown-text'] not in pred:
pred[attrib['unknown-text']] = attrib['predicted-author']
f1,precision,recall,accuracy = eval_measures(gt,pred)
return f1, precision, recall, accuracy
def evaluate_all(path_collection,path_answers,path_out):
# Calculates evaluation measures for a PAN-18 collection of attribution problems
infocollection = path_collection+os.sep+'collection-info.json'
problems = []
data = []
with open(infocollection, 'r') as f:
for attrib in json.load(f):
problems.append(attrib['problem-name'])
scores=[];
for problem in problems:
f1,precision,recall,accuracy=evaluate(path_collection+os.sep+problem+os.sep+'ground-truth.json',path_answers+os.sep+'answers-'+problem+'.json')
scores.append(f1)
data.append({'problem-name': problem, 'macro-f1': round(f1,3), 'macro-precision': round(precision,3), 'macro-recall': round(recall,3), 'micro-accuracy': round(accuracy,3)})
print(str(problem),'Macro-F1:',round(f1,3))
overall_score=sum(scores)/len(scores)
# Saving data to output files (out.json and evaluation.prototext)
with open(path_out+os.sep+'out.json', 'w') as f:
json.dump({'problems': data, 'overall_score': round(overall_score,3)}, f, indent=4, sort_keys=True)
print('Overall score:', round(overall_score,3))
prototext='measure {\n key: "mean macro-f1"\n value: "'+str(round(overall_score,3))+'"\n}\n'
with open(path_out+os.sep+'evaluation.prototext', 'w') as f:
f.write(prototext)
def main():
parser = argparse.ArgumentParser(description='Evaluation script AA@PAN2018')
parser.add_argument('-i', type=str,
help='Path to evaluation collection')
parser.add_argument('-a', type=str,
help='Path to answers folder')
parser.add_argument('-o', type=str,
help='Path to output files')
args = parser.parse_args()
if not args.i:
print('ERROR: The collection path is required')
parser.exit(1)
if not args.a:
print('ERROR: The answers folder is required')
parser.exit(1)
if not args.o:
print('ERROR: The output path is required')
parser.exit(1)
evaluate_all(args.i,args.a,args.o)
if __name__ == '__main__':
main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment