调用SDK消费消息

<p class="shortdesc">消息发送成功后,需要启动消费者消费消息。本文主要以调用 TCP 协议的Java SDK 为例,说明如何调用SDK消费消息。</p> <section><div class="tasklabel"><h2 class="doc-tairway">操作步骤</h2></div><ol class="ol steps"><li class="li step stepexpand"> <span class="ph cmd">根据以下说明设置相关参数,并运行示例代码消费消息。</span> <div class="itemgroup info"> <pre class="pre codeblock"><code>public static Consumer initConsumer() throws FCException { // 准备参数。 Properties properties = new Properties(); // 业务系统可以从配置文件中取出属性值,demo中写死。 properties.setProperty(FCConstant.NAME_SERVER_ADDRESS,"NAME_SERVER_ADDR-demo");// 必须配置,环境IP地址。 properties.setProperty(FCConstant.CONSUMER_ID, Constant.CID);// 必须配置,在控制台创建。 // 设置ak/sk,从控制台获取。 properties.setProperty(FCConstant.ACCESS_KEY, "ACCESSKEY-demo"); properties.setProperty(FCConstant.SECRET_KEY, "SECRETKEY-demo"); properties.setProperty(FCConstant.INSTANCE_NAME, UUID.randomUUID().toString());// INSTANCE_NAME属性建议设置为主机ip+随机串。 properties.setProperty("CONSUMER_THREAD_NUM", "10"); Consumer consumer = FCFactory.createConsumer(properties); // 设置第一次消费消息的时间点,这里设置从最后的消息开始获取。 // 如果不设置该值,默认从队列的最开始消费消息。 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); return consumer; } public static void subscribe(final Consumer consumer, FCMessageFilter messageFilter, String topic) throws FCException { final SimpleDateFormat smf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss"); // 订阅 topic的消息(topic需要在控制台中创建)。 consumer.subscribe(topic, messageFilter, new FCMessageListener() { // PUSH消费模式下,客户端包会启动后台线程不断从MQCP中拉取消息(准实时方式,毫秒级延时)。 // 业务系统收取到消息后,需要实现FCMessageListener,来处理消息。 @Override public FCConsumeStatus pushMessage(List<FCMessage> messageList) { // 监听到有消息抵达后,业务系统需要遍历messageList对象,来获取消息。 // messageList的默认大小为1,即消息是一条一条的推送到客户端的。 for (FCMessage msg : messageList) { // 消息明细 FCMessage对象。 // 建议业务系统将从消息平台拉取消息和处理消息的逻辑解耦。 // 在Consumer监听器中只监听到消息,建议简单地将获取的消息解析存储,然后返回。 // FCConsumeStatus.CONSUME_OK // 后端可以异步来处理接收到的消息。 log.info(consumer+"--------【{}】:{},\ntime :",new Object[]{new String(msg.getConent()),msg.getMsgId(),smf.format(new Date())}); } return FCConsumeStatus.CONSUME_OK; } }); } </code></pre> </div> </li><li class="li step stepexpand"> <span class="ph cmd">您可通过控制台查看消费者状态,若消费者在线,则说明消费者已成功启动。可参考查看<a class="xref" href="https://pinganyun.com/ssr/help/middleware/pamq/manual.common_operations.subscribe_manage.consumer_state" target="_blank">消费者状态</a>。</span> </li></ol></section>
以上内容是否解决了您的问题?
请补全提交信息!
咨询·建议

电话咨询

400-151-8800

邮件咨询

cloud@pingan.com

在线客服

工单支持

解决云产品相关技术问题