diff --git a/pkg/cli/initconfig/cmd/init.go b/pkg/cli/initconfig/cmd/init.go index 8ab70ad404..9ff9851e49 100644 --- a/pkg/cli/initconfig/cmd/init.go +++ b/pkg/cli/initconfig/cmd/init.go @@ -200,6 +200,8 @@ func createOrUpdateMongodbIndex(ctx context.Context) { commonrepo.NewEnvInfoColl(), commonrepo.NewApprovalTicketColl(), commonrepo.NewWorkflowTaskRevertColl(), + commonrepo.NewTerminalSessionColl(), + commonrepo.NewTerminalCommandColl(), // msg queue commonrepo.NewMsgQueueCommonColl(), @@ -304,7 +306,7 @@ func createBuiltinApplicationFieldDefinitions() error { {Key: "update_time", Name: "更新时间", Type: aslanconfig.ApplicationCustomFieldTypeDatetime, ShowInList: true, Source: aslanconfig.ApplicationFieldSourceBuiltin, Description: "业务服务的更新时间"}, } - // Upsert per key to be idempotent. Keep user-changed attributes for custom fields; for built-ins we only enforce Source="builtin" and Type. + // Upsert per key to be idempotent. Keep user-changed attributes for custom fields; for built-ins we only enforce Source="builtin", Type, Name, Description, and Required. for i := range builtin { b := builtin[i] existing, err := coll.GetByKey(ctx, b.Key) diff --git a/pkg/microservice/aslan/core/common/repository/models/terminal_audit.go b/pkg/microservice/aslan/core/common/repository/models/terminal_audit.go new file mode 100644 index 0000000000..092650f599 --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/models/terminal_audit.go @@ -0,0 +1,116 @@ +package models + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type TerminalSessionType string + +const ( + TerminalSessionTypeSSH TerminalSessionType = "ssh" + TerminalSessionTypePodExec TerminalSessionType = "podexec" + TerminalSessionTypeWorkflowDebug TerminalSessionType = "workflow_debug" +) + +type TerminalSessionStatus string + +const ( + TerminalSessionStatusRunning TerminalSessionStatus = "running" + TerminalSessionStatusFinished TerminalSessionStatus = "finished" + TerminalSessionStatusAborted TerminalSessionStatus = "aborted" + TerminalSessionStatusFailed TerminalSessionStatus = "failed" +) + +type TerminalSession struct { + ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` + SessionID string `bson:"session_id" json:"session_id"` + SessionType TerminalSessionType `bson:"session_type" json:"session_type"` + Status TerminalSessionStatus `bson:"status" json:"status"` + UserID string `bson:"user_id" json:"user_id"` + Username string `bson:"username" json:"username"` + Account string `bson:"account" json:"account"` + ProjectName string `bson:"project_name" json:"project_name"` + EnvName string `bson:"env_name" json:"env_name"` + ServiceName string `bson:"service_name" json:"service_name"` + WorkflowName string `bson:"workflow_name" json:"workflow_name"` + JobName string `bson:"job_name" json:"job_name"` + TaskID int64 `bson:"task_id" json:"task_id"` + TargetName string `bson:"target_name" json:"target_name"` + Protocol string `bson:"protocol" json:"protocol"` + RemoteAddr string `bson:"remote_addr" json:"remote_addr"` + LoginAccount string `bson:"login_account" json:"login_account"` + HostID string `bson:"host_id" json:"host_id"` + HostName string `bson:"host_name" json:"host_name"` + HostIP string `bson:"host_ip" json:"host_ip"` + ClusterID string `bson:"cluster_id" json:"cluster_id"` + Namespace string `bson:"namespace" json:"namespace"` + PodName string `bson:"pod_name" json:"pod_name"` + ContainerName string `bson:"container_name" json:"container_name"` + ClientIP string `bson:"client_ip" json:"client_ip"` + UserAgent string `bson:"user_agent" json:"user_agent"` + StartedAt int64 `bson:"started_at" json:"started_at"` + EndedAt int64 `bson:"ended_at" json:"ended_at"` + DurationSeconds int64 `bson:"duration_seconds" json:"duration_seconds"` + LastActivityAt int64 `bson:"last_activity_at" json:"last_activity_at"` + CommandCount int64 `bson:"command_count" json:"command_count"` + StorageID string `bson:"storage_id" json:"storage_id"` + Bucket string `bson:"bucket" json:"bucket"` + ObjectKey string `bson:"object_key" json:"object_key"` + FileSize int64 `bson:"file_size" json:"file_size"` + ErrorMessage string `bson:"error_message" json:"error_message"` + CreatedAt int64 `bson:"created_at" json:"created_at"` + UpdatedAt int64 `bson:"updated_at" json:"updated_at"` +} + +func (TerminalSession) TableName() string { + return "terminal_session" +} + +type TerminalCommand struct { + ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` + SessionID string `bson:"session_id" json:"session_id"` + Seq int64 `bson:"seq" json:"seq"` + Command string `bson:"command" json:"command"` + RiskLevel string `bson:"risk_level" json:"risk_level"` + UserID string `bson:"user_id" json:"user_id"` + Username string `bson:"username" json:"username"` + Account string `bson:"account" json:"account"` + ProjectName string `bson:"project_name" json:"project_name"` + EnvName string `bson:"env_name" json:"env_name"` + TargetName string `bson:"target_name" json:"target_name"` + Protocol string `bson:"protocol" json:"protocol"` + RemoteAddr string `bson:"remote_addr" json:"remote_addr"` + LoginAccount string `bson:"login_account" json:"login_account"` + TimeOffsetMS int64 `bson:"time_offset_ms" json:"time_offset_ms"` + CreatedAt int64 `bson:"created_at" json:"created_at"` +} + +func (TerminalCommand) TableName() string { + return "terminal_command" +} + +type TerminalSessionListArgs struct { + Status string `form:"status" json:"status"` + SessionType string `form:"sessionType" json:"sessionType"` + ProjectName string `form:"projectName" json:"projectName"` + EnvName string `form:"envName" json:"envName"` + ServiceName string `form:"serviceName" json:"serviceName"` + Username string `form:"username" json:"username"` + TargetName string `form:"targetName" json:"targetName"` + RemoteAddr string `form:"remoteAddr" json:"remoteAddr"` + StartTime int64 `form:"startTime" json:"startTime"` + EndTime int64 `form:"endTime" json:"endTime"` + PageNum int64 `form:"pageNum" json:"pageNum"` + PageSize int64 `form:"pageSize" json:"pageSize"` +} + +type TerminalCommandListArgs struct { + SessionID string `form:"sessionID" json:"sessionID"` + ProjectName string `form:"projectName" json:"projectName"` + Username string `form:"username" json:"username"` + TargetName string `form:"targetName" json:"targetName"` + RemoteAddr string `form:"remoteAddr" json:"remoteAddr"` + Command string `form:"command" json:"command"` + StartTime int64 `form:"startTime" json:"startTime"` + EndTime int64 `form:"endTime" json:"endTime"` + PageNum int64 `form:"pageNum" json:"pageNum"` + PageSize int64 `form:"pageSize" json:"pageSize"` +} diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/terminal_command.go b/pkg/microservice/aslan/core/common/repository/mongodb/terminal_command.go new file mode 100644 index 0000000000..76c89f2a6f --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/terminal_command.go @@ -0,0 +1,131 @@ +package mongodb + +import ( + "context" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +type TerminalCommandColl struct { + *mongo.Collection + + coll string +} + +func NewTerminalCommandColl() *TerminalCommandColl { + name := models.TerminalCommand{}.TableName() + return &TerminalCommandColl{ + Collection: mongotool.Database(config.MongoDatabase()).Collection(name), + coll: name, + } +} + +func (c *TerminalCommandColl) GetCollectionName() string { + return c.coll +} + +func (c *TerminalCommandColl) EnsureIndex(ctx context.Context) error { + indexes := []mongo.IndexModel{ + { + Keys: bson.D{{Key: "session_id", Value: 1}, {Key: "seq", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{{Key: "project_name", Value: 1}, {Key: "created_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + { + Keys: bson.D{{Key: "username", Value: 1}, {Key: "created_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + { + Keys: bson.D{{Key: "command", Value: "text"}}, + Options: options.Index().SetUnique(false), + }, + } + _, err := c.Indexes().CreateMany(ctx, indexes, mongotool.CreateIndexOptions(ctx)) + return err +} + +func (c *TerminalCommandColl) Create(command *models.TerminalCommand) error { + if command == nil { + return nil + } + _, err := c.InsertOne(context.TODO(), command) + return err +} + +func (c *TerminalCommandColl) CreateMany(commands []*models.TerminalCommand) error { + if len(commands) == 0 { + return nil + } + docs := make([]interface{}, 0, len(commands)) + for _, command := range commands { + if command == nil { + continue + } + docs = append(docs, command) + } + if len(docs) == 0 { + return nil + } + _, err := c.InsertMany(context.TODO(), docs) + return err +} + +func (c *TerminalCommandColl) List(args *models.TerminalCommandListArgs) ([]*models.TerminalCommand, int64, error) { + resp := make([]*models.TerminalCommand, 0) + query := bson.M{} + if args != nil { + if args.SessionID != "" { + query["session_id"] = args.SessionID + } + if args.ProjectName != "" { + query["project_name"] = buildRegexQuery(args.ProjectName) + } + if args.Username != "" { + query["username"] = buildRegexQuery(args.Username) + } + if args.TargetName != "" { + query["target_name"] = buildRegexQuery(args.TargetName) + } + if args.RemoteAddr != "" { + query["remote_addr"] = buildRegexQuery(args.RemoteAddr) + } + if args.Command != "" { + query["command"] = buildRegexQuery(args.Command) + } + if args.StartTime > 0 || args.EndTime > 0 { + timeQuery := bson.M{} + if args.StartTime > 0 { + timeQuery["$gte"] = args.StartTime + } + if args.EndTime > 0 { + timeQuery["$lte"] = args.EndTime + } + query["created_at"] = timeQuery + } + } + + opts := options.Find().SetSort(bson.D{{Key: "created_at", Value: -1}, {Key: "seq", Value: -1}}) + if args != nil && args.PageNum > 0 && args.PageSize > 0 { + opts.SetSkip((args.PageNum - 1) * args.PageSize).SetLimit(args.PageSize) + } + cursor, err := c.Find(context.TODO(), query, opts) + if err != nil { + return nil, 0, err + } + defer cursor.Close(context.TODO()) + + if err := cursor.All(context.TODO(), &resp); err != nil { + return nil, 0, err + } + total, err := c.CountDocuments(context.TODO(), query) + return resp, total, err +} diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/terminal_query.go b/pkg/microservice/aslan/core/common/repository/mongodb/terminal_query.go new file mode 100644 index 0000000000..a63e408587 --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/terminal_query.go @@ -0,0 +1,11 @@ +package mongodb + +import ( + "regexp" + + "go.mongodb.org/mongo-driver/bson" +) + +func buildRegexQuery(value string) bson.M { + return bson.M{"$regex": regexp.QuoteMeta(value)} +} diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/terminal_session.go b/pkg/microservice/aslan/core/common/repository/mongodb/terminal_session.go new file mode 100644 index 0000000000..8c0112276a --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/terminal_session.go @@ -0,0 +1,196 @@ +package mongodb + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +type TerminalSessionColl struct { + *mongo.Collection + + coll string +} + +type CloseSessionArgs struct { + SessionID string + Status models.TerminalSessionStatus + EndedAt int64 + DurationSeconds int64 + StorageID string + Bucket string + ObjectKey string + FileSize int64 + ErrorMessage string +} + +func NewTerminalSessionColl() *TerminalSessionColl { + name := models.TerminalSession{}.TableName() + return &TerminalSessionColl{ + Collection: mongotool.Database(config.MongoDatabase()).Collection(name), + coll: name, + } +} + +func (c *TerminalSessionColl) GetCollectionName() string { + return c.coll +} + +func (c *TerminalSessionColl) EnsureIndex(ctx context.Context) error { + indexes := []mongo.IndexModel{ + { + Keys: bson.D{{Key: "session_id", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{{Key: "status", Value: 1}, {Key: "started_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + { + Keys: bson.D{{Key: "project_name", Value: 1}, {Key: "env_name", Value: 1}, {Key: "started_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + { + Keys: bson.D{{Key: "username", Value: 1}, {Key: "started_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + { + Keys: bson.D{{Key: "session_type", Value: 1}, {Key: "started_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + { + Keys: bson.D{{Key: "target_name", Value: 1}, {Key: "started_at", Value: -1}}, + Options: options.Index().SetUnique(false), + }, + } + + _, err := c.Indexes().CreateMany(ctx, indexes, mongotool.CreateIndexOptions(ctx)) + return err +} + +func (c *TerminalSessionColl) Create(session *models.TerminalSession) error { + if session == nil { + return nil + } + now := time.Now().Unix() + if session.CreatedAt == 0 { + session.CreatedAt = now + } + if session.UpdatedAt == 0 { + session.UpdatedAt = now + } + if session.LastActivityAt == 0 { + session.LastActivityAt = session.StartedAt + } + _, err := c.InsertOne(context.TODO(), session) + return err +} + +func (c *TerminalSessionColl) FindBySessionID(sessionID string) (*models.TerminalSession, error) { + resp := new(models.TerminalSession) + err := c.FindOne(context.TODO(), bson.M{"session_id": sessionID}).Decode(resp) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *TerminalSessionColl) UpdateActivity(sessionID string, commandCountDelta int64, lastActivityAt int64) error { + update := bson.M{ + "$set": bson.M{ + "last_activity_at": lastActivityAt, + "updated_at": time.Now().Unix(), + }, + } + if commandCountDelta != 0 { + update["$inc"] = bson.M{"command_count": commandCountDelta} + } + _, err := c.UpdateOne(context.TODO(), bson.M{"session_id": sessionID}, update) + return err +} + +func (c *TerminalSessionColl) CloseSession(args *CloseSessionArgs) error { + if args == nil { + return nil + } + update := bson.M{ + "$set": bson.M{ + "status": args.Status, + "ended_at": args.EndedAt, + "duration_seconds": args.DurationSeconds, + "last_activity_at": args.EndedAt, + "storage_id": args.StorageID, + "bucket": args.Bucket, + "object_key": args.ObjectKey, + "file_size": args.FileSize, + "error_message": args.ErrorMessage, + "updated_at": time.Now().Unix(), + }, + } + _, err := c.UpdateOne(context.TODO(), bson.M{"session_id": args.SessionID}, update) + return err +} + +func (c *TerminalSessionColl) List(args *models.TerminalSessionListArgs) ([]*models.TerminalSession, int64, error) { + resp := make([]*models.TerminalSession, 0) + query := bson.M{} + if args != nil { + if args.Status != "" { + query["status"] = args.Status + } + if args.SessionType != "" { + query["session_type"] = args.SessionType + } + if args.ProjectName != "" { + query["project_name"] = buildRegexQuery(args.ProjectName) + } + if args.EnvName != "" { + query["env_name"] = buildRegexQuery(args.EnvName) + } + if args.ServiceName != "" { + query["service_name"] = buildRegexQuery(args.ServiceName) + } + if args.Username != "" { + query["username"] = buildRegexQuery(args.Username) + } + if args.TargetName != "" { + query["target_name"] = buildRegexQuery(args.TargetName) + } + if args.RemoteAddr != "" { + query["remote_addr"] = buildRegexQuery(args.RemoteAddr) + } + if args.StartTime > 0 || args.EndTime > 0 { + timeQuery := bson.M{} + if args.StartTime > 0 { + timeQuery["$gte"] = args.StartTime + } + if args.EndTime > 0 { + timeQuery["$lte"] = args.EndTime + } + query["started_at"] = timeQuery + } + } + + opts := options.Find().SetSort(bson.D{{Key: "started_at", Value: -1}}) + if args != nil && args.PageNum > 0 && args.PageSize > 0 { + opts.SetSkip((args.PageNum - 1) * args.PageSize).SetLimit(args.PageSize) + } + cursor, err := c.Find(context.TODO(), query, opts) + if err != nil { + return nil, 0, err + } + defer cursor.Close(context.TODO()) + + if err := cursor.All(context.TODO(), &resp); err != nil { + return nil, 0, err + } + total, err := c.CountDocuments(context.TODO(), query) + return resp, total, err +} diff --git a/pkg/microservice/aslan/core/environment/handler/pm_exec.go b/pkg/microservice/aslan/core/environment/handler/pm_exec.go index 1d0c477725..8d3effc504 100644 --- a/pkg/microservice/aslan/core/environment/handler/pm_exec.go +++ b/pkg/microservice/aslan/core/environment/handler/pm_exec.go @@ -72,7 +72,7 @@ func ConnectSshPmExec(c *gin.Context) { } } - ctx.RespErr = service.ConnectSshPmExec(c, ctx.UserName, name, projectKey, ip, hostId, cols, rows, ctx.Logger) + ctx.RespErr = service.ConnectSshPmExec(c, ctx.UserName, ctx.UserID, ctx.Account, name, projectKey, c.Param("serviceName"), ip, hostId, cols, rows, ctx.Logger) } // @summary Exec VM Service Command diff --git a/pkg/microservice/aslan/core/environment/service/pm_exec.go b/pkg/microservice/aslan/core/environment/service/pm_exec.go index d69d482fc7..e9233be654 100644 --- a/pkg/microservice/aslan/core/environment/service/pm_exec.go +++ b/pkg/microservice/aslan/core/environment/service/pm_exec.go @@ -26,6 +26,8 @@ import ( "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + terminalaudit "github.com/koderover/zadig/v2/pkg/shared/terminalaudit" "go.uber.org/zap" "golang.org/x/crypto/ssh" @@ -45,7 +47,7 @@ var upgrader = websocket.Upgrader{ }, } -func ConnectSshPmExec(c *gin.Context, username, envName, productName, ip, hostId string, cols, rows int, log *zap.SugaredLogger) error { +func ConnectSshPmExec(c *gin.Context, username, userID, account, envName, productName, serviceName, ip, hostId string, cols, rows int, log *zap.SugaredLogger) error { ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Errorf("ws upgrade err:%s", err) @@ -95,11 +97,50 @@ func ConnectSshPmExec(c *gin.Context, username, envName, productName, ip, hostId } defer sshCli.Close() - sshConn, err := wsconn.NewSshConn(cols, rows, sshCli) + finalStatus := commonmodels.TerminalSessionStatusFinished + var audit *terminalaudit.AuditSession + meta := &terminalaudit.SessionMeta{ + SessionType: commonmodels.TerminalSessionTypeSSH, + Protocol: "ssh", + Username: username, + ProjectName: productName, + EnvName: envName, + ServiceName: serviceName, + TargetName: resolveHostTargetName(resp), + RemoteAddr: resp.IP, + LoginAccount: resp.UserName, + HostID: hostId, + HostName: resolveHostName(resp), + HostIP: resp.IP, + ClientIP: c.ClientIP(), + UserAgent: c.Request.UserAgent(), + InitialCols: cols, + InitialRows: rows, + UserID: userID, + Account: account, + } + audit, err = terminalaudit.NewAuditSession(meta, func() { + sshCli.Close() + _ = ws.Close() + }) + if err != nil { + log.Errorf("create ssh terminal audit recorder failed: %v", err) + } + defer func() { + if err := audit.Close(finalStatus); err != nil { + log.Errorf("close ssh terminal audit recorder failed: %v", err) + } + }() + + sshConn, err := wsconn.NewSshConn(cols, rows, sshCli, &wsconn.SshConnOption{ + Recorder: audit.Recorder, + Sanitizer: audit.Sanitizer, + }) if err != nil { log.Errorf("NewSshConn err:%s", err) e.ErrLoginPm.AddErr(err) ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, e.ErrLoginPm.Error())) + finalStatus = commonmodels.TerminalSessionStatusFailed return e.ErrLoginPm } defer sshConn.Close() @@ -113,6 +154,26 @@ func ConnectSshPmExec(c *gin.Context, username, envName, productName, ip, hostId return nil } +func resolveHostTargetName(resp *commonmodels.PrivateKey) string { + if resp == nil { + return "" + } + if resp.Name != "" { + return resp.Name + } + if resp.VMInfo != nil && resp.VMInfo.HostName != "" { + return resp.VMInfo.HostName + } + return resp.IP +} + +func resolveHostName(resp *commonmodels.PrivateKey) string { + if resp == nil || resp.VMInfo == nil { + return "" + } + return resp.VMInfo.HostName +} + type VmServiceCommandType string const ( diff --git a/pkg/microservice/aslan/core/system/handler/router.go b/pkg/microservice/aslan/core/system/handler/router.go index 2bd3aa14cc..71bc5fbc57 100644 --- a/pkg/microservice/aslan/core/system/handler/router.go +++ b/pkg/microservice/aslan/core/system/handler/router.go @@ -84,6 +84,15 @@ func (*Router) Inject(router *gin.RouterGroup) { s3storage.GET("/project", ListS3StorageByProject) } + terminalAudit := router.Group("terminalAudit") + { + terminalAudit.GET("/sessions", ListTerminalSessions) + terminalAudit.GET("/sessions/:sessionID", GetTerminalSession) + terminalAudit.GET("/sessions/:sessionID/cast", GetTerminalCast) + terminalAudit.POST("/sessions/:sessionID/terminate", TerminateTerminalSession) + terminalAudit.GET("/commands", ListTerminalCommands) + } + //系统清理缓存 cleanCache := router.Group("cleanCache") { diff --git a/pkg/microservice/aslan/core/system/handler/terminal_audit.go b/pkg/microservice/aslan/core/system/handler/terminal_audit.go new file mode 100644 index 0000000000..281b5e9b6a --- /dev/null +++ b/pkg/microservice/aslan/core/system/handler/terminal_audit.go @@ -0,0 +1,145 @@ +package handler + +import ( + "fmt" + "io" + "strconv" + + "github.com/gin-gonic/gin" + + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + internalhandler "github.com/koderover/zadig/v2/pkg/shared/handler" + terminalaudit "github.com/koderover/zadig/v2/pkg/shared/terminalaudit" +) + +func ListTerminalSessions(c *gin.Context) { + ctx, err := internalhandler.NewContextWithAuthorization(c) + defer func() { internalhandler.JSONResponse(c, ctx) }() + if err != nil { + ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) + ctx.UnAuthorized = true + return + } + if !ctx.Resources.IsSystemAdmin { + ctx.UnAuthorized = true + return + } + + args := &commonmodels.TerminalSessionListArgs{ + Status: c.Query("status"), + SessionType: c.Query("sessionType"), + ProjectName: c.Query("projectName"), + EnvName: c.Query("envName"), + ServiceName: c.Query("serviceName"), + Username: c.Query("username"), + TargetName: c.Query("targetName"), + RemoteAddr: c.Query("remoteAddr"), + StartTime: parseInt64Query(c, "startTime"), + EndTime: parseInt64Query(c, "endTime"), + PageNum: parseInt64WithDefault(c, "pageNum", 1), + PageSize: parseInt64WithDefault(c, "pageSize", 20), + } + ctx.Resp, ctx.RespErr = terminalaudit.ListSessions(args) +} + +func GetTerminalSession(c *gin.Context) { + ctx, err := internalhandler.NewContextWithAuthorization(c) + defer func() { internalhandler.JSONResponse(c, ctx) }() + if err != nil { + ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) + ctx.UnAuthorized = true + return + } + if !ctx.Resources.IsSystemAdmin { + ctx.UnAuthorized = true + return + } + ctx.Resp, ctx.RespErr = terminalaudit.GetSession(c.Param("sessionID")) +} + +func GetTerminalCast(c *gin.Context) { + ctx, err := internalhandler.NewContextWithAuthorization(c) + defer func() { internalhandler.JSONResponse(c, ctx) }() + if err != nil { + ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) + ctx.UnAuthorized = true + return + } + if !ctx.Resources.IsSystemAdmin { + ctx.UnAuthorized = true + return + } + + stream, err := terminalaudit.GetCastStream(c.Param("sessionID")) + if err != nil { + ctx.RespErr = err + return + } + defer stream.Body.Close() + + c.Header("Content-Type", "application/octet-stream") + if stream.FileSize > 0 { + c.Header("Content-Length", strconv.FormatInt(stream.FileSize, 10)) + } + c.Status(200) + _, ctx.RespErr = io.Copy(c.Writer, stream.Body) +} + +func ListTerminalCommands(c *gin.Context) { + ctx, err := internalhandler.NewContextWithAuthorization(c) + defer func() { internalhandler.JSONResponse(c, ctx) }() + if err != nil { + ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) + ctx.UnAuthorized = true + return + } + if !ctx.Resources.IsSystemAdmin { + ctx.UnAuthorized = true + return + } + + args := &commonmodels.TerminalCommandListArgs{ + SessionID: c.Query("sessionID"), + ProjectName: c.Query("projectName"), + Username: c.Query("username"), + TargetName: c.Query("targetName"), + RemoteAddr: c.Query("remoteAddr"), + Command: c.Query("command"), + StartTime: parseInt64Query(c, "startTime"), + EndTime: parseInt64Query(c, "endTime"), + PageNum: parseInt64WithDefault(c, "pageNum", 1), + PageSize: parseInt64WithDefault(c, "pageSize", 20), + } + ctx.Resp, ctx.RespErr = terminalaudit.ListCommands(args) +} + +func TerminateTerminalSession(c *gin.Context) { + ctx, err := internalhandler.NewContextWithAuthorization(c) + defer func() { internalhandler.JSONResponse(c, ctx) }() + if err != nil { + ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err) + ctx.UnAuthorized = true + return + } + if !ctx.Resources.IsSystemAdmin { + ctx.UnAuthorized = true + return + } + ctx.RespErr = terminalaudit.TerminateSession(c.Param("sessionID")) +} + +func parseInt64Query(c *gin.Context, key string) int64 { + return parseInt64WithDefault(c, key, 0) +} + +func parseInt64WithDefault(c *gin.Context, key string, defaultValue int64) int64 { + raw := c.Query(key) + if raw == "" { + return defaultValue + } + value, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + return defaultValue + } + return value +} diff --git a/pkg/microservice/aslan/server/server.go b/pkg/microservice/aslan/server/server.go index f88fd2d2f8..675d55730c 100644 --- a/pkg/microservice/aslan/server/server.go +++ b/pkg/microservice/aslan/server/server.go @@ -26,11 +26,15 @@ import ( "github.com/gorilla/mux" "github.com/koderover/zadig/v2/pkg/microservice/aslan/core" "github.com/koderover/zadig/v2/pkg/microservice/aslan/server/rest" + terminalaudit "github.com/koderover/zadig/v2/pkg/shared/terminalaudit" "github.com/koderover/zadig/v2/pkg/tool/kube/client" "github.com/koderover/zadig/v2/pkg/tool/log" ) func Serve(ctx context.Context) error { + terminalaudit.SetProcessContext(ctx) + defer terminalaudit.SetProcessContext(context.Background()) + go func() { if err := client.Start(ctx); err != nil { panic(err) diff --git a/pkg/microservice/podexec/core/service/pod_server_ws.go b/pkg/microservice/podexec/core/service/pod_server_ws.go index 1293eacd11..fbf5c7c406 100644 --- a/pkg/microservice/podexec/core/service/pod_server_ws.go +++ b/pkg/microservice/podexec/core/service/pod_server_ws.go @@ -17,18 +17,26 @@ limitations under the License. package service import ( + "context" + "errors" "fmt" + "io" "strconv" "strings" "github.com/gin-gonic/gin" + terminalaudit "github.com/koderover/zadig/v2/pkg/shared/terminalaudit" "github.com/koderover/zadig/v2/pkg/tool/clientmanager" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + "github.com/koderover/zadig/v2/pkg/setting" internalhandler "github.com/koderover/zadig/v2/pkg/shared/handler" e "github.com/koderover/zadig/v2/pkg/tool/errors" "github.com/koderover/zadig/v2/pkg/tool/kube/getter" @@ -55,6 +63,9 @@ func ServeWs(c *gin.Context) { log.Infof("exec containerName: %s, pod: %s", containerName, podName) productName := c.Query("projectName") + if productName == "" { + productName = c.Param("productName") + } envName := c.Param("envName") productInfo, err := commonrepo.NewProductColl().Find(&commonrepo.ProductFindOptions{Name: productName, EnvName: envName}) if err != nil { @@ -70,16 +81,26 @@ func ServeWs(c *gin.Context) { return } defer func() { - log.Info("close session.") + log.Infof("serve ws defer close terminal session, sessionID=%s", pty.SessionID) _ = pty.Close() }() + initialCols, initialRows := readTerminalSizeFromQuery(c) + finalStatus := commonmodels.TerminalSessionStatusFinished + var audit *terminalaudit.AuditSession + defer func() { + if audit != nil { + log.Infof("serve ws defer close audit session, sessionID=%s finalStatus=%s", audit.SessionID, finalStatus) + } + if err := audit.Close(finalStatus); err != nil { + log.Errorf("close terminal audit recorder failed: %v", err) + } + }() kubeCli, err := clientmanager.NewKubeClientManager().GetKubernetesClientSet(clusterID) if err != nil { msg := fmt.Sprintf("get kubecli err :%v", err) log.Errorf(msg) _, _ = pty.Write([]byte(msg)) - pty.Done() ctx.RespErr = e.ErrInternalError.AddDesc(fmt.Sprintf("get kubecli err :%v", err)) return @@ -90,22 +111,68 @@ func ServeWs(c *gin.Context) { msg := fmt.Sprintf("Validate pod error! err: %v", err) log.Errorf(msg) _, _ = pty.Write([]byte(msg)) - pty.Done() ctx.RespErr = e.ErrInternalError.AddDesc(fmt.Sprintf("Validate pod error! err: %v", err)) return } + pod, err := kubeCli.CoreV1().Pods(namespace).Get(c.Request.Context(), podName, metav1.GetOptions{}) + if err != nil { + log.Warnf("failed to get pod %s/%s for terminal audit metadata: %v", namespace, podName, err) + } + secrets, err := collectContainerSecretValues(c.Request.Context(), kubeCli, pod, namespace, containerName) + if err != nil { + log.Warnf("failed to collect pod secret values for terminal audit masking: %v", err) + } - err = ExecPod(clusterID, []string{"/bin/sh"}, pty, namespace, podName, containerName) + meta := &terminalaudit.SessionMeta{ + SessionType: commonmodels.TerminalSessionTypePodExec, + Protocol: "k8s-exec", + UserID: ctx.UserID, + Username: ctx.UserName, + Account: ctx.Account, + ProjectName: productName, + EnvName: envName, + ServiceName: resolvePodServiceName(pod), + TargetName: fmt.Sprintf("%s/%s", podName, containerName), + RemoteAddr: func() string { + if pod != nil { + return pod.Status.PodIP + } + return "" + }(), + ClusterID: clusterID, + Namespace: namespace, + PodName: podName, + ContainerName: containerName, + ClientIP: c.ClientIP(), + UserAgent: c.Request.UserAgent(), + InitialCols: initialCols, + InitialRows: initialRows, + Secrets: secrets, + } + audit, err = terminalaudit.NewAuditSession(meta, func() { + _ = pty.Close() + }) if err != nil { - msg := fmt.Sprintf("Exec to pod error! err: %v", err) - log.Errorf(msg) - _, _ = pty.Write([]byte(msg)) - pty.Done() + log.Errorf("create podexec terminal audit recorder failed: %v", err) + } else { + log.Infof("created podexec terminal audit session, sessionID=%s project=%s env=%s pod=%s container=%s", audit.SessionID, productName, envName, podName, containerName) + } + pty.SetupAudit(audit) - ctx.RespErr = e.ErrInternalError.AddDesc(fmt.Sprintf("Exec to pod error! err: %v", err)) + log.Infof("start pod exec stream, sessionID=%s clusterID=%s namespace=%s pod=%s container=%s", pty.SessionID, clusterID, namespace, podName, containerName) + err = ExecPod(clusterID, []string{"/bin/sh"}, pty, namespace, podName, containerName) + log.Infof("finish pod exec stream, sessionID=%s err=%v", pty.SessionID, err) + if err == nil || isExpectedTerminalClose(err) { return } + finalStatus = commonmodels.TerminalSessionStatusFailed + msg := fmt.Sprintf("Exec to pod error! err: %v", err) + log.Errorf(msg) + _, _ = pty.Write([]byte(msg)) + + ctx.RespErr = e.ErrInternalError.AddDesc(fmt.Sprintf("Exec to pod error! err: %v", err)) + return } func DebugWorkflow(c *gin.Context) { @@ -118,11 +185,11 @@ func DebugWorkflow(c *gin.Context) { return } - ctx.RespErr = debugWorkflow(c, c.Param("workflowName"), c.Param("jobName"), taskID, logger) + ctx.RespErr = debugWorkflow(c, ctx, c.Param("workflowName"), c.Param("jobName"), taskID, logger) return } -func debugWorkflow(c *gin.Context, workflowName, jobName string, taskID int64, logger *zap.SugaredLogger) error { +func debugWorkflow(c *gin.Context, ctx *internalhandler.Context, workflowName, jobName string, taskID int64, logger *zap.SugaredLogger) error { workflowTask, err := commonrepo.NewworkflowTaskv4Coll().Find(workflowName, taskID) if err != nil { return e.ErrStopDebugShell.AddDesc(fmt.Sprintf("failed to find task: %s", err)) @@ -153,22 +220,27 @@ FOR: return e.ErrGetDebugShell.AddDesc("启动调试终端意外失败") } - pty, err := NewTerminalSession(c.Writer, c.Request, nil, &TerminalSessionOption{ - SecretEnvs: func() (secrets []string) { - for _, v := range jobTaskSpec.Properties.Envs { - if v.IsCredential { - secrets = append(secrets, v.Value) - } + credValues := func() (secrets []string) { + for _, v := range jobTaskSpec.Properties.Envs { + if v.IsCredential { + secrets = append(secrets, v.Value) } - return secrets - }(), - Type: Workflow, - }) + } + return secrets + }() + + pty, err := NewTerminalSession(c.Writer, c.Request, nil) if err != nil { log.Errorf("get pty failed: %v", err) return e.ErrGetDebugShell.AddDesc(fmt.Sprintf("get pty failed: %v", err)) } + initialCols, initialRows := readTerminalSizeFromQuery(c) + finalStatus := commonmodels.TerminalSessionStatusFinished + var audit *terminalaudit.AuditSession defer func() { + if err := audit.Close(finalStatus); err != nil { + log.Errorf("close workflow terminal audit recorder failed: %v", err) + } log.Info("close session.") _ = pty.Close() }() @@ -208,14 +280,157 @@ FOR: } script += "bash\n" - err = ExecPod(jobTaskSpec.Properties.ClusterID, []string{"/bin/sh", "-c", script}, pty, jobTaskSpec.Properties.Namespace, pod.Name, pod.Spec.Containers[0].Name) + meta := &terminalaudit.SessionMeta{ + SessionType: commonmodels.TerminalSessionTypeWorkflowDebug, + Protocol: "k8s-exec", + UserID: ctx.UserID, + Username: ctx.UserName, + Account: ctx.Account, + ProjectName: workflowTask.ProjectName, + WorkflowName: workflowName, + JobName: jobName, + TaskID: taskID, + TargetName: fmt.Sprintf("%s/%s", pod.Name, pod.Spec.Containers[0].Name), + RemoteAddr: pod.Status.PodIP, + ClusterID: jobTaskSpec.Properties.ClusterID, + Namespace: jobTaskSpec.Properties.Namespace, + PodName: pod.Name, + ContainerName: pod.Spec.Containers[0].Name, + ClientIP: c.ClientIP(), + UserAgent: c.Request.UserAgent(), + InitialCols: initialCols, + InitialRows: initialRows, + Secrets: credValues, + } + audit, err = terminalaudit.NewAuditSession(meta, func() { + _ = pty.Close() + }) if err != nil { - msg := fmt.Sprintf("Exec to pod error! err: %v", err) - log.Errorf(msg) - _, _ = pty.Write([]byte(msg)) - pty.Done() + log.Errorf("create workflow terminal audit recorder failed: %v", err) + } + pty.SetupAudit(audit) + + err = ExecPod(jobTaskSpec.Properties.ClusterID, []string{"/bin/sh", "-c", script}, pty, jobTaskSpec.Properties.Namespace, pod.Name, pod.Spec.Containers[0].Name) + if err == nil || isExpectedTerminalClose(err) { + return nil + } + finalStatus = commonmodels.TerminalSessionStatusFailed + msg := fmt.Sprintf("Exec to pod error! err: %v", err) + log.Errorf(msg) + _, _ = pty.Write([]byte(msg)) + + return e.ErrGetDebugShell.AddDesc(fmt.Sprintf("Exec to pod error! err: %v", err)) +} + +func readTerminalSizeFromQuery(c *gin.Context) (int, int) { + cols := 135 + rows := 40 + if value, err := strconv.Atoi(c.Query("cols")); err == nil && value > 0 { + cols = value + } + if value, err := strconv.Atoi(c.Query("rows")); err == nil && value > 0 { + rows = value + } + return cols, rows +} + +func isExpectedTerminalClose(err error) bool { + if errors.Is(err, io.EOF) { + return true + } + errText := strings.ToLower(err.Error()) + return strings.Contains(errText, "websocket: close") || + strings.Contains(errText, "close sent") || + strings.Contains(errText, "use of closed network connection") || + strings.Contains(errText, "next reader") || + strings.Contains(errText, "eof") +} + +func collectContainerSecretValues(ctx context.Context, kubeCli kubernetes.Interface, pod *corev1.Pod, namespace, containerName string) ([]string, error) { + if kubeCli == nil || pod == nil { + return nil, nil + } + envFrom, envs, found := findContainerSecretRefs(pod, containerName) + if !found { + return nil, nil + } + + secretValues := make([]string, 0) + var collectErr error + secretNames := make(map[string]bool) + for _, envFromSource := range envFrom { + if envFromSource.SecretRef != nil && envFromSource.SecretRef.Name != "" { + optional := optionalBool(envFromSource.SecretRef.Optional) + if existedOptional, ok := secretNames[envFromSource.SecretRef.Name]; !ok || existedOptional { + secretNames[envFromSource.SecretRef.Name] = optional + } + } + } + for secretName, optional := range secretNames { + secret, err := kubeCli.CoreV1().Secrets(namespace).Get(ctx, secretName, metav1.GetOptions{}) + if err != nil { + if optional && apierrors.IsNotFound(err) { + continue + } + if collectErr == nil { + collectErr = err + } + continue + } + for _, value := range secret.Data { + if len(value) > 0 { + secretValues = append(secretValues, string(value)) + } + } + } + + for _, envVar := range envs { + if envVar.ValueFrom == nil || envVar.ValueFrom.SecretKeyRef == nil { + continue + } + ref := envVar.ValueFrom.SecretKeyRef + if ref.Name == "" || ref.Key == "" { + continue + } + secret, err := kubeCli.CoreV1().Secrets(namespace).Get(ctx, ref.Name, metav1.GetOptions{}) + if err != nil { + if optionalBool(ref.Optional) && apierrors.IsNotFound(err) { + continue + } + if collectErr == nil { + collectErr = err + } + continue + } + value := secret.Data[ref.Key] + if len(value) > 0 { + secretValues = append(secretValues, string(value)) + } + } + return secretValues, collectErr +} + +func findContainerSecretRefs(pod *corev1.Pod, containerName string) ([]corev1.EnvFromSource, []corev1.EnvVar, bool) { + for i := range pod.Spec.Containers { + if pod.Spec.Containers[i].Name == containerName { + return pod.Spec.Containers[i].EnvFrom, pod.Spec.Containers[i].Env, true + } + } + for i := range pod.Spec.EphemeralContainers { + if pod.Spec.EphemeralContainers[i].Name == containerName { + return pod.Spec.EphemeralContainers[i].EnvFrom, pod.Spec.EphemeralContainers[i].Env, true + } + } + return nil, nil, false +} + +func optionalBool(value *bool) bool { + return value != nil && *value +} - return e.ErrGetDebugShell.AddDesc(fmt.Sprintf("Exec to pod error! err: %v", err)) +func resolvePodServiceName(pod *corev1.Pod) string { + if pod == nil || len(pod.Labels) == 0 { + return "" } - return nil + return pod.Labels[setting.ServiceLabel] } diff --git a/pkg/microservice/podexec/core/service/ws_terminal.go b/pkg/microservice/podexec/core/service/ws_terminal.go index b2e0120f3f..27f2915ec8 100644 --- a/pkg/microservice/podexec/core/service/ws_terminal.go +++ b/pkg/microservice/podexec/core/service/ws_terminal.go @@ -17,15 +17,18 @@ limitations under the License. package service import ( - "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" + "sync" "time" "github.com/gorilla/websocket" + terminalaudit "github.com/koderover/zadig/v2/pkg/shared/terminalaudit" + "github.com/koderover/zadig/v2/pkg/shared/terminalio" "github.com/koderover/zadig/v2/pkg/tool/clientmanager" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,10 +49,6 @@ var upgrader = websocket.Upgrader{ }, } -const ( - EndOfTransmission = "\u0004" -) - // TerminalMessage is the messaging protocol between ShellController and TerminalSession. type TerminalMessage struct { Operation string `json:"operation"` @@ -65,48 +64,42 @@ type PtyHandler interface { Done() chan struct{} } -type TerminalSessionType string - -const ( - // Environment is the debug terminal session type for environment - Environment TerminalSessionType = "env" - // Workflow is the debug terminal session type for workflow, which need musk secret envs - Workflow TerminalSessionType = "workflow" -) - // TerminalSession implements PtyHandler type TerminalSession struct { - wsConn *websocket.Conn - sizeChan chan remotecommand.TerminalSize - doneChan chan struct{} - // SecretEnvs is a list of environment variables that should be hidden from the client. - SecretEnvs []string - Type TerminalSessionType + wsConn *websocket.Conn + sizeChan chan remotecommand.TerminalSize + doneChan chan struct{} + closeOnce sync.Once + closeErr error + SessionID string + Recorder terminalio.Recorder + Sanitizer terminalio.Sanitizer } -type TerminalSessionOption struct { - SecretEnvs []string - Type TerminalSessionType -} - -func NewTerminalSession(w http.ResponseWriter, r *http.Request, responseHeader http.Header, opt ...*TerminalSessionOption) (*TerminalSession, error) { +func NewTerminalSession(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*TerminalSession, error) { conn, err := upgrader.Upgrade(w, r, responseHeader) if err != nil { return nil, err } session := &TerminalSession{ - wsConn: conn, - sizeChan: make(chan remotecommand.TerminalSize), - doneChan: make(chan struct{}), - Type: Environment, - } - if len(opt) > 0 { - session.SecretEnvs = opt[0].SecretEnvs - session.Type = opt[0].Type + wsConn: conn, + sizeChan: make(chan remotecommand.TerminalSize), + doneChan: make(chan struct{}), + Sanitizer: terminalaudit.NewSanitizer(nil, nil), } return session, nil } +func (t *TerminalSession) SetupAudit(audit *terminalaudit.AuditSession) { + if audit == nil { + return + } + t.SessionID = audit.SessionID + t.Sanitizer = audit.Sanitizer + t.Recorder = audit.Recorder + log.Infof("terminal session audit attached, sessionID=%s", t.SessionID) +} + // Done done func (t *TerminalSession) Done() chan struct{} { return t.doneChan @@ -126,43 +119,49 @@ func (t *TerminalSession) Next() *remotecommand.TerminalSize { func (t *TerminalSession) Read(p []byte) (int, error) { _, message, err := t.wsConn.ReadMessage() if err != nil { - log.Errorf("read message err: %v", err) - return copy(p, EndOfTransmission), err + log.Errorf("read message err: sessionID=%s err=%v", t.SessionID, err) + _ = t.Close() + return 0, io.EOF } var msg TerminalMessage if err := json.Unmarshal(message, &msg); err != nil { - log.Errorf("read parse message err: %v", err) - return copy(p, EndOfTransmission), err + log.Errorf("read parse message err: sessionID=%s err=%v", t.SessionID, err) + _ = t.Close() + return 0, err } switch msg.Operation { case "stdin": + if t.Recorder != nil { + t.Recorder.RecordInput(msg.Data) + } return copy(p, msg.Data), nil case "resize": + if t.Recorder != nil { + t.Recorder.RecordResize(msg.Cols, msg.Rows) + } t.sizeChan <- remotecommand.TerminalSize{Width: msg.Cols, Height: msg.Rows} return 0, nil default: - log.Errorf("unknown message type '%s'", msg.Operation) - return copy(p, EndOfTransmission), fmt.Errorf("unknown message type '%s'", msg.Operation) + log.Errorf("unknown message type '%s', sessionID=%s", msg.Operation, t.SessionID) + _ = t.Close() + return 0, fmt.Errorf("unknown message type '%s'", msg.Operation) } } // Write called from remotecommand whenever there is any output func (t *TerminalSession) Write(p []byte) (int, error) { + output := terminalio.ProcessOutput(string(p), t.Recorder, t.Sanitizer) msg, err := json.Marshal(TerminalMessage{ Operation: "stdout", - Data: string(p), + Data: output, }) if err != nil { log.Errorf("write parse message err: %v", err) return 0, err } - if t.Type == Workflow { - for _, secretEnv := range t.SecretEnvs { - msg = bytes.ReplaceAll(msg, []byte(secretEnv), []byte("********")) - } - } if err := t.wsConn.WriteMessage(websocket.TextMessage, msg); err != nil { - log.Errorf("write message err: %v", err) + log.Errorf("write message err: sessionID=%s err=%v", t.SessionID, err) + _ = t.Close() return 0, err } return len(p), nil @@ -170,7 +169,14 @@ func (t *TerminalSession) Write(p []byte) (int, error) { // Close close session func (t *TerminalSession) Close() error { - return t.wsConn.Close() + t.closeOnce.Do(func() { + log.Infof("terminal session close start, sessionID=%s", t.SessionID) + log.Infof("terminal session close doneChan, sessionID=%s", t.SessionID) + close(t.doneChan) + t.closeErr = t.wsConn.Close() + log.Infof("terminal session close finish, sessionID=%s err=%v", t.SessionID, t.closeErr) + }) + return t.closeErr } // 验证是否存在 @@ -228,13 +234,29 @@ func ExecPod(clusterID string, cmd []string, ptyHandler PtyHandler, namespace, p return err } - err = executor.Stream(remotecommand.StreamOptions{ + streamCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-ptyHandler.Done(): + log.Infof("pod exec stream context canceled by terminal close, namespace=%s pod=%s container=%s", namespace, podName, containerName) + cancel() + case <-streamCtx.Done(): + } + }() + + err = executor.StreamWithContext(streamCtx, remotecommand.StreamOptions{ Stdin: ptyHandler, Stdout: ptyHandler, Stderr: ptyHandler, TerminalSizeQueue: ptyHandler, Tty: true, }) + if errors.Is(err, context.Canceled) { + log.Infof("pod exec stream canceled by terminal close, namespace=%s pod=%s container=%s", namespace, podName, containerName) + return nil + } + log.Infof("pod exec stream completed, namespace=%s pod=%s container=%s err=%v", namespace, podName, containerName, err) if err != nil { log.Errorf("Stream err: %v", err) return err diff --git a/pkg/shared/terminalaudit/audit_session.go b/pkg/shared/terminalaudit/audit_session.go new file mode 100644 index 0000000000..ee4d67642c --- /dev/null +++ b/pkg/shared/terminalaudit/audit_session.go @@ -0,0 +1,39 @@ +package terminalaudit + +import ( + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + "github.com/koderover/zadig/v2/pkg/tool/log" +) + +type AuditSession struct { + Sanitizer Sanitizer + Recorder TerminalRecorder + SessionID string +} + +func NewAuditSession(meta *SessionMeta, terminate func()) (*AuditSession, error) { + audit := &AuditSession{ + Sanitizer: NewSanitizer(meta.Secrets, meta.SecretEnvs), + } + recorder, err := NewRecorder(meta) + if err != nil { + return audit, err + } + audit.Recorder = recorder + audit.SessionID = recorder.SessionID() + RegisterActiveSession(audit.SessionID, terminate) + log.Infof("register terminal audit session, sessionID=%s type=%s target=%s", audit.SessionID, meta.SessionType, meta.TargetName) + return audit, nil +} + +func (a *AuditSession) Close(finalStatus models.TerminalSessionStatus) error { + if a == nil || a.Recorder == nil || a.SessionID == "" { + return nil + } + resolvedStatus := ResolveSessionStatus(a.SessionID, finalStatus) + log.Infof("close terminal audit session start, sessionID=%s finalStatus=%s resolvedStatus=%s", a.SessionID, finalStatus, resolvedStatus) + err := a.Recorder.Close(resolvedStatus) + UnregisterActiveSession(a.SessionID) + log.Infof("close terminal audit session finish, sessionID=%s err=%v", a.SessionID, err) + return err +} diff --git a/pkg/shared/terminalaudit/command_extractor.go b/pkg/shared/terminalaudit/command_extractor.go new file mode 100644 index 0000000000..6bf8e49a61 --- /dev/null +++ b/pkg/shared/terminalaudit/command_extractor.go @@ -0,0 +1,275 @@ +package terminalaudit + +import ( + "bytes" + "strings" + "time" +) + +var ( + bracketedPasteStart = []byte{0x1b, '[', '2', '0', '0', '~'} + bracketedPasteEnd = []byte{0x1b, '[', '2', '0', '1', '~'} + interactiveEnterSeq = []string{"\x1b[?1049h", "\x1b[?1047h", "\x1b[?47h"} + interactiveExitSeq = []string{"\x1b[?1049l", "\x1b[?1047l", "\x1b[?47l"} + interactiveRejectHints = []string{"not found", "command not found", "No such file or directory"} +) + +type ExtractedCommand struct { + Seq int64 + Command string + TimeOffsetMS int64 +} + +type deferredInputChunk struct { + data string + offset time.Duration +} + +type CommandExtractor struct { + buffer []byte + seq int64 + inEscape bool + escapeBuffer []byte + inBracketedPaste bool + pasteEscapeBuffer []byte + pendingInteractive bool + interactiveMode bool + pendingInputs []deferredInputChunk + outputTail string +} + +func NewCommandExtractor() *CommandExtractor { + return &CommandExtractor{} +} + +func (e *CommandExtractor) Consume(data string, offset time.Duration) []ExtractedCommand { + if e.interactiveMode { + return nil + } + if e.pendingInteractive { + if data != "" { + e.pendingInputs = append(e.pendingInputs, deferredInputChunk{data: data, offset: offset}) + } + return nil + } + commands := make([]ExtractedCommand, 0) + for i := 0; i < len(data); i++ { + ch := data[i] + if e.inBracketedPaste { + commands = e.consumeBracketedPasteByte(ch, offset, commands) + continue + } + + if e.inEscape { + commands = e.consumeEscapeByte(ch, offset, commands) + continue + } + + commands = e.consumePlainByte(ch, offset, commands) + } + return commands +} + +func (e *CommandExtractor) ObserveOutput(data string) []ExtractedCommand { + if data == "" { + return nil + } + e.appendOutputTail(data) + if e.pendingInteractive && containsAny(e.outputTail, interactiveEnterSeq) { + e.pendingInteractive = false + e.pendingInputs = nil + e.interactiveMode = true + return nil + } + if e.pendingInteractive && (containsAny(e.outputTail, interactiveRejectHints) || looksLikeShellPrompt(e.outputTail)) { + pendingInputs := e.pendingInputs + e.pendingInteractive = false + e.pendingInputs = nil + e.outputTail = "" + return e.replayDeferredInputs(pendingInputs) + } + if e.interactiveMode && containsAny(e.outputTail, interactiveExitSeq) { + e.interactiveMode = false + } + return nil +} + +func (e *CommandExtractor) consumePlainByte(ch byte, offset time.Duration, commands []ExtractedCommand) []ExtractedCommand { + switch ch { + case 0x1b: + e.inEscape = true + e.escapeBuffer = append(e.escapeBuffer[:0], ch) + case '\r', '\n': + commands = e.flushCommand(offset, commands) + case 0x08, 0x7f: + if len(e.buffer) > 0 { + e.buffer = e.buffer[:len(e.buffer)-1] + } + default: + if ch >= 0x20 || ch == '\t' { + e.buffer = append(e.buffer, ch) + } + } + return commands +} + +func (e *CommandExtractor) consumeEscapeByte(ch byte, offset time.Duration, commands []ExtractedCommand) []ExtractedCommand { + e.escapeBuffer = append(e.escapeBuffer, ch) + if len(e.escapeBuffer) < 2 { + return commands + } + + second := e.escapeBuffer[1] + if second != '[' && second != ']' && second != 'O' && second != 'P' { + e.resetEscape() + return commands + } + if len(e.escapeBuffer) == 2 { + return commands + } + + if !isEscapeTerminator(ch) { + return commands + } + + if bytes.Equal(e.escapeBuffer, bracketedPasteStart) { + e.inBracketedPaste = true + e.pasteEscapeBuffer = e.pasteEscapeBuffer[:0] + } + e.resetEscape() + return commands +} + +func (e *CommandExtractor) consumeBracketedPasteByte(ch byte, offset time.Duration, commands []ExtractedCommand) []ExtractedCommand { + if len(e.pasteEscapeBuffer) > 0 { + return e.consumePasteEscapeByte(ch, offset, commands) + } + if ch == 0x1b { + e.pasteEscapeBuffer = append(e.pasteEscapeBuffer[:0], ch) + return commands + } + return e.consumePastedByte(ch, offset, commands) +} + +func (e *CommandExtractor) consumePasteEscapeByte(ch byte, offset time.Duration, commands []ExtractedCommand) []ExtractedCommand { + e.pasteEscapeBuffer = append(e.pasteEscapeBuffer, ch) + if bytes.Equal(e.pasteEscapeBuffer, bracketedPasteEnd) { + e.inBracketedPaste = false + e.pasteEscapeBuffer = e.pasteEscapeBuffer[:0] + return commands + } + if bytes.HasPrefix(bracketedPasteEnd, e.pasteEscapeBuffer) { + return commands + } + for _, pasteCh := range e.pasteEscapeBuffer { + commands = e.consumePastedByte(pasteCh, offset, commands) + } + e.pasteEscapeBuffer = e.pasteEscapeBuffer[:0] + return commands +} + +func (e *CommandExtractor) consumePastedByte(ch byte, offset time.Duration, commands []ExtractedCommand) []ExtractedCommand { + switch ch { + case 0x1b: + e.buffer = append(e.buffer, ch) + case '\r', '\n': + commands = e.flushCommand(offset, commands) + case 0x08, 0x7f: + if len(e.buffer) > 0 { + e.buffer = e.buffer[:len(e.buffer)-1] + } + default: + if ch >= 0x20 || ch == '\t' { + e.buffer = append(e.buffer, ch) + } + } + return commands +} + +func (e *CommandExtractor) flushCommand(offset time.Duration, commands []ExtractedCommand) []ExtractedCommand { + command := strings.TrimSpace(string(e.buffer)) + e.buffer = e.buffer[:0] + if command == "" { + return commands + } + e.pendingInteractive = isInteractiveCommand(command) + if e.pendingInteractive { + e.pendingInputs = nil + e.outputTail = "" + } + e.seq++ + return append(commands, ExtractedCommand{ + Seq: e.seq, + Command: command, + TimeOffsetMS: offset.Milliseconds(), + }) +} + +func (e *CommandExtractor) resetEscape() { + e.inEscape = false + e.escapeBuffer = e.escapeBuffer[:0] +} + +func isEscapeTerminator(ch byte) bool { + return ch >= 0x40 && ch <= 0x7e +} + +func containsAny(data string, targets []string) bool { + for _, target := range targets { + if strings.Contains(data, target) { + return true + } + } + return false +} + +func looksLikeShellPrompt(data string) bool { + line := data + if idx := strings.LastIndex(line, "\n"); idx >= 0 { + line = line[idx+1:] + } + line = strings.TrimSuffix(line, "\x1b[6n") + line = strings.TrimSpace(line) + if line == "" { + return false + } + return strings.HasSuffix(line, "$") || + strings.HasSuffix(line, "#") || + strings.HasSuffix(line, ">") || + strings.HasSuffix(line, "%") +} + +func (e *CommandExtractor) appendOutputTail(data string) { + const maxTailLen = 256 + e.outputTail += data + if len(e.outputTail) > maxTailLen { + e.outputTail = e.outputTail[len(e.outputTail)-maxTailLen:] + } +} + +func (e *CommandExtractor) replayDeferredInputs(chunks []deferredInputChunk) []ExtractedCommand { + commands := make([]ExtractedCommand, 0) + for _, chunk := range chunks { + commands = append(commands, e.Consume(chunk.data, chunk.offset)...) + } + return commands +} + +func isInteractiveCommand(command string) bool { + fields := strings.Fields(command) + if len(fields) == 0 { + return false + } + // 这里只覆盖已知会切换全屏/交互界面的常见命令,用于避免命令列表被编辑器或 TUI 内部输入污染。 + // 不在名单内的交互程序仍按输入流提取命令,后续如果需要再按真实场景补充。 + switch fields[0] { + case "vi", "vim", "nvim", "view", "vimdiff", + "nano", "pico", "emacs", + "less", "more", "most", "pg", "man", + "top", "htop", "btop", "atop", "iftop", "iotop", "glances", "nload", "nvtop", "watch", + "tig", "lazygit", "k9s", "ranger", "mc", "nnn": + return true + default: + return false + } +} diff --git a/pkg/shared/terminalaudit/lifecycle.go b/pkg/shared/terminalaudit/lifecycle.go new file mode 100644 index 0000000000..5b95ad6a19 --- /dev/null +++ b/pkg/shared/terminalaudit/lifecycle.go @@ -0,0 +1,26 @@ +package terminalaudit + +import ( + "context" + "sync" +) + +var ( + processContextMu sync.RWMutex + processContext = context.Background() +) + +func SetProcessContext(ctx context.Context) { + if ctx == nil { + ctx = context.Background() + } + processContextMu.Lock() + processContext = ctx + processContextMu.Unlock() +} + +func ProcessContext() context.Context { + processContextMu.RLock() + defer processContextMu.RUnlock() + return processContext +} diff --git a/pkg/shared/terminalaudit/recorder.go b/pkg/shared/terminalaudit/recorder.go new file mode 100644 index 0000000000..55e48f014b --- /dev/null +++ b/pkg/shared/terminalaudit/recorder.go @@ -0,0 +1,368 @@ +package terminalaudit + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "math" + "path" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + s3service "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/s3" + "github.com/koderover/zadig/v2/pkg/shared/terminalio" + "github.com/koderover/zadig/v2/pkg/tool/log" + s3tool "github.com/koderover/zadig/v2/pkg/tool/s3" + "github.com/koderover/zadig/v2/pkg/util" +) + +type TerminalRecorder interface { + terminalio.Recorder + SessionID() string + Close(status models.TerminalSessionStatus) error +} + +const internalStorageID = "__internal_default__" + +type asciicastRecorder struct { + mu sync.Mutex + errMu sync.Mutex + persistWG sync.WaitGroup + session *models.TerminalSession + startedAt time.Time + sanitizer Sanitizer + extractor *CommandExtractor + writer *bufio.Writer + encoder *json.Encoder + pipeWriter *io.PipeWriter + uploadDone chan error + storageID string + bucket string + objectKey string + fileSize atomic.Int64 + recordErr error + closeOnce sync.Once + sessionColl *commonrepo.TerminalSessionColl + commandColl *commonrepo.TerminalCommandColl +} + +type castHeader struct { + Version int `json:"version"` + Width int `json:"width"` + Height int `json:"height"` + Timestamp int64 `json:"timestamp"` + Env map[string]string `json:"env,omitempty"` + Title string `json:"title,omitempty"` +} + +func NewRecorder(meta *SessionMeta) (TerminalRecorder, error) { + if meta == nil { + return nil, fmt.Errorf("terminal session meta is nil") + } + startedAt := time.Now() + storage, err := s3service.FindDefaultS3() + if err != nil { + return nil, err + } + sessionID := util.UUID() + storageID := resolveStorageID(storage) + objectKey := storage.GetObjectPath(buildObjectKey(meta.SessionType, startedAt, sessionID)) + session := &models.TerminalSession{ + SessionID: sessionID, + SessionType: meta.SessionType, + Status: models.TerminalSessionStatusRunning, + UserID: meta.UserID, + Username: meta.Username, + Account: meta.Account, + ProjectName: meta.ProjectName, + EnvName: meta.EnvName, + ServiceName: meta.ServiceName, + WorkflowName: meta.WorkflowName, + JobName: meta.JobName, + TaskID: meta.TaskID, + TargetName: meta.TargetName, + Protocol: meta.Protocol, + RemoteAddr: meta.RemoteAddr, + LoginAccount: meta.LoginAccount, + HostID: meta.HostID, + HostName: meta.HostName, + HostIP: meta.HostIP, + ClusterID: meta.ClusterID, + Namespace: meta.Namespace, + PodName: meta.PodName, + ContainerName: meta.ContainerName, + ClientIP: meta.ClientIP, + UserAgent: meta.UserAgent, + StartedAt: startedAt.Unix(), + LastActivityAt: startedAt.Unix(), + CreatedAt: startedAt.Unix(), + UpdatedAt: startedAt.Unix(), + CommandCount: 0, + DurationSeconds: 0, + StorageID: storageID, + Bucket: storage.Bucket, + ObjectKey: objectKey, + } + sessionColl := commonrepo.NewTerminalSessionColl() + if err := sessionColl.Create(session); err != nil { + return nil, err + } + client, err := s3tool.NewClient(storage.Endpoint, storage.Ak, storage.Sk, storage.Region, storage.Insecure, storage.Provider) + if err != nil { + _ = sessionColl.CloseSession(&commonrepo.CloseSessionArgs{ + SessionID: session.SessionID, + Status: models.TerminalSessionStatusFailed, + EndedAt: time.Now().Unix(), + DurationSeconds: 0, + StorageID: storageID, + Bucket: storage.Bucket, + ObjectKey: session.ObjectKey, + FileSize: 0, + ErrorMessage: err.Error(), + }) + return nil, err + } + pipeReader, pipeWriter := io.Pipe() + uploadDone := make(chan error, 1) + + recorder := &asciicastRecorder{ + session: session, + startedAt: startedAt, + sanitizer: NewSanitizer(meta.Secrets, meta.SecretEnvs), + extractor: NewCommandExtractor(), + pipeWriter: pipeWriter, + uploadDone: uploadDone, + storageID: storageID, + bucket: storage.Bucket, + objectKey: session.ObjectKey, + sessionColl: sessionColl, + commandColl: commonrepo.NewTerminalCommandColl(), + } + recorder.writer = bufio.NewWriter(&countingWriter{ + writer: pipeWriter, + size: &recorder.fileSize, + }) + recorder.encoder = json.NewEncoder(recorder.writer) + go func() { + uploadDone <- client.UploadReader(storage.Bucket, pipeReader, session.ObjectKey, "application/octet-stream") + close(uploadDone) + }() + if err := recorder.writeHeader(normalizeDimension(meta.InitialCols, defaultCols), normalizeDimension(meta.InitialRows, defaultRows)); err != nil { + _ = recorder.Close(models.TerminalSessionStatusFailed) + return nil, err + } + log.Infof("create terminal audit recorder success, sessionID=%s storageID=%s bucket=%s objectKey=%s", session.SessionID, storageID, storage.Bucket, session.ObjectKey) + return recorder, nil +} + +func (r *asciicastRecorder) SessionID() string { + return r.session.SessionID +} + +func (r *asciicastRecorder) RecordInput(data string) { + sanitized := r.sanitizer.Mask(data) + r.mu.Lock() + if sanitized != "" { + r.writeEvent("i", sanitized) + } + commands := r.extractor.Consume(sanitized, time.Since(r.startedAt)) + r.mu.Unlock() + r.persistCommands(commands) +} + +func (r *asciicastRecorder) RecordOutput(data string) { + r.mu.Lock() + if data == "" { + r.mu.Unlock() + return + } + commands := r.extractor.ObserveOutput(data) + r.writeEvent("o", data) + r.mu.Unlock() + r.persistCommands(commands) +} + +func (r *asciicastRecorder) RecordResize(cols, rows uint16) { + if cols == 0 || rows == 0 { + return + } + r.mu.Lock() + defer r.mu.Unlock() + r.writeEvent("r", fmt.Sprintf("%dx%d", cols, rows)) +} + +func (r *asciicastRecorder) persistCommands(commands []ExtractedCommand) { + if len(commands) == 0 { + return + } + now := time.Now().Unix() + commandModels := make([]*models.TerminalCommand, 0, len(commands)) + for _, command := range commands { + commandModels = append(commandModels, &models.TerminalCommand{ + SessionID: r.session.SessionID, + Seq: command.Seq, + Command: command.Command, + RiskLevel: CommandRiskLevelAccepted, + UserID: r.session.UserID, + Username: r.session.Username, + Account: r.session.Account, + ProjectName: r.session.ProjectName, + EnvName: r.session.EnvName, + TargetName: r.session.TargetName, + Protocol: r.session.Protocol, + RemoteAddr: r.session.RemoteAddr, + LoginAccount: r.session.LoginAccount, + TimeOffsetMS: command.TimeOffsetMS, + CreatedAt: now, + }) + } + r.persistWG.Add(1) + go func(commands []*models.TerminalCommand, commandCount int64, activityAt int64) { + defer r.persistWG.Done() + if err := r.commandColl.CreateMany(commands); err != nil { + r.setRecordErr(err) + } + if err := r.sessionColl.UpdateActivity(r.session.SessionID, commandCount, activityAt); err != nil { + r.setRecordErr(err) + } + }(commandModels, int64(len(commands)), now) +} + +func (r *asciicastRecorder) Close(status models.TerminalSessionStatus) error { + var closeErr error + r.closeOnce.Do(func() { + log.Infof("terminal audit recorder close start, sessionID=%s status=%s", r.session.SessionID, status) + r.mu.Lock() + if r.writer != nil { + if err := r.writer.Flush(); err != nil { + r.setRecordErr(err) + } + } + if r.pipeWriter != nil { + if err := r.pipeWriter.Close(); err != nil { + r.setRecordErr(err) + } + } + r.mu.Unlock() + log.Infof("terminal audit recorder close flushed stream, sessionID=%s", r.session.SessionID) + r.persistWG.Wait() + log.Infof("terminal audit recorder close persist done, sessionID=%s", r.session.SessionID) + + endedAt := time.Now().Unix() + durationSeconds := int64(time.Since(r.startedAt).Seconds()) + recordErr := r.getRecordErr() + errorMessages := make([]string, 0) + if recordErr != nil { + errorMessages = append(errorMessages, recordErr.Error()) + } + if r.uploadDone != nil { + log.Infof("terminal audit recorder close wait upload, sessionID=%s", r.session.SessionID) + if err := <-r.uploadDone; err != nil { + errorMessages = append(errorMessages, err.Error()) + } + log.Infof("terminal audit recorder close upload done, sessionID=%s fileSize=%d errors=%v", r.session.SessionID, r.fileSize.Load(), errorMessages) + } + + finalStatus := status + if len(errorMessages) > 0 && finalStatus == models.TerminalSessionStatusFinished { + finalStatus = models.TerminalSessionStatusFailed + } + log.Infof("terminal audit recorder close update session, sessionID=%s finalStatus=%s endedAt=%d duration=%d fileSize=%d", r.session.SessionID, finalStatus, endedAt, durationSeconds, r.fileSize.Load()) + closeErr = r.sessionColl.CloseSession(&commonrepo.CloseSessionArgs{ + SessionID: r.session.SessionID, + Status: finalStatus, + EndedAt: endedAt, + DurationSeconds: durationSeconds, + StorageID: r.storageID, + Bucket: r.bucket, + ObjectKey: r.objectKey, + FileSize: r.fileSize.Load(), + ErrorMessage: strings.Join(errorMessages, "; "), + }) + log.Infof("terminal audit recorder close finish, sessionID=%s err=%v", r.session.SessionID, closeErr) + }) + return closeErr +} + +func (r *asciicastRecorder) writeHeader(cols, rows int) error { + header := castHeader{ + Version: 2, + Width: cols, + Height: rows, + Timestamp: r.startedAt.Unix(), + Env: map[string]string{ + "TERM": "xterm-256color", + }, + Title: r.session.TargetName, + } + return r.encoder.Encode(header) +} + +func (r *asciicastRecorder) writeEvent(code, data string) { + offset := math.Round(time.Since(r.startedAt).Seconds()*1000) / 1000 + if err := r.encoder.Encode([]interface{}{offset, code, data}); err != nil { + r.setRecordErr(err) + } +} + +func (r *asciicastRecorder) setRecordErr(err error) { + if err == nil { + return + } + r.errMu.Lock() + defer r.errMu.Unlock() + if r.recordErr == nil { + r.recordErr = err + return + } + r.recordErr = fmt.Errorf("%v; %w", r.recordErr, err) +} + +func (r *asciicastRecorder) getRecordErr() error { + r.errMu.Lock() + defer r.errMu.Unlock() + return r.recordErr +} + +func normalizeDimension(value, fallback int) int { + if value > 0 { + return value + } + return fallback +} + +type countingWriter struct { + writer io.Writer + size *atomic.Int64 +} + +func (w *countingWriter) Write(p []byte) (int, error) { + n, err := w.writer.Write(p) + if n > 0 { + w.size.Add(int64(n)) + } + return n, err +} + +func buildObjectKey(sessionType models.TerminalSessionType, startedAt time.Time, sessionID string) string { + return path.Join( + "terminal-cast", + string(sessionType), + startedAt.Format("2006"), + startedAt.Format("01"), + startedAt.Format("02"), + sessionID+".cast", + ) +} + +func resolveStorageID(storage *s3service.S3) string { + if storage == nil || storage.ID.IsZero() { + return internalStorageID + } + return storage.ID.Hex() +} diff --git a/pkg/shared/terminalaudit/registry.go b/pkg/shared/terminalaudit/registry.go new file mode 100644 index 0000000000..9fb9dc0628 --- /dev/null +++ b/pkg/shared/terminalaudit/registry.go @@ -0,0 +1,95 @@ +package terminalaudit + +import ( + "fmt" + "sync" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" +) + +type activeSession struct { + mu sync.Mutex + finalStatus models.TerminalSessionStatus + terminate func() + terminateOnce sync.Once + done chan struct{} + doneOnce sync.Once +} + +type activeSessionRegistry struct { + sessions sync.Map +} + +var registry = &activeSessionRegistry{} + +func RegisterActiveSession(sessionID string, terminate func()) { + session := &activeSession{ + terminate: terminate, + done: make(chan struct{}), + } + registry.sessions.Store(sessionID, session) + + go func() { + select { + case <-ProcessContext().Done(): + session.terminateWithStatus(models.TerminalSessionStatusAborted) + case <-session.done: + } + }() +} + +func UnregisterActiveSession(sessionID string) { + if session, ok := registry.load(sessionID); ok { + session.signalDone() + } + registry.sessions.Delete(sessionID) +} + +func ResolveSessionStatus(sessionID string, defaultStatus models.TerminalSessionStatus) models.TerminalSessionStatus { + session, ok := registry.load(sessionID) + if !ok { + return defaultStatus + } + session.mu.Lock() + defer session.mu.Unlock() + if session.finalStatus != "" { + return session.finalStatus + } + return defaultStatus +} + +func TerminateActiveSession(sessionID string) error { + session, ok := registry.load(sessionID) + if !ok { + return fmt.Errorf("terminal session %s is not active", sessionID) + } + session.terminateWithStatus(models.TerminalSessionStatusAborted) + return nil +} + +func (s *activeSession) terminateWithStatus(status models.TerminalSessionStatus) { + s.mu.Lock() + s.finalStatus = status + terminate := s.terminate + s.mu.Unlock() + s.terminateOnce.Do(func() { + if terminate != nil { + terminate() + } + }) +} + +func (s *activeSession) signalDone() { + s.doneOnce.Do(func() { + close(s.done) + }) +} + +func (r *activeSessionRegistry) load(sessionID string) (*activeSession, bool) { + value, ok := r.sessions.Load(sessionID) + if !ok { + return nil, false + } + session, ok := value.(*activeSession) + return session, ok +} diff --git a/pkg/shared/terminalaudit/sanitizer.go b/pkg/shared/terminalaudit/sanitizer.go new file mode 100644 index 0000000000..e6ed91ef37 --- /dev/null +++ b/pkg/shared/terminalaudit/sanitizer.go @@ -0,0 +1,37 @@ +package terminalaudit + +import ( + "github.com/koderover/zadig/v2/pkg/shared/terminalio" + "github.com/koderover/zadig/v2/pkg/util" +) + +type Sanitizer = terminalio.Sanitizer + +type noopSanitizer struct{} + +func (n noopSanitizer) Mask(data string) string { + return data +} + +type secretSanitizer struct { + secrets []string + secretEnvs []string +} + +func NewSanitizer(secrets, secretEnvs []string) Sanitizer { + if len(secrets) == 0 && len(secretEnvs) == 0 { + return noopSanitizer{} + } + return &secretSanitizer{secrets: secrets, secretEnvs: secretEnvs} +} + +func (s *secretSanitizer) Mask(data string) string { + masked := data + if len(s.secretEnvs) > 0 { + masked = util.MaskSecretEnvs(masked, s.secretEnvs) + } + if len(s.secrets) > 0 { + masked = util.MaskSecret(s.secrets, masked) + } + return masked +} diff --git a/pkg/shared/terminalaudit/service.go b/pkg/shared/terminalaudit/service.go new file mode 100644 index 0000000000..23cc8700f9 --- /dev/null +++ b/pkg/shared/terminalaudit/service.go @@ -0,0 +1,103 @@ +package terminalaudit + +import ( + "fmt" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + s3service "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/s3" + e "github.com/koderover/zadig/v2/pkg/tool/errors" + s3tool "github.com/koderover/zadig/v2/pkg/tool/s3" +) + +func ListSessions(args *models.TerminalSessionListArgs) (*SessionListResponse, error) { + if args == nil { + args = &models.TerminalSessionListArgs{} + } + normalizePagination(&args.PageNum, &args.PageSize) + sessions, total, err := commonrepo.NewTerminalSessionColl().List(args) + if err != nil { + return nil, err + } + return &SessionListResponse{Total: total, Sessions: sessions}, nil +} + +func GetSession(sessionID string) (*models.TerminalSession, error) { + return commonrepo.NewTerminalSessionColl().FindBySessionID(sessionID) +} + +func ListCommands(args *models.TerminalCommandListArgs) (*CommandListResponse, error) { + if args == nil { + args = &models.TerminalCommandListArgs{} + } + normalizePagination(&args.PageNum, &args.PageSize) + commands, total, err := commonrepo.NewTerminalCommandColl().List(args) + if err != nil { + return nil, err + } + return &CommandListResponse{Total: total, Commands: commands}, nil +} + +func GetCastStream(sessionID string) (*CastFileStream, error) { + session, err := GetSession(sessionID) + if err != nil { + return nil, err + } + if session.ObjectKey == "" { + return nil, e.ErrNotFound.AddDesc("cast file is not available") + } + + store, err := getSessionStorage(session) + if err != nil { + return nil, err + } + client, err := s3tool.NewClient(store.Endpoint, store.Ak, store.Sk, store.Region, store.Insecure, store.Provider) + if err != nil { + return nil, err + } + bucket := session.Bucket + if bucket == "" { + bucket = store.Bucket + } + object, err := client.GetFile(bucket, session.ObjectKey, &s3tool.DownloadOption{IgnoreNotExistError: false, RetryNum: 2}) + if err != nil { + return nil, err + } + if object == nil { + return nil, e.ErrNotFound.AddDesc("cast file not found") + } + return &CastFileStream{Body: object.Body, FileSize: session.FileSize}, nil +} + +func TerminateSession(sessionID string) error { + session, err := GetSession(sessionID) + if err != nil { + return err + } + if session.Status != models.TerminalSessionStatusRunning { + return fmt.Errorf("terminal session %s is not running", sessionID) + } + return TerminateActiveSession(sessionID) +} + +func normalizePagination(pageNum, pageSize *int64) { + if pageNum == nil || pageSize == nil { + return + } + if *pageNum <= 0 { + *pageNum = 1 + } + if *pageSize <= 0 { + *pageSize = 20 + } +} + +func getSessionStorage(session *models.TerminalSession) (*s3service.S3, error) { + if session.StorageID == internalStorageID { + return s3service.FindInternalS3(), nil + } + if session.StorageID != "" { + return s3service.FindS3ById(session.StorageID) + } + return s3service.FindDefaultS3() +} diff --git a/pkg/shared/terminalaudit/types.go b/pkg/shared/terminalaudit/types.go new file mode 100644 index 0000000000..95786d1a89 --- /dev/null +++ b/pkg/shared/terminalaudit/types.go @@ -0,0 +1,60 @@ +package terminalaudit + +import ( + "io" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" +) + +const ( + CommandRiskLevelAccepted = "accepted" + defaultCols = 135 + defaultRows = 40 +) + +type SessionMeta struct { + SessionType models.TerminalSessionType + Protocol string + UserID string + Username string + Account string + ProjectName string + EnvName string + ServiceName string + WorkflowName string + JobName string + TaskID int64 + TargetName string + RemoteAddr string + LoginAccount string + HostID string + HostName string + HostIP string + ClusterID string + Namespace string + PodName string + ContainerName string + ClientIP string + UserAgent string + InitialCols int + InitialRows int + // Secrets stores raw secret values and is masked via util.MaskSecret. + Secrets []string + // SecretEnvs stores KEY=VALUE pairs and is masked via util.MaskSecretEnvs. + SecretEnvs []string +} + +type SessionListResponse struct { + Total int64 `json:"total"` + Sessions []*models.TerminalSession `json:"sessions"` +} + +type CommandListResponse struct { + Total int64 `json:"total"` + Commands []*models.TerminalCommand `json:"commands"` +} + +type CastFileStream struct { + Body io.ReadCloser + FileSize int64 +} diff --git a/pkg/shared/terminalio/terminalio.go b/pkg/shared/terminalio/terminalio.go new file mode 100644 index 0000000000..063c0830de --- /dev/null +++ b/pkg/shared/terminalio/terminalio.go @@ -0,0 +1,38 @@ +/* +Copyright 2026 The KodeRover Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terminalio + +type Recorder interface { + RecordInput(data string) + RecordOutput(data string) + RecordResize(cols, rows uint16) +} + +type Sanitizer interface { + Mask(data string) string +} + +func ProcessOutput(raw string, recorder Recorder, sanitizer Sanitizer) string { + sanitized := raw + if sanitizer != nil { + sanitized = sanitizer.Mask(raw) + } + if recorder != nil { + recorder.RecordOutput(sanitized) + } + return sanitized +} diff --git a/pkg/tool/s3/client.go b/pkg/tool/s3/client.go index 4c69dd97ee..3e8e11ed30 100644 --- a/pkg/tool/s3/client.go +++ b/pkg/tool/s3/client.go @@ -18,6 +18,7 @@ package s3 import ( "fmt" + "io" "io/fs" "mime" "os" @@ -30,6 +31,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/koderover/zadig/v2/pkg/setting" "github.com/koderover/zadig/v2/pkg/tool/log" @@ -253,6 +255,20 @@ func (c *Client) Upload(bucketName, src string, objectKey string) error { return err } +func (c *Client) UploadReader(bucketName string, body io.Reader, objectKey string, contentType string) error { + uploader := s3manager.NewUploaderWithClient(c.S3) + input := &s3manager.UploadInput{ + Body: body, + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + } + if contentType != "" { + input.ContentType = aws.String(contentType) + } + _, err := uploader.Upload(input) + return err +} + // Upload upload all files in a directory to a S3 path recursively func (c *Client) UploadDir(bucketName, srcdir string, s3dir string) error { err := fs.WalkDir(os.DirFS(srcdir), ".", func(p string, d fs.DirEntry, e error) error { diff --git a/pkg/tool/wsconn/wsconn.go b/pkg/tool/wsconn/wsconn.go index 648c1d1bfc..f594c19707 100644 --- a/pkg/tool/wsconn/wsconn.go +++ b/pkg/tool/wsconn/wsconn.go @@ -24,6 +24,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/koderover/zadig/v2/pkg/shared/terminalio" "golang.org/x/crypto/ssh" "github.com/koderover/zadig/v2/pkg/tool/log" @@ -45,14 +46,31 @@ type wsMessage struct { } type wsBufferWriter struct { - buffer bytes.Buffer - mu sync.Mutex + buffer bytes.Buffer + mu sync.Mutex + recorder terminalio.Recorder + sanitizer terminalio.Sanitizer } func (w *wsBufferWriter) Write(p []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() - return w.buffer.Write(p) + output := terminalio.ProcessOutput(string(p), w.recorder, w.sanitizer) + return w.buffer.Write([]byte(output)) +} + +func (w *wsBufferWriter) RecordInput(data string) { + if w == nil || w.recorder == nil { + return + } + w.recorder.RecordInput(data) +} + +func (w *wsBufferWriter) RecordResize(cols, rows uint16) { + if w == nil || w.recorder == nil { + return + } + w.recorder.RecordResize(cols, rows) } type SshConn struct { @@ -61,7 +79,12 @@ type SshConn struct { SshSession *ssh.Session } -func NewSshConn(cols, rows int, sshClient *ssh.Client) (*SshConn, error) { +type SshConnOption struct { + Recorder terminalio.Recorder + Sanitizer terminalio.Sanitizer +} + +func NewSshConn(cols, rows int, sshClient *ssh.Client, opt ...*SshConnOption) (*SshConn, error) { sshSession, err := sshClient.NewSession() if err != nil { return nil, err @@ -73,6 +96,10 @@ func NewSshConn(cols, rows int, sshClient *ssh.Client) (*SshConn, error) { } wsWriter := new(wsBufferWriter) + if len(opt) > 0 { + wsWriter.recorder = opt[0].Recorder + wsWriter.sanitizer = opt[0].Sanitizer + } sshSession.Stdout = wsWriter sshSession.Stderr = wsWriter @@ -111,12 +138,14 @@ func (ssConn *SshConn) ReadWsMessage(wsConn *websocket.Conn, stopCh chan bool) { switch wsMsgObj.Operation { case wsMsgResize: + ssConn.WsWriter.RecordResize(uint16(wsMsgObj.Cols), uint16(wsMsgObj.Rows)) if wsMsgObj.Cols > 0 && wsMsgObj.Rows > 0 { if err := ssConn.SshSession.WindowChange(wsMsgObj.Rows, wsMsgObj.Cols); err != nil { log.Error("resize windows err:", err) } } case wsMsgStdin: + ssConn.WsWriter.RecordInput(wsMsgObj.Data) decodeBytes := []byte(wsMsgObj.Data) if _, err := ssConn.Stdin.Write(decodeBytes); err != nil { log.Error("ws stdin write to ssh.stdin err:", err)