Mise en oeuvre de logstash

Publié par Edouard le

Présentation

Logstash est un moteur de découpage de logs. Il est déjà riche de nombreux plugins et expressions régulières pour comprendre une très large variété de logs (system, apache, postfix, etc). Il est de plus très souple pour configurer les règles et tries des logs qu’on lui envoit.

Le logiciel logstash possède de nombreux avantages, entres autres:

  • Il est compatible avec les protocoles courants comme syslog, idéal pour s’insérer dans des architectures existantes
  • il est extensible dans le sens où tout est configuré en utilisant des expressions régulières, dont beaucoup sont déjà fournies
  • il est très scalable car il repose sur une base de données elasticsearch (qui peut fonctionner en cluster), des services de « cache » (broker)

Le projet existe déjà depuis quelques années et possède une bonne communauté, avec de nombreux projets orbitant autour de cette solution.

Il supporte aussi très bien la mise à l’échelle, via des éventuelles clients pour pre-parser les logs, un gestionnaire de cache ou broker (par exemple redis), et une base de données modernes types noSQL (elasticsearch).

Mise en oeuvre

Pour la mise en oeuvre au sein d’une architecture simple, logstash se positionne sur un serveur central faisant office de collecteur de logs. Sur ce serveur logstash est configuré avec une entrée de type syslog sur le port 5544 (par ex).

Les clients envoyent leurs logs via rsyslog, exactement comme il le ferait vers un collecteur syslog classique.

Configuration des dépôts yum

Il y a un dépôt yum pour elasticsearch, et un pour logstash.

Fichier de configuration du dépôt pour elasticsearch dans /etc/yum.repos.d/elasticsearch.repo:

[elasticsearch-1.1]
name=Elasticsearch repository for 1.1.x packages
baseurl=http://packages.elasticsearch.org/elasticsearch/1.1/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1

Fichier de configuration du dépôt pour elasticsearch dans/etc/yum.repos.d/logstash.repo:

[logstash-1.4]name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1

Installation des paquets

L’installation de la base de données se fait avec la commande suivante:

yum install elasticsearch

​L’installation de logstash se fait avec la commande suivante:

yum install logstash

Configuration

Voici un fichier de configuration d’exemple, avec des entrées de type syslog, un filtre pour comprendre les logs rsyslog, et un stockage en sortie dans elasticsearch.

Ouverture en entrée d'un port d'écoute utilisant le protocol syslog
input {
  tcp {
    port => 5544 
    type => syslog 
  }
  udp {
    port => 5544 
    type => syslog 
  }
}
filter {
  # Traitement type syslog, le type étant marqué sur les données entrant par nos ports de type syslog
  if [type] == "syslog" {
    grok {
      # Si on ne veut pas garder le message non traité
      overwrite => "message"
      match => {
        # rsyslong envoi des messages de type : Ligne Syslog avec le message
        "message" => "^(?:<%{NONNEGINT:syslog_pri}>)? %{SYSLOGBASE2} %{GREEDYDATA:message}" 
      }
      # on ajoute des tags perso, pratique pour filtrer dans l'interface kibana
      add_tag => [ "syslog", "grokked" ] 
    }
    # On ignore le champ syslog_pri
    syslog_pri { }
# on crée une colonne hostip avec le contenu de la variable hostmutate {  add_field => [ "hostip", "%{host}" ] }# on resoud l'ip présent dans la colonne hostdns {  reverse => [ "host" ]   action => "replace" }
}
}
on stock dans elasticsearch
output {
elasticsearch {
host => "localhost" 
}
}

Explication

La partie « input » est suffisament explicite. Le seul intérêt est qu’on marque ce qui entre par les ports indiqués comme étant de type « syslog ». Cela permet de faire des traitements conditionnels plus loin si besoin en fonction du type de logs.

