Estos días he trabajado en un script (python) que me permite parsear mi archivo de syslog y subirlo parseado a un DB postgreql, ésta es la primera prueba en la cual pude insertar 3'789.433 líneas en 29 minutos en un equipo modesto, si alguien tiene sugerencias para mejorar el script agradeceré la colaboración
El script normaliza las líneas del syslog quitando espacios innecesarios y separando cada campo en una nueva variable, también parsea el mensaje enviado por el servicio slapd (motivo del análisis), por el momento inserta los datos en una sola tabla, el objetivo es armar algo así como un data warehouse de logs, y otros datos más que nos sirvan.
Aquí les va el script
import re
import datetime
import pg
import sys
import os
import mmap
def abrir_archivo(archivo):
archivo_log = open(archivo,"r")
return archivo_log
def cerrar_archivo(archivo_abierto):
archivo_abierto.close()
def lee_log_file(archivo):
a=abrir_archivo(archivo)
lista=a.readlines();
cerrar_archivo(a)
return lista
def normaliza_linea(linea):
linea = re.sub("\n","",re.sub("\s\s+"," ",linea))
return linea
def parsea_msg_ldap(linea):
if re.search('^conn=(.*)op=',linea):
ldap=linea.split(None,2)
ldap_msg=['op',re.sub("\D","",ldap[0]),re.sub("\D","",ldap[1]),ldap[2].lower()]
elif re.search('^conn=(.*)fd=',linea):
ldap=linea.split(None,2)
ldap_msg=['fd',re.sub("\D","",ldap[0]),re.sub("\D","",ldap[1]),ldap[2].lower()]
else:
ldap_msg=[linea,'-1','-1','-1']
return ldap_msg
def parsear_linea(linea):
mes=carga_meses()
valores=linea.split(None,5)
parseado=[None]*5
parseado[0]=str(datetime.date.today().year)+"-"+mes[valores[0]]+"-"+valores[1]
parseado[1]=valores[2]
parseado[2]=valores[3]
parseado[3]=re.sub("[\d,\[,\],\:]","",valores[4]).lower()
if parseado[3]=='slapd':
parseado[4]=re.sub("[\D]","",valores[4])
parseado+=parsea_msg_ldap(valores[5])
else:
parseado[4]='-1'
parseado.append(valores[5])
parseado.append('-1')
parseado.append('-1')
parseado.append('-1')
return parseado
def carga_meses():
meses={'Jan':'01','Feb':'02','Mar':'03','Apr':'04','May':'05','Jun':'06','Jul':'07','Aug':'08','Sep':'09','Oct':'10','Nov':'11','Dic':'12'}
return meses
def abre_db():
conex=pg.connect(dbname='loguno',user='mike')
accion0=conex.query('BEGIN WORK')
return conex
def cierra_db(conex):
accion0=conex.query('COMMIT')
conex.close()
def insertar_log(linea,tabla,conex):
insertar='INSERT INTO '+tabla+' VALUES (nextval(\'seq_bruta\'),\''+linea[0]+'\',\''+linea[1]+'\',\''+linea[2]+'\',\''+linea[3]+'\','+linea[4]+',\''+linea[5]+'\','+linea[6]+','+linea[7]+',E\''+linea[8]+'\')'
accion=conex.query(insertar)
# Flujo del Script
log_archivo = "/prueba/mi_syslog_de_prueba"
log_tam = os.path.getsize(log_archivo)
tam_arch_map = 524288
pos1 = 0
if (log_tam > tam_arch_map):
pos2 = pos1+tam_arch_map
else:
pos2 = pos1+log_tam
todo = abrir_archivo(log_archivo)
todo.seek(0)
mapped = mmap.mmap(todo.fileno(),0,access=mmap.ACCESS_READ)
while (pos1 < log_tam):
con=abre_db()
mapped.seek(pos1)
pos2 = mapped.rfind('\n',pos1,pos2)
mapped.seek(pos1)
texto = mapped.read(pos2-pos1)
lineas = texto.split('\n')
for logs in lineas:
logs=normaliza_linea(logs)
campos = parsear_linea(logs)
insertar_log(campos,'syslog',con)
cierra_db(con)
pos1 = pos2 + 1
pos2 = pos2+tam_arch_map
if (pos2 >= log_tam):
pos2 = log_tam
mapped.close()
cerrar_archivo(todo)
No hay comentarios:
Publicar un comentario