目录

基于scheduler-framework扩展kube-scheduler

概述

笔者在之前的文章kube-scheduler的二次开发示例有提过说直接基于 kube-scheduler 的源码做二次开发,把需要增加的插件代码加入之后重新编译和构建镜像即可使用,而对于调度器的优化,除了基于源码直接修改以为,Kubernetes 官方还可以通过修改 scheduler-plugins 的代码的方式,引入需要的调度插件,并且也是重新编译构建镜像就可以用于实际的调度了。

scheduler-plugins

scheduler-plugins 是符合 kube-scheduler 的 scheduler framework 的项目,基于 scheduler-plugins 开发插件,只需要在源码上做增删改查即可,实际上,不管是什么框架,调度器的执行结果都是给 Pod Patch nodeName 这个字段,所以即便不是 scheduler framework 的架构,通过简单的 ListAndWatch 的方式给新创建的 Pod Patch nodeName 这个字段,而不管是随机的还是有算法支持的方式,都可以让 Pod 得到一个调度的结果。甚至根本不需要什么调度器,如果有条件的话,可以直接基于 kubebuilder 实现一个 Webhook,给所有创建的 Pod 直接 Mutate nodeName 这个字段,一样可以让 Pod 「调度」成功。

开发

scheduler-plugins 的开发跟在 kube-scheduler 上直接开发是差不多的,scheduler 通过调用 kube-scheduler 的方法来实现,所以本质上,也算是一个 kube-scheduler,因此如果需要新增一个插件叫做 diskAvailable,可以直接在 pkg 下新增目录和代码文件,详细如下。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// pkg/diskAvailable/diskAvailable.go
package diskAvailable

import (
	"context"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"k8s.io/apimachinery/pkg/runtime"
	"net/http"
	"strconv"

	v1 "k8s.io/api/core/v1"
	"k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
	Name          = "DiskAvailable"  // 插件名称改为 DiskAvailable
	RequiredLabel = "use-disk-score" // 固定标签键
)

type DiskAvailable struct {
	handle framework.Handle
}

var _ framework.ScorePlugin = &DiskAvailable{}

// New 初始化插件实例
func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
	d := &DiskAvailable{
		handle: h,
	}
	return d, nil
}

// Name 返回插件名称
func (d *DiskAvailable) Name() string {
	return Name
}

// Score 计算节点得分
func (d *DiskAvailable) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	// 检查 Pod 是否包含指定标签
	if _, exists := pod.Labels[RequiredLabel]; !exists {
		// 如果标签不存在,返回默认分数 0
		return 0, framework.NewStatus(framework.Success, "Pod does not have the required label")
	}

	// 从 Prometheus 获取磁盘大小
	diskSize, err := fetchDiskSizeFromPrometheus(nodeName)
	if err != nil {
		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Failed to fetch disk size for node %s: %v", nodeName, err))
	}

	// 假设磁盘大小范围为 [0, 100] GB,将其映射到 [0, 100] 分数
	score := int64((diskSize / 250.0) * 100)
	if score > 100 {
		score = 100
	}

	return score, nil
}

// ScoreExtensions 返回 nil,因为我们不需要扩展
func (d *DiskAvailable) ScoreExtensions() framework.ScoreExtensions {
	return nil
}

// 从 Prometheus 获取磁盘大小
func fetchDiskSizeFromPrometheus(nodeName string) (float64, error) {
	// Prometheus 查询 URL
	var hostIP string
	if nodeName == "node2" {
		hostIP = "192.168.1.202"
	} else if nodeName == "node3" {
		hostIP = "192.168.1.203"
	}
	prometheusQueryURL := fmt.Sprintf("http://kube-prometheus-stack-prometheus.kube-prometheus-stack.svc.cluster.local:9090/api/v1/query?query=node_filesystem_avail_bytes{instance='%s:9100',mountpoint='/'}", hostIP)

	resp, err := http.Get(prometheusQueryURL)
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return 0, err
	}

	// Parse Prometheus response
	var result struct {
		Data struct {
			Result []struct {
				Value []interface{} `json:"value"`
			} `json:"result"`
		} `json:"data"`
	}
	if err := json.Unmarshal(body, &result); err != nil {
		return 0, err
	}

	if len(result.Data.Result) == 0 {
		return 0, fmt.Errorf("no data found for node %s", nodeName)
	}

	// Extract disk size (in bytes) from the result
	diskSizeBytes, ok := result.Data.Result[0].Value[1].(string)
	if !ok {
		return 0, fmt.Errorf("invalid data format for node %s", nodeName)
	}

	// Convert to GB
	diskSizeGB := parseDiskSize(diskSizeBytes) / (1024 * 1024 * 1024)
	return diskSizeGB, nil
}

// 解析磁盘大小
func parseDiskSize(sizeStr string) float64 {
	size, _ := strconv.ParseFloat(sizeStr, 64)
	return size
}

插件代码完成之后,就是插件的注册了,注册的方法,还是跟 kube-scheduler 类似,在 cmd/scheduler/main.go 中注册插件即可,详细见下面代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// cmd/scheduler/main.go
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
	"os"
	"sigs.k8s.io/scheduler-plugins/pkg/diskAvailable"

	"k8s.io/component-base/cli"
	_ "k8s.io/component-base/metrics/prometheus/clientgo" // for rest client metric registration
	_ "k8s.io/component-base/metrics/prometheus/version"  // for version metric registration
	"k8s.io/kubernetes/cmd/kube-scheduler/app"

	"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
	"sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
	"sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
	"sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
	"sigs.k8s.io/scheduler-plugins/pkg/noderesources"
	"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology"
	"sigs.k8s.io/scheduler-plugins/pkg/podstate"
	"sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration"
	"sigs.k8s.io/scheduler-plugins/pkg/qos"
	"sigs.k8s.io/scheduler-plugins/pkg/sysched"
	"sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
	"sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment"
	"sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"

	// Ensure scheme package is initialized.
	_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
)

func main() {
	// Register custom plugins to the scheduler framework.
	// Later they can consist of scheduler profile(s) and hence
	// used by various kinds of workloads.
	command := app.NewSchedulerCommand(
		app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
		app.WithPlugin(coscheduling.Name, coscheduling.New),
		app.WithPlugin(loadvariationriskbalancing.Name, loadvariationriskbalancing.New),
		app.WithPlugin(networkoverhead.Name, networkoverhead.New),
		app.WithPlugin(topologicalsort.Name, topologicalsort.New),
		app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
		app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
		app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
		app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
		app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
		app.WithPlugin(sysched.Name, sysched.New),
		// Sample plugins below.
		// app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
		app.WithPlugin(podstate.Name, podstate.New),
		app.WithPlugin(qos.Name, qos.New),
		app.WithPlugin(diskAvailable.Name, diskAvailable.New),
	)

	code := cli.Run(command)
	os.Exit(code)
}

构建和编译

通过 make build-scheduler 构建调度器的二进制,然后通过 Dockerfile 将二进制放入镜像即可,然后就可以参考配置多个调度器。部署上,可以参考kube-scheduler的二次开发示例,可以部署除了 default-scheduler 以外的调度器,只需要修改调度器的配置,即可使用自定义的调度插件,参考下面的配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    profiles:
      - schedulerName: my-scheduler
        plugins:
          score:
            enabled:
              - name: DiskAvailable
    leaderElection:
      leaderElect: false

参考资料

  1. 配置多个调度器
  2. scheduler-plugins