diff --git a/cmd/bb_runner/main.go b/cmd/bb_runner/main.go index 33457491..de366add 100644 --- a/cmd/bb_runner/main.go +++ b/cmd/bb_runner/main.go @@ -70,6 +70,12 @@ func main() { commandCreator, configuration.SetTmpdirEnvironmentVariable, ) + if configuration.SampleCgroupResourceUsage { + r, err = runner.NewCgroupResourceUsageSamplingRunner(r) + if err != nil { + return util.StatusWrap(err, "Failed to create cgroup resource usage sampling runner") + } + } // Let bb_runner replace temporary directories with symbolic // links pointing to the temporary directory set up by diff --git a/pkg/proto/configuration/bb_runner/bb_runner.pb.go b/pkg/proto/configuration/bb_runner/bb_runner.pb.go index 3d180dae..8021269a 100644 --- a/pkg/proto/configuration/bb_runner/bb_runner.pb.go +++ b/pkg/proto/configuration/bb_runner/bb_runner.pb.go @@ -39,6 +39,7 @@ type ApplicationConfiguration struct { SymlinkTemporaryDirectories []string `protobuf:"bytes,12,rep,name=symlink_temporary_directories,json=symlinkTemporaryDirectories,proto3" json:"symlink_temporary_directories,omitempty"` RunCommandCleaner []string `protobuf:"bytes,13,rep,name=run_command_cleaner,json=runCommandCleaner,proto3" json:"run_command_cleaner,omitempty"` AppleXcodeDeveloperDirectories map[string]string `protobuf:"bytes,14,rep,name=apple_xcode_developer_directories,json=appleXcodeDeveloperDirectories,proto3" json:"apple_xcode_developer_directories,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + SampleCgroupResourceUsage bool `protobuf:"varint,15,opt,name=sample_cgroup_resource_usage,json=sampleCgroupResourceUsage,proto3" json:"sample_cgroup_resource_usage,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -164,11 +165,18 @@ func (x *ApplicationConfiguration) GetAppleXcodeDeveloperDirectories() map[strin return nil } +func (x *ApplicationConfiguration) GetSampleCgroupResourceUsage() bool { + if x != nil { + return x.SampleCgroupResourceUsage + } + return false +} + var File_github_com_buildbarn_bb_remote_execution_pkg_proto_configuration_bb_runner_bb_runner_proto protoreflect.FileDescriptor const file_github_com_buildbarn_bb_remote_execution_pkg_proto_configuration_bb_runner_bb_runner_proto_rawDesc = "" + "\n" + - "Zgithub.com/buildbarn/bb-remote-execution/pkg/proto/configuration/bb_runner/bb_runner.proto\x12!buildbarn.configuration.bb_runner\x1a^github.com/buildbarn/bb-remote-execution/pkg/proto/configuration/credentials/credentials.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/global/global.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/grpc/grpc.proto\"\xf3\b\n" + + "Zgithub.com/buildbarn/bb-remote-execution/pkg/proto/configuration/bb_runner/bb_runner.proto\x12!buildbarn.configuration.bb_runner\x1a^github.com/buildbarn/bb-remote-execution/pkg/proto/configuration/credentials/credentials.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/global/global.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/grpc/grpc.proto\"\xb4\t\n" + "\x18ApplicationConfiguration\x120\n" + "\x14build_directory_path\x18\x01 \x01(\tR\x12buildDirectoryPath\x12T\n" + "\fgrpc_servers\x18\x02 \x03(\v21.buildbarn.configuration.grpc.ServerConfigurationR\vgrpcServers\x12>\n" + @@ -183,7 +191,8 @@ const file_github_com_buildbarn_bb_remote_execution_pkg_proto_configuration_bb_r "\x0frun_commands_as\x18\v \x01(\v2A.buildbarn.configuration.credentials.UNIXCredentialsConfigurationR\rrunCommandsAs\x12B\n" + "\x1dsymlink_temporary_directories\x18\f \x03(\tR\x1bsymlinkTemporaryDirectories\x12.\n" + "\x13run_command_cleaner\x18\r \x03(\tR\x11runCommandCleaner\x12\xaa\x01\n" + - "!apple_xcode_developer_directories\x18\x0e \x03(\v2_.buildbarn.configuration.bb_runner.ApplicationConfiguration.AppleXcodeDeveloperDirectoriesEntryR\x1eappleXcodeDeveloperDirectories\x1aQ\n" + + "!apple_xcode_developer_directories\x18\x0e \x03(\v2_.buildbarn.configuration.bb_runner.ApplicationConfiguration.AppleXcodeDeveloperDirectoriesEntryR\x1eappleXcodeDeveloperDirectories\x12?\n" + + "\x1csample_cgroup_resource_usage\x18\x0f \x01(\bR\x19sampleCgroupResourceUsage\x1aQ\n" + "#AppleXcodeDeveloperDirectoriesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01J\x04\b\t\x10\n" + diff --git a/pkg/proto/configuration/bb_runner/bb_runner.proto b/pkg/proto/configuration/bb_runner/bb_runner.proto index d378faa8..989b913c 100644 --- a/pkg/proto/configuration/bb_runner/bb_runner.proto +++ b/pkg/proto/configuration/bb_runner/bb_runner.proto @@ -135,4 +135,16 @@ message ApplicationConfiguration { // https://github.com/bazelbuild/bazel/blob/master/src/main/java/com/google/devtools/build/lib/exec/local/XcodeLocalEnvProvider.java // https://www.smileykeith.com/2021/03/08/locking-xcode-in-bazel/ map apple_xcode_developer_directories = 14; + + // Sample cgroup v2 resource usage counters around each action. + // + // This should only be enabled when bb_worker sends at most one action to + // this bb_runner at a time. In particular, setting + // RunnerConfiguration.concurrency > 1 for this runner is incompatible. + // + // The bb_runner process should also have a cgroup whose activity is + // acceptable to include in the reported usage. If multiple runners or + // actions share the cgroup, the sampled counter deltas include unrelated + // work and are misleading. + bool sample_cgroup_resource_usage = 15; } diff --git a/pkg/proto/resourceusage/resourceusage.pb.go b/pkg/proto/resourceusage/resourceusage.pb.go index 69a56188..8de68771 100644 --- a/pkg/proto/resourceusage/resourceusage.pb.go +++ b/pkg/proto/resourceusage/resourceusage.pb.go @@ -374,6 +374,146 @@ func (x *InputRootResourceUsage) GetFilesRead() uint64 { return 0 } +type CgroupResourceUsage struct { + state protoimpl.MessageState `protogen:"open.v1"` + MemoryEventsLow int64 `protobuf:"varint,1,opt,name=memory_events_low,json=memoryEventsLow,proto3" json:"memory_events_low,omitempty"` + MemoryEventsHigh int64 `protobuf:"varint,2,opt,name=memory_events_high,json=memoryEventsHigh,proto3" json:"memory_events_high,omitempty"` + MemoryEventsMax int64 `protobuf:"varint,3,opt,name=memory_events_max,json=memoryEventsMax,proto3" json:"memory_events_max,omitempty"` + MemoryEventsOom int64 `protobuf:"varint,4,opt,name=memory_events_oom,json=memoryEventsOom,proto3" json:"memory_events_oom,omitempty"` + MemoryEventsOomKill int64 `protobuf:"varint,5,opt,name=memory_events_oom_kill,json=memoryEventsOomKill,proto3" json:"memory_events_oom_kill,omitempty"` + MemoryEventsOomGroupKill int64 `protobuf:"varint,6,opt,name=memory_events_oom_group_kill,json=memoryEventsOomGroupKill,proto3" json:"memory_events_oom_group_kill,omitempty"` + MemoryPeak int64 `protobuf:"varint,7,opt,name=memory_peak,json=memoryPeak,proto3" json:"memory_peak,omitempty"` + MemoryPressureSomeTotal *durationpb.Duration `protobuf:"bytes,8,opt,name=memory_pressure_some_total,json=memoryPressureSomeTotal,proto3" json:"memory_pressure_some_total,omitempty"` + MemoryPressureFullTotal *durationpb.Duration `protobuf:"bytes,9,opt,name=memory_pressure_full_total,json=memoryPressureFullTotal,proto3" json:"memory_pressure_full_total,omitempty"` + CpuPressureSomeTotal *durationpb.Duration `protobuf:"bytes,10,opt,name=cpu_pressure_some_total,json=cpuPressureSomeTotal,proto3" json:"cpu_pressure_some_total,omitempty"` + CpuPressureFullTotal *durationpb.Duration `protobuf:"bytes,11,opt,name=cpu_pressure_full_total,json=cpuPressureFullTotal,proto3" json:"cpu_pressure_full_total,omitempty"` + IoPressureSomeTotal *durationpb.Duration `protobuf:"bytes,12,opt,name=io_pressure_some_total,json=ioPressureSomeTotal,proto3" json:"io_pressure_some_total,omitempty"` + IoPressureFullTotal *durationpb.Duration `protobuf:"bytes,13,opt,name=io_pressure_full_total,json=ioPressureFullTotal,proto3" json:"io_pressure_full_total,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CgroupResourceUsage) Reset() { + *x = CgroupResourceUsage{} + mi := &file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CgroupResourceUsage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CgroupResourceUsage) ProtoMessage() {} + +func (x *CgroupResourceUsage) ProtoReflect() protoreflect.Message { + mi := &file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CgroupResourceUsage.ProtoReflect.Descriptor instead. +func (*CgroupResourceUsage) Descriptor() ([]byte, []int) { + return file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_rawDescGZIP(), []int{4} +} + +func (x *CgroupResourceUsage) GetMemoryEventsLow() int64 { + if x != nil { + return x.MemoryEventsLow + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryEventsHigh() int64 { + if x != nil { + return x.MemoryEventsHigh + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryEventsMax() int64 { + if x != nil { + return x.MemoryEventsMax + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryEventsOom() int64 { + if x != nil { + return x.MemoryEventsOom + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryEventsOomKill() int64 { + if x != nil { + return x.MemoryEventsOomKill + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryEventsOomGroupKill() int64 { + if x != nil { + return x.MemoryEventsOomGroupKill + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryPeak() int64 { + if x != nil { + return x.MemoryPeak + } + return 0 +} + +func (x *CgroupResourceUsage) GetMemoryPressureSomeTotal() *durationpb.Duration { + if x != nil { + return x.MemoryPressureSomeTotal + } + return nil +} + +func (x *CgroupResourceUsage) GetMemoryPressureFullTotal() *durationpb.Duration { + if x != nil { + return x.MemoryPressureFullTotal + } + return nil +} + +func (x *CgroupResourceUsage) GetCpuPressureSomeTotal() *durationpb.Duration { + if x != nil { + return x.CpuPressureSomeTotal + } + return nil +} + +func (x *CgroupResourceUsage) GetCpuPressureFullTotal() *durationpb.Duration { + if x != nil { + return x.CpuPressureFullTotal + } + return nil +} + +func (x *CgroupResourceUsage) GetIoPressureSomeTotal() *durationpb.Duration { + if x != nil { + return x.IoPressureSomeTotal + } + return nil +} + +func (x *CgroupResourceUsage) GetIoPressureFullTotal() *durationpb.Duration { + if x != nil { + return x.IoPressureFullTotal + } + return nil +} + type MonetaryResourceUsage_Expense struct { state protoimpl.MessageState `protogen:"open.v1"` Currency string `protobuf:"bytes,1,opt,name=currency,proto3" json:"currency,omitempty"` @@ -384,7 +524,7 @@ type MonetaryResourceUsage_Expense struct { func (x *MonetaryResourceUsage_Expense) Reset() { *x = MonetaryResourceUsage_Expense{} - mi := &file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes[4] + mi := &file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -396,7 +536,7 @@ func (x *MonetaryResourceUsage_Expense) String() string { func (*MonetaryResourceUsage_Expense) ProtoMessage() {} func (x *MonetaryResourceUsage_Expense) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes[4] + mi := &file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -471,7 +611,23 @@ const file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_reso "\x14directories_resolved\x18\x01 \x01(\x04R\x13directoriesResolved\x12)\n" + "\x10directories_read\x18\x02 \x01(\x04R\x0fdirectoriesRead\x12\x1d\n" + "\n" + - "files_read\x18\x03 \x01(\x04R\tfilesReadBBZ@github.com/buildbarn/bb-remote-execution/pkg/proto/resourceusageb\x06proto3" + "files_read\x18\x03 \x01(\x04R\tfilesRead\"\xd1\x06\n" + + "\x13CgroupResourceUsage\x12*\n" + + "\x11memory_events_low\x18\x01 \x01(\x03R\x0fmemoryEventsLow\x12,\n" + + "\x12memory_events_high\x18\x02 \x01(\x03R\x10memoryEventsHigh\x12*\n" + + "\x11memory_events_max\x18\x03 \x01(\x03R\x0fmemoryEventsMax\x12*\n" + + "\x11memory_events_oom\x18\x04 \x01(\x03R\x0fmemoryEventsOom\x123\n" + + "\x16memory_events_oom_kill\x18\x05 \x01(\x03R\x13memoryEventsOomKill\x12>\n" + + "\x1cmemory_events_oom_group_kill\x18\x06 \x01(\x03R\x18memoryEventsOomGroupKill\x12\x1f\n" + + "\vmemory_peak\x18\a \x01(\x03R\n" + + "memoryPeak\x12V\n" + + "\x1amemory_pressure_some_total\x18\b \x01(\v2\x19.google.protobuf.DurationR\x17memoryPressureSomeTotal\x12V\n" + + "\x1amemory_pressure_full_total\x18\t \x01(\v2\x19.google.protobuf.DurationR\x17memoryPressureFullTotal\x12P\n" + + "\x17cpu_pressure_some_total\x18\n" + + " \x01(\v2\x19.google.protobuf.DurationR\x14cpuPressureSomeTotal\x12P\n" + + "\x17cpu_pressure_full_total\x18\v \x01(\v2\x19.google.protobuf.DurationR\x14cpuPressureFullTotal\x12N\n" + + "\x16io_pressure_some_total\x18\f \x01(\v2\x19.google.protobuf.DurationR\x13ioPressureSomeTotal\x12N\n" + + "\x16io_pressure_full_total\x18\r \x01(\v2\x19.google.protobuf.DurationR\x13ioPressureFullTotalBBZ@github.com/buildbarn/bb-remote-execution/pkg/proto/resourceusageb\x06proto3" var ( file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_rawDescOnce sync.Once @@ -485,26 +641,33 @@ func file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resou return file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_rawDescData } -var file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_goTypes = []any{ (*FilePoolResourceUsage)(nil), // 0: buildbarn.resourceusage.FilePoolResourceUsage (*POSIXResourceUsage)(nil), // 1: buildbarn.resourceusage.POSIXResourceUsage (*MonetaryResourceUsage)(nil), // 2: buildbarn.resourceusage.MonetaryResourceUsage (*InputRootResourceUsage)(nil), // 3: buildbarn.resourceusage.InputRootResourceUsage - (*MonetaryResourceUsage_Expense)(nil), // 4: buildbarn.resourceusage.MonetaryResourceUsage.Expense - nil, // 5: buildbarn.resourceusage.MonetaryResourceUsage.ExpensesEntry - (*durationpb.Duration)(nil), // 6: google.protobuf.Duration + (*CgroupResourceUsage)(nil), // 4: buildbarn.resourceusage.CgroupResourceUsage + (*MonetaryResourceUsage_Expense)(nil), // 5: buildbarn.resourceusage.MonetaryResourceUsage.Expense + nil, // 6: buildbarn.resourceusage.MonetaryResourceUsage.ExpensesEntry + (*durationpb.Duration)(nil), // 7: google.protobuf.Duration } var file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_depIdxs = []int32{ - 6, // 0: buildbarn.resourceusage.POSIXResourceUsage.user_time:type_name -> google.protobuf.Duration - 6, // 1: buildbarn.resourceusage.POSIXResourceUsage.system_time:type_name -> google.protobuf.Duration - 5, // 2: buildbarn.resourceusage.MonetaryResourceUsage.expenses:type_name -> buildbarn.resourceusage.MonetaryResourceUsage.ExpensesEntry - 4, // 3: buildbarn.resourceusage.MonetaryResourceUsage.ExpensesEntry.value:type_name -> buildbarn.resourceusage.MonetaryResourceUsage.Expense - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 7, // 0: buildbarn.resourceusage.POSIXResourceUsage.user_time:type_name -> google.protobuf.Duration + 7, // 1: buildbarn.resourceusage.POSIXResourceUsage.system_time:type_name -> google.protobuf.Duration + 6, // 2: buildbarn.resourceusage.MonetaryResourceUsage.expenses:type_name -> buildbarn.resourceusage.MonetaryResourceUsage.ExpensesEntry + 7, // 3: buildbarn.resourceusage.CgroupResourceUsage.memory_pressure_some_total:type_name -> google.protobuf.Duration + 7, // 4: buildbarn.resourceusage.CgroupResourceUsage.memory_pressure_full_total:type_name -> google.protobuf.Duration + 7, // 5: buildbarn.resourceusage.CgroupResourceUsage.cpu_pressure_some_total:type_name -> google.protobuf.Duration + 7, // 6: buildbarn.resourceusage.CgroupResourceUsage.cpu_pressure_full_total:type_name -> google.protobuf.Duration + 7, // 7: buildbarn.resourceusage.CgroupResourceUsage.io_pressure_some_total:type_name -> google.protobuf.Duration + 7, // 8: buildbarn.resourceusage.CgroupResourceUsage.io_pressure_full_total:type_name -> google.protobuf.Duration + 5, // 9: buildbarn.resourceusage.MonetaryResourceUsage.ExpensesEntry.value:type_name -> buildbarn.resourceusage.MonetaryResourceUsage.Expense + 10, // [10:10] is the sub-list for method output_type + 10, // [10:10] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { @@ -520,7 +683,7 @@ func file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resou GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_rawDesc), len(file_github_com_buildbarn_bb_remote_execution_pkg_proto_resourceusage_resourceusage_proto_rawDesc)), NumEnums: 0, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/proto/resourceusage/resourceusage.proto b/pkg/proto/resourceusage/resourceusage.proto index 112b8f10..ee76d8ff 100644 --- a/pkg/proto/resourceusage/resourceusage.proto +++ b/pkg/proto/resourceusage/resourceusage.proto @@ -126,3 +126,74 @@ message InputRootResourceUsage { // Addressable Storage (CAS). uint64 files_read = 3; } + +// Resource usage metrics collected from cgroup v2 files by bb_runner. +// +// Fields represent the difference between samples taken before +// the action started and after it completed, unless noted otherwise. +// +// PSI total values are exported by Linux in microseconds, but stored +// here as durations. To express a PSI duration as a percentage, divide +// it by the action's execution duration from +// ActionResult.execution_metadata. For example: +// +// cpu_pressure_some_percent = cpu_pressure_some_total / ( +// execution_completed_timestamp - execution_start_timestamp) * 100 +// +// See: +// https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory-interface-files +// https://www.kernel.org/doc/html/latest/accounting/psi.html#psi +message CgroupResourceUsage { + // memory.events/low: Number of times the cgroup was reclaimed under + // high memory pressure even though it was below memory.low. + int64 memory_events_low = 1; + + // memory.events/high: Number of times cgroup processes were + // throttled and routed to direct reclaim after exceeding memory.high. + int64 memory_events_high = 2; + + // memory.events/max: Number of times the cgroup was about to exceed + // memory.max. + int64 memory_events_max = 3; + + // memory.events/oom: Number of times allocation was about to fail + // because the cgroup reached its memory limit. + int64 memory_events_oom = 4; + + // memory.events/oom_kill: Number of processes in the cgroup killed + // by any OOM killer. + int64 memory_events_oom_kill = 5; + + // memory.events/oom_group_kill: Number of group OOM kills. Zero if + // the running kernel does not expose this memory.events key. + int64 memory_events_oom_group_kill = 6; + + // memory.peak: Peak memory usage in bytes since resetting + // memory.peak at action start. Zero if memory.peak reset/read is + // unavailable. + int64 memory_peak = 7; + + // memory.pressure/some total: Memory PSI stall time with at least + // one task stalled on memory. + google.protobuf.Duration memory_pressure_some_total = 8; + + // memory.pressure/full total: Memory PSI stall time with all + // non-idle tasks stalled on memory. + google.protobuf.Duration memory_pressure_full_total = 9; + + // cpu.pressure/some total: CPU PSI stall time with at least one task + // waiting for CPU. + google.protobuf.Duration cpu_pressure_some_total = 10; + + // cpu.pressure/full total: CPU PSI stall time with all non-idle + // tasks waiting for CPU. + google.protobuf.Duration cpu_pressure_full_total = 11; + + // io.pressure/some total: I/O PSI stall time with at least one task + // stalled on I/O. + google.protobuf.Duration io_pressure_some_total = 12; + + // io.pressure/full total: I/O PSI stall time with all non-idle + // tasks stalled on I/O. + google.protobuf.Duration io_pressure_full_total = 13; +} diff --git a/pkg/runner/BUILD.bazel b/pkg/runner/BUILD.bazel index fb596676..4d786539 100644 --- a/pkg/runner/BUILD.bazel +++ b/pkg/runner/BUILD.bazel @@ -4,6 +4,8 @@ go_library( name = "runner", srcs = [ "apple_xcode_resolving_runner.go", + "cgroup_resource_usage_sampling_runner_linux.go", + "cgroup_resource_usage_sampling_runner_other.go", "clean_runner.go", "local_runner.go", "local_runner_darwin.go", @@ -72,7 +74,15 @@ go_test( "local_runner_test.go", "path_existence_checking_runner_test.go", "temporary_directory_symlinking_runner_test.go", - ], + ] + select({ + "@rules_go//go/platform:android": [ + "cgroup_resource_usage_sampling_runner_test.go", + ], + "@rules_go//go/platform:linux": [ + "cgroup_resource_usage_sampling_runner_test.go", + ], + "//conditions:default": [], + }), deps = [ ":runner", "//internal/mock", diff --git a/pkg/runner/cgroup_resource_usage_sampling_runner_linux.go b/pkg/runner/cgroup_resource_usage_sampling_runner_linux.go new file mode 100644 index 00000000..ba2a2dc4 --- /dev/null +++ b/pkg/runner/cgroup_resource_usage_sampling_runner_linux.go @@ -0,0 +1,417 @@ +//go:build linux +// +build linux + +package runner + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/buildbarn/bb-remote-execution/pkg/proto/resourceusage" + runner_pb "github.com/buildbarn/bb-remote-execution/pkg/proto/runner" + "github.com/buildbarn/bb-storage/pkg/util" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" +) + +type cgroupResourceUsageSamplingRunner struct { + runner_pb.RunnerServer + cgroupfsPath string + activeRun atomic.Bool +} + +// NewCgroupResourceUsageSamplingRunner creates a decorator for RunnerServer +// that samples cgroup v2 resource usage counters around actions and appends +// them to successful Run() responses. +// +// Sampled cgroup counters are only meaningful as per-action deltas if +// bb_worker sends at most one action to the runner at a time +// (RunnerConfiguration.concurrency == 1), and if the runner is deployed in a +// cgroup whose other activity is acceptable to include in the reported usage. +func NewCgroupResourceUsageSamplingRunner(base runner_pb.RunnerServer) (runner_pb.RunnerServer, error) { + cgroupfsPath, err := ResolveCurrentCgroupfsPathFromProcFiles("/proc/self/cgroup", "/proc/self/mountinfo") + if err != nil { + return nil, err + } + return NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(base, cgroupfsPath), nil +} + +// NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath creates a decorator for +// RunnerServer that samples cgroup v2 resource usage counters from +// cgroupfsPath. +// +// cgroupfsPath is the cgroup v2 filesystem directory whose counters should be +// sampled. Counter deltas are measured over each Run() request. +func NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(base runner_pb.RunnerServer, cgroupfsPath string) runner_pb.RunnerServer { + return &cgroupResourceUsageSamplingRunner{ + RunnerServer: base, + cgroupfsPath: cgroupfsPath, + } +} + +func (r *cgroupResourceUsageSamplingRunner) Run(ctx context.Context, request *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + if !r.activeRun.CompareAndSwap(false, true) { + return nil, status.Error(codes.Internal, "cgroup resource usage sampling requires an exclusive runner cgroup, but concurrent Run() calls were observed") + } + defer r.activeRun.Store(false) + + cgroupResourceUsageReader, err := newCgroupResourceUsageReader(r.cgroupfsPath) + if err != nil { + return nil, util.StatusWrap(err, "Failed to create cgroup resource usage reader") + } + defer func() { + _ = cgroupResourceUsageReader.close() + }() + + response, err := r.RunnerServer.Run(ctx, request) + if err != nil { + return response, err + } + + cgroupUsage, err := cgroupResourceUsageReader.read() + if err != nil { + return response, util.StatusWrap(err, "Failed to read cgroup stats") + } + if cgroupUsage == nil { + return response, nil + } + cgroupAny, err := anypb.New(cgroupUsage) + if err != nil { + return response, util.StatusWrap(err, "Failed to marshal cgroup resource usage") + } + if response != nil { + response.ResourceUsage = append(response.ResourceUsage, cgroupAny) + } + if cgroupUsage.MemoryEventsOomKill > 0 && cgroupUsage.MemoryEventsOom == 0 { + // The cgroup did not reach its memory limit, so the OOM kill likely + // came from system-level memory pressure, such as node memory + // overcommitment. Treat this as retryable infrastructure failure. + return response, status.Error(codes.Unavailable, "An action process was OOM-killed without the action reaching its cgroup memory limit") + } + return response, nil +} + +type cgroupResourceUsageReader struct { + cgroupfsPath string + + // Initial memory.events counter values. + eventsLow int64 + eventsHigh int64 + eventsMax int64 + eventsOOM int64 + eventsOOMKill int64 + eventsOOMGroupKill int64 + + // Initial PSI total stall durations, in microseconds. + psiMemorySomeUS int64 + psiMemoryFullUS int64 + psiCPUSomeUS int64 + psiCPUFullUS int64 + psiIOSomeUS int64 + psiIOFullUS int64 + + memoryPeakFile *os.File +} + +func newCgroupResourceUsageReader(cgroupfsPath string) (*cgroupResourceUsageReader, error) { + events, err := readCgroupKeyValues(filepath.Join(cgroupfsPath, "memory.events")) + if err != nil { + return nil, fmt.Errorf("failed to read memory.events: %w", err) + } + + memorySome, memoryFull, err := parsePSITotals(filepath.Join(cgroupfsPath, "memory.pressure")) + if err != nil { + return nil, fmt.Errorf("failed to read memory.pressure: %w", err) + } + cpuSome, cpuFull, err := parsePSITotals(filepath.Join(cgroupfsPath, "cpu.pressure")) + if err != nil { + return nil, fmt.Errorf("failed to read cpu.pressure: %w", err) + } + ioSome, ioFull, err := parsePSITotals(filepath.Join(cgroupfsPath, "io.pressure")) + if err != nil { + return nil, fmt.Errorf("failed to read io.pressure: %w", err) + } + + // memory.peak is optional. If it is unavailable or cannot be reset/read, + // MemoryPeak remains 0 to indicate that no peak data was collected. + memoryPeakFile := openAndResetCgroupMemoryPeak(filepath.Join(cgroupfsPath, "memory.peak")) + reader := &cgroupResourceUsageReader{ + cgroupfsPath: cgroupfsPath, + + eventsLow: events["low"], + eventsHigh: events["high"], + eventsMax: events["max"], + eventsOOM: events["oom"], + eventsOOMKill: events["oom_kill"], + eventsOOMGroupKill: events["oom_group_kill"], + + psiMemorySomeUS: memorySome, + psiMemoryFullUS: memoryFull, + psiCPUSomeUS: cpuSome, + psiCPUFullUS: cpuFull, + psiIOSomeUS: ioSome, + psiIOFullUS: ioFull, + + memoryPeakFile: memoryPeakFile, + } + return reader, nil +} + +func (r *cgroupResourceUsageReader) close() error { + if r == nil || r.memoryPeakFile == nil { + return nil + } + return r.memoryPeakFile.Close() +} + +func (r *cgroupResourceUsageReader) read() (*resourceusage.CgroupResourceUsage, error) { + events, err := readCgroupKeyValues(filepath.Join(r.cgroupfsPath, "memory.events")) + if err != nil { + return nil, fmt.Errorf("failed to read memory.events: %w", err) + } + + memorySome, memoryFull, err := parsePSITotals(filepath.Join(r.cgroupfsPath, "memory.pressure")) + if err != nil { + return nil, fmt.Errorf("failed to read memory.pressure: %w", err) + } + cpuSome, cpuFull, err := parsePSITotals(filepath.Join(r.cgroupfsPath, "cpu.pressure")) + if err != nil { + return nil, fmt.Errorf("failed to read cpu.pressure: %w", err) + } + ioSome, ioFull, err := parsePSITotals(filepath.Join(r.cgroupfsPath, "io.pressure")) + if err != nil { + return nil, fmt.Errorf("failed to read io.pressure: %w", err) + } + + memoryPeak := readCgroupMemoryPeak(r.memoryPeakFile) + + return &resourceusage.CgroupResourceUsage{ + MemoryEventsLow: events["low"] - r.eventsLow, + MemoryEventsHigh: events["high"] - r.eventsHigh, + MemoryEventsMax: events["max"] - r.eventsMax, + MemoryEventsOom: events["oom"] - r.eventsOOM, + MemoryEventsOomKill: events["oom_kill"] - r.eventsOOMKill, + MemoryEventsOomGroupKill: events["oom_group_kill"] - r.eventsOOMGroupKill, + + MemoryPeak: memoryPeak, + + MemoryPressureSomeTotal: microsecondsDuration(memorySome - r.psiMemorySomeUS), + MemoryPressureFullTotal: microsecondsDuration(memoryFull - r.psiMemoryFullUS), + CpuPressureSomeTotal: microsecondsDuration(cpuSome - r.psiCPUSomeUS), + CpuPressureFullTotal: microsecondsDuration(cpuFull - r.psiCPUFullUS), + IoPressureSomeTotal: microsecondsDuration(ioSome - r.psiIOSomeUS), + IoPressureFullTotal: microsecondsDuration(ioFull - r.psiIOFullUS), + }, nil +} + +func microsecondsDuration(microseconds int64) *durationpb.Duration { + return durationpb.New(time.Duration(microseconds) * time.Microsecond) +} + +func openAndResetCgroupMemoryPeak(path string) *os.File { + f, err := os.OpenFile(path, os.O_RDWR, 0) + if err != nil { + return nil + } + // Linux scopes memory.peak reset state to the file descriptor used for + // the write, so keep this descriptor open until the action finishes. + if _, err := f.Write([]byte("1")); err != nil { + f.Close() + return nil + } + return f +} + +func readCgroupMemoryPeak(f *os.File) int64 { + if f == nil { + return 0 + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + return 0 + } + data, err := io.ReadAll(f) + if err != nil { + return 0 + } + value, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64) + if err != nil { + return 0 + } + return value +} + +// readCgroupKeyValues parses a cgroup file with key-value lines +// (e.g., memory.events, memory.stat). Each line has the form "key value". +func readCgroupKeyValues(path string) (map[string]int64, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + result := make(map[string]int64) + scanner := bufio.NewScanner(f) + for scanner.Scan() { + parts := strings.SplitN(scanner.Text(), " ", 2) + if len(parts) != 2 { + continue + } + v, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, err + } + result[parts[0]] = v + } + if err := scanner.Err(); err != nil { + return nil, err + } + return result, nil +} + +// parsePSITotals parses a PSI pressure file and returns the total +// stall microseconds for the "some" and "full" lines. +// Format: some avg10=0.00 avg60=0.00 avg300=0.00 total=12345 +func parsePSITotals(path string) (someUS, fullUS int64, err error) { + f, err := os.Open(path) + if err != nil { + return 0, 0, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + prefix := fields[0] + var total int64 + foundTotal := false + for _, field := range fields[1:] { + if strings.HasPrefix(field, "total=") { + var err error + total, err = strconv.ParseInt(field[len("total="):], 10, 64) + if err != nil { + return 0, 0, err + } + foundTotal = true + break + } + } + if !foundTotal { + return 0, 0, fmt.Errorf("missing total field in %q", line) + } + switch prefix { + case "some": + someUS = total + case "full": + fullUS = total + } + } + if err := scanner.Err(); err != nil { + return 0, 0, err + } + return someUS, fullUS, nil +} + +// ResolveCurrentCgroupfsPathFromProcFiles resolves the cgroup v2 filesystem +// directory for the process described by procCgroupPath and procMountInfoPath. +func ResolveCurrentCgroupfsPathFromProcFiles(procCgroupPath, procMountInfoPath string) (string, error) { + currentCgroupPath, err := readCurrentCgroupRelativePath(procCgroupPath) + if err != nil { + return "", err + } + return resolveCgroupPathFromMountInfo(procMountInfoPath, currentCgroupPath) +} + +func resolveCgroupPathFromMountInfo(path, currentCgroupPath string) (string, error) { + data, err := os.ReadFile(path) + if err != nil { + return "", fmt.Errorf("failed to read cgroup mount information: %w", err) + } + currentCgroupPath = cleanCgroupPath(currentCgroupPath) + var bestMountPoint, bestRoot string + for _, line := range strings.Split(string(data), "\n") { + separator := strings.Index(line, " - ") + if separator < 0 { + continue + } + mountFields := strings.Fields(line[:separator]) + filesystemFields := strings.Fields(line[separator+3:]) + if len(mountFields) < 5 || len(filesystemFields) < 1 || filesystemFields[0] != "cgroup2" { + continue + } + root := cleanCgroupPath(unescapeMountInfoField(mountFields[3])) + if !isCgroupPathPrefix(root, currentCgroupPath) { + continue + } + if len(root) <= len(bestRoot) { + continue + } + bestRoot = root + mountPoint := filepath.Clean(unescapeMountInfoField(mountFields[4])) + relativePath, err := filepath.Rel(root, currentCgroupPath) + if err != nil { + continue + } + bestMountPoint = filepath.Join(mountPoint, relativePath) + } + if bestMountPoint == "" { + return "", fmt.Errorf("cgroup v2 mount containing current cgroup %q not found in %s", currentCgroupPath, path) + } + return bestMountPoint, nil +} + +func cleanCgroupPath(path string) string { + path = filepath.Clean(path) + if filepath.IsAbs(path) { + return path + } + return filepath.Clean(string(filepath.Separator) + path) +} + +func isCgroupPathPrefix(root, path string) bool { + relativePath, err := filepath.Rel(root, path) + return err == nil && + relativePath != ".." && + !strings.HasPrefix(relativePath, ".."+string(filepath.Separator)) && + !filepath.IsAbs(relativePath) +} + +// /proc/self/mountinfo encodes whitespace and backslash in path fields using +// octal escape sequences. +func unescapeMountInfoField(field string) string { + replacer := strings.NewReplacer( + `\011`, "\t", + `\012`, "\n", + `\040`, " ", + `\134`, `\`, + ) + return replacer.Replace(field) +} + +func readCurrentCgroupRelativePath(path string) (string, error) { + data, err := os.ReadFile(path) + if err != nil { + return "", fmt.Errorf("failed to read current cgroup: %w", err) + } + for _, line := range strings.Split(string(data), "\n") { + parts := strings.SplitN(line, ":", 3) + if len(parts) == 3 && parts[0] == "0" && parts[1] == "" { + return filepath.Clean(parts[2]), nil + } + } + return "", fmt.Errorf("cgroup v2 entry not found in %s", path) +} diff --git a/pkg/runner/cgroup_resource_usage_sampling_runner_other.go b/pkg/runner/cgroup_resource_usage_sampling_runner_other.go new file mode 100644 index 00000000..fe6651e3 --- /dev/null +++ b/pkg/runner/cgroup_resource_usage_sampling_runner_other.go @@ -0,0 +1,17 @@ +//go:build !linux +// +build !linux + +package runner + +import ( + runner_pb "github.com/buildbarn/bb-remote-execution/pkg/proto/runner" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// NewCgroupResourceUsageSamplingRunner returns an error, as cgroup resource +// usage sampling is only supported on Linux. +func NewCgroupResourceUsageSamplingRunner(base runner_pb.RunnerServer) (runner_pb.RunnerServer, error) { + return nil, status.Error(codes.Unimplemented, "cgroup resource usage sampling is only supported on Linux") +} diff --git a/pkg/runner/cgroup_resource_usage_sampling_runner_test.go b/pkg/runner/cgroup_resource_usage_sampling_runner_test.go new file mode 100644 index 00000000..04ba1f3f --- /dev/null +++ b/pkg/runner/cgroup_resource_usage_sampling_runner_test.go @@ -0,0 +1,421 @@ +//go:build linux +// +build linux + +package runner_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/buildbarn/bb-remote-execution/internal/mock" + "github.com/buildbarn/bb-remote-execution/pkg/proto/resourceusage" + runner_pb "github.com/buildbarn/bb-remote-execution/pkg/proto/runner" + "github.com/buildbarn/bb-remote-execution/pkg/runner" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestCgroupResourceUsageSamplingRunnerReportsDeltasFromCgroupFiles(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + writeFile(t, cgroupPath, "memory.events", ` +low 11 +high 22 +max 33 +oom 44 +oom_kill 55 +oom_group_kill 66 +`) + writeFile(t, cgroupPath, "memory.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=160 +full avg10=0.00 avg60=0.00 avg300=0.00 total=290 +`) + writeFile(t, cgroupPath, "cpu.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=345 +full avg10=0.00 avg60=0.00 avg300=0.00 total=410 +`) + writeFile(t, cgroupPath, "io.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=480 +full avg10=0.00 avg60=0.00 avg300=0.00 total=610 +`) + writeFile(t, cgroupPath, "memory.peak", "4096\n") + return &runner_pb.RunResponse{ + ExitCode: 7, + }, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + require.NoError(t, err) + require.Equal(t, int64(7), response.ExitCode) + require.Len(t, response.ResourceUsage, 1) + var usage resourceusage.CgroupResourceUsage + require.NoError(t, response.ResourceUsage[0].UnmarshalTo(&usage)) + + require.Equal(t, int64(10), usage.MemoryEventsLow) + require.Equal(t, int64(20), usage.MemoryEventsHigh) + require.Equal(t, int64(30), usage.MemoryEventsMax) + require.Equal(t, int64(40), usage.MemoryEventsOom) + require.Equal(t, int64(50), usage.MemoryEventsOomKill) + require.Equal(t, int64(60), usage.MemoryEventsOomGroupKill) + require.Equal(t, int64(4096), usage.MemoryPeak) + require.Equal(t, 60*time.Microsecond, usage.GetMemoryPressureSomeTotal().AsDuration()) + require.Equal(t, 90*time.Microsecond, usage.GetMemoryPressureFullTotal().AsDuration()) + require.Equal(t, 45*time.Microsecond, usage.GetCpuPressureSomeTotal().AsDuration()) + require.Equal(t, 100*time.Microsecond, usage.GetCpuPressureFullTotal().AsDuration()) + require.Equal(t, 80*time.Microsecond, usage.GetIoPressureSomeTotal().AsDuration()) + require.Equal(t, 110*time.Microsecond, usage.GetIoPressureFullTotal().AsDuration()) +} + +func TestCgroupResourceUsageSamplingRunnerAllowsMissingOomGroupKillCounter(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + writeFile(t, cgroupPath, "memory.events", ` +low 1 +high 2 +max 3 +oom 4 +oom_kill 5 +`) + + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + writeFile(t, cgroupPath, "memory.events", ` +low 11 +high 22 +max 33 +oom 44 +oom_kill 55 +`) + return &runner_pb.RunResponse{}, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + require.NoError(t, err) + require.Len(t, response.ResourceUsage, 1) + var usage resourceusage.CgroupResourceUsage + require.NoError(t, response.ResourceUsage[0].UnmarshalTo(&usage)) + + require.Equal(t, int64(50), usage.MemoryEventsOomKill) + require.Equal(t, int64(0), usage.MemoryEventsOomGroupKill) +} + +func TestResolveCurrentCgroupfsPathUsesMatchingCgroup2MountRoot(t *testing.T) { + for _, testCase := range []struct { + name string + mountInfo string + cgroup string + wantPath string + }{ + { + name: "root mount", + mountInfo: ` +36 25 0:31 / %[1]s rw,nosuid,nodev,noexec,relatime - cgroup2 cgroup rw +`, + cgroup: "0::/worker.slice/runner.scope\n", + wantPath: "worker.slice/runner.scope", + }, + { + name: "subtree mount", + mountInfo: ` +36 25 0:31 /unrelated /wrong rw,nosuid,nodev,noexec,relatime - cgroup2 cgroup rw +37 25 0:31 /worker.slice %[1]s rw,nosuid,nodev,noexec,relatime - cgroup2 cgroup rw +`, + cgroup: "0::/worker.slice/runner.scope\n", + wantPath: "runner.scope", + }, + { + name: "most specific mount root", + mountInfo: ` +36 25 0:31 / %[1]s/unrelated rw,nosuid,nodev,noexec,relatime - cgroup2 cgroup rw +37 25 0:31 /worker.slice %[1]s rw,nosuid,nodev,noexec,relatime - cgroup2 cgroup rw +`, + cgroup: "0::/worker.slice/runner.scope\n", + wantPath: "runner.scope", + }, + { + name: "current cgroup is mount root", + mountInfo: ` +36 25 0:31 /worker.slice %[1]s rw,nosuid,nodev,noexec,relatime - cgroup2 cgroup rw +`, + cgroup: "0::/worker.slice\n", + wantPath: ".", + }, + } { + t.Run(testCase.name, func(t *testing.T) { + tempDir := t.TempDir() + cgroupfsPath := filepath.Join(tempDir, "cgroupfs") + resolvedCgroupPath := filepath.Join(cgroupfsPath, testCase.wantPath) + + mountInfoPath := filepath.Join(tempDir, "mountinfo") + procCgroupPath := filepath.Join(tempDir, "cgroup") + writeFile(t, tempDir, "mountinfo", fmt.Sprintf(testCase.mountInfo, cgroupfsPath)) + writeFile(t, tempDir, "cgroup", testCase.cgroup) + + gotCgroupfsPath, err := runner.ResolveCurrentCgroupfsPathFromProcFiles(procCgroupPath, mountInfoPath) + require.NoError(t, err) + require.Equal(t, resolvedCgroupPath, gotCgroupfsPath) + }) + } +} + +func TestCgroupResourceUsageSamplingRunnerAppendsResourceUsage(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + request := &runner_pb.RunRequest{} + baseRunner.EXPECT().Run(gomock.Any(), request).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + writeFile(t, cgroupPath, "memory.events", ` +low 1 +high 3 +max 3 +oom 4 +oom_kill 5 +oom_group_kill 6 +`) + writeFile(t, cgroupPath, "cpu.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=423 +full avg10=0.00 avg60=0.00 avg300=0.00 total=355 +`) + writeFile(t, cgroupPath, "memory.peak", "4096\n") + return &runner_pb.RunResponse{ + ExitCode: 7, + }, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), request) + require.NoError(t, err) + require.Equal(t, int64(7), response.ExitCode) + require.Len(t, response.ResourceUsage, 1) + var got resourceusage.CgroupResourceUsage + require.NoError(t, response.ResourceUsage[0].UnmarshalTo(&got)) + require.Equal(t, int64(1), got.GetMemoryEventsHigh()) + require.Equal(t, int64(4096), got.GetMemoryPeak()) + require.Equal(t, 123*time.Microsecond, got.GetCpuPressureSomeTotal().AsDuration()) + require.Equal(t, 45*time.Microsecond, got.GetCpuPressureFullTotal().AsDuration()) +} + +func TestCgroupResourceUsageSamplingRunnerResetsMemoryPeakBeforeRun(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + memoryPeak, err := os.ReadFile(filepath.Join(cgroupPath, "memory.peak")) + require.NoError(t, err) + require.Equal(t, "1\n", string(memoryPeak)) + writeFile(t, cgroupPath, "memory.peak", "4096\n") + return &runner_pb.RunResponse{}, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + require.NoError(t, err) + require.Len(t, response.ResourceUsage, 1) + var got resourceusage.CgroupResourceUsage + require.NoError(t, response.ResourceUsage[0].UnmarshalTo(&got)) + require.Equal(t, int64(4096), got.GetMemoryPeak()) +} + +func TestCgroupResourceUsageSamplingRunnerReadErrorIsPropagated(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + request := &runner_pb.RunRequest{} + baseResponse := &runner_pb.RunResponse{ExitCode: 7} + baseRunner.EXPECT().Run(gomock.Any(), request).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + require.NoError(t, os.Remove(filepath.Join(cgroupPath, "memory.events"))) + return baseResponse, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), request) + require.Error(t, err) + require.Contains(t, status.Convert(err).Message(), "Failed to read cgroup stats") + require.Same(t, baseResponse, response) + require.Empty(t, response.ResourceUsage) +} + +func TestCgroupResourceUsageSamplingRunnerRunErrorDoesNotReadCgroupUsage(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + baseErr := status.Error(codes.FailedPrecondition, "failed") + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + require.NoError(t, os.Remove(filepath.Join(cgroupPath, "memory.events"))) + return nil, baseErr + }, + ) + + response, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + require.Nil(t, response) + require.Equal(t, baseErr, err) +} + +func TestCgroupResourceUsageSamplingRunnerSystemOOMKillIsUnavailable(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + writeFile(t, cgroupPath, "memory.events", ` +low 1 +high 2 +max 3 +oom 4 +oom_kill 6 +oom_group_kill 6 +`) + return &runner_pb.RunResponse{ + ExitCode: 0, + }, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + require.Equal(t, codes.Unavailable, status.Code(err)) + require.Equal(t, "An action process was OOM-killed without the action reaching its cgroup memory limit", status.Convert(err).Message()) + require.NotNil(t, response) + require.Len(t, response.ResourceUsage, 1) +} + +func TestCgroupResourceUsageSamplingRunnerCgroupOOMKillIsActionResult(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(context.Context, *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + writeFile(t, cgroupPath, "memory.events", ` +low 1 +high 2 +max 3 +oom 5 +oom_kill 6 +oom_group_kill 6 +`) + return &runner_pb.RunResponse{ + ExitCode: 0, + }, nil + }, + ) + + response, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + require.NoError(t, err) + require.Equal(t, int64(0), response.ExitCode) + require.Len(t, response.ResourceUsage, 1) +} + +func TestCgroupResourceUsageSamplingRunnerRejectsConcurrentRun(t *testing.T) { + ctrl := gomock.NewController(t) + baseRunner := mock.NewMockRunnerServer(ctrl) + cgroupPath := createTestCgroup(t) + wrappedRunner := runner.NewCgroupResourceUsageSamplingRunnerWithCgroupfsPath(baseRunner, cgroupPath) + + started := make(chan struct{}) + release := make(chan struct{}) + baseRunner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *runner_pb.RunRequest) (*runner_pb.RunResponse, error) { + close(started) + <-release + return &runner_pb.RunResponse{}, nil + }, + ) + + firstRunErr := make(chan error, 1) + go func() { + _, err := wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + firstRunErr <- err + }() + + select { + case <-started: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for first Run() to enter wrapped runner") + } + + secondRunDone := make(chan struct{}) + var response *runner_pb.RunResponse + var err error + go func() { + response, err = wrappedRunner.Run(context.Background(), &runner_pb.RunRequest{}) + close(secondRunDone) + }() + select { + case <-secondRunDone: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for concurrent Run() rejection") + } + require.Nil(t, response) + require.Equal(t, codes.Internal, status.Code(err)) + require.Contains(t, status.Convert(err).Message(), "concurrent Run() calls") + + close(release) + select { + case err := <-firstRunErr: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for first Run() to finish") + } +} + +func createTestCgroup(t *testing.T) string { + cgroupPath := t.TempDir() + writeFile(t, cgroupPath, "memory.events", ` +low 1 +high 2 +max 3 +oom 4 +oom_kill 5 +oom_group_kill 6 +`) + writeFile(t, cgroupPath, "memory.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=100 +full avg10=0.00 avg60=0.00 avg300=0.00 total=200 +`) + writeFile(t, cgroupPath, "cpu.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=300 +full avg10=0.00 avg60=0.00 avg300=0.00 total=310 +`) + writeFile(t, cgroupPath, "io.pressure", ` +some avg10=0.00 avg60=0.00 avg300=0.00 total=400 +full avg10=0.00 avg60=0.00 avg300=0.00 total=500 +`) + writeFile(t, cgroupPath, "memory.peak", "0\n") + return cgroupPath +} + +func writeFile(t *testing.T, dir, name, contents string) { + require.NoError(t, os.WriteFile(filepath.Join(dir, name), []byte(contents), 0o666)) +}