Kafka-服务端-网络层-源码流程

news/2024/7/8 7:42:10 标签: kafka, linq, 分布式

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {
       def startup() {
       	socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startupProcessors = false)
                /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, ...)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)
          }
     }

SocketServer

  def startup(startupProcessors: Boolean = true) {
    this.synchronized {
      ...
      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
      if (startupProcessors) {
        startProcessors()
      }
    }
      
  private def createAcceptorAndProcessors(processorsPerListener: Int,
                                          endpoints: Seq[EndPoint]): Unit = synchronized {
	...
    endpoints.foreach { endpoint =>
	  ...
      val acceptor = new Acceptor(endpoint, ...)
      addProcessors(acceptor, endpoint, processorsPerListener)
      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
      acceptor.awaitStartup()
      acceptors.put(endpoint, acceptor)
    }
  }

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable) {
                  val processor = synchronized {
                    currentProcessor = currentProcessor % processors.size
                    processors(currentProcessor)
                  }
                  accept(key, processor)
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread, mod(numProcessors) will be done later
                currentProcessor = currentProcessor + 1
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
      shutdownComplete()
    }
  }

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // 在新分到的客户端连接上注册OP_READ事件
          configureNewConnections()
          // 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送
          processNewResponses()
          // selector轮询各种事件,读取请求或者发送响应
          poll()
          // 封装selector.completedReceives中的请求,放入requestQueue
          processCompletedReceives()
          // 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)
          processCompletedSends()
          processDisconnected()
        } catch {
          ...
        }
      }
    } finally {
      ...
    }
  }

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {
  ...
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(..., requestChannel, apis, time)
    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
  }
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {
  ...
  def run() {
    while (!stopped) {

      val req = requestChannel.receiveRequest(300)

      req match {
        case RequestChannel.ShutdownRequest =>
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            apis.handle(request)
          } catch {
            case e: FatalExitError =>
              shutdownComplete.countDown()
              Exit.exit(e.statusCode)
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }

}

http://www.niftyadmin.cn/n/5536823.html

相关文章

MMSC物料库位扩充

MMSC物料库位扩充 输入事务码MMSC&#xff1a; 回车后添加新的库位即可&#xff1a; 代码实现&#xff0c;使用BDC *&------------------------------------------------* *&BDC的定义 *&------------------------------------------------* DATA gt_bdcdata T…

关于echarts中使用到的图例、颜色设置、设置tooltip换行显示等问题

最近使用echarts中用到图例随机生成&#xff0c;颜色多不好设置的问题&#xff0c;图例多展示出现不全&#xff0c;不能根据颜色判断图例和数据的问题等总结一下 原始代码&#xff1a; that_ge.charts echarts.init(document.getElementById(paramenterEcharts));that_ge.al…

ONLYOFFICE8.1版本桌面编辑器——功能测评

一、编辑DOCX 相信大家都有写word文档的经历&#xff0c;不知道大家是不是跟我一样&#xff0c;感觉做一个word不难&#xff0c;但想做好一个word却很麻烦&#xff0c;功能太多&#xff0c;看的人眼花缭乱&#xff0c;有时候一个功能要找很久&#xff0c;甚至有的功能用一辈子都…

大数据面试题之Spark(5)

Spark SQL与DataFrame的使用? Sparksql自定义函数?怎么创建DataFrame? HashPartitioner和RangePartitioner的实现 Spark的水塘抽样 DAGScheduler、TaskScheduler、SchedulerBackend实现原理 介绍下Sparkclient提交application后&#xff0c;接下来的流程? Spark的几种…

【matlab】回归预测——智能优化算法支持向量机

目录 引言 原理 应用 优势 总结 SVR安装 灰狼优化算法 代码实现 引言 原理 核心思想&#xff1a; SVR的目标是找到一个函数&#xff0c;该函数能够最小化预测误差&#xff0c;并在拟合过程中保持一定的间隔&#xff0c;使得大部分数据点都落在这个间隔之内。与SVM类似…

34.哀家要长脑子了!--归并排序

1.787. 归并排序 - AcWing题库 ① 确定分界点 mid l r >> 1 ② 递归排序 左边右边 ③ 合并有序数组 模板&#xff1a; void merge_sort(int q[], int l, int r){if(l > r) return 0;int mid l r >> 1;merge_sort(q, l ,mid), merge_sort(q, mid1, r…

数据分析三剑客-Matplotlib

数据分析三剑客 数据分析三剑客通常指的是在Python数据分析领域中&#xff0c;三个非常重要的工具和库&#xff1a;Pandas、NumPy和Matplotlib。Pandas主要负责数据处理和分析&#xff0c;NumPy专注于数值计算和数学运算&#xff0c;而Matplotlib则负责数据可视化。这三个库相…

[2024]docker-compose实战 (1)前言

前言 本文用来记录使用docker-compose来实战搭建一个多项目的测试环境. 环境中包含nodejs, php, html, redis, MongoDB, mysql. 在本次部署流程中, 尽量保证原镜像的"干净简洁", 尽量不会往镜像中加入各种软件和插件, 所有的配置尽可能的在宿主机映射进去. 项目…