Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
226 views
in Technique[技术] by (71.8m points)

Processing incoming payloads as batch not working as expected in spring-cloud-streams

I say 'not working as expected' but actually is more like 'I don't really know if I'm doing the proper work in here', I feel like I'm mixing stuff from different approaches and doesn't really correlate.

Right now I've been using Spring Cloud Streams to process String-type messages from a PubSub subscription and so far so good, message in message out without much of a hassle.

What I'm trying to achieve now is to gather, let's say, 1000 messages, process them and send them altogether to another PubSub Topic. Still unsure about sending them as a List or individually like now, but all at the same time (this shouldn't be related to this question though).

Now I just discovered the following property.

spring.cloud.stream.bindings.input.consumer.batch-mode=true

Together with the following ones more specific to the GCP stuff.

spring.cloud.gcp.pubsub.publisher.batching.enabled=true
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds=300
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold=100

So first question is... Are they linked by any means? Must I have the first one together with the other three?

What happened after I added the previous properties to my application.properties file is actually no change at all. Messages keep arriving and leaving the application without any issue and with no batch approach whatsoever.

Currently using the functional features the following way.

    @Bean
    public Function<Message<String>, String> sampleFunction() {

       ... // Stream processing in here

       return processedString;

    }

I was expecting this to crash with some message since the method only receives a String, not a list of String. Since it didn't crash, I modified the method above to receive a list of String (maybe Spring does some magic behind the scenes to still receive messages as String but collect them in a list for the method to process afterwards?).

    @Bean
    public Function<Message<List<String>>, String> sampleFunction() {

       ... // Stream processing in here

       return processedString;

    }

But this just crashes since it's trying to parse a single String message as a List of String.

How could I prepare the code to batch all those String messages into a List? Is there any example on this?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

...batch-mode only works with binders that support it (e.g. Kafka, RabbitMQ). It doesn't look like the GCP binder supports it (I see no references to the property).

https://github.com/spring-cloud/spring-cloud-gcp/blob/master/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_batch_consumers

Publisher batching is not related to consumer batching.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
...