Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions scraper/scraperhelper/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhe

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -112,7 +113,9 @@ func NewLogsController(cfg *ControllerConfig,
scrapers = append(scrapers, s)
}
return controller.NewController[scraper.Logs](
cfg, rSet, scrapers, func(c *controller.Controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh)
cfg, rSet, scrapers, func(ctx context.Context, c *controller.Controller[scraper.Logs]) error {
return scrapeLogs(ctx, c, nextConsumer)
}, co.tickerCh)
}

// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics.
Expand All @@ -136,18 +139,21 @@ func NewMetricsController(cfg *ControllerConfig,
scrapers = append(scrapers, s)
}
return controller.NewController[scraper.Metrics](
cfg, rSet, scrapers, func(c *controller.Controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh)
cfg, rSet, scrapers, func(ctx context.Context, c *controller.Controller[scraper.Metrics]) error {
return scrapeMetrics(ctx, c, nextConsumer)
}, co.tickerCh)
}

func scrapeLogs(c *controller.Controller[scraper.Logs], nextConsumer consumer.Logs) {
ctx, done := controller.WithScrapeContext(c.Timeout)
defer done()

func scrapeLogs(ctx context.Context, c *controller.Controller[scraper.Logs], nextConsumer consumer.Logs) error {
var errs []error
logs := plog.NewLogs()
for i := range c.Scrapers {
md, err := c.Scrapers[i].ScrapeLogs(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
if err != nil {
errs = append(errs, err)
if !scrapererror.IsPartialScrapeError(err) {
continue
}
}
md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
}
Expand All @@ -156,17 +162,19 @@ func scrapeLogs(c *controller.Controller[scraper.Logs], nextConsumer consumer.Lo
ctx = c.Obsrecv.StartLogsOp(ctx)
err := nextConsumer.ConsumeLogs(ctx, logs)
c.Obsrecv.EndLogsOp(ctx, "", logRecordCount, err)
return errors.Join(append(errs, err)...)
}

func scrapeMetrics(c *controller.Controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := controller.WithScrapeContext(c.Timeout)
defer done()

func scrapeMetrics(ctx context.Context, c *controller.Controller[scraper.Metrics], nextConsumer consumer.Metrics) error {
var errs []error
metrics := pmetric.NewMetrics()
for i := range c.Scrapers {
md, err := c.Scrapers[i].ScrapeMetrics(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
if err != nil {
errs = append(errs, err)
if !scrapererror.IsPartialScrapeError(err) {
continue
}
}
md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
}
Expand All @@ -175,6 +183,7 @@ func scrapeMetrics(c *controller.Controller[scraper.Metrics], nextConsumer consu
ctx = c.Obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeMetrics(ctx, metrics)
c.Obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
return errors.Join(append(errs, err)...)
}

func getOptions(options []ControllerOption) controllerOptions {
Expand Down
27 changes: 13 additions & 14 deletions scraper/scraperhelper/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Controller[T component.Component] struct {
Timeout time.Duration

Scrapers []T
scrapeFunc func(*Controller[T])
scrapeFunc func(context.Context, *Controller[T]) error
tickerCh <-chan time.Time

done chan struct{}
Expand All @@ -38,7 +38,7 @@ func NewController[T component.Component](
cfg *ControllerConfig,
rSet receiver.Settings,
scrapers []T,
scrapeFunc func(*Controller[T]),
scrapeFunc func(context.Context, *Controller[T]) error,
tickerCh <-chan time.Time,
) (*Controller[T], error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
Expand All @@ -61,6 +61,15 @@ func NewController[T component.Component](
Obsrecv: obsrecv,
}

if cfg.Timeout > 0 {
timeout := cfg.Timeout
cs.scrapeFunc = func(ctx context.Context, c *Controller[T]) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
Comment thread
axw marked this conversation as resolved.
Outdated
defer cancel()
return scrapeFunc(ctx, c)
}
}

return cs, nil
}

Expand Down Expand Up @@ -110,11 +119,11 @@ func (sc *Controller[T]) startScraping() {
// Call scrape method during initialization to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeFunc(sc)
_ = sc.scrapeFunc(context.Background(), sc)
for {
select {
case <-sc.tickerCh:
sc.scrapeFunc(sc)
_ = sc.scrapeFunc(context.Background(), sc)
case <-sc.done:
return
}
Expand All @@ -132,13 +141,3 @@ func GetSettings(sType component.Type, rSet receiver.Settings) scraper.Settings
BuildInfo: rSet.BuildInfo,
}
}

// WithScrapeContext will return a context that has no deadline if timeout is 0
// which implies no explicit timeout had occurred, otherwise, a context
// with a deadline of the provided timeout is returned.
func WithScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout == 0 {
return context.WithCancel(context.Background())
}
return context.WithTimeout(context.Background(), timeout)
}
134 changes: 110 additions & 24 deletions scraper/scraperhelper/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newTestController(t *testing.T, cfg *ControllerConfig, scrapers []component
cfg,
receivertest.NewNopSettings(receivertest.NopType),
scrapers,
func(*Controller[component.Component]) {},
func(context.Context, *Controller[component.Component]) error { return nil },
tickerCh,
)
require.NoError(t, err)
Expand All @@ -40,7 +40,7 @@ func newTestController(t *testing.T, cfg *ControllerConfig, scrapers []component
func TestNewController(t *testing.T) {
t.Parallel()

scrapeFunc := func(*Controller[component.Component]) {}
scrapeFunc := func(context.Context, *Controller[component.Component]) error { return nil }

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -145,8 +145,9 @@ func TestStartScrapingWithNilTickerCh(t *testing.T) {
t.Parallel()

var scrapeCount atomic.Int32
scrapeFunc := func(*Controller[component.Component]) {
scrapeFunc := func(context.Context, *Controller[component.Component]) error {
scrapeCount.Add(1)
return nil
}

cfg := &ControllerConfig{
Expand Down Expand Up @@ -226,8 +227,9 @@ func TestStartScraping(t *testing.T) {

tickerCh := make(chan time.Time)
var scrapeCount atomic.Int32
scrapeFunc := func(*Controller[component.Component]) {
scrapeFunc := func(context.Context, *Controller[component.Component]) error {
scrapeCount.Add(1)
return nil
}

cfg := &ControllerConfig{
Expand Down Expand Up @@ -263,8 +265,9 @@ func TestStartScrapingWithInitialDelay(t *testing.T) {

tickerCh := make(chan time.Time)
var scrapeCount atomic.Int32
scrapeFunc := func(*Controller[component.Component]) {
scrapeFunc := func(context.Context, *Controller[component.Component]) error {
scrapeCount.Add(1)
return nil
}

cfg := &ControllerConfig{
Expand Down Expand Up @@ -294,8 +297,9 @@ func TestStartScrapingShutdownDuringInitialDelay(t *testing.T) {
t.Parallel()

var scraped atomic.Bool
scrapeFunc := func(*Controller[component.Component]) {
scrapeFunc := func(context.Context, *Controller[component.Component]) error {
scraped.Store(true)
return nil
}

cfg := &ControllerConfig{
Expand Down Expand Up @@ -330,25 +334,107 @@ func TestGetSettings(t *testing.T) {
assert.Equal(t, rSet.BuildInfo, sSet.BuildInfo)
}

func TestWithScrapeContext(t *testing.T) {
func TestScrapeFuncAppliesTimeout(t *testing.T) {
t.Parallel()

t.Run("zero timeout returns context without deadline", func(t *testing.T) {
t.Parallel()
ctx, cancel := WithScrapeContext(0)
defer cancel()
_, hasDeadline := ctx.Deadline()
assert.False(t, hasDeadline)
})
timeout := 5 * time.Second
var deadline time.Time
var hasDeadline bool
scrapeFunc := func(ctx context.Context, _ *Controller[component.Component]) error {
deadline, hasDeadline = ctx.Deadline()
return nil
}

t.Run("positive timeout returns context with deadline", func(t *testing.T) {
t.Parallel()
timeout := 5 * time.Second
ctx, cancel := WithScrapeContext(timeout)
defer cancel()
deadline, hasDeadline := ctx.Deadline()
assert.True(t, hasDeadline)
// The deadline should be approximately now + timeout.
assert.WithinDuration(t, time.Now().Add(timeout), deadline, time.Second)
})
cfg := &ControllerConfig{
CollectionInterval: time.Minute,
Timeout: timeout,
}
ctrl, err := NewController(
cfg,
receivertest.NewNopSettings(receivertest.NopType),
[]component.Component{},
scrapeFunc,
nil,
)
require.NoError(t, err)

require.NoError(t, ctrl.scrapeFunc(context.Background(), ctrl))
assert.True(t, hasDeadline)
assert.WithinDuration(t, time.Now().Add(timeout), deadline, time.Second)
}

func TestScrapeFuncNoTimeout(t *testing.T) {
t.Parallel()

var hasDeadline bool
scrapeFunc := func(ctx context.Context, _ *Controller[component.Component]) error {
_, hasDeadline = ctx.Deadline()
return nil
}

cfg := &ControllerConfig{
CollectionInterval: time.Minute,
}
ctrl, err := NewController(
cfg,
receivertest.NewNopSettings(receivertest.NopType),
[]component.Component{},
scrapeFunc,
nil,
)
require.NoError(t, err)

require.NoError(t, ctrl.scrapeFunc(context.Background(), ctrl))
assert.False(t, hasDeadline)
}

func TestScrapeFuncPropagatesParentCancellation(t *testing.T) {
t.Parallel()

var gotErr error
scrapeFunc := func(ctx context.Context, _ *Controller[component.Component]) error {
gotErr = ctx.Err()
return nil
}

cfg := &ControllerConfig{
CollectionInterval: time.Minute,
Timeout: time.Hour,
}
ctrl, err := NewController(
cfg,
receivertest.NewNopSettings(receivertest.NopType),
[]component.Component{},
scrapeFunc,
nil,
)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()
require.NoError(t, ctrl.scrapeFunc(ctx, ctrl))
assert.ErrorIs(t, gotErr, context.Canceled)
}

func TestScrapeFuncReturnsError(t *testing.T) {
t.Parallel()

scrapeErr := errors.New("scrape failed")
scrapeFunc := func(context.Context, *Controller[component.Component]) error {
return scrapeErr
}

cfg := &ControllerConfig{
CollectionInterval: time.Minute,
}
ctrl, err := NewController(
cfg,
receivertest.NewNopSettings(receivertest.NopType),
[]component.Component{},
scrapeFunc,
nil,
)
require.NoError(t, err)

assert.ErrorIs(t, ctrl.scrapeFunc(context.Background(), ctrl), scrapeErr)
}
20 changes: 12 additions & 8 deletions scraper/scraperhelper/xscraperhelper/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package xscraperhelper // import "go.opentelemetry.io/collector/scraper/scraperh

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -106,7 +107,9 @@ func NewProfilesController(cfg *scraperhelper.ControllerConfig,
scrapers = append(scrapers, s)
}
return controller.NewController[xscraper.Profiles](
cfg, rSet, scrapers, func(c *controller.Controller[xscraper.Profiles]) { scrapeProfiles(c, nextConsumer) }, co.tickerCh)
cfg, rSet, scrapers, func(ctx context.Context, c *controller.Controller[xscraper.Profiles]) error {
return scrapeProfiles(ctx, c, nextConsumer)
}, co.tickerCh)
}

func getOptions(options []ControllerOption) controllerOptions {
Expand All @@ -117,20 +120,21 @@ func getOptions(options []ControllerOption) controllerOptions {
return co
}

func scrapeProfiles(c *controller.Controller[xscraper.Profiles], nextConsumer xconsumer.Profiles) {
ctx, done := controller.WithScrapeContext(c.Timeout)
defer done()

func scrapeProfiles(ctx context.Context, c *controller.Controller[xscraper.Profiles], nextConsumer xconsumer.Profiles) error {
var errs []error
profiles := pprofile.NewProfiles()
for i := range c.Scrapers {
md, err := c.Scrapers[i].ScrapeProfiles(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
if err != nil {
errs = append(errs, err)
if !scrapererror.IsPartialScrapeError(err) {
continue
}
}
md.ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
}

// TODO: Add proper receiver observability for profiles when receiverhelper supports it
// For now, we skip the obs report and just consume the profiles directly
_ = nextConsumer.ConsumeProfiles(ctx, profiles)
return errors.Join(append(errs, nextConsumer.ConsumeProfiles(ctx, profiles))...)
}
Loading