luodw.cc - 阅读全文 »









Search Preview

kafka源码分析之概述 | 罗道文的私房菜

luodw.cc
最近帮实验室写了一个Spark+kafka实时处理日志的监控系统案例,大概流程为: 用户购物日志发送给Kafka; Spark实时从Kafka接收购物日志,利用Spark Streaming实时处理,最后将结果发送给Kafka; 用Flask构建一个web程序接收Kafka处理后的数据,用Flask-SocketIO实时将每秒的数据发送给客户端浏览器; 浏览器利用socket.io.js实时接收
.cc > luodw.cc

SEO audit: Content analysis

Language Error! No language localisation is found.
Title kafka源码分析之概述 | 罗道文的私房菜
Text / HTML ratio 53 %
Frame Excellent! The website does not use iFrame solutions.
Flash Excellent! The website does not have any flash contents.
Keywords cloud = import key offset topic=mytopic partition=0 World kafka public Properties String> consumer true Python操作Kafka propsputbootstrapservers coding utf8 records producer Created
Keywords consistency
Keyword Content Title Description Headings
= 26
import 12
key 6
offset 6
topic=mytopic 5
partition=0 5
Headings
H1 H2 H3 H4 H5 H6
5 0 0 0 0 0
Images We found 4 images on this web page.

SEO Keywords (Single)

Keyword Occurrence Density
= 26 1.30 %
import 12 0.60 %
key 6 0.30 %
offset 6 0.30 %
topic=mytopic 5 0.25 %
partition=0 5 0.25 %
World 5 0.25 %
kafka 4 0.20 %
public 4 0.20 %
Properties 4 0.20 %
String> 3 0.15 %
consumer 3 0.15 %
true 2 0.10 %
Python操作Kafka 2 0.10 %
propsputbootstrapservers 2 0.10 %
coding 2 0.10 %
utf8 2 0.10 %
records 2 0.10 %
producer 2 0.10 %
Created 2 0.10 %

SEO Keywords (Two Word)

Keyword Occurrence Density
offset = 6 0.30 %
1 2 6 0.30 %
2 3 6 0.30 %
3 4 6 0.30 %
4 5 6 0.30 %
value = 6 0.30 %
key = 6 0.30 %
Hello World 5 0.25 %
5 6 5 0.25 %
topic=mytopic partition=0 5 0.25 %
partition=0 offset 5 0.25 %
World Hello 4 0.20 %
= new 4 0.20 %
6 7 4 0.20 %
7 8 3 0.15 %
public static 2 0.10 %
mainString args 2 0.10 %
10 11 2 0.10 %
9 10 2 0.10 %
8 9 2 0.10 %

SEO Keywords (Three Word)

Keyword Occurrence Density Possible Spam
2 3 4 6 0.30 % No
3 4 5 6 0.30 % No
1 2 3 6 0.30 % No
4 5 6 5 0.25 % No
partition=0 offset = 5 0.25 % No
topic=mytopic partition=0 offset 5 0.25 % No
5 6 7 4 0.20 % No
World Hello World 4 0.20 % No
Hello World Hello 4 0.20 % No
6 7 8 3 0.15 % No
17429 public class 2 0.10 % No
static void mainString 2 0.10 % No
void mainString args 2 0.10 % No
mainString args Properties 2 0.10 % No
args Properties props 2 0.10 % No
Properties props = 2 0.10 % No
public static void 2 0.10 % No
20 21 22 2 0.10 % No
26 27 28 2 0.10 % No
javautilProperties Created by 2 0.10 % No

SEO Keywords (Four Word)

Keyword Occurrence Density Possible Spam
1 2 3 4 6 0.30 % No
2 3 4 5 6 0.30 % No
topic=mytopic partition=0 offset = 5 0.25 % No
3 4 5 6 5 0.25 % No
Hello World Hello World 4 0.20 % No
4 5 6 7 4 0.20 % No
5 6 7 8 3 0.15 % No
World Hello World Hello 3 0.15 % No
static void mainString args 2 0.10 % No
public static void mainString 2 0.10 % No
void mainString args Properties 2 0.10 % No
on 17429 public class 2 0.10 % No
mainString args Properties props 2 0.10 % No
args Properties props = 2 0.10 % No
Properties props = new 2 0.10 % No
18 19 20 21 2 0.10 % No
19 20 21 22 2 0.10 % No
26 27 28 29 2 0.10 % No
javautilProperties Created by charles 2 0.10 % No
charles on 17429 public 2 0.10 % No

Internal links in - luodw.cc

