Class BaseKafkaConsumer<T,R>

java.lang.Object
it.gov.pagopa.common.reactive.kafka.consumer.BaseKafkaConsumer<T,R>
Type Parameters:
T - The type of the message to read and deserialize
R - The type of the message resulted

public abstract class BaseKafkaConsumer<T,R> extends Object
Base class to extend in order to configure a timed commit behavior when using KafkaBinder. Other than extend this class, you should:
  1. Turn off the autoCommit (spring.cloud.stream.kafka.bindings.BINDINGNAME.consumer.autoCommitOffset=false)
  2. Set the ackMode to MANUAL_IMMEDIATE (spring.cloud.stream.kafka.bindings.BINDINGNAME.consumer.ackMode=MANUAL_IMMEDIATE)
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    protected static final String
    Key used inside the Context to store a msg identifier used for logging purpose
    protected static final String
    Key used inside the Context to store the startTime
  • Constructor Summary

    Constructors
    Modifier
    Constructor
    Description
    protected
    BaseKafkaConsumer(String applicationName)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    protected T
    deserializeMessage(org.springframework.messaging.Message<String> message)
    It will read and deserialize Message.getPayload() using the given getObjectReader()
    protected void
    doFinally(org.springframework.messaging.Message<String> message, Map<String,Object> ctx)
    to perform some operation at the end of business logic execution, thus before to wait for commit.
    protected reactor.core.publisher.Mono<R>
    execute(org.springframework.messaging.Message<String> message, Map<String,Object> ctx)
    It will deserialize the message and then call the execute(Object, Message, Map) method
    final void
    execute(reactor.core.publisher.Flux<org.springframework.messaging.Message<String>> messagesFlux)
    It will ask the superclass to handle the messages, then sequentially it will acknowledge them
    protected abstract reactor.core.publisher.Mono<R>
    execute(T payload, org.springframework.messaging.Message<String> message, Map<String,Object> ctx)
    The function invoked in order to process the current message
    protected abstract Duration
    The Duration to wait before to commit processed messages
    Name used for logging purpose
    protected abstract com.fasterxml.jackson.databind.ObjectReader
    The ObjectReader to use in order to deserialize the input message
    protected abstract Consumer<Throwable>
    onDeserializationError(org.springframework.messaging.Message<String> message)
    The action to take if the deserialization will throw an error
    protected abstract void
    subscribeAfterCommits(reactor.core.publisher.Flux<List<R>> afterCommits2subscribe)
    Flux to which subscribe in order to start its execution and eventually perform some logic on results

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Field Details

    • CONTEXT_KEY_START_TIME

      protected static final String CONTEXT_KEY_START_TIME
      Key used inside the Context to store the startTime
      See Also:
    • CONTEXT_KEY_MSG_ID

      protected static final String CONTEXT_KEY_MSG_ID
      Key used inside the Context to store a msg identifier used for logging purpose
      See Also:
  • Constructor Details

    • BaseKafkaConsumer

      protected BaseKafkaConsumer(String applicationName)
  • Method Details

    • execute

      public final void execute(reactor.core.publisher.Flux<org.springframework.messaging.Message<String>> messagesFlux)
      It will ask the superclass to handle the messages, then sequentially it will acknowledge them
    • getCommitDelay

      protected abstract Duration getCommitDelay()
      The Duration to wait before to commit processed messages
    • subscribeAfterCommits

      protected abstract void subscribeAfterCommits(reactor.core.publisher.Flux<List<R>> afterCommits2subscribe)
      Flux to which subscribe in order to start its execution and eventually perform some logic on results
    • doFinally

      protected void doFinally(org.springframework.messaging.Message<String> message, Map<String,Object> ctx)
      to perform some operation at the end of business logic execution, thus before to wait for commit. As default, it will perform an INFO logging with performance time
    • getFlowName

      public String getFlowName()
      Name used for logging purpose
    • execute

      protected reactor.core.publisher.Mono<R> execute(org.springframework.messaging.Message<String> message, Map<String,Object> ctx)
      It will deserialize the message and then call the execute(Object, Message, Map) method
    • getObjectReader

      protected abstract com.fasterxml.jackson.databind.ObjectReader getObjectReader()
      The ObjectReader to use in order to deserialize the input message
    • onDeserializationError

      protected abstract Consumer<Throwable> onDeserializationError(org.springframework.messaging.Message<String> message)
      The action to take if the deserialization will throw an error
    • execute

      protected abstract reactor.core.publisher.Mono<R> execute(T payload, org.springframework.messaging.Message<String> message, Map<String,Object> ctx)
      The function invoked in order to process the current message
    • deserializeMessage

      protected T deserializeMessage(org.springframework.messaging.Message<String> message)
      It will read and deserialize Message.getPayload() using the given getObjectReader()