K
- outgoing record key typeV
- outgoing record value typepublic interface Sender<K,V>
Modifier and Type | Method and Description |
---|---|
void |
close()
Closes this sender and the underlying Kafka producer and releases all resources allocated to it.
|
static <K,V> Sender<K,V> |
create(SenderOptions<K,V> options)
Creates a Kafka sender that appends records to Kafka topic partitions.
|
<T> Mono<T> |
doOnProducer(Function<Producer<K,V>,? extends T> function)
Invokes the specified function on the Kafka
Producer associated with this Sender. |
Mono<Void> |
send(Publisher<ProducerRecord<K,V>> records)
Sends a sequence of producer records to Kafka.
|
<T> Flux<SenderResult<T>> |
send(Publisher<SenderRecord<K,V,T>> records,
boolean delayError)
Sends a sequence of records to Kafka and returns a
Flux of response record metadata including
partition and offset of each send request. |
static <K,V> Sender<K,V> create(SenderOptions<K,V> options)
options
- Configuration options of this sender. Changes made to the options
after the sender is created will not be used by the sender.<T> Flux<SenderResult<T>> send(Publisher<SenderRecord<K,V,T>> records, boolean delayError)
Flux
of response record metadata including
partition and offset of each send request. Ordering of responses is guaranteed for partitions,
but responses from different partitions may be interleaved in a different order from the requests.
Additional correlation metadata may be passed through in the SenderRecord
that is not sent
to Kafka, but is included in the response Flux
to enable matching responses to requests.
Results are published when the send is acknowledged based on the acknowledgement mode
configured using the option ProducerConfig.ACKS_CONFIG
. If acks=0, records are acknowledged
after the requests are buffered without waiting for any server acknowledgements. In this case the
requests are not retried and the offset returned in SenderResult
will be -1. For other ack
modes, requests are retried up to the configured ProducerConfig.RETRIES_CONFIG
times. If
the request does not succeed after these attempts, the request fails and an exception indicating
the reason for failure is returned in SenderResult.exception()
.
Example usage:
source = Flux.range(1, count)
.map(i -> SenderRecord.create(new ProducerRecord<>(topic, key(i), message(i)), i));
sender.send(source, true)
.doOnNext(r -> System.out.println("Message #" + r.correlationMetadata() + " metadata=" + r.recordMetadata()));
records
- Outbound records along with additional correlation metadata to be included in responsedelayError
- If false, send terminates when a response indicates failure, otherwise send is attempted for all recordsMono<Void> send(Publisher<ProducerRecord<K,V>> records)
Mono
is failed immediately if a record cannot
be delivered to Kafka after the configured number of retries in ProducerConfig.RETRIES_CONFIG
.records
- Outbound producer records<T> Mono<T> doOnProducer(Function<Producer<K,V>,? extends T> function)
Producer
associated with this Sender.
The function is invoked when the returned Mono
is subscribed to.
Example usage:
sender.doOnProducer(producer -> producer.partitionsFor(topic))
.doOnSuccess(partitions -> System.out.println("Partitions " + partitions));
Functions that are directly supported on the reactive Sender
interface (eg. send)
should not be invoked from function
. The methods supported by
doOnProducer
are:
function
- A function that takes Kafka Producer as parameterfunction
void close()