Kafka Connect FileSystem is a Source Connector for reading data from any file system which implements
org.apache.hadoop.fs.FileSystem
class from Hadoop-Common and writing to Kafka.
- Confluent 3.1.1
- Java 8
mvn clean package
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data,hdfs://localhost:9001/data
topic=mytopic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=true
file.reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
file.regexps=^[0-9]*\.txt$
The kafka-connect-fs.properties
file defines:
- The connector name.
- The class containing the connector.
- The number of tasks the connector is allowed to start.
- Comma-separated URIs of the FS(s). They can be URIs pointing directly to a file in the FS.
- Topic in which copy data to.
- Policy class to apply.
- Flag to activate traversed recursion in subdirectories when listing files.
- File reader class to read files from the FS.
- Regular expression to filter files from the FS.
Just list files included in the corresponding URI.
Simple policy with an custom sleep on each execution.
policy.custom.sleep=200000
policy.custom.sleep.fraction=100
policy.custom.max.executions=-1
- Max sleep time (in ms) to wait to look for files in the FS.
- Sleep fraction to divide the sleep time to allow interrupt the policy.
- Max sleep times allowed (negative to disable).
It uses Hadoop notifications events (since Hadoop 2.6.0) and all create/append/close events will be reported as new files to be ingested.
Just use it when your URIs start with hdfs://
Read files with Avro format.
Read files with Parquet format.
Read Sequence files.
Text file reader using custom tokens to distinguish different columns on each line.
file.reader.delimited.header=true
file.reader.delimited.token=,
- If the file contains header or not (default false).
- The token delimiter for columns.
Read plain text files. Each line represents one record.
mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties config/kafka-connect-fs.properties
- Add more file readers.
- Add more policies.
- Manages FS blocks.
- Improve documentation.
- Include a FS Sink Connector.
If you would like to add/fix something to this connector, you are welcome to do so!
Released under the Apache License, version 2.0.