Search Preview
kafka源码分析之概述 | 罗道文的私房菜
luodw.cc最近帮实验室写了一个Spark+kafka实时处理日志的监控系统案例,大概流程为: 用户购物日志发送给Kafka; Spark实时从Kafka接收购物日志,利用Spark Streaming实时处理,最后将结果发送给Kafka; 用Flask构建一个web程序接收Kafka处理后的数据,用Flask-SocketIO实时将每秒的数据发送给客户端浏览器; 浏览器利用socket.io.js实时接收
.cc > luodw.cc
SEO audit: Content analysis
Language | Error! No language localisation is found. | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Title | kafka源码分析之概述 | 罗道文的私房菜 | ||||||||||||||||||||||||||||||||||||
Text / HTML ratio | 53 % | ||||||||||||||||||||||||||||||||||||
Frame | Excellent! The website does not use iFrame solutions. | ||||||||||||||||||||||||||||||||||||
Flash | Excellent! The website does not have any flash contents. | ||||||||||||||||||||||||||||||||||||
Keywords cloud | = import key offset topic=mytopic partition=0 World kafka public Properties String> consumer true Python操作Kafka propsputbootstrapservers coding utf8 records producer Created | ||||||||||||||||||||||||||||||||||||
Keywords consistency |
|
||||||||||||||||||||||||||||||||||||
Headings |
|
||||||||||||||||||||||||||||||||||||
Images | We found 4 images on this web page. |
SEO Keywords (Single)
Keyword | Occurrence | Density |
---|---|---|
= | 26 | 1.30 % |
import | 12 | 0.60 % |
key | 6 | 0.30 % |
offset | 6 | 0.30 % |
topic=mytopic | 5 | 0.25 % |
partition=0 | 5 | 0.25 % |
World | 5 | 0.25 % |
kafka | 4 | 0.20 % |
public | 4 | 0.20 % |
Properties | 4 | 0.20 % |
String> | 3 | 0.15 % |
consumer | 3 | 0.15 % |
true | 2 | 0.10 % |
Python操作Kafka | 2 | 0.10 % |
propsputbootstrapservers | 2 | 0.10 % |
coding | 2 | 0.10 % |
utf8 | 2 | 0.10 % |
records | 2 | 0.10 % |
producer | 2 | 0.10 % |
Created | 2 | 0.10 % |
SEO Keywords (Two Word)
Keyword | Occurrence | Density |
---|---|---|
offset = | 6 | 0.30 % |
1 2 | 6 | 0.30 % |
2 3 | 6 | 0.30 % |
3 4 | 6 | 0.30 % |
4 5 | 6 | 0.30 % |
value = | 6 | 0.30 % |
key = | 6 | 0.30 % |
Hello World | 5 | 0.25 % |
5 6 | 5 | 0.25 % |
topic=mytopic partition=0 | 5 | 0.25 % |
partition=0 offset | 5 | 0.25 % |
World Hello | 4 | 0.20 % |
= new | 4 | 0.20 % |
6 7 | 4 | 0.20 % |
7 8 | 3 | 0.15 % |
public static | 2 | 0.10 % |
mainString args | 2 | 0.10 % |
10 11 | 2 | 0.10 % |
9 10 | 2 | 0.10 % |
8 9 | 2 | 0.10 % |
SEO Keywords (Three Word)
Keyword | Occurrence | Density | Possible Spam |
---|---|---|---|
2 3 4 | 6 | 0.30 % | No |
3 4 5 | 6 | 0.30 % | No |
1 2 3 | 6 | 0.30 % | No |
4 5 6 | 5 | 0.25 % | No |
partition=0 offset = | 5 | 0.25 % | No |
topic=mytopic partition=0 offset | 5 | 0.25 % | No |
5 6 7 | 4 | 0.20 % | No |
World Hello World | 4 | 0.20 % | No |
Hello World Hello | 4 | 0.20 % | No |
6 7 8 | 3 | 0.15 % | No |
17429 public class | 2 | 0.10 % | No |
static void mainString | 2 | 0.10 % | No |
void mainString args | 2 | 0.10 % | No |
mainString args Properties | 2 | 0.10 % | No |
args Properties props | 2 | 0.10 % | No |
Properties props = | 2 | 0.10 % | No |
public static void | 2 | 0.10 % | No |
20 21 22 | 2 | 0.10 % | No |
26 27 28 | 2 | 0.10 % | No |
javautilProperties Created by | 2 | 0.10 % | No |
SEO Keywords (Four Word)
Keyword | Occurrence | Density | Possible Spam |
---|---|---|---|
1 2 3 4 | 6 | 0.30 % | No |
2 3 4 5 | 6 | 0.30 % | No |
topic=mytopic partition=0 offset = | 5 | 0.25 % | No |
3 4 5 6 | 5 | 0.25 % | No |
Hello World Hello World | 4 | 0.20 % | No |
4 5 6 7 | 4 | 0.20 % | No |
5 6 7 8 | 3 | 0.15 % | No |
World Hello World Hello | 3 | 0.15 % | No |
static void mainString args | 2 | 0.10 % | No |
public static void mainString | 2 | 0.10 % | No |
void mainString args Properties | 2 | 0.10 % | No |
on 17429 public class | 2 | 0.10 % | No |
mainString args Properties props | 2 | 0.10 % | No |
args Properties props = | 2 | 0.10 % | No |
Properties props = new | 2 | 0.10 % | No |
18 19 20 21 | 2 | 0.10 % | No |
19 20 21 22 | 2 | 0.10 % | No |
26 27 28 29 | 2 | 0.10 % | No |
javautilProperties Created by charles | 2 | 0.10 % | No |
charles on 17429 public | 2 | 0.10 % | No |
Internal links in - luodw.cc
归档 | 罗道文的私房菜
index | 罗道文的私房菜
关于我 | 罗道文的私房菜
分类: kafka | 罗道文的私房菜
kafka源码分析之Producer | 罗道文的私房菜
kafka源码分析之概述 | 罗道文的私房菜
分类: nsq | 罗道文的私房菜
nsqd执行解析 | 罗道文的私房菜
分类: lua | 罗道文的私房菜
Lua原来这么好用 | 罗道文的私房菜
罗道文的私房菜
罗道文的私房菜
罗道文的私房菜
Luodw.cc Spined HTML
kafka源码分析之概述 | 罗道文的私房菜 罗道文的私房菜 分享知识,分享快乐 首页 归档 阅读 书签 关于 搜索 kafka源码分析之概述 发表于 2017-04-24 | 分类于 kafka | 阅读次数 最近帮实验室写了一个Spark+kafka实时处理日志的监控系统案例,大概流程为: 用户购物日志发送给Kafka; Spark实时从Kafka接收购物日志,利用Spark Streaming实时处理,最后将结果发送给Kafka; 用Flask构建一个web程序接收Kafka处理后的数据,用Flask-SocketIO实时将每秒的数据发送给客户端浏览器; 浏览器利用socket.io.js实时接收web发送来的数据,利用highcharts.js展示出来。 这个案例很好的展示了利用Spark+Kafka实时处理数据的开发模式。Spark在实时处理和批量处理都有很高的性能,Kafka消息队列在异步解耦,冗余处理和削峰等方面有很高的性能。 Kafka在互联网各大公司都有很广泛的应用,主要在于Kakfa性能出众,又有很好的扩展性和稳定性。而之前看过NSQ消息队列,对消息队列的分布式架构都有一定的了解,所以想最近这段时间看看kafka源码,熟悉下Kafka的整体架构,以及学习Scala和Java是如何写基础组件的。 这篇文章先介绍下Kafka的整体架构,在通过一个简单的实例展示Python是如何操作Kafka消息队列,如下: Kafka整体架构; Python操作Kafka; Java操作Kafka; 总结; 【版权声明】博客内容由罗道文的私房菜拥有版权,允许转载,但请标明原文链接http://luodw.cc/2017/04/24/kafka01/#more Kafka整体架构 Kafka相对于NSQ架构更加的复杂,但也提供更丰富的功能,下面根据我的理解列出二者的不同点: NSQ消费者采用的是push模式,而Kafka消费者采用的是pull模式; NSQ消息被消费之后,即被删除,而Kafka消费数据之后,并不删除数据,所以Kafka也可以看成是一个存储系统; NSQ没提供消息副本功能,而Kafka提供分区多副本,当leader宕机之后,可以重新选组提供服务,具有高可用性; NSQ各个nsqd之间不进行通信,而Kafka Server之间进行通信,毕竟要进行副本传输; Kafka消费提供组的概念,不同组的消费者可以消费同一个topic下所有的数据;而对于同组消费者,各个消费者按某种算法一起消费同一个topic下不同分区的消息。虽然NSQ并没有挺供消费者组的概念,但是NSQ的channel则提供了相同的功能;不同的channel相当于不同消费者组,都能收到topic的所有消息,然后同一个channel所对应的所有消费者相当于Kafka同一个消费者组内的消费者。二者在这方面实现的功能是一样的。 因此,我把NSQ看成是轻量级的消息队列,如果不需要消息副本,不需要提供消息冗余,只是简单消息的投递和消费我觉得可以使用NSQ,毕竟轻量,部署简单,也更容易深入理解源码。当然,如果需要很好的消息可靠性或者其他Kakfa其他特性,还是推荐Kafka。 下面先介绍下Kafka当中的专业术语: broker 一个Kafka集群有多个服务器,其中一台即称为一个broker; Topic 我们可以把topic看成是消息的种类,我们发送的每条消息都属于某个topic; Partition Partition是物理的概念,一个topic下面可以有多个Partition,这些Partition拥有等同的地位,主要是为了实现负载均衡; Producer 复杂发布消息到Kafka Broker; Consumer 向Kafka读取消息的客户端; Consumer Group 消费者组,每个consumer都属于一个ConsumerGroup,我们可以指定组的名字,如果不指定,则属于默认的消费者组; 下面同过一个简单的图示说明Kafka的拓扑结构以及与Producer和Consumer的关系 这里展示的是只是一个zookeeper(当然可以有多个)以及一个消费者组中的一个消费者(当然可以有多个消费者组和多个消费者),主要是为了简化分析。每个Kafka Server在启动时,都需要向zookeeper注册broker信息,路径为/brokers,可以通过zookeeper的ls /命令查看。等三个Kafka Server都启动之后,Producer与Consumer就可以连接投递和消费消息。这里假设有一个Topic,三个分区,每个分区只有一个副本,刚好对应图中的三个broker。对于多副本,等后续分析Kafka Server时再分析。 Producer向某个topic发送消息时,需要先连接上与这个topic相关的一台或者多台broker,因为broker之间会相互通信,最后通过一台broker,就可以找到所有与该topic相关的broker。当Producer在发布消息时,根据消息提供的key进行分区(最简单的方式就是哈稀求余),因此一条消息并将属于一个分区;如果分区函数设计得当,所有消息将会被均衡的发送到所有分区,实现负载均衡。在早期版本的Producer,有同步和异步的方式,而在最新的版本中只提供异步的方式,即将所有消息先存入一个队列,然后开启一个后台线不断从队列读取消息并发送给Kafka Sever。 Consumer消费消息时,需要先连接zookeeper获取与topic相关的broker,然后再连接;等Consumer连上Kafka Server之后,然后发送FetchRequest请求,带上topic,partition和offset从相应的分区读取消息; 下面通过官网提供的图,简单说明下消费者组模式 如图,假设有一个topic,四个partition;两个Kafka Server,每个Kafka Server分别有两个分区;有两个消费者组A和B。由图可知,ConsumerGroup A和ConsumerGroup B都可以消费同一个topic下的所有消息,而同一个消费者组内的消费者则消费topic下的不同分区;例如,ConsumerGroup A的C1消费分区0和3,而C2消费分区1和2。ConsumerGroup B下的四个Consumer则分别消费一个分区。 消费者组主要是为了实现同一个topic下的消息的消息实现不同的处理,例如同一个topic下的消息,即可用于hadoop进行批处理,也可以用于Spark流计算,还可以直接进行持久化到磁盘等等。 消费者组内的多个消费者主要就是为了实现负载均衡。 Python操作Kafka 这里用Python操作Kafka作为演示,只需要几行代码就可以实现生产者和消费者。生产者代码如下: 12345678# coding: utf-8import timefrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')while True: producer.send('test', "Hello World!".encode('utf8')) time.sleep(3) 然后消费者代码如下: 123456# coding: utf-8from kafka import KafkaConsumerconsumer = KafkaConsumer('test')for msg in consumer: print((msg.value).decode('utf8')) 我的测试环境只有一台Zookeeper和一台Kafka,生产者每隔3秒向test这个topic发送消息,消息内容为"Hello World!"。而消费者不断消费test的消息,PyCharm在consumer控制台下可以看到如下输出 1234567/home/charles/Envs/env1/bin/python /home/charles/PycharmProjects/kafka/consumer.pyHello World!Hello World!Hello World!Hello World!Hello World!... Java操作Kafka 在后端开发写业务代码中,经常使用的当属Python和Java,因此下面再给出Java操作Kafka的简单示例,代码取自Kafka源码,生产者代码如下: 123456789101112131415161718192021222324252627282930313233import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * Created by charles on 17-4-29. */public matriculation ProducerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0; i<=100; i++) { producer.send(new ProducerRecord<>("mytopic", Integer.toString(i), Integer.toString(i))); try { Thread.sleep(3000); } reservation (InterruptedException e) { e.printStackTrace(); } } producer.close(); }} 消费者代码如下: 1234567891011121314151617181920212223242526272829import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * Created by charles on 17-4-29. */public matriculation ConsumerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("mytopic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("topic=%s, partition=%s, offset = %d, key = %s, value = %s\n", record.topic(),record.partition(),record.offset(), record.key(), record.value()); } }} 在Intellij idea的ConsumerDemo窗口,可以得到如下输出 12345topic=mytopic, partition=0, offset = 21, key = 0, value = 0topic=mytopic, partition=0, offset = 22, key = 1, value = 1topic=mytopic, partition=0, offset = 23, key = 2, value = 2topic=mytopic, partition=0, offset = 24, key = 3, value = 3topic=mytopic, partition=0, offset = 25, key = 4, value = 4 当然Kafka的Producer和Consumer都有很多配置,例如ack,是否自动commmitOffset等等,这也是我后续想看源码的原因,因为看了源码,可以更好的理解这些参数是什么意思,怎么做优化。 总结 这篇文章简单的介绍了下Kafka,也算对Kafka有个较为深入的认识,也为后续深入看源码打下基础。Kafka代码量好多,需要耐心慢慢啃,我有大概看了下Kafka的代码,有很多优秀的设计可以学习,包括NIO,Selector,Java并发包等等。先到这吧。 #kafka nsqd执行解析 kafka源码分析之Producer 文章目录 站点概览 罗道文 分享知识,分享快乐 106 日志 33 分类 116 标签 RSS github weibo zhihu Links 赖明星的博客地址 蔡珉星的博客地址 阮榕城的博客地址 陈友兵的博客地址 郑江龙的博客地址 厦门大学数据库实验室 1. Kafka整体架构2. Python操作Kafka3. Java操作Kafka4. 总结 © 2017 罗道文 由 Hexo 强力驱动 本站总访问量次 | 主题 - NexT.Pisces