常见问题

<p class="shortdesc"></p> <section class="section" id="faqs__section_mpt_vvg_2mb"><h2 class="doc-tairway">如何判断消息已成功发送?</h2> <p class="p">客户端 Producer 调用 send 消息方法,只要不提示异常,即代表发送成功。但发送成功会有多个状态,在 sendResult 里定义。</p> <table class="table" id="faqs__table_dfl_jyg_2mb"><caption></caption><colgroup><col><col></colgroup><thead class="thead"> <tr class="row"> <th class="entry" id="faqs__table_dfl_jyg_2mb__entry__1">返回状态</th> <th class="entry" id="faqs__table_dfl_jyg_2mb__entry__2">状态释义</th> </tr> </thead><tbody class="tbody"> <tr class="row"> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__1 "><code class="ph codeph">SEND_OK</code></td> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__2 "> <p class="p">消息发送成功。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__1 "><code class="ph codeph">FLUSH_DISK_TIMEOUT</code></td> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__2 "> <p class="p">消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时MASTER服务器宕机,消息才会丢失。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__1 "><code class="ph codeph">FLUSH_SLAVE_TIMEOUT</code></td> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__2 "> <p class="p">消息发送成功,但是服务器同步到 SLAVE 时超时,消息已经进入服务器队列,只有此时 SLAVE 服务器宕机,消息才会丢失。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__1 "><code class="ph codeph">SLAVE_NOT_AVAILABLE</code></td> <td class="entry" headers="faqs__table_dfl_jyg_2mb__entry__2 "> <p class="p">消息发送成功,但是此时 SLAVE 不可用,消息已经进入服务器队列,只有此时 SLAVE 服务器宕机,消息才会丢失。</p> </td> </tr> </tbody></table> <p class="p">目前PAMQ采用两主两从共 4 台 Broker 机器,针对大部分业务系统来讲,只要 PAMQ 没有提示异常,可以默认消息已成功发送。建议业务系统针对发送消息后所有非 SEND_OK 状态的消息,打印 Warning 日志,并在运营端设置对应的监控规则,及时发邮件提醒。</p> </section> <section class="section" id="faqs__section_rkf_tyg_2mb"><h2 class="doc-tairway">如何判断已成功消费消息?</h2> <p class="p">客户端 Consumer 在 FCMessageListener 中实现 pushMessage(),遍历并处理消息后会返回给 PAMQ 端消费的状态,只有消费成功或者消费失败两种状态。</p> <table class="table" id="faqs__table_ufy_vyg_2mb"><caption></caption><colgroup><col><col></colgroup><thead class="thead"> <tr class="row"> <th class="entry" id="faqs__table_ufy_vyg_2mb__entry__1">消费状态</th> <th class="entry" id="faqs__table_ufy_vyg_2mb__entry__2">状态释义</th> </tr> </thead><tbody class="tbody"> <tr class="row"> <td class="entry" headers="faqs__table_ufy_vyg_2mb__entry__1 "><code class="ph codeph">CONSUME_OK</code></td> <td class="entry" headers="faqs__table_ufy_vyg_2mb__entry__2 "> <p class="p">消费成功</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_ufy_vyg_2mb__entry__1 "><code class="ph codeph">CONSUME_FAIL</code></td> <td class="entry" headers="faqs__table_ufy_vyg_2mb__entry__2 "> <p class="p">消费失败</p> </td> </tr> </tbody></table> </section> <section class="section" id="faqs__section_wbk_zyg_2mb"><h2 class="doc-tairway">消费端如何实现定时消费?</h2> <p class="p">在某些业务场景下,消费端希望在业务低峰 (例如半夜 12 点后) 时开始从 PAMQ 拉取消息,在业务高峰期前关闭消费功能,以此降低系统负载。这种类似场景涉及到如何在不停止业务服务的场景下,多次开启和关闭 PAMQ 消费服务。</p> <p class="p">PAMQ 的消费者本身是可以多实例初始化的,每个实例的消费者服务开启和关闭也是独立的,可方便地实现定时消费的场景。</p> <p class="p">如果业务系统有类似需求,我们建议:</p> <ol class="ol" id="faqs__ol_a4z_zyg_2mb"> <li class="li">业务系统本身需要添加功能开关,支持配置化的方式来开启或关闭消费服务。其实现本身比较简单,即调用 PAMQ 消费者的<code class="ph codeph">start || shutdown</code> 方法。必要时需要对方法添加上层逻辑封装,来实现定制化的需求。</li> <li class="li">调用完 Consumer 对象的 shutdown 方法后,不要立即初始化下一个 Consumer 对象并启用服务,建议至少延迟几秒种,等相关的资源回收完毕。</li> <li class="li">完善业务端开启或关闭消息服务的日志,方便后续运维处理问题。</li> </ol> </section> <section class="section" id="faqs__section_hh1_mzg_2mb"><h2 class="doc-tairway">生产端如何实现定时发送消息?</h2> <p class="p">生产者客户端是单实例实现的,相关资源只能初始化一次,即便是在调用了 Producer 的 shutdown() 方法之后,多次初始化仍然会报错。发送消息的定时控制,相对于 Consumer 来说实现比较简单,在需要的时候直接调用 Producer 的 send() 方法发送消息即可。</p> </section> <section class="section" id="faqs__section_mrj_11h_2mb"><h2 class="doc-tairway">消费端没有接收到消息,如何定位问题?</h2> <p class="p">正常情况下生产者发送消息到 PAMQ,消息被投递到消费端的延时应该在毫秒级。如果消费端迟迟没有收到消息,建议采用下面的步骤来排查问题:</p> <p class="p">获取消息的Message Id 或Message Key,在平安云消息队列MQ的消息查询模块,根据自己的消费者 ID 找到其对应的消费状态,常见的投递状态有:</p> <table class="table" id="faqs__table_ftp_h1h_2mb"><caption></caption><colgroup><col><col></colgroup><thead class="thead"> <tr class="row"> <th class="entry" id="faqs__table_ftp_h1h_2mb__entry__1">投递状态</th> <th class="entry" id="faqs__table_ftp_h1h_2mb__entry__2">状态释义</th> </tr> </thead><tbody class="tbody"> <tr class="row"> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__1 "><code class="ph codeph">SUBSCRIBED_AND_CONSUMED</code></td> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__2 "> <p class="p">已订阅,且已被消费(Offset已越过)。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__1 "><code class="ph codeph">SUBSCRIBED_BUT_FILTERD</code></td> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__2 "> <p class="p">已订阅,但已被过滤。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__1 "><code class="ph codeph">the consumer group[***] not online</code></td> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__2 "> <p class="p">已订阅,但消费者未启动。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__1 "><code class="ph codeph">SUBSCRIBED_AND_NOT_CONSUME_YET</code></td> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__2 "> <p class="p">已订阅,但还没有被消费。</p> </td> </tr> <tr class="row"> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__1 "><code class="ph codeph">UNKNOW_EXCEPTION</code></td> <td class="entry" headers="faqs__table_ftp_h1h_2mb__entry__2 "> <p class="p">未知异常。</p> </td> </tr> </tbody></table> <div class="note note note_note"><span class="note__title">说明:</span> <ul class="ul" id="faqs__ul_pvq_x1h_2mb"> <li class="li">SUBSCRIBED_AND_CONSUMED状态,表示消息已被正常消费,如果此时有异常,需要业务系统检查日志,分析是否因为解析消息时出现异常,导致消息未被正确处理。</li> <li class="li">SUBSCRIBED_BUT_FILTERD状态,需要业务系统检查初始化 Consumer 对象时传入的 TagList 是否和生产者定义的 Tag 匹配。</li> <li class="li">SUBSCRIBED_AND_NOT_CONSUME_YET 状态,可能的原因是由于消息有积压,消息还未被取走,可以稍等几十秒,再重新查询状态。</li> <li class="li">the consumer group[***] not online 状态,表示对应 CID 的消费者还未正确启动。业务系统需要检查消费者是否已启动,如果已启动请检查是否启动时有报错。可能是相关的配置项配置错误,导致 Consumer 启动时校验失败。</li> <li class="li">UNKNOW_EXCEPTION 状态,表示消息平台有异常。</li> </ul> </div> </section> <section class="section" id="faqs__section_mw5_4bh_2mb"><h2 class="doc-tairway">如何避免接收到的消息出现乱码?</h2> <p class="p">对于生产者来说,建议将消息 body 转为 byte 数组时显示指定为 UTF-8 编码。对于消费者来说,建议在接收到消息后将 byte 数组转为 String 时指定为 UTF-8 编码。这样可以避免因为消息 body 中有中文或者特殊字符,消费端解析时乱码,从而造成消息解析失败。</p> </section> <section class="section" id="faqs__section_dv4_sbh_2mb"><h2 class="doc-tairway">如何使用消息标签(Tag)?</h2> <p class="p">在消息中间件实际的使用场景中,消费者可能只需要接收消息队列中的部分消息,其余消息希望默认不被接收,直接丢弃。针对类似场景,PAMQ 提供通过合理使用消息标签 (Tag) 的方式来实现消费端灵活过滤队列中消息,实现方式如下:</p> <ol class="ol" id="faqs__ol_b5q_5bh_2mb"> <li class="li">生产者和消费者双方约定消息标签具体的设置值及其代表的含义。</li> <li class="li">生产者在发送消息时,组装消息对象的时候,需要给对应消息设置正确的消息标签。<pre class="pre codeblock"><code>FCMessage msg = new FCMessage(); //设置过滤标签---大小写敏感 msg.setTag("SystemTag");</code></pre></li> <li class="li">消费者在组装消费者对象时,需要正确设置消息过滤的过滤器。<pre class="pre codeblock"><code>FCMessageFilter FCFilter = new FCMessageFilter (); List<String> list = new ArrayList<String>(); FCFilter.setTags(list); // 需要注意的是同一CID下的多个应用实例需要设置相同的Tag列表来过滤消息,以确保不会出现消息被过滤取走但未被业务系统处理list.add("SystemTag")的情况。</code></pre></li> </ol> </section>
以上内容是否解决了您的问题?
请补全提交信息!
咨询·建议

电话咨询

400-151-8800

邮件咨询

cloud@pingan.com

在线客服

工单支持

解决云产品相关技术问题