如何查看kafka中的数据_查看kafka数据保存时间
- 游戏资讯
- 2025-03-21 10:21
- 1
kafka 怎样查看kafka状态
##查看topic分布情况 kafka-list-topic.sh
如何查看kafka中的数据_查看kafka数据保存时间
如何查看kafka中的数据_查看kafka数据保存时间
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分区情况)
其实kafka-list-topic.sh里面就一句
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
实际是通过
kafka-run-class.sh脚本执行的包kafka.admin下面的类
linux 怎样查看kafka的某topic数据?
基于0.8.0版本。
##查看topic分布情况kafka-list-topic.sh
bin/kafka-list-topic.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
(列出所有topic的分区情况)
bin/kafka-list-topic.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--topic
test
(查看test的分区情况)
其实kafka-list-topic.sh里面就一句
exec
$(dirname
$0)/kafka-run-class.sh
kafka.admin.listtopiccommand
$@
实际是通过
kafka-run-class.sh脚本执行的包kafka.admin下面的类
##创建topic
kafka-create-topic.sh
bin/kafka-create-topic.sh
--replica
2--partition
8--topic
test
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
创建名为test的topic,
8个分区分别存放数据,数据备份总共2份
bin/kafka-create-topic.sh
--replica
1--partition
1--topic
test2
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
结果
topic:
test2
partition:
leader:
170
replicas:
170
isr:
170
##重新分配分区kafka-reassign-partitions.sh
这个命令可以分区指定到想要的--broker-list上
bin/kafka-reassign-partitions.sh
--topics-to-move-json-file
topics-to-move.json
--broker-list
"171"
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--execute
cat
topic-to-move.json
{"topics":
[{"topic":
"test2"}],
"version":1
}##为topic增加
partition数目kafka-add-partitions.sh
bin/kafka-add-partitions.sh
--topic
test
--partition
2--zookeeper
192.168.197.170:2181,192.168.197.171:2181
(为topic
test增加2个分区)
##控制台接收消息
bin/kafka-console-consumer.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--from-beginning
--topic
test
##控制台发送消息
bin/kafka-console-producer.sh
--broker-list
192.168.197.170:9092,192.168.197.171:
9092
--topic
test
##手动均衡topic,
kafka-preferred-replica-election.sh
bin/kafka-preferred-replica-election.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--path-to-json-file
preferred-click.json
cat
preferred-click.json
{"partitions":
[{"topic":
"click",
"partition":
0},
{"topic":
"click",
"partition":
1},
{"topic":
"click",
"partition":
2},
{"topic":
"click",
"partition":
3},
{"topic":
"click",
"partition":
4},
{"topic":
"click",
"partition":
5},
{"topic":
"click",
"partition":
6},
{"topic":
"click",
"partition":
7},
{"topic":
"play",
"partition":
0},
{"topic":
"play",
"partition":
1},
{"topic":
"play",
"partition":
2},
{"topic":
"play",
"partition":
3},
{"topic":
"play",
"partition":
4},
{"topic":
"play",
"partition":
5},
{"topic":
"play",
"partition":
6},
{"topic":
"play",
"partition":
7}
]}
##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
bin/kafka-run-class.sh
kafka.admin.deletetopiccommand
--topic
test666
--zookeeper
192.168.197.170:2181
,192.168.197.171:2181
Kafka中的索引机制
在kafka中,每个日志分段文件都对应了两个索引文件—— 偏移量索引文件和时间戳索引文件 (还有其它的诸如事务日志索引文件就不细表了),主要用来 提高查找消息的效率 。
偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。
每当写入一定量 (由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096,即 4KB) 的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes 的值,对应地可以缩小或增加索引项的密度。
稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的偏移量。
时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示。
每个索引项占用 8 个字节,分为两个部分:
(1) relativeOffset : 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值。
(2) ition : 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
消息的偏移量(offset)占用 8 个字节,也可以称为偏移量。
索引项中没有直接使用偏移量而改为只占用 4 个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间。
举个例子,一个日志分段的 baseOffset 为 32,那么其文件名就是 00000000000000000032.log,offset 为 35 的消息在索引文件中的 relativeOffset 的值为 35-32=3。
如果我们要查找偏移量为 23 的消息,那么应该怎么做呢?首先通过二分法在偏移量索引文件中找到不大于 23 的索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。
以上是简单的一种情况。参考上图,如果要查找偏移量为 268 的消息,那么应该怎么办呢?
首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset = 268 - 251 = 17,之后再在对应的索引文件中找到不大于 17 的索引项,后根据索引项中的 ition 定位到具体的日志分段文件位置开始查找目标消息。
那么又是如何查找 baseOffset 为 251 的日志分段的呢?
这里并不是顺序查找,而是用了跳跃表的结构。
Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。
在Kafka中要定位一条消息,那么首先根据 offset 从 ConcurrentSkipListMap 中来查找到到对应(baseOffset)日志分段的索引文件,然后读取偏移量索引索引文件,之后使用二分法在偏移量索引文件中找到不大于 offset - baseOffset z的索引项,接着再读取日志分段文件并且从日志分段文件中顺序查找relativeOffset对应的消息。
Kafka中通过offset查询消息内容的整个流程我们可以简化成下图:
Kafka中消息的offset可以类比成InnoDB中的主键,前者是通过offset检索出整条Record的数据,后者是通过主键检索出整条Record的数据。
InnoDB中通过主键查询数据内容的整个流程建议简化成下图(下半部分)。
Kafka中通过时间戳索引文件去检索消息的方式可以类比于InnoDB中的辅助索引的检索方式:
前者是通过timestamp去找offset,后者是通过索引去找主键,后面两者的过程就和上面的陈述相同。
Kafka中当有新的索引文件建立的时候ConcurrentSkipListMap才会更新,而不是每次有数据写入时就会更新,这块的维护量基本可以忽略
B+树中数据有插入、更新、删除的时候都需要更新索引,还会引来“页分裂”等相对耗时的作。Kafka中的索引文件也是顺序追加文件的作,和B+树比起来工作量要小很多。
说到底还是应用场景不同所决定的。MySQL中需要频繁地执行CRUD的作,CRUD是MySQL的主要工作内容,而为了支撑这个作需要使用维护量大很多的B+树去支撑。
Kafka中的消息一般都是顺序写入磁盘,再到从磁盘顺序读出(不深入探讨page cache等),他的主要工作内容就是:写入+读取,很少有检索查询的作
换句话说, 检索查询只是Kafka的一个辅助功能,不需要为了这个功能而去花费特别太的代价去维护一个高ll的索引。
前面也说过,Kafka中的这种方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
Flink是如何从kafka中拉取数据的
首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:
createFetcher方法
返回了一个 KafkaFetcher对象,我们点进去看一下
KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象
至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message
既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法
至此如何从kafka中拉取数据,已经介绍完了
kafka获取数据的几种方式
一、基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层出现了失败,也可以使用预写日志中的数据进行恢复。
如何进行Kafka数据源连接
1、在men添加依赖
2、scala代码
val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" -> "zookeeper1:2181","group.id" -> "spark-streaming-test","zookeeper.connection.timeout.ms" -> "1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLl.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParalleli = 1 // You'd probably pick a higher value than 1 in production.unifiedStream.repartition(sparkProcessingParalleli)}
需要注意的要点
1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLl.MEMORY_AND_DISK_SER。
二、基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
这种方式有如下优点:
1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被了两份,Kafka自己本身就有高可靠的机制,会对数据一份,而这里又会一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
scala连接代码
val topics = Set("teststreaming")val brokers = "" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val nts = kafkaStream.flatMap(line => {Some(line.toString())})
三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的方式。
[Kafka] kafka 协议分析 (一) 基础篇
kafka 协议分析 (一) 基础篇
kafka 协议分析 (二) Produce API
kafka 协议分析 (三) Fetch API
Kafka作为分布式消息组件被广泛使用,也提供了非常易用的各语言版本的client。由于系统的异构性,我们使用了不同语言的client生产和消费消息。实际使用的过程中,发现有生产了消息却消费不到等情况。由于client屏蔽了很多底层细节,所以通过网络抓包去分析数据才是解决问题的关键。要分析网络数据包就必须了解 KAFKA PROTOCOL 。(PS:本文是定你对Kafka的 基本概念 已经了解,特别是 topic,partition,broker 和 leader )
协议:为了实现一个目的,怎样 构建请求 以及如何 解析响应 。
要想分析和使用kafka协议,我们必须要事先弄清楚协议的基础理论。本文主要介绍这些基础概念,后面文章会陆续介绍kafka的常用api(主要介绍low ll api和high ll api)。
协议中的请求和响应都由下面3种类型的数据组成。
请求和响应的前4个字节都是一个表示请求体或响应体多大的数字:
注意:后面介绍中将不会再专门列出message_size域,默认在头部包含该域值
所有请求都具有相同格式的请求头
所有响应都具有相同格式的响应头
版权声明:本文内容由互联网用户自发贡献。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 836084111@qq.com,本站将立刻删除。
下一篇