diff --git a/pkg/microservice/aslan/core/common/repository/models/task/model.go b/pkg/microservice/aslan/core/common/repository/models/task/model.go index 17039c70ef..a8ae7de451 100644 --- a/pkg/microservice/aslan/core/common/repository/models/task/model.go +++ b/pkg/microservice/aslan/core/common/repository/models/task/model.go @@ -85,6 +85,7 @@ type Task struct { Features []string `bson:"features" json:"features"` IsRestart bool `bson:"is_restart" json:"is_restart"` StorageEndpoint string `bson:"storage_endpoint" json:"storage_endpoint"` + Events *models.Events `bson:"-" json:"events,omitempty"` } func (Task) TableName() string { diff --git a/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go b/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go index a1544c0a53..0346414570 100644 --- a/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go +++ b/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go @@ -345,11 +345,13 @@ type ImageAndServiceModule struct { type JobTaskFreestyleSpec struct { Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"` Steps []*StepTask `bson:"steps" json:"steps" yaml:"steps"` + Events *Events `bson:"events" json:"events" yaml:"events"` } type JobTaskPluginSpec struct { Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"` Plugin *PluginTemplate `bson:"plugin" json:"plugin" yaml:"plugin"` + Events *Events `bson:"events" json:"events" yaml:"events"` } type JobTaskBlueGreenDeploySpec struct { diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go index 73c5526221..17ab31a2f6 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go @@ -97,6 +97,9 @@ func NewFreestyleJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Wor if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil { logger.Error(err) } + if jobTaskSpec.Events == nil { + jobTaskSpec.Events = &commonmodels.Events{} + } job.Spec = jobTaskSpec return &FreestyleJobCtl{ job: job, @@ -1164,7 +1167,10 @@ func (c *FreestyleJobCtl) cleanupHelperPod(ctx context.Context, client crClient. func (c *FreestyleJobCtl) wait(ctx context.Context) { var err error taskTimeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute) - c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger) + c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger, func(events *commonmodels.Events) { + c.jobTaskSpec.Events = events + c.ack() + }) if err != nil { c.job.Error = err.Error() } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go index 3e5bcf2ff1..4ac3db2018 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go @@ -48,6 +48,9 @@ func NewPluginsJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Workf if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil { logger.Error(err) } + if jobTaskSpec.Events == nil { + jobTaskSpec.Events = &commonmodels.Events{} + } job.Spec = jobTaskSpec return &PluginJobCtl{ job: job, @@ -150,7 +153,10 @@ func (c *PluginJobCtl) run(ctx context.Context) error { func (c *PluginJobCtl) wait(ctx context.Context) { var err error timeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute) - c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger) + c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger, func(events *commonmodels.Events) { + c.jobTaskSpec.Events = events + c.ack() + }) if err != nil { c.logger.Errorf("wait job start error: %v", err) } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go index c18fd03d27..3677e56439 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go @@ -896,7 +896,7 @@ func int64Ptr(i int64) *int64 { return &i } func WaitPlainJobEnd(ctx context.Context, taskTimeout int, namespace, jobName string, kubeClient crClient.Client, apiServer crClient.Reader, xl *zap.SugaredLogger) config.Status { timeout := time.After(time.Duration(taskTimeout) * time.Minute) - status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl) + status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl, nil) if err != nil { xl.Errorf("wait job start error: %v", err) } @@ -936,12 +936,67 @@ func waitPlainJobEnd(ctx context.Context, taskTimeout int, timeout <-chan time.T } } -func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger) (config.Status, error) { +func appendPendingPodEvents(podName, namespace string, apiReader client.Reader, events *commonmodels.Events, reported map[string]struct{}, onUpdate func(*commonmodels.Events), xl *zap.SugaredLogger) { + if events == nil { + return + } + + selector := fields.Set{"involvedObject.name": podName, "involvedObject.kind": setting.Pod}.AsSelector() + kubeEvents, err := getter.ListEvents(namespace, selector, apiReader) + if err != nil { + xl.Errorf("list events failed for pod %s/%s: %s", namespace, podName, err) + return + } + + sort.SliceStable(kubeEvents, func(i, j int) bool { + return kubeEvents[i].CreationTimestamp.Unix() < kubeEvents[j].CreationTimestamp.Unix() + }) + + changed := false + for _, kubeEvent := range kubeEvents { + eventKey := fmt.Sprintf("%s|%s|%s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message) + if _, ok := reported[eventKey]; ok { + continue + } + reported[eventKey] = struct{}{} + + eventType := "info" + if kubeEvent.Type == corev1.EventTypeWarning { + eventType = "error" + } + + eventTime := kubeEvent.LastTimestamp + if eventTime.IsZero() { + eventTime = kubeEvent.FirstTimestamp + } + if eventTime.IsZero() && !kubeEvent.EventTime.IsZero() { + eventTime = metav1.NewTime(kubeEvent.EventTime.Time) + } + if eventTime.IsZero() { + eventTime = metav1.NewTime(time.Now()) + } + + *events = append(*events, &commonmodels.Event{ + EventType: eventType, + Time: eventTime.Format("2006-01-02 15:04:05"), + Message: fmt.Sprintf("Pod event [%s/%s]: %s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message), + }) + changed = true + } + + if changed && onUpdate != nil { + onUpdate(events) + } +} + +func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger, onEventsUpdate func(*commonmodels.Events)) (config.Status, error) { xl.Infof("wait job to start: %s/%s", namespace, jobName) xl.Infof("Timeout of preparing Pod: %s.", 120*time.Second) waitPodReadyTimeout := time.After(120 * time.Second) var podReadyTimeout bool + reportedEvents := make(map[string]struct{}) + events := &commonmodels.Events{} for { select { case <-ctx.Done(): @@ -966,6 +1021,8 @@ func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crC continue } for _, pod := range podList { + appendPendingPodEvents(pod.Name, namespace, apiReader, events, reportedEvents, onEventsUpdate, xl) + if pod.Status.Phase == corev1.PodFailed { msg := "" for _, condition := range pod.Status.Conditions { diff --git a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go index 5e95e09336..cdbc92eb9c 100644 --- a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go +++ b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go @@ -135,6 +135,7 @@ type JobTaskPreview struct { ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"` ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"` RetryCount int `bson:"retry_count" yaml:"retry_count" json:"retry_count"` + Events *commonmodels.Events `bson:"events" json:"events"` // JobInfo contains the fields that make up the job task name, for frontend display JobInfo interface{} `bson:"job_info" json:"job_info"` } @@ -2479,6 +2480,23 @@ func HandleJobError(workflowName, jobName, userID, username string, taskID int64 return nil } +func extractRuntimeJobEvents(job *commonmodels.JobTask) *commonmodels.Events { + switch job.JobType { + case string(config.JobFreestyle), string(config.JobZadigBuild), string(config.JobZadigTesting), string(config.JobZadigScanning), string(config.JobZadigDistributeImage): + taskJobSpec := &commonmodels.JobTaskFreestyleSpec{} + if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil { + return taskJobSpec.Events + } + case string(config.JobPlugin): + taskJobSpec := &commonmodels.JobTaskPluginSpec{} + if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil { + return taskJobSpec.Events + } + } + + return nil +} + func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, now int64, projectName string) []*JobTaskPreview { envMap := make(map[string]*commonmodels.Product) resp := []*JobTaskPreview{} @@ -2510,6 +2528,7 @@ func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, ErrorHandlerUserID: job.ErrorHandlerUserID, ErrorHandlerUserName: job.ErrorHandlerUserName, RetryCount: job.RetryCount, + Events: extractRuntimeJobEvents(job), } switch job.JobType { case string(config.JobFreestyle): diff --git a/pkg/microservice/aslan/core/workflow/testing/service/scanning.go b/pkg/microservice/aslan/core/workflow/testing/service/scanning.go index 4475ce3ceb..6b3b655bc8 100644 --- a/pkg/microservice/aslan/core/workflow/testing/service/scanning.go +++ b/pkg/microservice/aslan/core/workflow/testing/service/scanning.go @@ -307,13 +307,17 @@ func ListScanningTask(id string, pageNum, pageSize int, log *zap.SugaredLogger) respList := make([]*ScanningTaskResp, 0) for _, workflowTask := range workflowTasks { + status := workflowTask.Status + if len(workflowTask.Stages) == 1 && len(workflowTask.Stages[0].Jobs) == 1 && workflowTask.Stages[0].Jobs[0].Status != "" { + status = workflowTask.Stages[0].Jobs[0].Status + } taskInfo := &ScanningTaskResp{ ScanID: workflowTask.TaskID, - Status: string(workflowTask.Status), + Status: string(status), Creator: workflowTask.TaskCreator, CreatedAt: workflowTask.CreateTime, } - if workflowTask.Status == config.StatusPassed || workflowTask.Status == config.StatusCancelled || workflowTask.Status == config.StatusFailed { + if status == config.StatusPassed || status == config.StatusCancelled || status == config.StatusFailed { taskInfo.RunTime = workflowTask.EndTime - workflowTask.StartTime } respList = append(respList, taskInfo) @@ -347,9 +351,9 @@ func GetScanningTaskInfo(scanningID string, taskID int64, log *zap.SugaredLogger resultAddr := "" if len(workflowTask.Stages) != 1 || len(workflowTask.Stages[0].Jobs) != 1 { - errMsg := fmt.Sprintf("invalid test task!") + errMsg := "invalid test task!" log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) + return nil, fmt.Errorf("%s", errMsg) } spec := new(commonmodels.ZadigScanningJobSpec) @@ -429,11 +433,25 @@ func GetScanningTaskInfo(scanningID string, taskID int64, log *zap.SugaredLogger repo.Username = "" } + status := workflowTask.Status + if workflowTask.Stages[0].Jobs[0].Status != "" { + status = workflowTask.Stages[0].Jobs[0].Status + } + errorMsg := workflowTask.Stages[0].Jobs[0].Error + if errorMsg == "" { + errorMsg = workflowTask.Stages[0].Error + } + if errorMsg == "" { + errorMsg = workflowTask.Error + } + return &ScanningTaskDetail{ Creator: workflowTask.TaskCreator, - Status: string(workflowTask.Status), + Status: string(status), + Error: errorMsg, CreateTime: workflowTask.CreateTime, EndTime: workflowTask.EndTime, + Events: jobTaskSpec.Events, RepoInfo: repoInfo, SonarMetrics: sonarMetrics, ResultLink: resultAddr, diff --git a/pkg/microservice/aslan/core/workflow/testing/service/test_task.go b/pkg/microservice/aslan/core/workflow/testing/service/test_task.go index c2e84b1b27..4fc1365bf1 100644 --- a/pkg/microservice/aslan/core/workflow/testing/service/test_task.go +++ b/pkg/microservice/aslan/core/workflow/testing/service/test_task.go @@ -166,9 +166,9 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar } if len(workflowTask.WorkflowArgs.Stages) != 1 || len(workflowTask.WorkflowArgs.Stages[0].Jobs) != 1 { - errMsg := fmt.Sprintf("invalid test task!") + errMsg := "invalid test task!" log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) + return nil, fmt.Errorf("%s", errMsg) } stages := make([]*commonmodels.Stage, 0) @@ -224,10 +224,19 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar } } + errorMsg := workflowTask.Stages[0].Jobs[0].Error + if errorMsg == "" { + errorMsg = workflowTask.Stages[0].Error + } + if errorMsg == "" { + errorMsg = workflowTask.Error + } + subTaskInfo[testName] = map[string]interface{}{ "start_time": workflowTask.Stages[0].Jobs[0].StartTime, "end_time": workflowTask.Stages[0].Jobs[0].EndTime, "status": workflowTask.Stages[0].Jobs[0].Status, + "error": errorMsg, "job_ctx": struct { JobName string `json:"job_name"` IsHasArtifact bool `json:"is_has_artifact"` @@ -248,6 +257,7 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar SubTasks: subTaskInfo, StartTime: workflowTask.Stages[0].StartTime, EndTime: workflowTask.Stages[0].EndTime, + Error: errorMsg, TypeName: workflowTask.Stages[0].Name, }) @@ -263,9 +273,11 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar CreateTime: workflowTask.CreateTime, StartTime: workflowTask.StartTime, EndTime: workflowTask.EndTime, + Error: errorMsg, Stages: stages, TestReports: testResultMap, IsRestart: workflowTask.IsRestart, + Events: jobSpec.Events, }, nil } diff --git a/pkg/microservice/aslan/core/workflow/testing/service/types.go b/pkg/microservice/aslan/core/workflow/testing/service/types.go index f7ae941edb..abfcb25981 100644 --- a/pkg/microservice/aslan/core/workflow/testing/service/types.go +++ b/pkg/microservice/aslan/core/workflow/testing/service/types.go @@ -188,16 +188,18 @@ type ScanningTaskResp struct { } type ScanningTaskDetail struct { - Creator string `json:"creator"` - Status string `json:"status"` - CreateTime int64 `json:"create_time"` - EndTime int64 `json:"end_time"` - RepoInfo []*types.Repository `json:"repo_info"` - SonarMetrics *step.SonarMetrics `json:"sonar_metrics"` - ResultLink string `json:"result_link,omitempty"` - IsHasArtifact bool `json:"is_has_artifact"` - JobName string `json:"job_name"` - JobDisplayName string `json:"job_display_name"` + Creator string `json:"creator"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + CreateTime int64 `json:"create_time"` + EndTime int64 `json:"end_time"` + Events *commonmodels.Events `json:"events,omitempty"` + RepoInfo []*types.Repository `json:"repo_info"` + SonarMetrics *step.SonarMetrics `json:"sonar_metrics"` + ResultLink string `json:"result_link,omitempty"` + IsHasArtifact bool `json:"is_has_artifact"` + JobName string `json:"job_name"` + JobDisplayName string `json:"job_display_name"` } func ConvertToDBScanningModule(args *Scanning) *commonmodels.Scanning {