springcloudstreamkafka实例
maven
⽣产者配置
rver:
port:8081
spring:
application:
name:output-demo
cloud:
instance-count:1
instance-index:0
stream:
kafka:
binder:
brokers:localhost:9092
zk-nodes:localhost:2182
auto-add-partitions:true
auto-create-topics:true
min-partition-count:1
bindings:
output:
destination:event-demo
content-type:text/plain
producer:
partitionCount:1
java代码
@EnableBinding()
publicclassSendService{
@Autowired
privateSourcesource;
publicvoidndMessage(Stringmsg){
try{
().nd(yload(msg).build());
}catch(Exceptione){
tackTrace();
}
}
}
@RestController
publicclassProducerController{
@Autowired
privateSendServicervice;
@RequestMapping(value="/nd/{msg}",method=)
publicvoidnd(@PathVariable("msg")Stringmsg){
ssage(msg);
}
}
消费者
spring:
application:
name:input-demo
cloud:
instance-count:1
instance-index:0
stream:
kafka:
binder:
brokers:localhost:9092
zk-nodes:localhost:2182
auto-add-partitions:true
auto-create-topics:true
min-partition-count:1
bindings:
input:
destination:event-demo
group:s1
consumer:
autoCommitOfft:fal
concurrency:1
partitioned:fal
java代码
@EnableBinding()
publicclassMsgSink{
@StreamListener()
publicvoidprocess(Message<?>message){
n(load());
Acknowledgmentacknowledgment=ders().get(LEDGMENT,);
if(acknowledgment!=null){
n("Acknowledgmentprovided");
ledge();
}
}
}
运⾏
先运⾏⽣产者,再运⾏消费者
curl-ilocalhost:8081/nd/hello1
doc
本文发布于:2022-11-25 00:51:17,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/90/15313.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
| 留言与评论(共有 0 条评论) |