Apache Spark and Confluent Avro Serialisation With Confluent Schema Registry Made Easy (And Free!)

If you are currently working in the data engineering domain it would be hard to not at least be aware of Apache Spark and Apache Kafka, each a big data titan in their own right, but for those not familiar here is a brief explanation:

Apache Spark is currently the most popular open-sourced large scale data processing framework around being widely used for both batch and stream processing of unstructured data. The open source framework spawned the open core DataBricks proprietary platform which is used by over “five thousand organisations worldwide”.

Apache Kafka is a massively popular open-source distributed event streaming platform used by “More than 80% of all Fortune 100 companies” which also spawned the open core Confluent proprietary event streaming (and processing) platform.

Given these two products popularity integration between the two is very common and easily achieved when using basic messaging serialisation formats (such as JSON) on your Kafka topics (queues). When the message serialisation format is Confluent Avro however, as is recommended by Confluent, things become a bit more difficult. This is because when using Confluent Avro serialisation/deserialisation (serde) the schema of the messages is maintained in a schema registry and is not present in the message itself. This means for serde of the messages we need to query a schema registry for the schema, and so enters another component… the Confluent Schema Registry.

In basic terms the Confluent Schema Registry is a RESTful web service which holds schema definitions and is queryable by unique id. Each message serialised using the schema registry has the id of its schema prepended to the message but not the schema itself (this is in contrast to other serde formats such as JSON where the schema itself is present and repeated in every message which increases volume usage drastically). The serde using the Confluent Schema Registry works as follows.

  1. When a message producer app wants to produce a message onto the topic with a specific Avro schema it first queries the schema registry with the schema it wants to register.
  2. The schema registry then check if that schema already exists in its registry, if it does it returns the unique id of it to the producer app, otherwise it registers it in the registry and then returns the new unique id.
  3. The producers then takes this id and prepends it to the start of the message where the rest of the message is made up of the data serialised using Avro.
  4. The producer then produces this message onto the Kafka topic.
  5. Now the message is on the topic it needs to be deserialised by any application consuming it. The message consumer app starts by reading the message from the Kafka topic.
  6. It then reads the prepended id that was added to the message by the producer.
  7. The consumer then queries the schema registry using the id it read from the message.
  8. The schema registry then returns the Avro schema that is registered in its registry with that id.
  9. The consumer then uses the returned Avro schema to deserialise the part of the message after the id and continues with whatever additional processing it was written to do.

More on the Confluent Schema Registry can be found here.

In our use case it is Apache Spark requiring to do this serde. Currently Apache Spark does not provide any out of the box integration with the Confluent Schema Registry and therefore support for Confluent Avro serde. Most cloud service providers provide their own big data processing service (AWS EMR, GCP Dataproc) with Apache Spark baked in and so in those cases the integration would not be provided either. Databricks do claim to provide this integration with their paid distribution of Spark but having never used this I cannot confirm its usability. Thankfully there is an open sourced library named ABRiS: Avro Bridge for Spark which provides this serde when integrating OS Apache Spark with Apache Kafka.

Walkthrough

Checkout the project that this guide will be walking through found here.

You will need the following installed on your machine to follow this walkthrough and run the applications (the version I used is given below each but does not need to match exactly):

docker 
Docker version 19.03.13, build 4484c46d9d
docker-compose
docker-compose version 1.27.4, build 40524192
scala
Scala code runner version 2.13.2
sbt
sbt version in this project: 1.4.7
sbt script version: 1.3.12
confluent-platform (needed for command line tools, download here)
confluent-platform version 6.0.0

To get started we need a Kafka (with Zookeeper) instance to produce and consume our messages to and from and a Confluence Schema Registry instance to facilitate the serde. I’ve used docker-compose and the following docker-compose.yaml definition to achieve this:

version: "3.0"

services:

zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
restart: always
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: "2181"
ZOOKEEPER_TICK_TIME: "2000"
ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
ports:
- "2181:2181"

kafka1:
image: confluentinc/cp-enterprise-kafka:5.2.1
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: "r1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8081"
KAFKA_JMX_PORT: 9991


schemaregistry:
image: confluentinc/cp-schema-registry:5.2.1
restart: always
depends_on:
- zookeeper
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
ports:
- 8070:8081

Instantiate this by running the following in root directory of the project:

docker-compose up 

The project is an sbt project so I’ve defined the build.sbt as follows to get all the required Spark and ABRiS dependencies we need to run the application:

name := "spark-confluent-avro-poc"

version := "0.1"

scalaVersion := "2.12.13"
mainClass in (Compile, run) := Some("poc.ConfluentAvroToStructureStreaming")
resolvers += "confluent" at "https://packages.confluent.io/maven/"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.7"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7"
libraryDependencies += "za.co.absa" %% "abris" % "4.0.1"

dependencyOverrides ++= {
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
)
}

We need to now create a topic on the local Kafka instance for our Avro messages. Run this command to do so (you will find these bash executables in the bin directory of the confluent-platform package you downloaded).

~/confluent-6.0.0/bin/kafka-topics -create --topic spark-poc --bootstrap-server localhost:29092 --replication-factor 1 --partitions 2

Now the topic is created lets publish a message serialised as Confluent Avro onto the topic using the kafka-avro-console-producer. We start this by running the following command which defines the Avro schema to use inline using the value.schema property:

~/confluent-6.0.0/bin/kafka-avro-console-producer --broker-list localhost:29092 --topic spark-poc --property schema.registry.url=http://localhost:8070 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Once started we can publish some messages by entering:

