Class BaseKafkaConsumer<T,R>
java.lang.Object
it.gov.pagopa.common.reactive.kafka.consumer.BaseKafkaConsumer<T,R>
- Type Parameters:
T- The type of the message to read and deserializeR- The type of the message resulted
Base class to extend in order to configure a timed commit behavior when using KafkaBinder.
Other than extend this class, you should:
- Turn off the autoCommit (spring.cloud.stream.kafka.bindings.BINDINGNAME.consumer.autoCommitOffset=false)
- Set the ackMode to MANUAL_IMMEDIATE (spring.cloud.stream.kafka.bindings.BINDINGNAME.consumer.ackMode=MANUAL_IMMEDIATE)
-
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected TdeserializeMessage(org.springframework.messaging.Message<String> message) It will read and deserializeMessage.getPayload()using the givengetObjectReader()protected voidto perform some operation at the end of business logic execution, thus before to wait for commit.protected reactor.core.publisher.Mono<R> It will deserialize the message and then call theexecute(Object, Message, Map)methodfinal voidIt will ask the superclass to handle the messages, then sequentially it will acknowledge themprotected abstract reactor.core.publisher.Mono<R> The function invoked in order to process the current messageprotected abstract DurationTheDurationto wait before to commit processed messagesName used for logging purposeprotected abstract com.fasterxml.jackson.databind.ObjectReaderTheObjectReaderto use in order to deserialize the input messageonDeserializationError(org.springframework.messaging.Message<String> message) The action to take if the deserialization will throw an errorprotected abstract voidsubscribeAfterCommits(reactor.core.publisher.Flux<List<R>> afterCommits2subscribe) Fluxto which subscribe in order to start its execution and eventually perform some logic on results
-
Field Details
-
CONTEXT_KEY_START_TIME
Key used inside theContextto store the startTime- See Also:
-
CONTEXT_KEY_MSG_ID
Key used inside theContextto store a msg identifier used for logging purpose- See Also:
-
-
Constructor Details
-
BaseKafkaConsumer
-
-
Method Details
-
execute
public final void execute(reactor.core.publisher.Flux<org.springframework.messaging.Message<String>> messagesFlux) It will ask the superclass to handle the messages, then sequentially it will acknowledge them -
getCommitDelay
TheDurationto wait before to commit processed messages -
subscribeAfterCommits
protected abstract void subscribeAfterCommits(reactor.core.publisher.Flux<List<R>> afterCommits2subscribe) Fluxto which subscribe in order to start its execution and eventually perform some logic on results -
doFinally
protected void doFinally(org.springframework.messaging.Message<String> message, Map<String, Object> ctx) to perform some operation at the end of business logic execution, thus before to wait for commit. As default, it will perform an INFO logging with performance time -
getFlowName
Name used for logging purpose -
execute
protected reactor.core.publisher.Mono<R> execute(org.springframework.messaging.Message<String> message, Map<String, Object> ctx) It will deserialize the message and then call theexecute(Object, Message, Map)method -
getObjectReader
protected abstract com.fasterxml.jackson.databind.ObjectReader getObjectReader()TheObjectReaderto use in order to deserialize the input message -
onDeserializationError
protected abstract Consumer<Throwable> onDeserializationError(org.springframework.messaging.Message<String> message) The action to take if the deserialization will throw an error -
execute
protected abstract reactor.core.publisher.Mono<R> execute(T payload, org.springframework.messaging.Message<String> message, Map<String, Object> ctx) The function invoked in order to process the current message -
deserializeMessage
It will read and deserializeMessage.getPayload()using the givengetObjectReader()
-