Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions agent/agentserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
Expand All @@ -38,13 +40,34 @@ import (
mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler"
mockannounceclient "github.com/uber/kraken/mocks/tracker/announceclient"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/memsize"
"github.com/uber/kraken/utils/testutil"

"github.com/go-chi/chi"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
)

// failingResponseWriter always errors on Write, simulating a client that
// disconnected after the response headers were sent.
type failingResponseWriter struct {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a compile-time check after the definition:

var _ http.ResponseWriter = (*failingResponseWriter)(nil)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thx sir!

header http.Header
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's drop this field and simply return http.Header{} from within Header().

}

func (f *failingResponseWriter) Header() http.Header {
if f.header == nil {
f.header = http.Header{}
}
return f.header
}

func (f *failingResponseWriter) Write([]byte) (int, error) {
return 0, errors.New("write failed: client disconnected")
}

func (f *failingResponseWriter) WriteHeader(int) {}

type serverMocks struct {
cads *store.CADownloadStore
sched *mockscheduler.MockReloadableScheduler
Expand All @@ -53,6 +76,7 @@ type serverMocks struct {
containerdCli *mockcontainerd.MockClient
ac *mockannounceclient.MockClient
containerRuntime *mockcontainerruntime.MockFactory
stats tally.TestScope
cleanup *testutil.Cleanup
}

Expand All @@ -73,19 +97,31 @@ func newServerMocks(t *testing.T) (*serverMocks, func()) {
containerdCli := mockcontainerd.NewMockClient(ctrl)
ac := mockannounceclient.NewMockClient(ctrl)
containerruntime := mockcontainerruntime.NewMockFactory(ctrl)
stats := tally.NewTestScope("", nil)
return &serverMocks{
cads, sched, tags, dockerCli, containerdCli, ac,
containerruntime, &cleanup,
containerruntime, stats, &cleanup,
}, cleanup.Run
}

func (m *serverMocks) startServer(c Config) (*Server, string) {
s := New(c, tally.NoopScope, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
s := New(c, m.stats, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
addr, stop := testutil.StartServer(s.Handler())
m.cleanup.Add(stop)
return s, addr
}

// mbServedValue returns the sum of all "mb_served" counter values in the scope.
func mbServedValue(scope tally.TestScope) int64 {
var total int64
for _, c := range scope.Snapshot().Counters() {
if c.Name() == "mb_served" {
total += c.Value()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the metrics not be emitted to the same counter? If so, we can just immediately return c.Value() from within the loop.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thx sir!

}
}
return total
}

func TestGetTag(t *testing.T) {
require := require.New(t)

Expand Down Expand Up @@ -146,6 +182,80 @@ func TestDownload(t *testing.T) {
require.Equal(string(blob.Content), string(result))
}

func TestDownloadEmitsMBServed(t *testing.T) {
for _, tc := range []struct {
desc string
blobSize uint64
wantMB int64
}{
{"large blob (2 MiB)", 2 * memsize.MB, 2},
{"exact 1 MiB blob", memsize.MB, 1},
{"sub-MiB blob truncates to 0", 256 * memsize.KB, 0},
} {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

namespace := core.TagFixture()
blob := core.SizedBlobFixture(tc.blobSize, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

r, err := c.Download(namespace, blob.Digest)
require.NoError(err)
_, err = io.ReadAll(r)
require.NoError(err)

require.Equal(tc.wantMB, mbServedValue(mocks.stats))
})
}
}

// TestDownloadEmitsMBServedEvenWhenCopyFails documents that the mb_served
// counter is incremented before io.Copy runs, so on a client disconnect (or
// any other write failure) the counter still records a full serve even though
// no bytes actually reached the client. This is the existing behavior after
// PR #597; if a future change moves the increment to after a successful copy,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line from the comment?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks!

// this test will need to be updated.
func TestDownloadEmitsMBServedEvenWhenCopyFails(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

namespace := core.TagFixture()
blob := core.SizedBlobFixture(2*memsize.MB, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

s := New(
Config{}, mocks.stats, mocks.cads, mocks.sched, mocks.tags,
mocks.ac, mocks.containerRuntime)

rctx := chi.NewRouteContext()
rctx.URLParams.Add("namespace", namespace)
rctx.URLParams.Add("digest", blob.Digest.String())
req := httptest.NewRequest(http.MethodGet, "/", nil).
WithContext(context.WithValue(context.Background(), chi.RouteCtxKey, rctx))

err := s.downloadBlobHandler(&failingResponseWriter{}, req)
require.Error(err)
require.Contains(err.Error(), "copy file")

require.Equal(int64(2), mbServedValue(mocks.stats))
}

func TestDownloadNotFound(t *testing.T) {
require := require.New(t)

Expand Down
84 changes: 82 additions & 2 deletions lib/dockerregistry/transfer/ro_transferer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/kraken/lib/store"
mocktagclient "github.com/uber/kraken/mocks/build-index/tagclient"
mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler"
"github.com/uber/kraken/utils/memsize"
"github.com/uber/kraken/utils/testutil"

"github.com/golang/mock/gomock"
Expand All @@ -37,6 +38,7 @@ type agentTransfererMocks struct {
cads *store.CADownloadStore
tags *mocktagclient.MockClient
sched *mockscheduler.MockScheduler
stats tally.TestScope
}

func newReadOnlyTransfererMocks(t *testing.T) (*agentTransfererMocks, func()) {
Expand All @@ -52,11 +54,24 @@ func newReadOnlyTransfererMocks(t *testing.T) (*agentTransfererMocks, func()) {

sched := mockscheduler.NewMockScheduler(ctrl)

return &agentTransfererMocks{cads, tags, sched}, cleanup.Run
stats := tally.NewTestScope("", nil)

return &agentTransfererMocks{cads, tags, sched, stats}, cleanup.Run
}

func (m *agentTransfererMocks) new() *ReadOnlyTransferer {
return NewReadOnlyTransferer(tally.NoopScope, m.cads, m.tags, m.sched)
return NewReadOnlyTransferer(m.stats, m.cads, m.tags, m.sched)
}

// mbServedValue returns the sum of all "mb_served" counter values in the scope.
func mbServedValue(scope tally.TestScope) int64 {
var total int64
for _, c := range scope.Snapshot().Counters() {
if c.Name() == "mb_served" {
total += c.Value()
}
}
return total
}

func TestReadOnlyTransfererDownloadCachesBlob(t *testing.T) {
Expand Down Expand Up @@ -86,6 +101,71 @@ func TestReadOnlyTransfererDownloadCachesBlob(t *testing.T) {
}
}

func TestReadOnlyTransfererDownloadEmitsMBServed(t *testing.T) {
for _, tc := range []struct {
desc string
blobSize uint64
wantMB int64
}{
{"large blob (2 MiB)", 2 * memsize.MB, 2},
{"exact 1 MiB blob", memsize.MB, 1},
{"sub-MiB blob truncates to 0", 256 * memsize.KB, 0},
} {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)

mocks, cleanup := newReadOnlyTransfererMocks(t)
defer cleanup()

transferer := mocks.new()

namespace := "docker/repo-bar:latest"
blob := core.SizedBlobFixture(tc.blobSize, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

result, err := transferer.Download(namespace, blob.Digest)
require.NoError(err)
_, err = io.ReadAll(result)
require.NoError(err)

require.Equal(tc.wantMB, mbServedValue(mocks.stats))
})
}
}

// TestReadOnlyTransfererDownloadEmitsMBServedOnCacheHit documents that the
// mb_served counter increments on every Download call, including cached reads
// where no blob was actually fetched over the network.
func TestReadOnlyTransfererDownloadEmitsMBServedOnCacheHit(t *testing.T) {
require := require.New(t)

mocks, cleanup := newReadOnlyTransfererMocks(t)
defer cleanup()

transferer := mocks.new()

namespace := "docker/repo-bar:latest"
blob := core.SizedBlobFixture(2*memsize.MB, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

for i := 0; i < 3; i++ {
result, err := transferer.Download(namespace, blob.Digest)
require.NoError(err)
_, err = io.ReadAll(result)
require.NoError(err)
}

require.Equal(int64(6), mbServedValue(mocks.stats))
}

func TestReadOnlyTransfererStat(t *testing.T) {
require := require.New(t)

Expand Down
Loading