博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka部署与代码实例(转)
阅读量:6426 次
发布时间:2019-06-23

本文共 8483 字,大约阅读时间需要 28 分钟。

来自:http://doc.okbase.net/QING____/archive/19447.html

也可参考:

http://blog.csdn.net/21aspnet/article/details/19325373

http://blog.csdn.net/unix21/article/details/18990123

 

kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

clientPort=2181server.0=127.0.0.1:2888:3888server.1=127.0.0.1:2889:3889server.2=127.0.0.1:2890:3890##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

./zkServer.sh start

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

clientPort=2182##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

clientPort=2183##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

broker.id=0port=9092num.network.threads=2num.io.threads=2socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600log.dir=./logsnum.partitions=2log.flush.interval.messages=10000log.flush.interval.ms=1000log.retention.hours=168#log.retention.bytes=1073741824log.segment.bytes=536870912log.cleanup.interval.mins=10zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183zookeeper.connection.timeout.ms=1000000kafka.metrics.polling.interval.secs=5kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporterkafka.csv.metrics.dir=/tmp/kafka_metricskafka.csv.metrics.reporter.enabled=false

 

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

> cd kafka-0> ./sbt update> ./sbt package> ./sbt assembly-package-dependency

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

broker.id=1port=9093##其他配置和kafka-0保持一致

    然后和kafka-0一样执行打包命令,然后启动此broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。

4.0.0
com.test
test-kafka
jar
test-kafka
http://maven.apache.org
1.0.0
log4j
log4j
1.2.14
org.apache.kafka
kafka_2.8.0
0.8.0-beta1
log4j
log4j
org.scala-lang
scala-library
2.8.1
com.yammer.metrics
metrics-core
2.2.0
com.101tec
zkclient
0.3
test-kafka-1.0
src/main/resources
true
maven-compiler-plugin
2.3.2
1.5
1.5
gb2312
maven-resources-plugin
2.2
gbk

 

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

#partitioner.class=metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093##,127.0.0.1:9093producer.type=synccompression.codec=0 serializer.class=kafka.serializer.StringEncoder ##在producer.type=async时有效 #batch.num.messages=100

 

    2) LogProducer.java代码样例

package com.test.kafka;import java.util.ArrayList;import java.util.Collection; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class LogProducer { private Producer
inner; public LogProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer
(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } KeyedMessage
km = new KeyedMessage
(topicName,message); inner.send(km); } public void send(String topicName,Collection
messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List
> kms = new ArrayList
>(); for(String entry : messages){ KeyedMessage
km = new KeyedMessage
(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send("test-topic", "this is a sample" + i); i++; Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } }

 

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183##,127.0.0.1:2182,127.0.0.1:2183# timeout in ms for connecting to zookeeperzookeeper.connectiontimeout.ms=1000000#consumer group idgroup.id=test-group #consumer timeout #consumer.timeout.ms=5000

 

    2) LogConsumer.java代码样例

package com.test.kafka;import java.util.HashMap;import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class LogConsumer { private ConsumerConfig config; private String topic; private int partitionsNum; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties")); config = new ConsumerConfig(properties); this.topic = topic; this.partitionsNum = partitionsNum; this.executor = executor; } public void start() throws Exception{ connector = Consumer.createJavaConsumerConnector(config); Map
topics = new HashMap
(); topics.put(topic, partitionsNum); Map
>> streams = connector.createMessageStreams(topics); List
> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream
partition : partitions){ threadPool.execute(new MessageRunner(partition)); } } public void close(){ try{ threadPool.shutdownNow(); }catch(Exception e){ // }finally{ connector.shutdown(); } } class MessageRunner implements Runnable{ private KafkaStream
partition; MessageRunner(KafkaStream
partition) { this.partition = partition; } public void run(){ ConsumerIterator
it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata
item = it.next(); System.out.println("partiton:" + item.partition()); System.out.println("offset:" + item.offset()); executor.execute(new String(item.message()));//UTF-8 } } } interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { LogConsumer consumer = null; try{ MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new LogConsumer("test-topic", 2, executor); consumer.start(); }catch(Exception e){ e.printStackTrace(); }finally{ // if(consumer != null){ // consumer.close(); // } } } }

 

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

转载于:https://www.cnblogs.com/lyl693/p/6649870.html

你可能感兴趣的文章
SQL注入神器——sqlmap
查看>>
Unity导航 (寻路系统Nav Mesh Agent)
查看>>
SaltStack配置语法-YAML和Jinja
查看>>
运用免费OA让你有意想不到的效果
查看>>
一些软件设计软则
查看>>
Linux运维基础命令
查看>>
使用PowerShell配置IP地址
查看>>
第十一章 MySQL运算符
查看>>
JAVA常见算法题(十七)
查看>>
GUI鼠标相关设置
查看>>
使用 <Iframe>实现跨域通信
查看>>
闭包--循序学习
查看>>
项目实战之集成邮件开发
查看>>
解决C3P0在Linux下Failed to get local InetAddress for VMID问题
查看>>
1531 山峰 【栈的应用】
查看>>
巧用美女照做微信吸粉,你会做吗?
查看>>
wcf学习总结《上》
查看>>
ERROR (ClientException)
查看>>
Load Balance 产品横向比较
查看>>
Java代理程序实现web方式管理邮件组成员
查看>>