代码实现

<p><strong><span style="font-size:18px">接入须知</span></strong></p> <p>身份认证:依照公有云安全标准,客户端与服务端的通信,需要通过OAC的ak/sk进行身份认证,kafka使用SASL+JAAS插件认证机制,传递ak和加签信息,进行身份认证。</p> <p><strong><span style="font-size:18px">环境要求</span></strong></p> <ol> <li>JAVA 1.7 U51以上版本,推荐使用JAVA 1.8</li> <li>KAFKA服务端目前是1.1.0版本</li> <li>KAFKA客户端推荐1.1.0版本,至少需要大于等于0.11以上版本,并支持1.1.0的服务端版本</li> </ol> <p><strong><span style="font-size:18px">集成Kafka</span></strong></p> <ol> <li>引入kafka-clients.jar、kafka-clients-security-1.1.0.jar到项目工程,并将依赖包也一起引入项目工程,KAFKA客户端可以使用从官网下载的jar包,版本需要大于等于0.11,并支持kafka1.1.0服务端。 <p>说明: 获取服务接入点请参考<a href="https://pinganyun.com/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.topic_accesspoint" target="_blank">获取接入点信息</a>。</p> </li> <li>创建kafka_jaas.conf文件。 <pre> KafkaClient { &nbsp;&nbsp; &nbsp;org.apache.kafka.common.security.oac.OacLoginModule required &nbsp; &nbsp; AccessKey=&quot;&quot; &nbsp;&nbsp; &nbsp;SecretKey=&quot;&quot;; };</pre> <p>说明: AccessKey &amp; SecretKey请登录公有云控制台获取。</p> </li> <li>设置security.protocol和sasl.mechanism属性。 <pre> security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN</pre> </li> </ol> <p><strong><span style="display:none">&nbsp;</span></strong><strong><span style="font-size:18px">依赖包</span></strong></p> <p>引入如下依赖:<span style="display:none">&nbsp;</span></p> <pre> &lt;dependencies&gt; &nbsp;&nbsp;&lt;dependency&gt; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&lt;artifactId&gt;kafka-clients&lt;/artifactId&gt; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&lt;version&gt;1.1.0&lt;/version&gt; &nbsp;&nbsp;&nbsp;&nbsp;&lt;/dependency&gt; &nbsp;&nbsp;&nbsp;&nbsp;&lt;dependency&gt; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&lt;artifactId&gt;kafka-clients-security&lt;/artifactId&gt; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&lt;version&gt;1.1.0&lt;/version&gt; &nbsp;&nbsp;&nbsp;&nbsp;&lt;/dependency&gt; &lt;/dependencies&gt;</pre> <p><strong><span style="font-size:18px">发送消息</span></strong></p> <p>消息发送示例代码:</p> <pre> <strong>public</strong> <strong>void</strong> testProducer() { &nbsp;&nbsp; &nbsp;Properties props = <strong>new</strong> Properties(); &nbsp;&nbsp; &nbsp;String topic = &quot;test2&quot;; &nbsp;&nbsp; &nbsp;props.put(&quot;bootstrap.servers&quot;, &quot;10.42.8.222:9092&quot;);<em>// 此处填写接入点服务地址,接入点获取请参考</em><a href="https://pinganyun.com/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.topic_accesspoint" target="_blank">获取接入点信息</a>。 &nbsp;&nbsp; &nbsp;props.put(&quot;security.protocol&quot;, &quot;SASL_PLAINTEXT&quot;); &nbsp;&nbsp; &nbsp;props.put(&quot;sasl.mechanism&quot;, &quot;PLAIN&quot;); &nbsp; &nbsp; props.put(&quot;key.serializer&quot;, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);props.put(&quot;value.serializer&quot;, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;); &nbsp; &nbsp; Producer producer = <strong>new</strong> KafkaProducer&lt;&gt;(props); &nbsp; &nbsp; <strong>for</strong> (<strong>int</strong> i = 0; i &lt; 50; i++) { &nbsp; &nbsp; &nbsp; &nbsp; producer.send(<strong>new</strong> ProducerRecord(topic, topic + &quot;-K-&quot; + Integer.toString(i), topic + &quot;-V-&quot; + Integer.toString(i))); &nbsp; &nbsp; } &nbsp; &nbsp; producer.close(); }</pre> <p><strong><span style="font-size:18px">消费消息</span></strong></p> <p>消费消息示例代码:</p> <pre> <strong>public</strong> <strong>static</strong> <strong>void</strong> testConsumer() { &nbsp; &nbsp; Properties props = <strong>new</strong> Properties(); &nbsp; &nbsp; String topic = &quot;test2&quot;; &nbsp; &nbsp; props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);<em>//// 此处填写接入点服务地址,接入点获取请参考</em><a href="https://pinganyun.com/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.topic_accesspoint" target="_blank">获取接入点信息</a>。 &nbsp; &nbsp; props.put(&quot;sasl.mechanism&quot;, &quot;PLAIN&quot;); &nbsp; &nbsp; props.put(&quot;group.id&quot;, &quot;szx_test&quot;); &nbsp; &nbsp; props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;); &nbsp; &nbsp; props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;); &nbsp; &nbsp; KafkaConsumer consumer = <strong>new</strong> KafkaConsumer&lt;&gt;(props); &nbsp; &nbsp; consumer.subscribe(Arrays.asList(topic)); &nbsp; &nbsp; <strong>while</strong> (true) { &nbsp; &nbsp; &nbsp; &nbsp; ConsumerRecords records = consumer.poll(100); &nbsp; &nbsp; &nbsp; &nbsp; <strong>for</strong> (ConsumerRecord record : records) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.info(&quot;offset = &quot; + record.offset() + &quot;, key = &quot; + record.key() + &quot;, value = &quot; + record.value()); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; } }</pre> <p><strong><span style="font-size:18px">DEMO</span></strong></p> <p>Java版Kafka消息收发 - <a href="https://obs-cn-shanghai.yun.pingan.com/pcp-portal/kafka-demo-pcd.zip?response-content-disposition=attachment%3Bfilename%3Dkafka-demo-pcd.zip" target="_blank">Demo包下载</a></p> <p>SpringBoot版Kafka消息收发 - <a href="https://obs-cn-shanghai.yun.pingan.com/pcp-portal/20200904111509-12d721eb935e.zip" target="_blank">Demo包下载</a></p> <p>KafkaStream版Kafka消息收发 - <a href="https://obs-cn-shanghai.yun.pingan.com/pcp-portal/20200904091743-1de0842c9799.zip" target="_blank">Demo包下载</a></p>
以上内容是否解决了您的问题?
请补全提交信息!
咨询·建议

电话咨询 - 7x24 小时

400-151-8800

邮件咨询

cloud@pingan.com

在线客服

7x24 小时,急速解答

工单支持

解决云产品相关技术问题