Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions cmd/upgrader/docs/upgrader.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,20 @@ type Step interface {
- `mongosh`
- `instancesets` / `configurations` CRD

11. `fix_redis_and_wait`
11. `patch_dbprovider_componentversions_rbac`
- 检查 `cluster-version-reader` ClusterRole 是否存在
- 如果不存在,说明当前集群没有 dbprovider 前端,跳过
- 如果存在但缺少 `apps.kubeblocks.io/componentversions` 的 `get/list/watch` 权限,则自动补齐

12. `fix_redis_and_wait`
- `Check()`:检查 Redis CR 是否已经完成转换
- `Run()`:
- `fix_redis_check_sentinel.sh`
- `fix_redis_cluster.sh`
- `restart_redis.sh`
- `waitDBReady(redis)`

12. `fix_mysql_cv_and_wait`
13. `fix_mysql_cv_and_wait`
- `Check()`:检查 MySQL 版本映射是否已经修好
- 支持的版本映射(`mysqlVersionMap`):
- `ac-mysql-8.0.31` → `ac-mysql-8.0.30`
Expand All @@ -307,19 +312,19 @@ type Step interface {
- 如有需要,顺手执行一次 `fix_mysql_lowercase.sh`
- `waitDBReady(mysql)`

13. `fix_mysql_lowercase_and_wait`
14. `fix_mysql_lowercase_and_wait`
- `Check()`:检查所有需要 `lower_case_table_names=1` 的 MySQL ConfigMap 是否都已补齐
- `Run()`:
- `fix_mysql_lowercase.sh`
- `waitDBReady(mysql)`

14. `verify_pg`
15. `verify_pg`

15. `verify_mysql`
16. `verify_mysql`

16. `verify_redis`
17. `verify_redis`

17. `verify_mongo`
18. `verify_mongo`


---
Expand Down Expand Up @@ -394,4 +399,3 @@ type Step interface {
5. `scripts/` 目录
- 默认都应重审,必要时重写


151 changes: 151 additions & 0 deletions cmd/upgrader/steps/dbprovider_rbac.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package steps

import (
"encoding/json"
"fmt"
"strings"
)

const (
dbProviderClusterRoleName = "cluster-version-reader"
componentVersionsAPIGroup = "apps.kubeblocks.io"
componentVersionsResource = "componentversions"
componentVersionsRequiredVerbs = "get,list,watch"
)

type clusterRolePolicyRule struct {
APIGroups []string `json:"apiGroups"`
Resources []string `json:"resources"`
Verbs []string `json:"verbs"`
}

type clusterRole struct {
Rules []clusterRolePolicyRule `json:"rules"`
}

type PatchDBProviderComponentVersionsRBAC struct{}

func (s *PatchDBProviderComponentVersionsRBAC) Name() string {
return "patch_dbprovider_componentversions_rbac"
}

func (s *PatchDBProviderComponentVersionsRBAC) Description() string {
return "补 dbprovider 前端读取 ComponentVersion 的 RBAC 权限"
}

func (s *PatchDBProviderComponentVersionsRBAC) Check(opts RunOptions) (bool, error) {
role, exists, err := loadDBProviderClusterRole(opts)
if err != nil {
return false, err
}
if !exists {
return true, nil
}
return clusterRoleHasComponentVersionsPermission(role), nil
}

func (s *PatchDBProviderComponentVersionsRBAC) Run(opts RunOptions) error {
role, exists, err := loadDBProviderClusterRole(opts)
if err != nil {
return err
}
if !exists {
logInfo("未发现 ClusterRole %s,跳过 dbprovider 前端 RBAC 修复", dbProviderClusterRoleName)
return nil
}
if clusterRoleHasComponentVersionsPermission(role) {
logInfo("ClusterRole %s 已具备 componentversions %s 权限", dbProviderClusterRoleName, componentVersionsRequiredVerbs)
return nil
}

patch, err := componentVersionsRBACPatch(role)
if err != nil {
return err
}
if _, err := kubectl(opts, "patch", "clusterrole", dbProviderClusterRoleName, "--type=json", "-p", patch); err != nil {
return fmt.Errorf("补 ClusterRole %s 的 componentversions 权限失败: %w", dbProviderClusterRoleName, err)
}
logOK("已补 ClusterRole %s 的 componentversions %s 权限", dbProviderClusterRoleName, componentVersionsRequiredVerbs)
return nil
}

func loadDBProviderClusterRole(opts RunOptions) (*clusterRole, bool, error) {
out, err := kubectl(opts, "get", "clusterrole", dbProviderClusterRoleName, "--ignore-not-found", "-o", "json")
if err != nil {
return nil, false, err
}
if strings.TrimSpace(out) == "" {
return nil, false, nil
}

role := &clusterRole{}
if err := json.Unmarshal([]byte(out), role); err != nil {
return nil, false, fmt.Errorf("解析 ClusterRole %s 失败: %w", dbProviderClusterRoleName, err)
}
return role, true, nil
}

func clusterRoleHasComponentVersionsPermission(role *clusterRole) bool {
if role == nil {
return false
}
for _, verb := range []string{"get", "list", "watch"} {
allowed := false
for _, rule := range role.Rules {
if ruleAllowsComponentVersionsVerb(rule, verb) {
allowed = true
break
}
}
if !allowed {
return false
}
}
return true
}

func ruleAllowsComponentVersionsVerb(rule clusterRolePolicyRule, verb string) bool {
return containsPolicyValue(rule.APIGroups, componentVersionsAPIGroup) &&
containsPolicyValue(rule.Resources, componentVersionsResource) &&
containsPolicyValue(rule.Verbs, verb)
}

func containsPolicyValue(values []string, target string) bool {
for _, value := range values {
if value == target || value == "*" {
return true
}
}
return false
}

func componentVersionsRBACPatch(role *clusterRole) (string, error) {
rule := componentVersionsPolicyRule()
path := "/rules/-"
value := interface{}(rule)
if role == nil || len(role.Rules) == 0 {
path = "/rules"
value = []clusterRolePolicyRule{rule}
}

patch := []map[string]interface{}{
{
"op": "add",
"path": path,
"value": value,
},
}
data, err := json.Marshal(patch)
if err != nil {
return "", err
}
return string(data), nil
}

func componentVersionsPolicyRule() clusterRolePolicyRule {
return clusterRolePolicyRule{
APIGroups: []string{componentVersionsAPIGroup},
Resources: []string{componentVersionsResource},
Verbs: strings.Split(componentVersionsRequiredVerbs, ","),
}
}
4 changes: 2 additions & 2 deletions cmd/upgrader/steps/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func RegisterAll(modules map[string]bool) []Step {
mysqlLcSnap := &ResourceSnapshot{}
all = append(all,
&PreflightDBFix{},
&PatchDBProviderComponentVersionsRBAC{},
&FixRedisAndWait{Snapshot: redisSnap},
&FixMySQLCVAndWait{Snapshot: mysqlSnap},
&FixMySQLLowercaseAndWait{Snapshot: mysqlLcSnap},
Expand Down Expand Up @@ -281,7 +282,7 @@ func (s *WaitKBReady) Check(opts RunOptions) (bool, error) {
!checkDeploymentReady(opts, "kb-system", "kubeblocks-dataprotection") {
return false, nil
}
return checkHelmTrackedAddonsSettled(opts), nil
return checkHelmTrackedAddonsSettled(opts)
}

func (s *WaitKBReady) Run(opts RunOptions) error {
Expand Down Expand Up @@ -701,4 +702,3 @@ func waitDBReady(opts RunOptions, dbType string, snap *ResourceSnapshot) error {
}
return watchFromSnapshot(opts.Ctx, snap, []string{"cluster", "-A"}, clusterTerminalPhases)
}

66 changes: 38 additions & 28 deletions cmd/upgrader/steps/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ResourceSnapshot struct {

var (
clusterTerminalPhases = map[string]bool{"Running": true, "Stopped": true}
addonTerminalPhases = map[string]bool{"Enabled": true, "Disabled": true, "Failed": true}
addonTerminalPhases = map[string]bool{"Enabled": true, "Disabled": true}
)

const helmAddonReleasePrefix = "kb-addon-"
Expand Down Expand Up @@ -227,30 +227,51 @@ func loadHelmTrackedAddonRuntime(opts RunOptions, tracked map[string]struct{}, s
return result, nil
}

func checkHelmTrackedAddonsSettled(opts RunOptions) bool {
tracked, err := trackedAddonNames(opts, nil)
if err != nil || len(tracked) == 0 {
return err == nil
func addonFailureError(states []helmTrackedAddonRuntime) error {
failed := make([]string, 0)
for _, state := range states {
if state.Phase == "Failed" {
failed = append(failed, state.Name)
}
}
states, err := loadHelmTrackedAddonRuntime(opts, tracked, time.Time{})
if err != nil {
return false
if len(failed) == 0 {
return nil
}
sort.Strings(failed)
return fmt.Errorf("addon 进入 Failed 状态: %s", strings.Join(failed, ", "))
}

func helmTrackedAddonsSettled(states []helmTrackedAddonRuntime) (bool, error) {
if err := addonFailureError(states); err != nil {
return false, err
}
for _, state := range states {
if state.Generation != 0 && state.Generation != state.ObservedGeneration {
return false
return false, nil
}
if !addonTerminalPhases[state.Phase] {
return false
return false, nil
}
if state.JobState == "Active" || state.JobState == "Pending" || state.JobState == "Deleting" {
return false
return false, nil
}
if len(state.ActivePodPhases) > 0 {
return false
return false, nil
}
}
return true
return true, nil
}

func checkHelmTrackedAddonsSettled(opts RunOptions) (bool, error) {
tracked, err := trackedAddonNames(opts, nil)
if err != nil || len(tracked) == 0 {
return err == nil, err
}
states, err := loadHelmTrackedAddonRuntime(opts, tracked, time.Time{})
if err != nil {
return false, err
}
return helmTrackedAddonsSettled(states)
}

func waitHelmTrackedAddonsSettled(ctx context.Context, opts RunOptions, snap *ResourceSnapshot, startedAt time.Time) error {
Expand All @@ -275,7 +296,6 @@ func waitHelmTrackedAddonsSettled(ctx context.Context, opts RunOptions, snap *Re
}

summaryParts := make([]string, 0, len(states))
allSettled := true
for _, state := range states {
podSummary := "-"
if len(state.ActivePodPhases) > 0 {
Expand All @@ -288,20 +308,6 @@ func waitHelmTrackedAddonsSettled(ctx context.Context, opts RunOptions, snap *Re
summaryParts = append(summaryParts,
fmt.Sprintf("%s[phase=%s gen=%d/%d job=%s pods=%s]",
state.Name, phase, state.ObservedGeneration, state.Generation, state.JobState, podSummary))

if state.Generation != 0 && state.Generation != state.ObservedGeneration {
allSettled = false
}
if !addonTerminalPhases[state.Phase] {
allSettled = false
}
switch state.JobState {
case "Active", "Pending", "Deleting":
allSettled = false
}
if len(state.ActivePodPhases) > 0 {
allSettled = false
}
}

summary := strings.Join(summaryParts, " ")
Expand All @@ -310,6 +316,10 @@ func waitHelmTrackedAddonsSettled(ctx context.Context, opts RunOptions, snap *Re
lastSummary = summary
}

allSettled, err := helmTrackedAddonsSettled(states)
if err != nil {
return err
}
if allSettled {
logInfo("所有 Helm Addon install job 已完成,Addon 已收敛到目标 generation 且 phase 回到终态")
return nil
Expand Down
Loading