miércoles, 23 de septiembre de 2009

Armando el primer data set

Estos días he trabajado en un script (python) que me permite parsear mi archivo de syslog y subirlo parseado a un DB postgreql, ésta es la primera prueba en la cual pude insertar 3'789.433 líneas en 29 minutos en un equipo modesto, si alguien tiene sugerencias para mejorar el script agradeceré la colaboración


El script normaliza las líneas del syslog quitando espacios innecesarios y separando cada campo en una nueva variable, también parsea el mensaje enviado por el servicio slapd (motivo del análisis), por el momento inserta los datos en una sola tabla, el objetivo es armar algo así como un data warehouse de logs, y otros datos más que nos sirvan.


Aquí les va el script


import re
import datetime
import pg
import sys
import os
import mmap


def abrir_archivo(archivo):
archivo_log = open(archivo,"r")
return archivo_log


def cerrar_archivo(archivo_abierto):
archivo_abierto.close()


def lee_log_file(archivo):
a=abrir_archivo(archivo)
lista=a.readlines();
cerrar_archivo(a)
return lista


def normaliza_linea(linea):
linea = re.sub("\n","",re.sub("\s\s+"," ",linea))
return linea


def parsea_msg_ldap(linea):
if re.search('^conn=(.*)op=',linea):
ldap=linea.split(None,2)
ldap_msg=['op',re.sub("\D","",ldap[0]),re.sub("\D","",ldap[1]),ldap[2].lower()]
elif re.search('^conn=(.*)fd=',linea):
ldap=linea.split(None,2)
ldap_msg=['fd',re.sub("\D","",ldap[0]),re.sub("\D","",ldap[1]),ldap[2].lower()]
else:
ldap_msg=[linea,'-1','-1','-1']
return ldap_msg


def parsear_linea(linea):
mes=carga_meses()
valores=linea.split(None,5)
parseado=[None]*5
parseado[0]=str(datetime.date.today().year)+"-"+mes[valores[0]]+"-"+valores[1]
parseado[1]=valores[2]
parseado[2]=valores[3]
parseado[3]=re.sub("[\d,\[,\],\:]","",valores[4]).lower()


if parseado[3]=='slapd':
parseado[4]=re.sub("[\D]","",valores[4])
parseado+=parsea_msg_ldap(valores[5])
else:
parseado[4]='-1'
parseado.append(valores[5])
parseado.append('-1')
parseado.append('-1')
parseado.append('-1')
return parseado


def carga_meses():
meses={'Jan':'01','Feb':'02','Mar':'03','Apr':'04','May':'05','Jun':'06','Jul':'07','Aug':'08','Sep':'09','Oct':'10','Nov':'11','Dic':'12'}
return meses


def abre_db():
conex=pg.connect(dbname='loguno',user='mike')
accion0=conex.query('BEGIN WORK')
return conex


def cierra_db(conex):
accion0=conex.query('COMMIT')
conex.close()


def insertar_log(linea,tabla,conex):
insertar='INSERT INTO '+tabla+' VALUES (nextval(\'seq_bruta\'),\''+linea[0]+'\',\''+linea[1]+'\',\''+linea[2]+'\',\''+linea[3]+'\','+linea[4]+',\''+linea[5]+'\','+linea[6]+','+linea[7]+',E\''+linea[8]+'\')'
accion=conex.query(insertar)




# Flujo del Script


log_archivo = "/prueba/mi_syslog_de_prueba"
log_tam = os.path.getsize(log_archivo)
tam_arch_map = 524288
pos1 = 0
if (log_tam > tam_arch_map):
pos2 = pos1+tam_arch_map
else:
pos2 = pos1+log_tam


todo = abrir_archivo(log_archivo)
todo.seek(0)
mapped = mmap.mmap(todo.fileno(),0,access=mmap.ACCESS_READ)


while (pos1 < log_tam):
con=abre_db()
mapped.seek(pos1)
pos2 = mapped.rfind('\n',pos1,pos2)
mapped.seek(pos1)
texto = mapped.read(pos2-pos1)
lineas = texto.split('\n')
for logs in lineas:
logs=normaliza_linea(logs)
campos = parsear_linea(logs)
insertar_log(campos,'syslog',con)
cierra_db(con)
pos1 = pos2 + 1
pos2 = pos2+tam_arch_map
if (pos2 >= log_tam):
pos2 = log_tam


mapped.close()
cerrar_archivo(todo)

No hay comentarios: