利用Logstash实时同步mysql数据至elasticsearch

  • 利用Logstash实时同步mysql数据至elasticsearch已关闭评论
  • 73 views
  • A+
所属分类:elasticsearch MySQL

1.安装elasticsearch,具体安装办法 点此链接

2.正确安装mysql。并建立新表 abc (我的mysql测试地址为 192.168.1.108 )

往测试的新建表 member,表结构如下:

利用Logstash实时同步mysql数据至elasticsearch

加入测试数据

利用Logstash实时同步mysql数据至elasticsearch

3.下载logstash-6.3.2.tar.gz 上传至服务器,我的测试目录为( /elsearch/ )

4.解压 logstash

cd /elsearch/

tar zxvf logstash-6.3.2.tar.gz

在 logstash 目录中新建 etc 目录,准备存放配置文件

(服务器具体目录为 /elsearch/logstash-6.3.2/etc/ )

5.下载 mysql-connector-java-6.0.6.jar 并上传到 /elsearch/logstash-6.3.2/etc/

6.在 etc 目录中新建mysql.conf配置文件,上体配置内容为:

input {

jdbc {

# mysql 数据库链接,shop为数据库名

jdbc_connection_string => "jdbc:mysql://192.168.1.108:3306/abc"

# 用户名和密码

jdbc_user => "root"

jdbc_password => "12345678"

# 驱动

jdbc_driver_library => "/elsearch/logstash-6.3.2/etc/mysql-connector-java-6.0.6.jar"

# 驱动类名

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "100"

# 执行的sql

statement =>"SELECT * FROM member"

# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

# 索引类型

type => "abcmember"

}

}

output {

elasticsearch {

hosts => ["127.0.0.1:9200"]

index => "abc"

document_id => "%{id}"

}

stdout {

codec => json_lines

}

}

7.启动 logstash

cd /elsearch/logstash-6.3.2/

bin/logstash -f etc/mysql.conf

执行命令后,控制台会有以下输出:

利用Logstash实时同步mysql数据至elasticsearch

再用head插件来,观察数据是否已经在elsearch中

利用Logstash实时同步mysql数据至elasticsearch
利用Logstash实时同步mysql数据至elasticsearch

到此,mysql表中的数据已经顺利导入到了 elsearch中。

以上只是数据库中单表的数据同步到了elsearch中,如果要同步多张表的数据,只需要修改mysql.conf配置文件,根据 type 来判断就可以。可以修改为:

input {

jdbc {

# mysql 数据库链接,shop为数据库名

jdbc_connection_string => "jdbc:mysql://192.168.1.108:3306/abc"

# 用户名和密码

jdbc_user => "root"

jdbc_password => "12345678"

# 驱动

jdbc_driver_library => "/elsearch/logstash-6.3.2/etc/mysql-connector-java-6.0.6.jar"

# 驱动类名

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "100"

# 执行的sql

statement =>"SELECT * FROM member"

# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

# 索引类型

type => "abcmember"

}

jdbc {

# mysql 数据库链接,shop为数据库名

jdbc_connection_string => "jdbc:mysql://192.168.1.108:3306/abc"

# 用户名和密码

jdbc_user => "root"

jdbc_password => "12345678"

# 驱动

jdbc_driver_library => "/elsearch/logstash-6.3.2/etc/mysql-connector-java-6.0.6.jar"

# 驱动类名

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "100"

# 执行的sql

statement =>"SELECT * FROM aabbcc"

# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

# 索引类型

type => "aabbcc"

}

}

output {

if [type]=="abcmember" {

elasticsearch {

hosts => ["127.0.0.1:9200"]

index => "abc"

document_id => "%{id}"

}

}

if [type]=="aabbcc" {

elasticsearch {

hosts => ["127.0.0.1:9200"]

index => "aabbcc"

document_id => "%{id}"

}

}

stdout {

codec => json_lines

}

}

重启logstash,到head插件中查看就可以看到会有两个index,并且数据也会顺利同步到elsearch中。

  • 我的微信
  • 微信扫一扫
  • weinxin
  • 微信公众号
  • 微信公众号扫一扫
  • weinxin
avatar