Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Task struct {
Features []string `bson:"features" json:"features"`
IsRestart bool `bson:"is_restart" json:"is_restart"`
StorageEndpoint string `bson:"storage_endpoint" json:"storage_endpoint"`
Events *models.Events `bson:"-" json:"events,omitempty"`
}

func (Task) TableName() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,13 @@ type ImageAndServiceModule struct {
type JobTaskFreestyleSpec struct {
Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"`
Steps []*StepTask `bson:"steps" json:"steps" yaml:"steps"`
Events *Events `bson:"events" json:"events" yaml:"events"`
}

type JobTaskPluginSpec struct {
Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"`
Plugin *PluginTemplate `bson:"plugin" json:"plugin" yaml:"plugin"`
Events *Events `bson:"events" json:"events" yaml:"events"`
}

type JobTaskBlueGreenDeploySpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func NewFreestyleJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Wor
if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil {
logger.Error(err)
}
if jobTaskSpec.Events == nil {
jobTaskSpec.Events = &commonmodels.Events{}
}
job.Spec = jobTaskSpec
return &FreestyleJobCtl{
job: job,
Expand Down Expand Up @@ -1164,7 +1167,10 @@ func (c *FreestyleJobCtl) cleanupHelperPod(ctx context.Context, client crClient.
func (c *FreestyleJobCtl) wait(ctx context.Context) {
var err error
taskTimeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger, func(events *commonmodels.Events) {
c.jobTaskSpec.Events = events
c.ack()
})
if err != nil {
c.job.Error = err.Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func NewPluginsJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Workf
if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil {
logger.Error(err)
}
if jobTaskSpec.Events == nil {
jobTaskSpec.Events = &commonmodels.Events{}
}
job.Spec = jobTaskSpec
return &PluginJobCtl{
job: job,
Expand Down Expand Up @@ -150,7 +153,10 @@ func (c *PluginJobCtl) run(ctx context.Context) error {
func (c *PluginJobCtl) wait(ctx context.Context) {
var err error
timeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger, func(events *commonmodels.Events) {
c.jobTaskSpec.Events = events
c.ack()
})
if err != nil {
c.logger.Errorf("wait job start error: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func int64Ptr(i int64) *int64 { return &i }

func WaitPlainJobEnd(ctx context.Context, taskTimeout int, namespace, jobName string, kubeClient crClient.Client, apiServer crClient.Reader, xl *zap.SugaredLogger) config.Status {
timeout := time.After(time.Duration(taskTimeout) * time.Minute)
status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl)
status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl, nil)
if err != nil {
xl.Errorf("wait job start error: %v", err)
}
Expand Down Expand Up @@ -936,12 +936,67 @@ func waitPlainJobEnd(ctx context.Context, taskTimeout int, timeout <-chan time.T
}
}

func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger) (config.Status, error) {
func appendPendingPodEvents(podName, namespace string, apiReader client.Reader, events *commonmodels.Events, reported map[string]struct{}, onUpdate func(*commonmodels.Events), xl *zap.SugaredLogger) {
if events == nil {
return
}

selector := fields.Set{"involvedObject.name": podName, "involvedObject.kind": setting.Pod}.AsSelector()
kubeEvents, err := getter.ListEvents(namespace, selector, apiReader)
if err != nil {
xl.Errorf("list events failed for pod %s/%s: %s", namespace, podName, err)
return
}

sort.SliceStable(kubeEvents, func(i, j int) bool {
return kubeEvents[i].CreationTimestamp.Unix() < kubeEvents[j].CreationTimestamp.Unix()
})

changed := false
for _, kubeEvent := range kubeEvents {
eventKey := fmt.Sprintf("%s|%s|%s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message)
if _, ok := reported[eventKey]; ok {
continue
}
reported[eventKey] = struct{}{}

eventType := "info"
if kubeEvent.Type == corev1.EventTypeWarning {
eventType = "error"
}

eventTime := kubeEvent.LastTimestamp
if eventTime.IsZero() {
eventTime = kubeEvent.FirstTimestamp
}
if eventTime.IsZero() && !kubeEvent.EventTime.IsZero() {
eventTime = metav1.NewTime(kubeEvent.EventTime.Time)
}
if eventTime.IsZero() {
eventTime = metav1.NewTime(time.Now())
}

*events = append(*events, &commonmodels.Event{
EventType: eventType,
Time: eventTime.Format("2006-01-02 15:04:05"),
Message: fmt.Sprintf("Pod event [%s/%s]: %s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message),
})
changed = true
}

if changed && onUpdate != nil {
onUpdate(events)
}
}

func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger, onEventsUpdate func(*commonmodels.Events)) (config.Status, error) {
xl.Infof("wait job to start: %s/%s", namespace, jobName)
xl.Infof("Timeout of preparing Pod: %s.", 120*time.Second)
waitPodReadyTimeout := time.After(120 * time.Second)

var podReadyTimeout bool
reportedEvents := make(map[string]struct{})
events := &commonmodels.Events{}
for {
select {
case <-ctx.Done():
Expand All @@ -966,6 +1021,8 @@ func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crC
continue
}
for _, pod := range podList {
appendPendingPodEvents(pod.Name, namespace, apiReader, events, reportedEvents, onEventsUpdate, xl)

if pod.Status.Phase == corev1.PodFailed {
msg := ""
for _, condition := range pod.Status.Conditions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type JobTaskPreview struct {
ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"`
ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"`
RetryCount int `bson:"retry_count" yaml:"retry_count" json:"retry_count"`
Events *commonmodels.Events `bson:"events" json:"events"`
// JobInfo contains the fields that make up the job task name, for frontend display
JobInfo interface{} `bson:"job_info" json:"job_info"`
}
Expand Down Expand Up @@ -2479,6 +2480,23 @@ func HandleJobError(workflowName, jobName, userID, username string, taskID int64
return nil
}

func extractRuntimeJobEvents(job *commonmodels.JobTask) *commonmodels.Events {
switch job.JobType {
case string(config.JobFreestyle), string(config.JobZadigBuild), string(config.JobZadigTesting), string(config.JobZadigScanning), string(config.JobZadigDistributeImage):
taskJobSpec := &commonmodels.JobTaskFreestyleSpec{}
if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil {
return taskJobSpec.Events
}
case string(config.JobPlugin):
taskJobSpec := &commonmodels.JobTaskPluginSpec{}
if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil {
return taskJobSpec.Events
}
}

return nil
}

func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, now int64, projectName string) []*JobTaskPreview {
envMap := make(map[string]*commonmodels.Product)
resp := []*JobTaskPreview{}
Expand Down Expand Up @@ -2510,6 +2528,7 @@ func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string,
ErrorHandlerUserID: job.ErrorHandlerUserID,
ErrorHandlerUserName: job.ErrorHandlerUserName,
RetryCount: job.RetryCount,
Events: extractRuntimeJobEvents(job),
}
switch job.JobType {
case string(config.JobFreestyle):
Expand Down
28 changes: 23 additions & 5 deletions pkg/microservice/aslan/core/workflow/testing/service/scanning.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,17 @@ func ListScanningTask(id string, pageNum, pageSize int, log *zap.SugaredLogger)
respList := make([]*ScanningTaskResp, 0)

for _, workflowTask := range workflowTasks {
status := workflowTask.Status
if len(workflowTask.Stages) == 1 && len(workflowTask.Stages[0].Jobs) == 1 && workflowTask.Stages[0].Jobs[0].Status != "" {
status = workflowTask.Stages[0].Jobs[0].Status
}
taskInfo := &ScanningTaskResp{
ScanID: workflowTask.TaskID,
Status: string(workflowTask.Status),
Status: string(status),
Creator: workflowTask.TaskCreator,
CreatedAt: workflowTask.CreateTime,
}
if workflowTask.Status == config.StatusPassed || workflowTask.Status == config.StatusCancelled || workflowTask.Status == config.StatusFailed {
if status == config.StatusPassed || status == config.StatusCancelled || status == config.StatusFailed {
taskInfo.RunTime = workflowTask.EndTime - workflowTask.StartTime
}
respList = append(respList, taskInfo)
Expand Down Expand Up @@ -347,9 +351,9 @@ func GetScanningTaskInfo(scanningID string, taskID int64, log *zap.SugaredLogger
resultAddr := ""

if len(workflowTask.Stages) != 1 || len(workflowTask.Stages[0].Jobs) != 1 {
errMsg := fmt.Sprintf("invalid test task!")
errMsg := "invalid test task!"
log.Errorf(errMsg)
return nil, fmt.Errorf(errMsg)
return nil, fmt.Errorf("%s", errMsg)
}

spec := new(commonmodels.ZadigScanningJobSpec)
Expand Down Expand Up @@ -429,11 +433,25 @@ func GetScanningTaskInfo(scanningID string, taskID int64, log *zap.SugaredLogger
repo.Username = ""
}

status := workflowTask.Status
if workflowTask.Stages[0].Jobs[0].Status != "" {
status = workflowTask.Stages[0].Jobs[0].Status
}
errorMsg := workflowTask.Stages[0].Jobs[0].Error
if errorMsg == "" {
errorMsg = workflowTask.Stages[0].Error
}
if errorMsg == "" {
errorMsg = workflowTask.Error
}

return &ScanningTaskDetail{
Creator: workflowTask.TaskCreator,
Status: string(workflowTask.Status),
Status: string(status),
Error: errorMsg,
CreateTime: workflowTask.CreateTime,
EndTime: workflowTask.EndTime,
Events: jobTaskSpec.Events,
RepoInfo: repoInfo,
SonarMetrics: sonarMetrics,
ResultLink: resultAddr,
Expand Down
16 changes: 14 additions & 2 deletions pkg/microservice/aslan/core/workflow/testing/service/test_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
}

if len(workflowTask.WorkflowArgs.Stages) != 1 || len(workflowTask.WorkflowArgs.Stages[0].Jobs) != 1 {
errMsg := fmt.Sprintf("invalid test task!")
errMsg := "invalid test task!"
log.Errorf(errMsg)
return nil, fmt.Errorf(errMsg)
return nil, fmt.Errorf("%s", errMsg)
}

stages := make([]*commonmodels.Stage, 0)
Expand Down Expand Up @@ -224,10 +224,19 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
}
}

errorMsg := workflowTask.Stages[0].Jobs[0].Error
if errorMsg == "" {
errorMsg = workflowTask.Stages[0].Error
}
if errorMsg == "" {
errorMsg = workflowTask.Error
}

subTaskInfo[testName] = map[string]interface{}{
"start_time": workflowTask.Stages[0].Jobs[0].StartTime,
"end_time": workflowTask.Stages[0].Jobs[0].EndTime,
"status": workflowTask.Stages[0].Jobs[0].Status,
"error": errorMsg,
"job_ctx": struct {
JobName string `json:"job_name"`
IsHasArtifact bool `json:"is_has_artifact"`
Expand All @@ -248,6 +257,7 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
SubTasks: subTaskInfo,
StartTime: workflowTask.Stages[0].StartTime,
EndTime: workflowTask.Stages[0].EndTime,
Error: errorMsg,
TypeName: workflowTask.Stages[0].Name,
})

Expand All @@ -263,9 +273,11 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
CreateTime: workflowTask.CreateTime,
StartTime: workflowTask.StartTime,
EndTime: workflowTask.EndTime,
Error: errorMsg,
Stages: stages,
TestReports: testResultMap,
IsRestart: workflowTask.IsRestart,
Events: jobSpec.Events,
}, nil
}

Expand Down
22 changes: 12 additions & 10 deletions pkg/microservice/aslan/core/workflow/testing/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,18 @@ type ScanningTaskResp struct {
}

type ScanningTaskDetail struct {
Creator string `json:"creator"`
Status string `json:"status"`
CreateTime int64 `json:"create_time"`
EndTime int64 `json:"end_time"`
RepoInfo []*types.Repository `json:"repo_info"`
SonarMetrics *step.SonarMetrics `json:"sonar_metrics"`
ResultLink string `json:"result_link,omitempty"`
IsHasArtifact bool `json:"is_has_artifact"`
JobName string `json:"job_name"`
JobDisplayName string `json:"job_display_name"`
Creator string `json:"creator"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
CreateTime int64 `json:"create_time"`
EndTime int64 `json:"end_time"`
Events *commonmodels.Events `json:"events,omitempty"`
RepoInfo []*types.Repository `json:"repo_info"`
SonarMetrics *step.SonarMetrics `json:"sonar_metrics"`
ResultLink string `json:"result_link,omitempty"`
IsHasArtifact bool `json:"is_has_artifact"`
JobName string `json:"job_name"`
JobDisplayName string `json:"job_display_name"`
}

func ConvertToDBScanningModule(args *Scanning) *commonmodels.Scanning {
Expand Down
Loading