Data streams between different apps with confluent & Apache Kafka

Kafka™ is an open-sourced distributed streaming platform, based on the concept of transaction log where different processes communicate using messages published and processed in a cluster, the core of the service, over one or more servers. The cluster stores streams of records written by producers, partitioned in different “partitions” within different categories called “topics”, where every record consists of a key, a value and a timestamp. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one or many consumers that subscribe to the data written to it. Furthermore Kafka uses a custom binary TCP-based protocol to guarantee a secure and consistent communication.

https://altereos.anteprimaprogetto.it/wp-content/uploads/2023/01/kafka_apis-640x538.png

Once upon a time…

Written in Scala and Java, Kafka was originally developed as internal project in LinkedIn to solve the problem of communication between infrastructures with continuous streams of data (see details here).
The Kafka creator Jay Kreps together with co-founders are considered pioneers because they have improved this project and they realized that there was a business opportunity to create and commercialize Kafka as independent project adopted by Apache Software Foundation. Within a few years Kafka has become a fundamental infrastructure by thousands of companies. The importance of this project led Jay Kreps and his team to focus on it by founding Confluent (see details here), a new streaming platform that works with Kafka ecosystem and improves it by adding open connectors, the REST Proxy service and the Schema Registry. The first provides a RESTful interface to a Kafka cluster, the latter provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format. All these functions should now be available in Confluent Cloud™: the new platform allows to run Kafka and Confluent services in public cloud servers.

https://altereos.anteprimaprogetto.it/wp-content/uploads/2023/01/ConfluentCloud_FlagImage_forweb-640x475.webp

Build your streaming platform

In this article I’ll explain how to get up and running with a very simple streaming platform on a single server (standalone mode) using Kafka Connect and two open supported connectors: the Twitter source connector and the MongoDB sink connector to read data from Twitter, process and store them in a MongoDB database. Source connectors import data from another system to Kafka and Sink connectors export data.

A Kafka deployment surely will not work without Zookeeper. Indeed it plays an important role in the Kafka ecosystem, as described in details in this Quora question. This is used for:

  1. Electing a broker as controller to manage leader/follower relationship between partitions;
  2. Managing the partitions as parts of the cluster;
  3. Managing the topic configuration information;
  4. Quotas – how much data is each client allowed to read and write;
  5. ACLs – what process is allowed to write and read to which topic.

This test will be executed in Linux OS where Confluent is officially supported and JVM correctly configured. The following list contains every tool you need:

https://altereos.anteprimaprogetto.it/wp-content/uploads/2023/01/kafka_logo-300x156-1.png
Confluent logo - 300x65
https://altereos.anteprimaprogetto.it/wp-content/uploads/2023/01/mongodb_logo-300x81-1.png
https://altereos.anteprimaprogetto.it/wp-content/uploads/2023/01/twitter_logo-300x59-1.png

Running Confluent Platform

Start Zookeeper, the Kafka brokers and the Schema Registry. Every service will run in its own terminal (see here for details). Assuming that $CONFLUENT_HOME refers to the root of your Confluent Platform installation:

$CONFLUENT_HOME/bin/zookeeper-server-start \
$CONFLUENT_HOME/etc/kafka/zookeeper.properties &
$CONFLUENT_HOME/bin/kafka-server-start \
$CONFLUENT_HOME/etc/kafka/server.properties &
$CONFLUENT_HOME/bin/schema-registry-start \
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties

Running Kafka-source-connect-twitter

Build the cloned source code for example with Maven (see here for details):

clean package

Put the JAR file location into your CLASSPATH

export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

Visit https://apps.twitter.com/ and Create a New App (if not already done) to obtain the required access keys and setting up the connector properties located in twitter-source.properties. Here is an example:

// Intentionally showing only dependencies
  "dependencies": {
    "react": "^15.4.2",
    "react-bootstrap": "^0.30.7",
    "react-dom": "^15.4.2",
    "react-redux": "^5.0.2",
    "react-router": "^3.0.1",
    "react-router-bootstrap": "^0.23.1",
    "react-router-redux": "^4.0.7",
    "redux": "^3.6.0",
    "redux-form": "^6.4.3",
    "redux-saga": "^0.14.3"
  },
  "devDependencies": {
    "babel-core": "^6.21.0",
    "babel-loader": "^6.2.10",
    "babel-polyfill": "^6.20.0",
    "babel-preset-es2015": "^6.18.0",
    "babel-preset-react": "^6.16.0",
    "babel-preset-stage-3": "^6.17.0",
    "babel-runtime": "^6.20.0",
    "clean-webpack-plugin": "^0.1.15",
    "css-loader": "^0.26.1",
    "enzyme": "^2.7.0",
    "extract-text-webpack-plugin": "^1.0.1",
    "ignore-styles": "^5.0.1",
    "mocha": "^3.2.0",
    "node-sass": "^4.3.0",
    "react-addons-test-utils": "^15.4.2",
    "react-hot-loader": "^1.3.1",
    "redux-freeze": "^0.1.5",
    "sass-loader": "^4.1.1",
    "style-loader": "^0.13.1",
    "webpack": "^1.14.0",
    "webpack-dev-server": "^1.16.2",
    "whatwg-fetch": "^2.0.1"
  }
}

Then start a Kafka Connect source instance. A new terminal will be open in Twitter source root directory

$CONFLUENT_HOME/bin/connect-standalone connect-source-standalone.properties twitter-source.properties

Verify everything works well opening a consumer instance to receive data:

$CONFLUENT_HOME/bin/kafka-avro-console-consumer --topic tweets --zookeeper localhost:2181

Running MongoDB-sink-connector

This time use Gradle to build the cloned source code

cd MongoDb-Sink-Connector
./gradlew clean build

Export the JAR location in CLASSPATH including the old one

export CLASSPATH=$CLASSPATH:/pathto/kafka-connect-mongodb-sink-*.jar

Modify sink.properties file by following this example

# Kafka consumer configuration
name=kafka-connector-mongodb-sink
# Kafka connector configuration
connector.class=org.radarcns.mongodb.MongoDbSinkConnector
tasks.max=1
# Topics that will be consumed (remember the topic produced from twitter connector)
topics=tweets
# MongoDB server
mongo.host=localhost
mongo.port=27017
# MongoDB configuration
mongo.username=mongodb-username
mongo.password=***
mongo.database=mongodb-database
# Collection name for putting data into the MongoDB database. The {$topic} token will be replaced by the Kafka topic name.
#mongo.collection.format={$topic}
# Factory class to do the actual record conversion
record.converter.class=org.radarcns.serialization.RecordConverterFactory

Then run the MongoDB-Sink-Connector as example in standalone mode

$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties \
./sink.properties

You can view the new data in MongoDB stored in the collection with the same name of the topic choosen with additional OFFSETS collection.

https://altereos.anteprimaprogetto.it/wp-content/uploads/2023/02/logo-altereos.png

Copyright 2023 © Altèreos | All rights reserved

Copyright 2023 © Altèreos | All rights reserved

bt_bb_section_top_section_coverage_image
bt_bb_section_bottom_section_coverage_image