<代码>
进口org.apache.pulsar.client.api.Consumer;
进口org.apache.pulsar.client.api.Message;
进口org.apache.pulsar.client.api.PulsarClient;
进口org.apache.pulsar.client.api.SubscriptionInitialPosition;
进口org.apache.pulsar.client.api.SubscriptionType;
进口org.apache.pulsar.client.impl.schema.JSONSchema;
公开课ReceiveMsgTest {
公共静态void main (String [] args) {
字符串url=" http://192.168.1.48:8080 ";
尝试{
PulsarClient客户=PulsarClient.builder ()
.serviceUrl (url)
.build ();
Consumer消费者=client.newConsumer (JSONSchema.of (UserModel.class))
.topic(“我的租客/my-namespace testschema-topic”)
.subscriptionType (SubscriptionType.Exclusive)//订阅模式独家(独占,默认模式)故障转移(灾备)分享(共享)
.subscriptionName (wbq_1)//订阅者名称
.subscribe ();
而(真){
MessageuserModelmsg=consumer.receive ();
UserModel UserModel=userModelmsg.getValue ();
system . out。println(“接收信息:”+ userModel.getName () + "=" + userModel.getAge ());
consumer.acknowledge (userModelmsg.getMessageId());//应答后此订阅者不会在收到此消息
}
}捕捉(异常e) {
e.printStackTrace ();
}
}
}代码>
<代码>
公开课UserModel {
私人字符串名称;
私人int年龄;
公共字符串getName () {
返回名称;
}
公共空间setName(字符串名称){
this.name=名称;
}
公共int getAge () {
返回年龄;
}
公共空间setAge (int年龄){
这一点。年龄=年龄;
}
}代码>