Saturday, 18 May 2019

Kafka Streams

Stateless operators

  • branch
  • filter
  • inverseFilter
  • flatMap
  • flatMapValues
  • foreach
  • groupByKey
  • groupBy
  • map
  • mapValues

Stateful operators

  • join
  • aggregate
  • count
  • reduce
  • windowing

Window

  1. Tumbling window
    • Time based, Fixed Size, Non overlapping, Gap less windows
    • For e.g. if window-size=5min and advance-interval =5min then it looks like [0-5min] [5min-10min] [10min-15min].....
  2. Hopping window
    • Time based, Fixed Size, Overlapping windows
    • For e.g. if widow-size=5min and advance-interval=3min then it looks like [0-5min] [3min-8min] [6min-11min]......
  3. Sliding window
    • Fixed size overlapping window that works on the difference between record timestamp
    • Used only for join operation
  4. Session window
    • Session based, Dynamically sized, Non overlapping, Data driven window.
    • Used to aggregate key based events into session.
For more information on windowing, refer Apache Kafka Documentation

Sunday, 12 May 2019

Confluent Schema Registry


Avro

Primitive Types

  1. null
  2. boolean
  3. int (32 bit)
  4. long (64 bit)
  5. float (32 bit)
  6. double (64 bit)
  7. byte[] (8 bit)
  8. string (char squence)

Complex Types

  1. record
  2. enum
  3. array
  4. map
  5. union
  6. fixed

Avro Schema Definition

  • namespace (required)
  • type (required) => record, enum, array, map, union, fixed
  • name (required)
  • doc (optional)
  • aliases (optional)
  • fields (required)
    • name (required)
    • type (required)
    • doc (optional)
    • default (optional)
    • order (optional)
    • aliases (optional)

Confluent Schema Registry

  • Schema Registry stores all schemas in a Kafka topic defined by kafkastore.config=_schemas (default) which is a single partition topic with log compacted.
  • The default response media type application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json are used in response header.
  • HTTP and HTTPS client protocol are supported for schema registry.
  • Prefix to apply to metric names for the default JMX reporter kafka.schema.registry
  • Default port for listener is 8081
  • Confluent support primitive types of null, Boolean, Integer, Long, Float, Double, String, byte[], and complex type of IndexedRecord. Sending data of other types to KafkaAvroSerializer will cause a SerializationException

Schema Compatibility Types

  1. BACKWARD
    • Consumer using schema X can process data produced with schema X or X-1. In case of BACKWARD_TRANSITIVE, consumer using schema X can process data produced with all previous schema X, X-1, X-2 and so on
    • Delete field without default value (Required field) is allowed. In this case, Consumer ignore this field.
    • Add field with default value (Optional field) is allowed. In this case, Consumer will assign default value.
    • BACKWARD is default compatibility type in confluent schema registry.
    • There is no assurance that consumers using older schema can read data produced using the new schema. Therefore, upgrade all consumers before you start producing new events.
  2. FORWARD
    • Data produced using schema X can be ready by consumers with schema X or X-1. In case of FORWARD_TRANSITIVE, data produced using schema X can be ready by consumers with all previous schema X, X-1, X-2 and so on
    • Add field without default value (Required field) is allowed. In this case, Consumer ignore this field.
    • Delete field with default value (Optional field) is allowed. In this case, Consumer will assign default value.
    • There is no assurance that consumers using the new schema can read data produced using older schema. Therefore, first upgrade all producers to using the new schema and make sure the data already produced using the older schema are not available to consumers, then upgrade the consumers.
  3. FULL
    • Backward and forward compatible between schema X and X-1. In case of FULL_TRANSITIVE, backward and forward compatible between all previous schema X and X-1 and X-2 and so on
    • Modify field with default value (Optional field) is allowed.
    • There are assurances that consumers using older schema can read data produced using the new schema and that consumers using the new schema can read data produced using older schema. Therefore, you can upgrade the producers and consumers independently.
  4. NONE
    • Compatibility type means schema compatibility checks are disabled.
    • Upgrading Consumer or Producer depends. For example, modifying a field type from Number to String. In this case, you will either need to upgrade all producers and consumers to the new schema version at the same time


Saturday, 11 May 2019

Kafka Consumer Using Java

package com.abc.demo;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaConsumerTest {

 public static void main(String[] args) throws InterruptedException, ExecutionException{
  //Create consumer property
  String bootstrapServer = "localhost:9092";
  String groupId = "my-first-consumer-group";
  String topicName = "my-first-topic";
  
  Properties properties = new Properties();
  properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  
  //Create consumer
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  
  //Subscribe consumer to topic(s)
  consumer.subscribe(Collections.singleton(topicName));
  
  
  //Poll for new data
  while(true){
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
   
   for(ConsumerRecord<String, String> record: records){
    System.out.println(record.key() + record.value());
    System.out.println(record.topic() + record.partition() + record.offset());
   }
   
   //Commit consumer offset manually (recommended)
   consumer.commitAsync();
  }
  
 }
}

Kafka Producer Using Java

package com.abc.demo;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerTest {

 public static void main(String[] args) throws InterruptedException, ExecutionException{
  //Create producer property
  String bootstrapServer = "localhost:9092";
  Properties properties = new Properties();
  properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  
  //Create safe producer
  properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
  properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
  properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
  
  //High throughput producer (at the expense of a bit of latency and CPU usage)
  properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
  properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); //20ms wait time
  properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); //32KB batch size
  
  //Create producer
  KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  
  //create a producer record
  ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "firstRecord");
  //create producer record with key
  //new ProducerRecord<>("topicName", "MessageKey", "Message");
  //create producer record with key and partition number
  //new ProducerRecord<>("topicName", 1 /*partition number*/, "MessageKey", "Message");
  
  //send data - asynchronous
  //without callback
  //producer.send(record);
  //with callback
  producer.send(record, (recordMetadata, exception) -> {
   if(exception == null){
    System.out.println(recordMetadata.topic() + "+" + recordMetadata.partition() + "+" + recordMetadata.offset());
   }else{
    System.err.println(exception.getMessage());
   }
  });
  
  //send data - synchronous
  //without callback
  //producer.send(record).get(); //.get() make it synchronous call
  
  //flush data
  producer.flush();
  
  //flush and close producer
  producer.close();
 }
}

Top CSS Interview Questions

These CSS interview questions are based on my personal interview experience. Likelihood of question being asked in the interview is from to...