Kafka™ is an open-sourced distributed streaming platform, based on the concept of transaction log where different processes communicate using messages published and processed in a cluster, the core of the service, over one or more servers. The cluster stores streams of records written by producers, partitioned in different “partitions” within different categories called “topics”, where every record consists of a key, a value and a timestamp. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one or many consumers that subscribe to the data written to it. Furthermore Kafka uses a custom binary TCP-based protocol to guarantee a secure and consistent communication.

Once upon a time…
Written in Scala and Java, Kafka was originally developed as internal project in LinkedIn to solve the problem of communication between infrastructures with continuous streams of data (see details here).
The Kafka creator Jay Kreps together with co-founders are considered pioneers because they have improved this project and they realized that there was a business opportunity to create and commercialize Kafka as independent project adopted by Apache Software Foundation. Within a few years Kafka has become a fundamental infrastructure by thousands of companies. The importance of this project led Jay Kreps and his team to focus on it by founding Confluent (see details here), a new streaming platform that works with Kafka ecosystem and improves it by adding open connectors, the REST Proxy service and the Schema Registry. The first provides a RESTful interface to a Kafka cluster, the latter provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format. All these functions should now be available in Confluent Cloud™: the new platform allows to run Kafka and Confluent services in public cloud servers.

Build your streaming platform
In this article I’ll explain how to get up and running with a very simple streaming platform on a single server (standalone mode) using Kafka Connect and two open supported connectors: the Twitter source connector and the MongoDB sink connector to read data from Twitter, process and store them in a MongoDB database. Source connectors import data from another system to Kafka and Sink connectors export data.
A Kafka deployment surely will not work without Zookeeper. Indeed it plays an important role in the Kafka ecosystem, as described in details in this Quora question. This is used for:
- Electing a broker as controller to manage leader/follower relationship between partitions;
- Managing the partitions as parts of the cluster;
- Managing the topic configuration information;
- Quotas – how much data is each client allowed to read and write;
- ACLs – what process is allowed to write and read to which topic.
This test will be executed in Linux OS where Confluent is officially supported and JVM correctly configured. The following list contains every tool you need:
- Confluent platform (see https://www.confluent.io/download/);
- A registered account in Twitter dev program (see https://dev.twitter.com/);
- MongoDB installed and running with access credentials (see https://www.mongodb.com/);
- Twitter source connector (see GitHub repository);
- MongoDB sink connector (see GitHub repository);
- Maven installed and configured (see the tutorial here);
- Gradle installed and configured (see the tutorial here).




Running Confluent Platform
Start Zookeeper, the Kafka brokers and the Schema Registry. Every service will run in its own terminal (see here for details). Assuming that $CONFLUENT_HOME refers to the root of your Confluent Platform installation:
$CONFLUENT_HOME/bin/zookeeper-server-start \
$CONFLUENT_HOME/etc/kafka/zookeeper.properties &
$CONFLUENT_HOME/bin/kafka-server-start \
$CONFLUENT_HOME/etc/kafka/server.properties &
$CONFLUENT_HOME/bin/schema-registry-start \
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
Running Kafka-source-connect-twitter
Build the cloned source code for example with Maven (see here for details):
clean package |
Put the JAR file location into your CLASSPATH
export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar |
Visit https://apps.twitter.com/ and Create a New App (if not already done) to obtain the required access keys and setting up the connector properties located in twitter-source.properties. Here is an example:
// Intentionally showing only dependencies
"dependencies": {
"react": "^15.4.2",
"react-bootstrap": "^0.30.7",
"react-dom": "^15.4.2",
"react-redux": "^5.0.2",
"react-router": "^3.0.1",
"react-router-bootstrap": "^0.23.1",
"react-router-redux": "^4.0.7",
"redux": "^3.6.0",
"redux-form": "^6.4.3",
"redux-saga": "^0.14.3"
},
"devDependencies": {
"babel-core": "^6.21.0",
"babel-loader": "^6.2.10",
"babel-polyfill": "^6.20.0",
"babel-preset-es2015": "^6.18.0",
"babel-preset-react": "^6.16.0",
"babel-preset-stage-3": "^6.17.0",
"babel-runtime": "^6.20.0",
"clean-webpack-plugin": "^0.1.15",
"css-loader": "^0.26.1",
"enzyme": "^2.7.0",
"extract-text-webpack-plugin": "^1.0.1",
"ignore-styles": "^5.0.1",
"mocha": "^3.2.0",
"node-sass": "^4.3.0",
"react-addons-test-utils": "^15.4.2",
"react-hot-loader": "^1.3.1",
"redux-freeze": "^0.1.5",
"sass-loader": "^4.1.1",
"style-loader": "^0.13.1",
"webpack": "^1.14.0",
"webpack-dev-server": "^1.16.2",
"whatwg-fetch": "^2.0.1"
}
}
Then start a Kafka Connect source instance. A new terminal will be open in Twitter source root directory
$CONFLUENT_HOME/bin/connect-standalone connect-source-standalone.properties twitter-source.properties |
Verify everything works well opening a consumer instance to receive data:
$CONFLUENT_HOME/bin/kafka-avro-console-consumer --topic tweets --zookeeper localhost:2181 |
Running MongoDB-sink-connector
This time use Gradle to build the cloned source code
cd MongoDb-Sink-Connector |
Export the JAR location in CLASSPATH including the old one
export CLASSPATH=$CLASSPATH:/pathto/kafka-connect-mongodb-sink-*.jar |
Modify sink.properties file by following this example
# Kafka consumer configuration
name=kafka-connector-mongodb-sink
# Kafka connector configuration
connector.class=org.radarcns.mongodb.MongoDbSinkConnector
tasks.max=1
# Topics that will be consumed (remember the topic produced from twitter connector)
topics=tweets
# MongoDB server
mongo.host=localhost
mongo.port=27017
# MongoDB configuration
mongo.username=mongodb-username
mongo.password=***
mongo.database=mongodb-database
# Collection name for putting data into the MongoDB database. The {$topic} token will be replaced by the Kafka topic name.
#mongo.collection.format={$topic}
# Factory class to do the actual record conversion
record.converter.class=org.radarcns.serialization.RecordConverterFactory
Then run the MongoDB-Sink-Connector as example in standalone mode
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties \
./sink.properties
You can view the new data in MongoDB stored in the collection with the same name of the topic choosen with additional OFFSETS collection.