归档
归档 | 罗道文的私房菜
书签
index | 罗道文的私房菜
关于
关于我 | 罗道文的私房菜
kafka
分类: kafka | 罗道文的私房菜
阅读全文 »
kafka源码分析之Producer | 罗道文的私房菜
阅读全文 »
kafka源码分析之概述 | 罗道文的私房菜
nsq
分类: nsq | 罗道文的私房菜
阅读全文 »
nsqd执行解析 | 罗道文的私房菜
lua
分类: lua | 罗道文的私房菜
阅读全文 »
Lua原来这么好用 | 罗道文的私房菜
2
罗道文的私房菜
22
罗道文的私房菜
RSS
罗道文的私房菜

Luodw.cc Spined HTML


kafka源码分析之概述 | 罗道文的私房菜 罗道文的私房菜 分享知识,分享快乐 首页 归档 阅读 书签 关于 搜索 kafka源码分析之概述 发表于 2017-04-24   |   分类于 kafka   |   阅读次数 最近帮实验室写了一个Spark+kafka实时处理日志的监控系统案例,大概流程为: 用户购物日志发送给Kafka; Spark实时从Kafka接收购物日志,利用Spark Streaming实时处理,最后将结果发送给Kafka; 用Flask构建一个web程序接收Kafka处理后的数据,用Flask-SocketIO实时将每秒的数据发送给客户端浏览器; 浏览器利用socket.io.js实时接收web发送来的数据,利用highcharts.js展示出来。 这个案例很好的展示了利用Spark+Kafka实时处理数据的开发模式。Spark在实时处理和批量处理都有很高的性能,Kafka消息队列在异步解耦,冗余处理和削峰等方面有很高的性能。 Kafka在互联网各大公司都有很广泛的应用,主要在于Kakfa性能出众,又有很好的扩展性和稳定性。而之前看过NSQ消息队列,对消息队列的分布式架构都有一定的了解,所以想最近这段时间看看kafka源码,熟悉下Kafka的整体架构,以及学习Scala和Java是如何写基础组件的。 这篇文章先介绍下Kafka的整体架构,在通过一个简单的实例展示Python是如何操作Kafka消息队列,如下: Kafka整体架构; Python操作Kafka; Java操作Kafka; 总结; 【版权声明】博客内容由罗道文的私房菜拥有版权,允许转载,但请标明原文链接http://luodw.cc/2017/04/24/kafka01/#more Kafka整体架构 Kafka相对于NSQ架构更加的复杂,但也提供更丰富的功能,下面根据我的理解列出二者的不同点: NSQ消费者采用的是push模式,而Kafka消费者采用的是pull模式; NSQ消息被消费之后,即被删除,而Kafka消费数据之后,并不删除数据,所以Kafka也可以看成是一个存储系统; NSQ没提供消息副本功能,而Kafka提供分区多副本,当leader宕机之后,可以重新选组提供服务,具有高可用性; NSQ各个nsqd之间不进行通信,而Kafka Server之间进行通信,毕竟要进行副本传输; Kafka消费提供组的概念,不同组的消费者可以消费同一个topic下所有的数据;而对于同组消费者,各个消费者按某种算法一起消费同一个topic下不同分区的消息。虽然NSQ并没有挺供消费者组的概念,但是NSQ的channel则提供了相同的功能;不同的channel相当于不同消费者组,都能收到topic的所有消息,然后同一个channel所对应的所有消费者相当于Kafka同一个消费者组内的消费者。二者在这方面实现的功能是一样的。 因此,我把NSQ看成是轻量级的消息队列,如果不需要消息副本,不需要提供消息冗余,只是简单消息的投递和消费我觉得可以使用NSQ,毕竟轻量,部署简单,也更容易深入理解源码。当然,如果需要很好的消息可靠性或者其他Kakfa其他特性,还是推荐Kafka。 下面先介绍下Kafka当中的专业术语: broker 一个Kafka集群有多个服务器,其中一台即称为一个broker; Topic 我们可以把topic看成是消息的种类,我们发送的每条消息都属于某个topic; Partition Partition是物理的概念,一个topic下面可以有多个Partition,这些Partition拥有等同的地位,主要是为了实现负载均衡; Producer 复杂发布消息到Kafka Broker; Consumer 向Kafka读取消息的客户端; Consumer Group 消费者组,每个consumer都属于一个ConsumerGroup,我们可以指定组的名字,如果不指定,则属于默认的消费者组; 下面同过一个简单的图示说明Kafka的拓扑结构以及与Producer和Consumer的关系 这里展示的是只是一个zookeeper(当然可以有多个)以及一个消费者组中的一个消费者(当然可以有多个消费者组和多个消费者),主要是为了简化分析。每个Kafka Server在启动时,都需要向zookeeper注册broker信息,路径为/brokers,可以通过zookeeper的ls /命令查看。等三个Kafka Server都启动之后,Producer与Consumer就可以连接投递和消费消息。这里假设有一个Topic,三个分区,每个分区只有一个副本,刚好对应图中的三个broker。对于多副本,等后续分析Kafka Server时再分析。 Producer向某个topic发送消息时,需要先连接上与这个topic相关的一台或者多台broker,因为broker之间会相互通信,最后通过一台broker,就可以找到所有与该topic相关的broker。当Producer在发布消息时,根据消息提供的key进行分区(最简单的方式就是哈稀求余),因此一条消息并将属于一个分区;如果分区函数设计得当,所有消息将会被均衡的发送到所有分区,实现负载均衡。在早期版本的Producer,有同步和异步的方式,而在最新的版本中只提供异步的方式,即将所有消息先存入一个队列,然后开启一个后台线不断从队列读取消息并发送给Kafka Sever。 Consumer消费消息时,需要先连接zookeeper获取与topic相关的broker,然后再连接;等Consumer连上Kafka Server之后,然后发送FetchRequest请求,带上topic,partition和offset从相应的分区读取消息; 下面通过官网提供的图,简单说明下消费者组模式 如图,假设有一个topic,四个partition;两个Kafka Server,每个Kafka Server分别有两个分区;有两个消费者组A和B。由图可知,ConsumerGroup A和ConsumerGroup B都可以消费同一个topic下的所有消息,而同一个消费者组内的消费者则消费topic下的不同分区;例如,ConsumerGroup A的C1消费分区0和3,而C2消费分区1和2。ConsumerGroup B下的四个Consumer则分别消费一个分区。 消费者组主要是为了实现同一个topic下的消息的消息实现不同的处理,例如同一个topic下的消息,即可用于hadoop进行批处理,也可以用于Spark流计算,还可以直接进行持久化到磁盘等等。 消费者组内的多个消费者主要就是为了实现负载均衡。 Python操作Kafka 这里用Python操作Kafka作为演示,只需要几行代码就可以实现生产者和消费者。生产者代码如下: 12345678# coding: utf-8import timefrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')while True: producer.send('test', "Hello World!".encode('utf8')) time.sleep(3) 然后消费者代码如下: 123456# coding: utf-8from kafka import KafkaConsumerconsumer = KafkaConsumer('test')for msg in consumer: print((msg.value).decode('utf8')) 我的测试环境只有一台Zookeeper和一台Kafka,生产者每隔3秒向test这个topic发送消息,消息内容为"Hello World!"。而消费者不断消费test的消息,PyCharm在consumer控制台下可以看到如下输出 1234567/home/charles/Envs/env1/bin/python /home/charles/PycharmProjects/kafka/consumer.pyHello World!Hello World!Hello World!Hello World!Hello World!... Java操作Kafka 在后端开发写业务代码中,经常使用的当属Python和Java,因此下面再给出Java操作Kafka的简单示例,代码取自Kafka源码,生产者代码如下: 123456789101112131415161718192021222324252627282930313233import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * Created by charles on 17-4-29. */public matriculation ProducerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0; i<=100; i++) { producer.send(new ProducerRecord<>("mytopic", Integer.toString(i), Integer.toString(i))); try { Thread.sleep(3000); } reservation (InterruptedException e) { e.printStackTrace(); } } producer.close(); }} 消费者代码如下: 1234567891011121314151617181920212223242526272829import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * Created by charles on 17-4-29. */public matriculation ConsumerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("mytopic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("topic=%s, partition=%s, offset = %d, key = %s, value = %s\n", record.topic(),record.partition(),record.offset(), record.key(), record.value()); } }} 在Intellij idea的ConsumerDemo窗口,可以得到如下输出 12345topic=mytopic, partition=0, offset = 21, key = 0, value = 0topic=mytopic, partition=0, offset = 22, key = 1, value = 1topic=mytopic, partition=0, offset = 23, key = 2, value = 2topic=mytopic, partition=0, offset = 24, key = 3, value = 3topic=mytopic, partition=0, offset = 25, key = 4, value = 4 当然Kafka的Producer和Consumer都有很多配置,例如ack,是否自动commmitOffset等等,这也是我后续想看源码的原因,因为看了源码,可以更好的理解这些参数是什么意思,怎么做优化。 总结 这篇文章简单的介绍了下Kafka,也算对Kafka有个较为深入的认识,也为后续深入看源码打下基础。Kafka代码量好多,需要耐心慢慢啃,我有大概看了下Kafka的代码,有很多优秀的设计可以学习,包括NIO,Selector,Java并发包等等。先到这吧。 #kafka nsqd执行解析 kafka源码分析之Producer 文章目录 站点概览 罗道文 分享知识,分享快乐 106 日志 33 分类 116 标签 RSS github weibo zhihu Links 赖明星的博客地址 蔡珉星的博客地址 阮榕城的博客地址 陈友兵的博客地址 郑江龙的博客地址 厦门大学数据库实验室 1. Kafka整体架构2. Python操作Kafka3. Java操作Kafka4. 总结 © 2017 罗道文 由 Hexo 强力驱动 本站总访问量次 | 主题 - NexT.Pisces