logstash 使用入门教程

作者:じ☆ve宝贝

发布时间:2017-07-13T15:48:15

安装

前置条件:

安装jdk
yum install java-1.8.0-openjdk
export JAVA_HOME=/usr/java

1、下载安装公共签名密钥

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

2、配置yum源

yum 源在/etc/yum.repos.d/目录下,以.repo结尾,例如logstash.repo

vi /etc/yum.repos.d/logstash.repo
复制如下内容:
	[logstash-5.x]
	name=Elastic repository for 5.x packages
	baseurl=https://artifacts.elastic.co/packages/5.x/yum
	gpgcheck=1
	gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
	enabled=1
	autorefresh=1
	type=rpm-md

3、执行安装

	yum clean all
	yum install logstash
Hello World案例:
/usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
执行配置文件执行
/usr/share/logstash/bin/logstash -f /usr/share/logstash/logstash.conf
结果:
{
	   
	    "@timestamp" => 2017-07-12T02:12:28.516Z,
		"@version" => "1",
		"host" => "localhost",
		"message" => "Hello world"
	}

4、插件安装

查看已安装插件(插件源码地址:https://github.com/logstash-plugins/)

bin/logstash-plugin list 安装插件 bin/logstash-plugin install logstash-output-webhdfs 升级插件 bin/logstash-plugin update logstash-output-webhdfs 本地插件安装 bin/logstash-plugin install /data/logstash/logstash-filter-crash.gem

5、后台运行

nohup方式运行
nohup /usr/share/logstash/bin/logstash -f /etc/logstash/logstash.conf  &>/dev/null &
SCREEN 不受用户登出限制
https://www.studyjava.cn/post/206
采用daemontools
安装
yum -y install supervisord --enablerepo=epel

在 /etc/supervisord.conf 配置文件里添加内容,定义你要启动的程序:

[program:logstash_1]
environment=LS_HEAP_SIZE=5000m
directory=/usr/share/logstash/
command=/usr/share/logstash/bin/logstash -f /etc/logstash/logstash.conf -w 10 -l /var/log/logstash/logstash.log
[program:logstash_2]
environment=LS_HEAP_SIZE=5000m
directory=/usr/share/logstash/
command=/usr/share/logstash/bin/logstash -f /etc/logstash/logstash2.conf -w 10 -l /var/log/logstash/logstash2.log
启动
service supervisord start

停止
supervisorctl stop logstash_2

6、Demo

1、日志文件格式
	2017-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0 (iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||

logstash配置

		#输入源
		input {
			file {
				path => ["/data/logs/logstash-1.log"]
				start_position => "beginning"   #从文件开始处读写
				type => "system"
		   }
		}
		#过滤 拆分条件
		filter {
			#定义数据的格式
			grok {
				match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"}
			}
			#定义时间戳的格式
			date {
				match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
				locale => "cn"
			  }
		}

		output {
			stdout {
				codec => rubydebug
			}
		}

2、自定义解析log4j日志

日志文件格式
2017-07-05 09:45:41 [cv.studyjava.spider.job.jobs.ChoiceReportJob:175]-[main]-[execute]-[WARN] 报告: 温迪数字:2015年度报告(更正公告) 已经存在,路过本次抓取
		2017-07-05 09:45:41 [cv.studyjava.spider.job.jobs.ChoiceReportJob:175]-[main]-[execute]-[WARN] 报告: 温迪数字:2015年度报告(更正公告) 已经存在,路过本次抓取
		2017-07-05 09:45:11 [cv.studyjava.spider.job.jobs.ChoiceReportJob:272]-[main]-[execute]-[FATAL] Choice 报告数据抓取异常
		java.io.IOException: Connection(zbus:80) timeout
				at org.zbus.net.tcp.TcpClient.sendMessage(TcpClient.java:235)
				at org.zbus.net.tcp.TcpClient.invokeSync(TcpClient.java:371)
				at org.zbus.broker.SingleBroker.invokeSync(SingleBroker.java:98)
				at org.zbus.broker.SingleBroker.invokeSync(SingleBroker.java:38)
				at org.zbus.broker.ZbusBroker.invokeSync(ZbusBroker.java:81)
				at org.zbus.broker.ZbusBroker.invokeSync(ZbusBroker.java:21)
				at org.zbus.mq.MqAdmin.invokeSync(MqAdmin.java:63)
				at org.zbus.mq.MqAdmin.createMQ(MqAdmin.java:94)
				at cv.studyjava.spider.job.jobs.ChoiceReportJob.execute(ChoiceReportJob.java:86)
				at cv.studyjava.spider.job.jobs.ChoiceReportJob.main(ChoiceReportJob.java:52)
		2017-07-05 09:45:05 [com.alibaba.druid.pool.DruidDataSource:715]-[main]-[init]-[INFO] {dataSource-1} inited
		2017-07-05 09:45:11 [org.zbus.net.tcp.TcpClient:213]-[main]-[connectSync]-[WARN] Connection(zbus:80) timeout
logstash conf配置
input {
		file {
			path => ["/data/logs/logstash-2.log"]
			start_position => "beginning" #从文件开始处读写
			sincedb_path => "/dev/null"  不记录读取位置,每次强制从头开始
			type => "system"
			codec => multiline {
				pattern => "^[1-9]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1])\s*(20|21|22|23|[0-1]\d):[0-5]\d:[0-5]\d\s+\["
				negate => true
				what => "previous"
			}
		}
	}

	filter {
		grok {
		match => { "message" => "%{GREEDYDATA:timestamp} \[%{DATA:class}\]\-\[%{DATA:threadName}\]\-\[%{DATA:method}\]\-\[%{DATA:logLevel}\] %{GREEDYDATA:content}"}
		}
	date {
		match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
        locale => "cn"
        timezone => "+00:00"
	  }
	}

	output {
		stdout {
			codec => rubydebug
		}
	}

输出ES

在output中添加
	#    elasticsearch {
	#        hosts => ["test41:9200","test42:9200","test43:9200"]
	#        index => "%{hostabc}"
	#        document_type => "%{hostabc}"
	#        #protocol: "http"
	#        flush_size => 100
	#        idle_flush_time => 10
	#        user => "elastic"
	#        password => "baoshan"
	#    }

注意事项

1、Logstash默认不处理一天之前的数据,但是可以通过配置修改

	通过设置file中的ignore_older选项来配置ignore_older => 604800 #不处理一周以前的文件,数字对应的单位为s。

2、让Logstash每次都从头读文件

	要点就在这行 sincedb_path => “/dev/null” 了!该参数用来指定 sincedb 文件名,但是如果我们设置为 /dev/null这个 Linux 系统上特殊的空洞文件,那么 logstash 每次重启进程的时候,尝试读取 sincedb 内容,都只会读到空白内容,也就会理解成之前没有过运行记录,自然就从初始位置开始读取了!
	(实际生产场景中,最好不要这么用,因为日志过大,每次重头读日志耗费资源,也无必要。此处适合测试场景)

3、注意

有些同学测试会在Windows上创建一个txt,改成.log。上传到Linux上,这样可能会导致文件无法读取。应该使用vim在Linux上创建文件,将测试日志粘贴在文件中

####参考资料 ELKstack 中文指南