说明:我们这里每个表对应一个配置文件,当然也可以使用多个表使用一个配置文件(多个jdbc进行配置)

1.准备配置文件表1:

[root@host135 config]# more sync_mysql2es.conf

#logstash输入配置

input {

#jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期

jdbc {

jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"

jdbc_user => root

jdbc_password => mysql

# 是否开启分页

jdbc_paging_enabled => true

# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件

use_column_value => true

#用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名

tracking_column => "unix_ts_in_secs"

# tracking_column 对应字段的类型

tracking_column_type => "numeric"

# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次

schedule => "*/5 * * * * *"

#同步数据的查询sql语句

statement => "select id,name,f_int,f_dou,f_flo,date_format(create_time,'%Y-%m-%d %H:%i:%S') as create_time,date_fo

rmat(update_time,'%Y-%m-%d %H:%i:%S') as update_time, unix_timestamp(update_time) as unix_ts_in_secs from tb_es where

(unix_timestamp(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"

last_run_metadata_path =>"/opt/logstash-6.8.5/logs/11.txt"

}

}

#logstash输入数据的字段匹配和数据过滤

filter {

mutate {

copy => { "id" => "[@metadata][_id]"}

remove_field => ["@version","@timestamp", "unix_ts_in_secs"]

}

}

#logstash输出配置

output {

#采用stdout可以将同步数据输出到控制台,主要是调试阶段使用

stdout { codec => "rubydebug"}

#指定输出到ES的具体索引

elasticsearch {

hosts => ["http://192.168.1.134:19200"]

user => "elastic"

password => "elastic"

index => "index_tb_es01"

document_id => "%{[@metadata][_id]}"

}

}

表2:

[root@host135 config]# more sync_mysql2es_notime.conf

#logstash输入配置

input {

#jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期

jdbc {

jdbc_driver_library => "/soft/mysql-connector-java-5.1.49.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_connection_string => "jdbc:mysql://192.168.1.14:13306/db_hxl"

jdbc_user => root

jdbc_password => mysql

# 是否开启分页

jdbc_paging_enabled => true

# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件

use_column_value => true

#用来控制增量更新的字段,一般是自增id或者创建,更新时间,注意这里要采用sql语句中select采用的字段别名

tracking_column => "id"

# tracking_column 对应字段的类型

tracking_column_type => "numeric"

# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次

schedule => "*/5 * * * * *"

#同步数据的查询sql语句

statement => "select * from tb_es_notime where id>:sql_last_value"

last_run_metadata_path =>"/opt/logstash-6.8.5/logs/22.txt"

}

}

#logstash输入数据的字段匹配和数据过滤

filter {

mutate {

copy => { "id" => "[@metadata][_id]"}

remove_field => ["@version","@timestamp"]

}

}

#logstash输出配置

output {

#采用stdout可以将同步数据输出到控制台,主要是调试阶段使用

stdout { codec => "rubydebug"}

#指定输出到ES的具体索引

elasticsearch {

hosts => ["http://192.168.1.134:19200"]

user => "elastic"

password => "elastic"

index => "index_tb_es_notime"

document_id => "%{[@metadata][_id]}"

}

}

2.编辑pipelines.yml文件在最后面添加如下配置

- pipeline.id: table1

path.config: "/opt/logstash-6.8.5/config/sync_mysql2es.conf"

- pipeline.id: table2

path.config: "/opt/logstash-6.8.5/config/sync_mysql2es_notime.conf"

3.运行[root@host135 bin]# cd /opt/logstash-6.8.5/bin[root@host135 bin]#./logstash