你好,请教一下
我用的是
flume-release-1.7.0 + flume-ng-sql-source-1.5.0 + Hadoop 2.7.6 + MySQL 5.7.17
都是分别从源码编译打包部署的,
flume.conf如下:
agent1.channels = ch1
agent1.sinks = h1
agent1.sources = ss1
agent1.channels.ch1.type = memory
agent1.sources.ss1.channels = ch1
agent1.sources.ss1.type = org.keedio.flume.source.SQLSource
agent1.sources.ss1.hibernate.connection.url = jdbc:mysql://129.1.99.1:3306/test
agent1.sources.ss1.hibernate.connection.user = root
agent1.sources.ss1.hibernate.connection.password = root
agent1.sources.ss1.table = user
agent1.sources.ss1.start.from = 0
agent1.sources.ss1.custom.query = select * from user where id > $@$ order by id asc
agent1.sources.ss1.batch.size = 1000
agent1.sources.ss1.max.rows = 1000
agent1.sources.ss1.run.query.delay=15000
agent1.sources.ss1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.ss1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.ss1.connection.autocommit = true
agent1.sources.ss1.status.file.path = /opt/flume/status
agent1.sources.ss1.status.file.name = ss1.status
agent1.sinks.h1.channel = ch1
agent1.sinks.h1.type = hdfs
agent1.sinks.h1.hdfs.path = hdfs://129.1.99.5:9000/flume/mysql/%y-%m-%d/%H%M/%S
agent1.sinks.h1.hdfs.writeFormat = Text
agent1.sinks.h1.hdfs.rollSize = 268435456
agent1.sinks.h1.hdfs.rollInterval = 0
agent1.sinks.h1.hdfs.rollCount = 0
agent1.sinks.h1.hdfs.fileType = DataStream
agent1.sinks.h1.hdfs.threadsPoolSize = 10
agent1.sinks.h1.hdfs.filePrefix = evt-
agent1.sinks.h1.hdfs.useLocalTimeStamp = true
agent1.sinks.h1.hdfs.round = true
agent1.sinks.h1.hdfs.roundValue = 10
agent1.sinks.h1.hdfs.roundUnit = minute
agent1.sinks.h1.hdfs.retryInterval = 60
启动后最终会定时的打印
2018-05-16 17:37:17,588 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select * from user where id > 89 order by id asc limit ?
sql中会多一个 ‘limit ?’
数据可以正常读取写入到hdfs,但是flume运行中在mysql添加新数据是无法准实时新增到hdfs中的
想请问下可能是哪里出了问题?
谢谢!