1,778 Matching Annotations
  1. Aug 2022
    1. 建议消费者在 visibilityTimeout 时间内消费成功后需要调用(batch)DeleteMessage 接口删除该消息,否则该消息将会重新变成为 active 状态,此消息又可被消费者重新消费,保证消息至少消费一次,但是不能保证幂等性, 业务侧需要有去重逻辑。

      消费者需要delete,不然可能会出现重新消费

    2. 当其被取走后在 VisibilityTimeout 的时间内状态为 Inactive,若超过 VisibilityTimeout 时间后消息还未被删除,消息会重新变成 Active 状态

      相当于可以从unack自动变成ready状态

    1. 您可以将消费过的去重 key 缓存(如 KV 等),然后每次消费时检查去重 key 是否已消费过。去重 key 缓存可以根据消息最大有效时间来淘汰。CMQ 提供了队列当前最小未消费消息的时间(min_msg_time),您可以使用该时间和业务生产消息最大重试时间来确定缓存淘汰时间。存在多个消费者时,去重 key 缓存就需要是分布式的

      分布式缓存的最佳实践

    2. 采取 Pull 的方式问题就简单了许多,由于 Consumer 是主动到服务端拉取数据,此时只需要降低自己访问频率即可。举例:如前端是 flume 等日志收集业务,不断向 CMQ 生产消息,CMQ 向后端投递,后端业务如数据分析等业务,效率可能低于生产者。

      pull适用于日志分析等实时性不高的、吞吐量大的场景

    1. 五次重试后,消费处于一个未被确认的状态。因为需要你手动 ack!下次服务重启的时候,会继续消费这条消息。
      1. manul模式下,如果没有手工ack/reject,服务器治不会将消息从unack状态变为其他状态(比如抛出异常)。
      2. auto模式正常情况下(即包括抛出异常情况下,除非出现故障)会将消息设置为ack/reject状态。
    2. 仅仅是消费者内部进行了重试,换句话说就是重试跟mq没有任何关系。上述消费者代码不能添加try{}catch(){},一旦捕获了异常,在自动 ack 模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的。

      如果没有异常,消费者也不会进行重试。只有抛出异常,消费者才会进行重试

    1. 则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置

      在开启重试的情况下,默认变成ack。(即不会重新入列),设置不同的recoverer,会有不同的表现:比如 1. RejectAndDontRequeueRecoverer即nack,且requue为false。 1. ImmediateRequeueMessageRecoverer即nack,且requue为true。

    2. 如果某个服务忘记确认 ACK 了,则 RabbitMQ 不会再发送此消息数据给它,只要程序还在运行,没确认的消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。

      即unack的消息是blocked状态 如果connect断开,unack消息会被释放,则消息会被服务器重新投递

    3. 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息

      NONE模式是自动模式,非auto是自动

    4. 默认情况下消息消费者是NONE模式,默认所有消息消费成功,会不断的向消费者推送消息。 因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险

      这里是真正的auto模式

    1. 交换器 source 根据路由键找到与其匹配的另一个交换机 destination,并把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue中

      备份交换机

    1. 消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

      直连模式,routingkey名等于queue名,系统会找到默认的交换机

    1. 设置message为persistent并不能完全保证消息不丢失,在RabbitMQ将消息保存到disk之前仍可能丢失。

      标记message安全存储于exchange中

    1. 会触发: channel.basicNack(tag, false, true);, 这样会告诉rabbitmq该消息消费失败, 需要重新入队

      重试次数跟max-attempts的配置有关,并且因为网络等原因,会导致重试次数高于设定次数, 例子:图中设置为2次,实际执行4次

    1. 这个需要利用Spring事务的一个特性TransactionSynchronization,注册一个同步钩子,自动把相关代码放到事务完成之后执行,我们使用拦截器拦截rabbitTemplate.convertAndSend方法,实现不用修改现有代码自动把发送MQ消息逻辑移到事务之外

      spring事务拦截机制

    1. 在需要使用消息的return机制时候,mandatory参数必须设置为true
      • //常用的三个配置如下
      • //1---设置手动应答(acknowledge-mode: manual)
      • // 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调

      // publisher-confirm-type: correlated

      // #保证交换机能把消息推送到队列中

      // publisher-returns: true

      // template:

      // #以下是rabbitmqTemplate配置

      // mandatory: true) * // 3---设置重试

    1. 使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@Input对应SubscribableChannel

      使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@Input对应SubscribableChannel

    1. 通过消费组的设置,虽然能保证同一消息只被一个消费者进行接收和处理,但是对于特殊业务情况,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能被同一个实例消费,这个就可以使用 Spring Cloud Stream 提供的消息分区功能。修改配置。

      todo

    1. 主要是因为这种一统江湖的趋势让不同的消息中间件厂商都开发了自己的绑定器 Binder 提供给 SpringCloud Stream

      可以指明或者直接引入相应的binder器

    1. 再来一个新软件——ExTab官网|多标签文件管理器——应该是一个轻便的工具,官网没讲明系统支持情况。时至今日(2021-05-23)这软件还是有一些bug的,如果想要反馈,可以去它的百度贴吧或QQ群发言。

      windows

    1. THREAD(线程隔离):使用该方式,HystrixCommand将会在单独的线程上执行,并发请求受线程池中线程数量的限制。SEMAPHORE(信号量隔离):使用该方式,HystrixCommand将会在调用线程上执行,开销相对较小,并发请求受信号量的个数的限制。
    1. UnicastingDispatcher,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler
    1. 创建一个消费者,绑定消费队列及死信交换机,交换机默认为direct模型,死信交换机也是,arguments绑定死信交换机和key。(注解支持的具体参数文末会附上)

      同一个queue也可以绑定不同的routingKey

    1. 比如不需要交互的离线大规模计算,又比如多数 Web 资讯类网站、小程序、公共 API 服务、移动应用服务端等,都跟无服务架构擅长的短链接、无状态、适合事件驱动的交互形式很契合。

      无服务适用范围

    2. 一个良好设计的服务,应该是能够报废的,而不是期望得到长久的发展。如果一个系统中出现不可更改、无可替代的服务,这并不能说明这个服务有多么重要,反而是系统设计上脆弱的表现。微服务带来的独立、自治,也是在反对这种脆弱性。

      可迁移性

    3. 另外,尽管在分布式中,我们要想处理好一致性的问题也很困难,很多时候都没法使用传统的事务处理来保证不出现一致性问题。但是两害相权取其轻,一致性问题这些必要的代价是值得付出的。

      分布式事务没有

    4. 在单体服务中,通常一个系统的各个功能模块会使用同一个数据库,虽然这种中心化的存储确实天生就更容易避免一致性的问题,但是,同一个数据实体在不同服务的视角里,它的抽象形态往往也是不同的。

      营销甚至想访问券库

    5. 这一点在真正实践的时候,其实多少都会留点儿宽松的处理余地。因为大多数的公司都不会在某一个服务用 Java,另一个用 Python,下一个用 Golang,而是通常都会统一主流语言,甚至会有统一的技术栈或专有的技术平台。

      大数据那边可能有

    6. 如果本应该归属同一个产品内的功能,被划分在了不同的团队当中,那就必然会产生大量的跨团队沟通协作,而跨越团队边界,无论是在管理、沟通,还是在工作安排上,都会产生更高的成本。高效的团队,自然会针对这个情况进行改进,而当团队和产品磨合调节稳定了之后,就会拥有一致的结构

      我们的问题应该是相反的:折上折前期电商、券微服务区分度不够。

    7. 潜在的观念是希望系统的每一个部件,甚至每一处代码都尽量可靠,不出、少出错误,致力于构筑一个 7×24 小时不间断的可靠系统

      性能打折扣倒是其次

    8. 可以基于意图去使用各种协调分布式系统的工具,而不用深入具体工具的实现细节去研究怎么解决的分布式难题

      sicp作者也提过,现代开发是探针式的开发方式,不需要了解那么多细节,只需要知道点api即可

    9. 远程的服务在哪里(服务发现)、有多少个(负载均衡)、网络出现分区、超时或者服务出错了怎么办(熔断、隔离、降级)、方法的参数与返回结果如何表示(序列化协议)、如何传输(传输协议)、服务权限如何管理(认证、授权)、如何保证通信安全(网络安全层)、如何令调用不同机器的服务能返回相同的结果(分布式数据一致性)等一系列问题,就需要设计者耗费大量的心思

      分布式设计的核心概念

    1. 编译器进行代码流程的推断。比如说,当一个表达式的返回值是 Nothing 的时候,就往往意味着它后面的语句不再有机会被执行

      关键点

    2. 任何类型,当它被“?”修饰,变成可空类型以后,它就变成原本类型的父类了。所以,从某种程度上讲,我们可以认为“Any?”是所有 Kotlin 类型的根类型
    1. val annotations = method.annotations for (annotation in annotations) { // ④ if (annotation is GET) { // ⑤ val url = baseUrl + annotation.value // ⑥ return@newProxyInstance invoke(url, method, args!!) } }

      方法注解是一个列表

    2. val parameterAnnotations = method.parameterAnnotations for (i in parameterAnnotations.indices) { for (parameterAnnotation in parameterAnnotations[i]) { // ② if (parameterAnnotation is Field) { val key = parameterAnnotation.value val value = args[i].toString() if (!url.contains("?")) { // ③ url += "?$key=$value" } else { // ④ url += "&$key=$value" } } }

      为什么是这样的一个结构

    1. 改为var后,编译器就会立马报错

      fun main() { // 找到一家肯德基 // ↓ val kfc = Restaurant<KFC>() // 需要普通饭店,传入了肯德基,编译器报错 orderFood(kfc) val success:Result.Success<Cat> = Result.Success<Cat>(Cat()) println(success.data) var s:Result.Success<Animal> = success s.data = Dog() println(success.data as Cat) }

      sealed class Result<out R> { // 协变 ① // ↓ ↓ data class Success<out T:Animal>(var data: @UnsafeVariance T, val message: String = "") : Result<T>()

      data class Error(val exception: Exception) : Result<Nothing>()
      
      data class Loading(val time: Long = System.currentTimeMillis()) : Result<Nothing>()
      

      }

  2. Jul 2022