224 Matching Annotations
  1. May 2025
    1. Seata会维护全局锁,如果一个全局事务需要进行本地提交,需要先获取全局锁。而对于select for update也会申请全局锁,用于保障事务隔离级别为read commit

  2. Apr 2025
    1. 可见,副本读取状态有截断中和获取中两个:当副本执行截断操作时,副本状态被设置成 Truncating;当副本被读取时,副本状态被设置成 Fetching。

      就是当前副本的状态,是否可以去Fetch,如果当前在截断中,就不能Fetch,而如果被delay了同理

    2. highWatermark

      各副本已经同步的消息offset, HW leader节点从所有的副本中获取LEO,取最小的LEO做HW,HW为所有已经成功写入全部副本的消息的最新位置。Follower的HW则是他的LEO和leader的HW的较小值

    1. 这是社区为了规避因多线程访问产生锁争用导致线程阻塞,从而引发请求超时问题而做的努力

      解决的问题

      当多线程同时执行该方法进行检查的时候,拿到锁的线程complete失败,而没拿到锁的线程直接跳过,那么如果没有其他线程再去处理的话,就永远也不会complete。

      如何解决?

      如果第一个线程成功将retry设置为false,那么第二个线程就会进行重试,而如果本身retry就是true,那么说明被其他线程先一步设置了,该线程不重试。但是如果多个线程顺序执行,都完成不了,那不是一样?

    2. bucket.flush(reinsert)

      相当于重新添加进时间轮,但是如果已经到了时间,那么就会执行。而且由于时间轮向后滚动,如果是最下面一级的bucket则所有元素都会过期,而上一级的bucket则会往下一级插入.

      由于DelayQueue的存在,时间复杂度和直接使用DelayQueue有什么不同?

      首先是以bucket为单位写入DelayQueue,bucket通过分层,使得每一层只有20个bucket,那么n层就只有n*20个bucket,大大减少了DelayQueue元素的数量,减小时间复杂度。

    1. triggerOnlineStateChangeForPartitions

      我的回答: 在Controller启动、Broker启动、Broker下线等状态下,为NewPartition、OfflinePartition选举Leader

    2. NewPartition 是未初始化状态,处于该状态下的分区尚不具备选举 Leader 的资格。

      有误导性,对于NewPartition可以转变成OnlinePartition,在这个过程中进行初始化

    3. 选择存活副本列表的第一个副本作为 Leader;选择存活副本列表作为 ISR

      NewPartition状态的分区,进行初始化(因为是初始化,所以数据都为空),会将存活副本设置为ISR,而将ISR第一个设置为Leader

    4. 应该可以看到名为 Replicas 的一列数据

      Topic: xxc TopicId: HPo2q0gHRSC261VzemHlcw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: xxc Partition: 0 Leader: 1 Replicas: 1 Isr: 1

    5. 实际上消息队列都是以分区为单位组织的。而一个topic拥有多个分区,客户端可以根据一定的规则(Partitioner)往这多个分区中写入数据,而消费者则通过一定规则或者随机分配到部分partition,从partition中顺序进行消费

    1. 代码会给所有符合状态转换的副本所在的 Broker,发送 StopReplicaRequest 请求,显式地告诉这些 Broker 停掉其上的对应副本。Kafka 的副本管理器组件(ReplicaManager)负责处理这个逻辑。后面我们会用两节课的时间专门讨论 ReplicaManager 的实现,这里你只需要了解,StopReplica 请求被发送出去之后,这些 Broker 上对应的副本就停止工作了。

      Kafka如何保证请求发送后能够按预期执行?

      通过重试兜底保证最终一致性?同时如果主从切换的话,会进行检查然后重新进行状态同步

    2. Controller给Broker发送请求是否需要保证Broker变更成功

      不需要,Controller发送请求后会异步的等待broker心跳中包含的响应。同时由于MQ本身的设计能够进行容错,即旧的状态如果不对,那么会进行重试,或者刷新缓存元数据

    3. 源码会获取对应分区的详细数据,然后向该副本对象所在的 Broker 发送 LeaderAndIsrRequest 请求,令其同步获知,并保存该分区数据。

      为什么不是NewReplica状态就可以直接让其他broker同步?

      因为最前面判定了validReplicas,所以这些状态的变更都是允许的

    1. 但我这样说你可能会觉得,inplace跟Online是不是就是一个意思?其实不是的,只是在重建表这个逻辑中刚好是这样而已。

      具体看是否支持Online DDL

    1. 他的主机磁盘用的是SSD,但是innodb_io_capacity的值设置的是300。于是,InnoDB认为这个系统的能力就这么差,所以刷脏页刷得特别慢,甚至比脏页生成的速度还慢,这样就造成了脏页累积,影响了查询和更新性能

      错误的io能力设置,导致mysql错估磁盘io,影响脏页的刷新效率

    1. 如何解决选错索引?

      无法从根上解决。 缓解的方法(仍然可能选错索引): - analyze table更新统计信息 - force index(索引可能变更,不灵活) - 删除错误的索引(很多时候,其他业务需要用到该索引) - 通过修改sql引导mysql优化器,让其觉得错的索引成本高(不通用)

    1. redo log 主要节省的是随机写磁盘的IO消耗(转成顺序写),而change buffer主要节省的则是随机读磁盘的IO消耗。

      可是实际上是对随机写攒批呀。如果没有change buffer则会在页缺失的时候加载,当然change buffer延迟了加载的时间,只在需要的时候加载,但是如何保证数据存在呢?比如更新的时候需要返回影响行数的,那是否只能用于没有唯一索引的表插入?

    1. 索引是如何实现mvcc呢?

      索引没有undolog,因此索引需要回表,通过回表得到数据的undolog,然后判定索引条目是否有效。但是如果索引本身不带事务id,那么如何知道其属于哪一个版本呢?如果每个索引条目都需要回表才能确定是否有效,那么全索引扫描不就是全表扫描了吗?如果主键变更索引又要如何处理?

      https://dev.mysql.com/doc/refman/8.0/en/innodb-multi-versioning.html?open_in_browser=true

      似乎是,二级索引的数据也是一样不删除,而是标记为删除,但是仍然存留在二级索引的b+树上,每个索引页会记录最近更新的事务id。而如何确定数据是否有效呢,回表,回表,回表,重要的事情说三遍,只有回表访问版本链才能知道readview是否能够看到这条记录的这个版本

      If a secondary index record is marked for deletion or the secondary index page is updated by a newer transaction, the covering index technique is not used. Instead of returning values from the index structure, InnoDB looks up the record in the clustered index.

    1. 大量插入数据时,Auto-INC锁会阻塞其他事务的插入,如何解决?

      使得innodb_autoinc_lock_mode设置为1或者2,默认0,使得在插入时申请到自增值后立即释放auto-inc锁。在不同值下的行为: - 若是1,则insert ... select from这种仍然会执行完成后释放锁 - 若是2,则获取到了自增值就立即释放,但会引起主从不一致,因此需要将binlog设置为mixed或者row格式。

    2. 如果更新的时候走的全表扫描,则会锁住扫描的所有数据,如何避免?

      • 删除时查询出来通过id删除
      • 开启sql_safe_updates,其要求sql能够击中索引或者通过limit限制了更新的行数
    3. 死锁检测以及相应开销

      由于事务之间可能产生死锁,因此要么设置最大等待超时时间,要么设置死锁检测。

      死锁检测

      1. 维护等待图(Wait-for Graph)

      InnoDB 内部维护一个有向图,图中的节点表示事务(Transaction),边表示锁的等待关系。

      例如:事务A持有行X的锁,事务B请求行X的锁并被阻塞,则图中有一条边从B指向A(B→A,表示B在等待A释放锁)。

      当某个事务请求锁时,如果锁已被其他事务持有,InnoDB 会更新等待图,添加一条新的边。

      2. 检测环(Cycle)

      每次有事务请求锁失败(进入等待状态)时,InnoDB 会触发死锁检测。

      深度优先搜索(DFS):InnoDB 通过DFS遍历等待图,检查是否存在环。如果发现环,则判定为死锁。

      优化:为了减少性能开销,InnoDB 不会每次都全图遍历,而是从新加入的边出发,仅检查可能形成环的路径。

      3. 选择牺牲者(Victim)

      如果检测到死锁,InnoDB 会选择一个事务作为牺牲者(通常选择回滚成本更低的事务,例如修改数据量较小的事务),强制回滚该事务,并释放其持有的锁。

      回滚后,等待图中对应的边被移除,其他被阻塞的事务可以继续执行。

      死锁的开销

      问题

      因此由于事务的不断加入,图会变得越来越大,进行环检测是O(n)的操作,若n很大则cpu大量消耗。

      解决

      可以通过中间件对操作相同key的事务限流,这样事务虽然也在等待,但是没有增加死锁检测的负担。

    1. 如果有不合适的,为什么,更好的方法是什么?

      重建主键

      删除主键相当于重建整个表,mysql内部会使用一个隐式的自增字段存储记录。而创建主键则再次进行更改,将其从隐式转成显示的字段。因此如果需要重建主键索引则可以直接使用alter table T engine=InnoDB。

      删除自增主键

      如果要删除自增主键,那么主键列必须将自增删除掉,因为隐式的主键也是自增的,一个表只能有一个自增字段

    2. 显然,主键长度越小,普通索引的叶子节点就越小,普通索引占用的空间也就越小。

      尽量使用自增主键,能够避免页分裂,同时减少普通索引叶子节点的占用空间

    1. 在一天一备的模式里,最坏情况下需要应用一天的binlog。比如,你每天0点做一次全量备份,而要恢复出一个到昨天晚上23点的备份。一周一备最坏情况就要应用一周的binlog了。

      物理备份(热备)

      Percona XtraBackup 8.0.30-23引入了--register-redo-log-consumer参数。--register-redo-log-consumer参数默认为禁用状态。当启用时,此参数允许Percona XtraBackup在备份开始时注册为一个重做日志消费者。服务器不会移除Percona XtraBackup(消费者)尚未复制的重做日志。消费者读取重做日志并手动提升日志序列号(LSN)。在此过程中,服务器阻止写入操作。基于重做日志的消耗,服务器决定何时可以清理日志。

      为什么需要定期全量备份?

      因为备库会很快同步主库的数据,如果发生误删数据,备库无法保证数据的恢复。而全量备份则可以留存定期的快照,当出现问题时能够回滚

    2. 我给你留一个问题吧。你现在知道了系统里面应该避免长事务,如果你是业务开发负责人同时也是数据库负责人,你会有什么方案来避免出现或者处理这种情况呢?

      预防和发现

      1. 通过innodb_schema追踪innodb的事务状态,发现长事务,予以告警
      2. 连接初始时需要显示的设置autocommit或者显式启动连接,关闭或归还连接池的时候应显式重置连接(目前spring、mybatis会自动做)
    3. 就是当系统里没有比这个回滚日志更早的read-view的时候。

      由于当前系统中没有比undolog更早的readview才能删除undolog。因此应当避免长事务来防止undolog大量占用磁盘和buffer pool。

    4. 在MySQL中,实际上每条记录在更新的时候都会同时记录一条回滚操作。记录上的最新值,通过回滚操作,都可以得到前一个状态的值。

      undolog使得记录能够往回回滚,使得mvcc读取到历史版本的数据

    1. redolog是innodb引入保证crash-safe的。而binlog则是mysql的归档日志,可以提供备份、从节点复制等功能。为了保证redolog与binlog的一致性,通常需要两阶段提交,从而防止mysql的数据与binlog数据不一致。主要在于需要保证事务的redolog状态至少为prepare状态,且binlog完整(如果当前redolog处于提交状态,则事务直接是有效的)

    2. redolog是循环写的,只需要读取需要修改的数据(页)然后根据页的状态生成物理修改日志。使用redolog则无需直接去随机写修改磁盘页,而转变为了“顺序写redolog”

    3. 正常情况下的WAL或者说日志基本都是逻辑日志,而在mysql中redolog是物理日志,mysql如何知道要将数据写到物理磁盘上的那一个位置呢? Buffer Pool中为物理页的映射,由此知道如何在物理上进行数据的修改。

    1. 对于有索引的表,执行的逻辑也差不多。第一次调用的是“取满足条件的第一行”这个接口,之后循环取“满足条件的下一行”这个接口,这些接口都是引擎中已经定义好的。你会在数据库的慢查询日志中看到一个rows_examined的字段,表示这个语句执行过程中扫描了多少行。这个值就是在执行器每次调用引擎获取数据行的时候累加的

      row_examined表示在执行器扫描的数据行数,不代表执行引擎扫描的数据行数

    2. 有些时候MySQL占用内存涨得特别快,这是因为MySQL在执行过程中临时使用的内存是管理在连接对象里面的

      连接使用的一些内存资源只有在连接断开时释放。因此对于大查询或者长期使用的连接可以定期释放或者在5.7+执行mysql_reset_connection

    1. 日志是如何清理的呢?

      1. deletion策略,正常情况下,根据日志的配置,自动进行最大留存时间、最大留存大小的清理,以segment为单位,具体看kafka-log-retention任务
      2. compaction策略,topic可以设置compaction策略为compaction、delete + compaction,这样的话LogCleaner会定时进行compaction。优化策略,segments氛围cleaned和uncleaned,从uncleaned中遍历消息(在逻辑中可以根据需求读取需要的大小以节省内存)构建offsetMap来存储这一段中最新的key的offset,而从头遍历segments,清理掉offsetMap中已有的已有的消息(也就是最新消息)
    2. (validBytes - lastIndexEntry > indexIntervalBytes)

      recover过程中会重置索引,然后遍历每个batch来添加索引项,这样就能避免掉原本写入消息时,一次性写入的多batch的消息最多只写入一次索引导致查询效率低的问题

    3. 因为目前 Broker 端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘 I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。

      配置topic的segment.jitter.ms或者segment.ms让segment根据时间自动滚动时错峰,避免io压力过大

      即使segment的数据量不大,超过固定时间(默认一周)没写入数据,也会滚动segment,那如果很多segment都没写入的话,那么就会同时创建segment导致大量的io占用 rollJitterMs则是一个随机的扰动值,来源于{@link LogConfig#randomSegmentJitter()}, 通过这个值可以将segment滚动的时间错开,缓解物理磁盘的 I/O 负载瓶颈

    4. indexIntervalBytes 值其实就是 Broker 端参数 log.index.interval.bytes 值,它控制了日志段对象新增索引项的频率

      indexIntervalBytes控制时间和offset索引写入的频率,减少日志大小

    5. 每个日志段都有一个起始位移值(Base Offset),而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大

      segment的区间是左闭右开

  3. Mar 2025
    1. 收缩是指,把 ISR 副本集合中那些与 Leader 差距过大的副本移除的过程。所谓的差距过大,就是 ISR 中 Follower 副本滞后 Leader 副本的时间,超过了 Broker 端参数 replica.lag.time.max.ms 值的 1.5 倍。

      由于定时检查是配置的一半时间,因此lag超过1.5倍配置的isr仍然可能存在

    1. // 3. 已累积到足够多的数据

      如果没有累积到一定数据,那么这部分数据被读取后就被丢弃了,这就比较浪费,能不能先计算一下是否能够到达阈值呢

    1. 那为什么 Log Start Offset 值也可能发生变化呢?这是因为 Leader 的 Log Start Offset 可能发生变化,比如用户手动执行了删除消息的操作等。Follower 副本的日志需要和 Leader 保持严格的一致,因此,如果 Leader 的该值发生变化,Follower 自然也要发生变化,以保持一致。

      用户还可能手动删除消息?这种情况kafka也考虑了。。。

    2. 由于 Leader Epoch 机制属于比较高阶的知识内容,这里我们的重点是理解高水位值在截断操作中的应用,我就不再和你详细讲解 Leader Epoch 机制了。如果你希望深入理解这个机制,你可以研读一下 LeaderEpochFileCache 类的源码。

      TODO

    3. 但是为什么 AbstractFetcherThread 线程总要不断尝试去做截断呢?这是因为,分区的 Leader 可能会随时发生变化。每当有新 Leader 产生时,Follower 副本就必须主动执行截断操作,将自己的本地日志裁剪成与 Leader 一模一样的消息序列,甚至,Leader 副本本身也需要执行截断操作,将 LEO 调整到分区高水位处。

      当leader变更时,follower需要truncate自己的日志来保证与leader一致,而leader切换的时候,自身也需要truncate到HW来保持数据的一致性,但是这样会丢消息,当然如果没有到HW,那么需要所有节点同步的消息应该收不到回复

    1. 请简单描述一下 handlePartitionsWithErrors 方法的实现原理

      对于出错的partition,将其丢到LinkedHashMap的尾部,并且更新PartitionFetchState的delay信息,等到delay的时间再重新fetch

    2. private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();

      同步数据是通过一个线程批量从一个broker获取的,如果多个partiton的leader都在一个broker上,会一起拉取

  4. Feb 2025
    1. test_snapshot_20233

      看起来似乎是application id为空字符,也就是没拿到application id,然后尝试kill,获取job的application id是通过bash命令获取的,那意味着可能job会执行完毕

  5. Jan 2025
    1. 许多人都是刚刚听到别人要求做的一个功能,就开始脑补接下来的一切。导致的结果,就是付出的努力毫无意义。

      太形象了

    1. 如果一个 RDD 的一个分区,只会影响到下游的一个节点,那么我们就称这样的上下游依赖关系为窄依赖。而如果一个 RDD 的一个分区,会影响到下游的多个节点,那么我们就称这样的上下游关系为宽依赖。

      如果宽依赖前面的节点数据丢失或者故障,那么后面所有执行都会受到影响

    1. 则是叶子节点本身不负责存储,而是采用一个共享的存储层,比如 GFS。Dremel 从 2009 年开始,就逐步把存储层全部迁移到了 GFS 上。

      存算分离,动态调度

    1. 由于数据格式可能存在嵌套和 repeat,基于列存,那么某行的该列的值是哪些很难判定,因此通过 dremel 的 repetition level 和 definition level 的方式进行编码后就能重组数据

    2. 对于很多取值为 NULL 的字段来说,我们并不知道它为空,是因为自己作为一个 Optional 字段,所以没有值,还是由于上一层的 Optional 字段的整个对象为空。

      当前字段本身可能是 null,也可能由于上一级是 null 导致当前字段是 null,所以需要 definition level

    3. 而这个解决方案我们之前也已经用过了,那就是像 Bigtable 一样对数据进行分区。只要行键在同一区间的列存储的数据,存储在相同服务器的硬盘上,我们也就不需要通过网络传输,来把基于列存储的数据,组装成一行行的数据了

      同一区间的行的多个列存放在同一个服务器上

    4. 更合理的解决方案是行列混合存储。因为,我们需要把一批行相同的数据,放在同一个服务器上。

      可能希望一行的一些列存在一起,避免网络传输

    5. 而在这种大量、小文件的场景下,是发挥不出 MapReduce 进行顺序文件读写的吞吐量的优势的。

      拆分多个文件之后,就不再是顺序读取了

    1. 数据存储里,Hive 的数据是直接放在 HDFS 上,通过 HDFS 上不同的目录和文件来做到分区和分桶。分区和分桶,使得 Hive 下很多的 MapReduce 任务可以减少需要扫描的数据,提升了整个系统的性能。

      分区可以减少数据的扫描量,而分桶则能够便于进行采样,以及数据负载均衡 https://www.51cto.com/article/753462.html

    1. 科研人员注重在特定领域的深入了解,追求深入,想要寻求新的想法、突破,找到新的研究方向。而作为工程师,更多是学习理解,并和现有的工程系统加以联系,看看是否可以把整个领域最新的进展和自己的工作结合起来。

      看论文的关注点

    1. **如果raft 的客户端网络波动没收到成功响应而重试岂不是会多次执行操作? **

      对于这个问题,Raft协议中有一种机制管理这种网络不稳定的情况。

      在Raft协议中,客户端请求是通过集群的领导者节点来处理的。当一个客户端提交一个请求,领导者节点将这个请求作为一个新的日志条目添加到自己的日志中。然后领导者向其他节点复制这个条目。当这个条目被大多数节点保存到日志中之后,然后领导者就执行这个请求并向客户端返回结果。

      为了解决客户端因网络问题接收不到结果而重试的问题,Raft设计了client interaction的机制。也就是说,客户端在发送请求时会附加一个唯一的请求ID。这样,即使一个请求由于网络问题重复发送,由于请求ID相同,领导者能识别出这个重复的请求并且忽略它。

      在这个机制下,即使客户端因为网络问题重试请求,由于有请求ID作为标识,整个操作仍然只会执行一次。另外,即使返回结果因网络问题没有送达客户端,领导者也能根据请求ID重新发送应答而不会重复执行操作。

      因此,虽然在网络不稳定的情况下客户端可能会重试,但是通过Raft协议的设计,这个操作不会被重复执行多次。

    2. accept 阶段之后,并没有所有节点都同步数据,那么这些数据将在什么时候进行同步呢?如果新的请求到来,那么有旧数据的节点能否接受请求呢?<br /> 似乎数据持久化是在其他节点上,比如 learner 节点上,当获得了大多数的 accept 就相当于拿到了整个集群锁,然后往 learner 中写数据

    1. 在最后的提交执行阶段,三阶段提交为了提升系统的可用性也做了一点小小的改造。在进入最后的提交执行阶段的时候,如果参与者等待协调者超时了,那么参与者不会一直在那里死等,而是会把已经答应的事务执行完成。

      不确定是否是正确的,正确的做法不应该是 rollback 吗?

    2. 两阶段提交也没法保证一致性,一旦在决定提交事务时部分节点宕机那么只有一部分节点成功 commit,就出现数据的不一致

    3. 三阶段提交相比两阶段提交会先检查各节点资源是否可用<br /> 同时在提交执行阶段,如果参与者等待协调者超时了,那么参与者会继续完成事务的执行 (回滚或者提交),因此不同节点数据会存在不一致<br /> 更多请看这里

    1. 所以,TCompactProtocol 对于所有的整数类型的值,都采用了可变长数值(VQL,Variable-length quantity)的表示方式,通过 1~N 个 byte 来表示整数。

      变长编码存储数据

    2. 而且通常两个字段的编号是连续的。所以这个协议在存储编号的时候,存储的不是编号的值,而是存储编号和上一个编号的差。

      通过存储编号间的差值Delta Encoding,减少存储编号需要的空间。只需要一个字节即可存储编号和类型

    3. 我们可以废弃字段,并且这些废弃的字段不会占用存储空间。我们会随着需求变更不断新加字段,数十乃至上百个字段的 struct 在真实的工程场景中是非常正常的。而在这个过程中,很多字段都会被逐步废弃不再使用。如果这些字段仍然要占用存储空间的话,也会是一大浪费。

      如果需要废弃字段,那么需要保证不再需要向前兼容了,当前所有的相关程序都已经切换到了不需要这个字段的版本

    1. 第二个地方,是在我们读取数据的时候。在读取数据的时候,我们其实是读取 MemTable 加上多个 SSTable 文件合并在一起的一个视图。也就是说,我们从 MemTable 和所有的 SSTable 中,拿到了对应的行键的数据之后,会在内存中合并数据,并根据时间戳或者墓碑标记,来对数据进行“修改”和“删除”,并将数据返回给到客户端。

      是否可以理解为内存中是最新的数据,而磁盘上的日志则是整个修改过程,如果用户需要获取历史版本,那么就从磁盘数据上恢复得到

    2. 最近被读取到的数据,会存放到缓存(Cache)里,而不存在的行键,也会以一个在内存里的布隆过滤器(BloomFilter)进行快速过滤,尽一切可能减少真正需要随机访问硬盘的次数

      布隆过滤器无法支持删除操作,同时每次启动都需要加载,或者需要持久化

    3. 因为 Bigtable 支持单行事务,所以这两个修改,要么都完成了,要么都没有完成,不会存在一个成功,一个失败的情况。

      同一行的数据连续存储,所以单行事务还是好实现的,并没有分布式问题

    1. 对于列族,更合理的解读是,它是一张“物理表”,同一个列族下的数据会在物理上存储在一起。而整个表,是一张“逻辑表”

      由于一张表会存在大量的列,但是可能会是不同的业务的数据,因此通过列族进行标识切分。<br /> 不同的列族物理上也是连续的,只是是否是一个整体而已,比如压缩缓存的时候。同一列族的数据连续存储

    2. 在Google Cloud Bigtable中,列族有以下几个主要作用:

      数据组织:列族被用来组织、分组一系列有相关性的列。这样的结构不仅提供了更高效的查询方式,也使得对于业务上有关联的数据能够更好地进行管理。 数据存储和压缩优化:同一列族中的数据在物理存储上相互靠近,这样可以确保相同类型的数据一起被读取、写入或者压缩,从而提高效率。 读写性能:通过将相关的数据存储在同一个列族中,可以提高读写性能,因为只需要访问一个列族而不是多个列。 安全性和权限控制:列族能够为权限管理提供边界,比如可以对不同的列族设置不同的访问权限。 灵活性和扩展性:Bigtable的列族模型允许动态地向每一行添加或删除列,使得Bigtable非常适应变化多端的需求。 垃圾回收:在Bigtable中,基于时间的垃圾回收策略是按列族来设定的,可以指定保存特定列族中某一时间段之内的数据版本。 总的来说,列族在Bigtable中发挥着重要的角色,它在数据管理、性能优化、安全控制以及灵活性等多个方面都起着关键的作用。

    3. Each random read involves the transfer of a 64 KB SSTable block over the network from GFS to a tablet server, out of which only a single 1000-byte value is used…… Random and sequential writes perform better than random reads since each tablet server appends all incoming writes to a single commit log and uses group commit to stream these writes efficiently to GFS.

      随机写和顺序写的性能旗鼓相当,不仅在单台tablet server上表现类似,在tablet server集群上表现也几乎一样。原因是它们都会先写commit log,同步修改内存中的数据,异步修改SSTable中的数据。随机写和顺序写的差异在于SSTable的变更,由于这个操作被异步执行,所以它们的性能没有差异。 如果SSTable的不缓存在tablet server上,随机读的性能非常差,比写入操作几乎慢一个数量级。原因是不管要实际需要多小的数据,都需要从GFS上加载64k的SSTable块数据。它的瓶颈不在于tablet server的多少,而在于从GFS上读取数据。

    4. 而且 Master 可以根据每个 Tablet Server 的负载进行动态的调度,也就是 Master 还能起到负载均衡(load balance)的作用

      Master 负载均衡,动态重分区

    5. Bigtable 里,数据存储和在线服务的职责是完全分离的。我们调度 Tablet 的时候,只是调度在线服务的负载,并不需要把数据也一并搬运走。

      存算分离的鼻祖

    6. 在 Bigtable 里,我们就采用了另外一种分区方式,也就是动态区间分区。我们不再是一开始就定义好需要多少个机器,应该怎么分区,而是采用了一种自动去“分裂”(split)的方式来动态地进行分区。

      类似于一致性 hash ,只是这里是直接对按 key 排好序的数据进行均匀切分,如果插入或删除导致不均衡则进行合并或分裂

    7. 架构师的工作不是作出决策,而是尽可能久地推迟决策,在现在不作出重大决策的情况下构建程序,以便以后有足够信息时再作出决策

      如果前期不合理考虑,岂不是给后面增加技术债?

    1. 既然只是对访问次数计数,我们自然就可以通过一个 Combiner,把 1 万条相同域名的访问记录做个化简。把它们变成 Key 还是域名,Value 就是有多少次访问的数值这样的记录就好了。而这样一化简,reduce 所在的 worker 需要抓取的数据,就从 1 万条变成了 1 条。

      map 函数并不能减少数据量,因此通过 combiner 可以在同一个 map 节点进行 reduce 合并数据来减少传输数据

    1. 5400 转,或者 7200 转的机械硬盘。如果你读过我的《深入浅出计算机组成原理》,你一定还记得机械硬盘的 IOPS 也就是在 70 左右

      5400转或者 7200 转的硬盘只有 70iops

    2. 而对于数据可能重复写入多次的问题,你也可以对每一条要写入的数据生成一个唯一的 ID,并且在里面带上当时的时间戳

      追加写时,在写入数据的时候带上时间戳,数据最终的顺序按照时间戳排序去重能够一定程度上保证数据的正确性

    3. GFS 的客户端里面自带了对写入的数据去添加校验和(checksum),并在读取的时候计算来验证数据完整性的功能。

      数据写入可能存在部分写入的脏数据,通过校验和排除

    4. 这个“至少一次”的机制,其实很适合 Google 的应用场景。你想像一下,如果你是一个搜索引擎,不断抓取网页然后存到 GFS 上。其实你并不会太在意这个网页信息是不是被重复存了两次,你也不太会在意不同的两个网页存储的顺序

      gfs 的至少一次保证并不适用于常规应用,只是对于谷歌搜索引擎的业务比较适合

    5. 第二种因素是随机的数据写入极有可能要跨越多个 chunk。

      考虑两个客户端 A、B往一个 hdfs 文件上随机写入,分别写 chunk1 和 chunk2,但 chunk1 主副本定序为A、B,而 chunk2 的主副本定序为 B、A,那么 chunk1 中的数据是 B 覆盖 A,而 chunk2 中是 A 覆盖 B,因此不能得到一份完整的 A或者 B

    1. 独特的 Snapshot 操作

      当需要复制一份数据进行追加写的时候性能好,数据复制不需要经过网络,而是通过控制指令直接在 chunserver 上进行

    2. 等到所有次副本都接收完数据后,客户端就会发送一个写请求给到主副本。我在上节课一开始就说过,GFS 面对的是几百个并发的客户端,所以主副本可能会收到很多个客户端的写入请求。主副本自己会给这些请求排一个顺序,确保所有的数据写入是有一个固定顺序的。接下来,主副本就开始按照这个顺序,把刚才 LRU 的缓冲区里的数据写到实际的 chunk 里去。

      对于同一个 chunk 来说,主、次副本是固定的,当多个客户端并发写chunk 时,需要主副本确定写入的顺序,从而保持各个副本之间的一致性

    1. 首先是 Storm 的作者南森·马茨(Nathan Marz)的“Big Data”,现在也有中译本叫做《大数据系统构建》。对于人为错误的容错问题的思考,为我们带来了著名的 Lambda 架构。在我看来,即使到今天 Lambda 架构也并不过时。其次是俗称 DDIA 的这本《数据密集型应用系统设计》,这本书梳理了整个大数据领域的核心技术脉络,是一本非常合适的架构入门书。第三本是专注于流式处理的《Streaming System》,不过目前还没有中译本上市。如果你更喜欢通过视频课程学习,那么去看一看来自 MIT 的课程 6.824 的 Distributed System 绝对错不了。我在这里放上了Youtube和B 站的视频链接。最后是一份很容易被人忽视的资料,就是 2009 年 Jeff Dean 在 Cornell 大学的一个讲座“Designs, Lessons and Advice from Building Large Distributed Systems”的 PPT,我也推荐你去看一看,对于理解大数据系统的真实应用场景很有帮助。

      论文资料

    2. 多做交叉阅读和扩展阅读。论文本身往往只有 10 来页,非常精炼,对于很多知识点,往往就只有一个小片段,甚至只有一两句话,所以交叉阅读和扩展阅读少不了。根据你需要深入了解的知识点,你可能要回顾之前已经解读过的论文,也可能需要去阅读一些开源项目的代码,或者是一些计算机经典书籍中相关的章节,帮你彻底理解对应的问题

      扩展阅读和交叉阅读

    3. 一篇篇的大数据论文,并不是教科书里的一个章节或者一个知识点,而是对于一个重要的系统问题的解决方案。在读论文之前,先尝试自己去思考和解决对应的问题,有助于你更深刻地理解问题和解决方案的重点。

      思考自己如何做

  6. Dec 2024
    1. 倾斜分区判定完毕之后,下一步,就是根据 advisoryPartitionSizeInBytes 参数指定的目标尺寸,对大分区进行拆分

      将倾斜分区拆分成多个分区

    2. Spark SQL 必须要仰仗运行时的执行状态,而 Shuffle 中间文件,则是这些状态的唯一来源。

      AQE需要获取数据的统计信息来决定是否将数据进行广播

    1. Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。对于已经排好序的两张表,SMJ 的复杂度是 O(M + N),这样的执行效率与 HJ 的 O(M) 可以说是不相上下。再者,SMJ 在执行稳定性方面,远胜于 HJ,在内存受限的情况下,SMJ 可以充分利用磁盘来顺利地完成关联计算。因此,考虑到 Shuffle SMJ 的诸多优势,Shuffle HJ 就像是关公后面的周仓,Spark SQL 向来对之视而不见,所以对于 HJ 你大概知道它的作用就行。

      由于 Shuffle 会对分区内的数据进行排序,因此 SMJ性能和 HJ 一样,那 Shuffle HJ 就没有优势了

    2. 在任何情况下,不论数据的体量是大是小、不管内存是否足够,Shuffle Join 在功能上都能够“不辱使命”,成功地完成数据关联的计算。

      那如果是非等值条件关联查询呢?

    3. 与前两者相比,Nested Loop Join 看上去有些多余,嵌套的双层 for 循环带来的计算复杂度最高:O(M * N)。不过,尺有所短寸有所长,执行高效的 HJ 和 SMJ 只能用于等值关联,也就是说关联条件必须是等式,像 salaries(“id”) < employees(“id”) 这样的关联条件,HJ 和 SMJ 是无能为力的

      非等值的连表关联还得使用 NLJ

    1. 考虑到 SMJ 对于排序的苛刻要求,后来又有人推出了 HJ 算法。HJ 的设计初衷是以空间换时间,力图将基表扫描的计算复杂度降低至 O(1)。

      使用 Hash算法做关联似乎在大数据领域比较好用

    1. 我们知道,使用 Java Object 来存储数据会引入大量额外的存储开销。为此,Tungsten 设计并实现了一种叫做 Unsafe Row 的二进制数据结构。Unsafe Row 本质上是字节数组,它以极其紧凑的格式来存储 DataFrame 的每一条数据记录,大幅削减存储开销,从而提升数据的存储与访问效率。

      使用 Java对象会存在大量的存储开销,因此使用本地内存加 byte数组进行存储

    2. 而使用 DataFrame API 开发的应用,则会先过一遍 Spark SQL,由 Spark SQL 优化过后再交由 Spark Core 去做执行

      DataFrame 使用的 Spark SQL 进行解析执行,因此其也存在逻辑计划、物理计划

    1. Spark 存储系统负责维护所有暂存在内存与磁盘中的数据,这些数据包括 Shuffle 中间文件、RDD Cache 以及广播变量。

      spark存储服务的对象

    1. Executor是进行干活的工人,而task是spark计算的最小粒度,工人不断地执行task来完成每一阶段的task然后转向下一阶段,直到所有的阶段执行完毕

    2. 如果使用广播变量,那么数据只会在driver和executor上都存一份,task在executor上执行,如果使用到广播变量则直接从executor上获取即可

    1. coalesce 会降低同一个 stage 计算的并行度,导致 cpu 利用率不高,任务执行时间变长。我们目前有一个实现是需要将最终的结果写成单个 avro 文件,前面的转换过程可能是各种各样的,我们在最后阶段加上 repartition(1).write().format('avro').mode('overwrite').save('path')。最近发现有时前面的转换过程中有排序时,使用 repartition(1) 有时写得单文件顺序不对,使用 coalesce(1) 顺序是对的,但 coalesce(1) 有性能问题。目前想到可以 collect 到 d

      https://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce

    2. collect 算子有两处性能隐患,一个是拉取数据过程中引入的网络开销,另一个 Driver 的 OOM(内存溢出,Out of Memory)

      收集数据会导致Driver的内存占用

    3. 其中 spark.executor.memory 是绝对值,它指定了 Executor 进程的 JVM Heap 总大小。另外两个配置项,spark.memory.fraction 和 spark.memory.storageFraction 都是比例值,它们指定了划定不同区域的空间占比。spark.memory.fraction 用于标记 Spark 处理分布式数据集的内存总大小,这部分内存包括 Execution Memory 和 Storage Memory 两部分,也就是图中绿色的矩形区域。(M – 300)* (1 – mf)刚好就是 User Memory 的区域大小,也就是图中蓝色区域的部分。spark.memory.storageFraction 则用来进一步区分 Execution Memory 和 Storage Memory 的初始大小。我们之前说过,Reserved Memory 固定为 300MB。(M – 300)* mf * sf 是 Storage Memory 的初始大小,相应地,(M – 300)* mf * (1 – sf)就是 Execution Memory 的初始大小。

      内存划分配置,Reserved Memory为300M,总内存量为M,剩余内存M - 300,然后划分为User Memory(1 - mf)和共享内存[Execution Memory+ Storage Memory(mf * sf)]

    4. 当一个 RDD 在代码中的引用次数大于 1 时,你可以考虑通过给 RDD 加 Cache 来提升作业性能

      Cache并不是自动的,而是人手动添加的,将RDD物化到内存中

    5. 对于 Storage Memory 抢占的 Execution Memory 部分,当分布式任务有计算需要时,Storage Memory 必须立即归还抢占的内存,涉及的缓存数据要么落盘、要么清除;对于 Execution Memory 抢占的 Storage Memory 部分,即便 Storage Memory 有收回内存的需要,也必须要等到分布式任务执行完毕才能释放。

      Execution Memory在共享内存中的优先级更高,如果Storage Memory抢占了EM的内存且EM的内存需要则必须归还

    6. 在 1.6 版本之后,Spark 推出了统一内存管理模式,在这种模式下,Execution Memory 和 Storage Memory 之间可以相互转化

      内存共享

    7. aggregateByKey

      在Map端聚合后,在Reduce端基于Map端的聚合结果再进行聚合,从而实现在Map端减少传输的数据量,而在Reduce端又能实现目的

    8. 从图中你可以看出来,尽管 reduceByKey 也会引入 Shuffle,但相比 groupByKey 以全量原始数据记录的方式消耗磁盘与网络,reduceByKey 在落盘与分发之前,会先在 Shuffle 的 Map 阶段做初步的聚合计算。比如,在数据分区 0 的处理中,在 Map 阶段,reduceByKey 把 Key 同为 Streaming 的两条数据记录聚合为一条,聚合逻辑就是由函数 f 定义的、取两者之间 Value 较大的数据记录,这个过程我们称之为“Map 端聚合”。相应地,数据经由网络分发之后,在 Reduce 阶段完成的计算,我们称之为“Reduce 端聚合”。

      Map端聚合能够减少数据的分发数量(原本需要传输原数据,而聚合后则可以将多条数据合并为聚合数据)

    9. 对于所有 Map Task 生成的中间文件,Reduce Task 需要通过网络从不同节点的硬盘中下载并拉取属于自己的数据内容。不同的 Reduce Task 正是根据 index 文件中的起始索引来确定哪些数据内容是“属于自己的”。Reduce 阶段不同于 Reduce Task 拉取数据的过程,往往也被叫做 Shuffle Read。

      shuffle过程并不是Map阶段的节点将数据发送给Reduce阶段的节点,而是Reduce节点主动去Map阶段的节点上拉取数据,根据index文件来获取自己需要的数据

    10. 当 Map 结构被灌满之后,Spark 根据主键对 Map 中的数据记录做排序,然后把所有内容溢出到磁盘中的临时文件

      基于溢出机制,在受限内存中处理大量数据,将处理过程中的部分结果进行排序写入临时文件,最终将所有文件进行归并排序得到任务最终的data文件和index文件

    11. Map 阶段与 Reduce 阶段的计算过程相对清晰明了,二者都是利用 reduce 运算完成局部聚合与全局聚合。在 reduceByKey 的计算过程中,Shuffle 才是关键。

      在Shuffle的时候,Map阶段可以进行局部聚合,而局部聚合后可以确定第一轮数据分发的节点,基于hash或者什么算法进行分区后,再一次进行全局聚合,从而将相同key的数据进行聚合

    12. 在不同的工地上有不同类型的砖块,需要将相同类型的砖块分发给对应的节点,因此需要砖头在集群范围内跨节点、跨进程的数据分发

      数据在不同节点上处理之后,需要基于key进行分发

    13. SchedulerBackend 与集群内所有 Executors 中的 ExecutorBackend 保持周期性通信,双方通过 LaunchedExecutor、RemoveExecutor、StatusUpdate 等消息来互通有无、变更可用计算资源

      SchedulerBackend通过Executor上的agent ExecutorBackend获取机器上的计算资源信息.ExecutorBackend还负责执行代码

    14. 像上面这种定向到计算节点粒度的本地性倾向,Spark 中的术语叫做 NODE_LOCAL。除了定向到节点,Task 还可以定向到进程(Executor)、机架、任意地址,它们对应的术语分别是 PROCESS_LOCAL、RACK_LOCAL 和 ANY。

      本地倾向性,包括机架感知、节点感知、进程感知,进程感知最高

    15. 以 Actions 算子为起点,从后向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages

      构建DAG时,会将shuffle之间的一系列RDD操作划分为一个TaskSets,他们之间是有依赖关系的,但是并不需要进行shuffle

    16. DAGScheduler 是任务调度的发起者,DAGScheduler 以 TaskSet 为粒度,向 TaskScheduler 提交任务调度请求

      TaskScheduler对任务进行调度和分配资源,资源信息WorkOffer来源于SchedulerBackend,任务信息TaskSets来源于TaskScheduler

    1. 现在换一种方式思考这个关系,父节点应该拥有其子节点:如果父节点被丢弃了,其子节点也应该被丢弃。然而子节点不应该拥有其父节点:如果丢弃子节点,其父节点应该依然存在。这正是弱引用的例子!

      如果父节点没人引用了,而子节点还有人引用,这时候可能父节点就会直接销毁掉,看起来挺奇怪的

    1. 在任意给定时刻,只能拥有一个可变引用或任意数量的不可变引用 之一(而不是两者)。
      • 要么一个可变引用
      • 要么0个可变引用 + 1或多个不可变引用
    1. 所以 Message 值需要的最大空间是存储其最大成员所需的空间大小。

      对于枚举其实和union类似,取需要最大空间的一个类型的空间即可

    2. 当有大量数据并希望在确保数据不被拷贝的情况下转移所有权的时候

      转移所有权并不会被拷贝,拷贝也不会转移所有权,所以这里是一大堆的数据相互关联,然后通过转移或者拷贝栈上的引用来转移所有权?

    3. 当有一个在编译时未知大小的类型,而又想要在需要确切大小的上下文中使用这个类型值的时候

      由于栈上的数据是需要知道确切大小的,因此如果大小不确定则只能放在堆上,而栈上使用引用