脉冲星功能例子

  

在单机环境下实现字符串追加函数(脉冲星2.4.2版本)

1启动单机脉冲星

美元bin/pulsar-daemon开始独立的

2创建函数

1)准备环境

项目引用编译“org.apache.pulsar: pulsar-functions-api: 2.4.2”

2)创建JAVA函数(此函数用于数据源来的主题模式是字符串,输出的tiopic模式是字符串)

奥龀逍枪δ芾?

导出jar包,放到脉冲星服务器目录下,本例子放在/数据/jar/下

3)使用命令行工具加载函数到脉冲星,

bin/pulsar-admin函数创建\

——名称测试。AppStrFunction \

——jar/数据/jar/pf。jar \

——输入持久://公共/违约/tlstest \

——输出持久://公共/违约/teststr \

——租户公众\

——默认名称空间\

——名字appStrFunction

参数说明:

参数
说明functionscreate创建函数,默认创建成功后启动classname函数类名称,需要加上包名
jarinputs指定函数数据的来源在哪里,支持多个话题作为输入输出如果该函数有输出(有些情况下,函数没有输出),指定函数输出的话题,只能有一个输出租户指定该函数运行的租户名命名空间指定该函数运行的命名空间名称指定该函数运行的名称以下是函数相关其他操作

停止函数

bin/pulsar-admin函数停止\

——租户公众\

——默认名称空间\

——名字appStrFunction

启动函数

bin/pulsar-admin函数开始\

——租户公众\

——默认名称空间\

——名字appStrFunction

删除函数

bin/pulsar-admin函数删除\

——租户公众\

——默认名称空间\

——名字appStrFunction

函数的日志在脉冲星安装目录/日志/功能下

3测试函数

根据前边函数已成功加载启动

1)向tlstest主题发送消息

进口java.util.concurrent.TimeUnit;   进口org.apache.pulsar.client.api.Producer;   进口org.apache.pulsar.client.api.PulsarClient;   进口org.apache.pulsar.client.api.Schema;   公开课SendMsgTest {   公共静态void main (String [] args) {   字符串url=奥龀逍?//192.168.1.48:6650”;   尝试{   PulsarClient客户=PulsarClient.builder ()   .serviceUrl (url)   TimeUnit.SECONDS .connectionTimeout (10)   .build ();   Producer制片人=client.newProducer (Schema.STRING)   .topic (“tlstest”)   TimeUnit.SECONDS .sendTimeout (10)   .producerName (“senduser”)   共创();   生产商。发送(“这是一本书”);   system . out。打印(“发送ok”);   client.close ();   }捕捉(异常e) {   e.printStackTrace ();   }   }   }

2)读取teststr主题消息

进口org.apache.pulsar.client.api.Consumer;   进口org.apache.pulsar.client.api.Message;   进口org.apache.pulsar.client.api.PulsarClient;   进口org.apache.pulsar.client.api.Schema;   进口org.apache.pulsar.client.api.SubscriptionInitialPosition;   进口org.apache.pulsar.client.api.SubscriptionType;   进口org.apache.pulsar.client.impl.schema.JSONSchema;   进口schema.OrderModel;   进口com.alibaba.fastjson.JSON;   公开课RecFunTest {   公共静态void main (String [] args) {   字符串url=" http://192.168.1.48:8080 ";   尝试{   PulsarClient客户=PulsarClient.builder ()   .serviceUrl (url)   .build ();   Consumer消费者=client.newConsumer (Schema.STRING)   .topic (“teststr”)   .subscriptionInitialPosition (SubscriptionInitialPosition.Earliest)   .subscriptionType (SubscriptionType.Exclusive)//订阅模式独家(独占,默认模式)故障转移(灾备)分享(共享)   .subscriptionName (wbq)//订阅者名称   .subscribe ();   而(真){   Messagemondmsg=consumer.receive ();   字符串味精=mondmsg.getValue ();   system . out。println(“接收消息=:”+味精);   }   }捕捉(异常e) {   e.printStackTrace ();   }   }   }


脉冲星功能例子