La partie « filter » est toujours la plus complexe. C’est dans cette partie que le traitement de la ligne de log reçue ce fait. Le plugin grok ici utilisé connait déjà beaucoup d’expressions régulières pour nous aider à découper nos lignes. C’est ce que fait en gros le paramètre « match » qui indique que le champ « message » (qui correspond à la ligne qu’on vient de recevoir), va être découpé selon l’expression suivante:

"^(?:<%{NONNEGINT:syslog_pri}>)?%{SYSLOGBASE2} %{GREEDYDATA:message}"

Une ligne type syslog ressemble à ça:

<Numero>Apr 5 09:36:33 syslog_facility.priorité client programme: message

Notre regexp précédente cherche bien un entier non negatif entre < >, qui sera stocké dans la colonne/variable syslog_pri.

La suite de notre regexp utilise des clefs proposées par grok, déjà écrites. Elles sont définies dans /opt/logstash/patterns. Par ex SYSLOGBASE2 correspond à:

(?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:

SYSLOGTIMESTAMP et TIMESTAMP_ISO8601 sont aussi des clefs fournies par logstash et correspondent à des formats date type syslog. Exactement ce qui arrive après notre <Numero> dans notre ligne de log. La date ainsi trouvée sera stockée dans la variable timestamp pour une date au format classique, ou timestamp8601 si elle est au format iso8601.

Le reste de la ligne va récupérer le syslog_facility grâce au pattern SYSLOGFACILITY qui correspond à <%{NONNEGINT:facility}.%{NONNEGINT:priority}>. Ce qui va stocker la syslog_facility dans facility, et la priorité dans priority.

Le nom du client envoyant le log sera quand à lui stocké dans logsource.

Enfin le nom du programme est récupéré, suivi de : puis le message qui correspond au reste de la ligne (avec plein de caractères donc de type GREEDYDATA).

Voilà. Pour résumer notre ligne si elle correspond à l’expression que l’on cherche, est découpée et certaines parties (car tout n’est pas toujours à garder) sont ainsi stockées dans des variables. Comme au final nous stockons tout dans une base elasticsearch, qui est de type noSQL, nos variables seront en fait des colonnes dans la base. Et comme c’est du noSQL, chaque ligne de notre base peut avoir des colonnes différentes.

Ensuite dans notre filtre, nous avons préciser que nous voulions écraser la variable message, avec ce que nous découperons dans notre ligne. Car la ligne d’origine est déjà stockée dans la variable message. Si vous ne faites pas ça, la variable message que nous créons dans notre regexp sera ajoutée à la variable message déjà existante et qui deviendra du coup un tableau (message[0] ligne d’origine, message[1] notre variable traitée). C’est utilie éventuellement pour debuguer, mais au final c’est pas lisible, mieux vaut l’écraser.

Et enfin, notre plugin grok, après avoir trouvé nos variable, va ajouter des tags. C’est surtout utile pour retravailler certaines choses plus loin, ou tout simplemet pour filtrer les messages ensuite dans l’interface web. Ici nous ajoutons les tags syslog et grokked. Si le filtre echou (par ex la ligne ne ressemble pas au match indiqué), le tag _grokparsefailure est automatiquement ajouté.

Configuration des clients

Ici rien de particulier, nous pouvons directement utiliser rsyslog pour envoyer les logs sur les entrées logstash. Voici un exemple de configuration pour rsyslog, à mettre par exemple dans /etc/rsyslog.d/logstash.conf:

$template LOGSTASH,"<%PRI%>%TIMESTAMP% %HOSTNAME% %syslogtag%%msg%"
$ActionForwardDefaultTemplate LOGSTASH
*.*	@192.168.0.1:5544

En supposant que notre serveur de collecte possède l’adresse 192.168.0.1. Sans oublier:

service rsyslog restart

Restera plus qu’à configurer l’interface web kibana sur le collecteur pour apprécier tout ça. Enfin des logs facilement exploitable, toujours intéressant ce qu’on y découvre…