{"f1": "foo"}
{"f1": "bar"}

Leave this producer running in this terminal so we can return to it to produce more messages.

Publishing these messages will have automatically added the schema to the schema registry if it does not exist already. In a separate terminal we can observe a schema has been created in the registry using the command:

curl http://localhost:8070/subjects

Which should give the result:

["spark-poc-value"]

And to view the schema you can run:

curl http://localhost:8070/subjects/spark-poc-value/versions/latest

Which should give this:

{"subject":"spark-poc-value","version":2,"id":1,"schema":"{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"}

Now we have our Kafka instantiated with a topic created and messages on the topic ready to be consumed let’s review the Spark app that will consume and deserialise these messages using the ABRiS library.

The app is defined in the class ConfluentAvroToStructureStreaming and begins with a config definition for the ABRiS library:

val abrisConfig: FromAvroConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("spark-poc")
.usingSchemaRegistry("http://localhost:8070")

The thing to note here is that we are using the topic name strategy for schema definition in the Schema Registry, this just means the ABRiS lib looks for the schema in the registry based on the topic name appended with -value, so in our case it would look forspark-poc-value which we have already observed has been created by the kafka-avro-console-producer commands we ran earlier. All other configs here are self explanatory (I think?!).

Next we define our consumer Spark Streaming application using the spark session:

val spark = SparkSession
.builder
.appName("Simple Application")
.master("local[*]")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate()

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("startingOffsets", "earliest")
.option("subscribe", "spark-poc")
.load()

This will provide use with a stream of events in Spark with the fields key and value where the value field contains the yet to be deserialised Avro we want.

To deserialise the Avro we then use the from_avro function provided by ABRis passing in the value column reference and the abrisConfig we defined earlier and aliasing the result with the name data. We then select all the nested fields in the data column to bring them to the top level of the schema of our stream:

val deserializedAvro = df.select(from_avro(col("value"), abrisConfig).as("data"))
.select(col("data.*"))

Finally we write the stream to the console so we can view it:

deserializedAvro.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()

Having reviewed the app now we can run it in a new terminal using the following command executed in the project root directory:

sbt "runMain poc.ConfluentAvroToStructureStreaming"

The streaming app will start consuming from the topic and you should see the messages you published earlier being printed to the console with the same schema defined:

-------------------------------------------
Batch: 0
-------------------------------------------
+---+
| f1|
+---+
|foo|
|bar|
+---+

If you then publish more messages to the topic using the kafka-avro-console-producer in the terminal from before you will see them being consumed and printed in this terminal also.

Again leave the app running in this terminal so we can observe further messages being consumed from the topic.

Having successfully consumed and deserialised Confluent Avro messages from Kafka using Spark and ABRis now let’s review the StructureStreamingToAvroConfluentAvro app which will produce messages serialised in Confluent Avro to the Kafka topic.

We start by creating a config for ABRis and instantiating the Spark Session the same as in the consumer app:

val abrisConfig: ToAvroConfig = AbrisConfig
.toConfluentAvro
.downloadSchemaByLatestVersion
.andTopicNameStrategy("spark-poc")
.usingSchemaRegistry("http://localhost:8070")

val spark = SparkSession
.builder
.appName("Simple Application")
.master("local[*]")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate()

Next we need to manipulate our data into the same schema that matches the schema we already have registered in the Confluent Schema Registry:

val dataframe = Seq(("foo2"), ("bar2")).toDF("f1")
val dataframeWithNullRemoved = setNullableStateForAllColumns(dataframe, false)
val formattedForKafka = dataframeWithNullRemoved.select(struct(col("f1")).as("value_struct"))
val formattedForKafkaDataframeWithNullRemoved = setNullableStateForAllColumns(formattedForKafka, false)
val seriliazedAsAvro = formattedForKafkaDataframeWithNullRemoved.select(to_avro(col("value_struct"), abrisConfig).as("value"))

In the first line here we are creating a Spark dataset using an in memory Seq collection (as this is a POC this will suffice). On the second line we are calling a function defined in the class which ensure all fields in the schema have the nullable attribute set to false. By default dataset schemas created within Spark set String fields as nullable while we know in our Avro schema the field is not nullable so this line is to ensure the schemas match in that regard.

The third and forth line puts the value field in a struct and again ensure it is not nullable. Finally the fifth line uses the ABRis to_avro function passing in the struct we want to serialise as Avro and the abrisConfig. The result of this serialisation is then aliased as value as this is the column name the Spark Kafka producer will look for when attempting to produce to Kafka.

The final step in the app is to write the messages to Kafka using:

seriliazedAsAvro
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("topic", "spark-poc")
.save()

Having reviewed producer app we can run it in a separate terminal using the following command:

sbt "runMain poc.StructureStreamingToAvroConfluentAvro"

The Spark app will start producing messages to the topic, the messages should then be consumed by the Spark consumer app we have running in the previous terminal and you should see the messages published by the Spark producer app being printed to the Spark consumer app terminal with the same schema defined:

-------------------------------------------
Batch: 3
-------------------------------------------
+----+
| f1|
+----+
|foo2|
|bar2|
+----+

We have now completed an end to end execution of Spark serialising a message to Confluent Avro and publishing that message to a topic where it has then been consumed by another Spark app which deserialised the Confluent Avro message and printed it to the console.

Although basic Ive tried to cover the fundamentals in getting this integration up and running. Hopefully this POC can give those who require this integration a starting point to expand on.

If you have any questions regarding this article notify me by leaving a comment and I will try to get back to you.