脉冲星消费者例子

  
 <代码>
  进口org.apache.pulsar.client.api.Consumer;
  进口org.apache.pulsar.client.api.Message;
  进口org.apache.pulsar.client.api.PulsarClient;
  进口org.apache.pulsar.client.api.SubscriptionInitialPosition;
  进口org.apache.pulsar.client.api.SubscriptionType;
  进口org.apache.pulsar.client.impl.schema.JSONSchema;
  公开课ReceiveMsgTest {
  公共静态void main (String [] args) {
  字符串url=" http://192.168.1.48:8080 ";
  尝试{
  PulsarClient客户=PulsarClient.builder ()
  .serviceUrl (url)
  .build ();
  Consumer消费者=client.newConsumer (JSONSchema.of (UserModel.class))
  .topic(“我的租客/my-namespace testschema-topic”)
  .subscriptionType (SubscriptionType.Exclusive)//订阅模式独家(独占,默认模式)故障转移(灾备)分享(共享)
  .subscriptionName (wbq_1)//订阅者名称
  .subscribe ();
  而(真){
  MessageuserModelmsg=consumer.receive ();
  UserModel UserModel=userModelmsg.getValue ();
  system . out。println(“接收信息:”+ userModel.getName () + "=" + userModel.getAge ());
  consumer.acknowledge (userModelmsg.getMessageId());//应答后此订阅者不会在收到此消息
  }
  }捕捉(异常e) {
  e.printStackTrace ();
  }
  }
  } 
  
 <代码>
  公开课UserModel {
  
  私人字符串名称;
  
  私人int年龄;
  
  公共字符串getName () {
  返回名称;
  }
  
  公共空间setName(字符串名称){
  this.name=名称;
  }
  
  公共int getAge () {
  返回年龄;
  }
  
  公共空间setAge (int年龄){
  这一点。年龄=年龄;
  }
  } 

脉冲星消费者例子