Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions agent/agentserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
Expand All @@ -38,13 +40,29 @@ import (
mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler"
mockannounceclient "github.com/uber/kraken/mocks/tracker/announceclient"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/memsize"
"github.com/uber/kraken/utils/testutil"

"github.com/go-chi/chi"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
)

// failingResponseWriter always errors on Write, simulating a client that
// disconnected after the response headers were sent.
type failingResponseWriter struct{}

func (failingResponseWriter) Header() http.Header { return http.Header{} }

func (failingResponseWriter) Write([]byte) (int, error) {
return 0, errors.New("write failed: client disconnected")
}

func (failingResponseWriter) WriteHeader(int) {}

var _ http.ResponseWriter = failingResponseWriter{}

type serverMocks struct {
cads *store.CADownloadStore
sched *mockscheduler.MockReloadableScheduler
Expand All @@ -53,6 +71,7 @@ type serverMocks struct {
containerdCli *mockcontainerd.MockClient
ac *mockannounceclient.MockClient
containerRuntime *mockcontainerruntime.MockFactory
stats tally.TestScope
cleanup *testutil.Cleanup
}

Expand All @@ -73,19 +92,30 @@ func newServerMocks(t *testing.T) (*serverMocks, func()) {
containerdCli := mockcontainerd.NewMockClient(ctrl)
ac := mockannounceclient.NewMockClient(ctrl)
containerruntime := mockcontainerruntime.NewMockFactory(ctrl)
stats := tally.NewTestScope("", nil)
return &serverMocks{
cads, sched, tags, dockerCli, containerdCli, ac,
containerruntime, &cleanup,
containerruntime, stats, &cleanup,
}, cleanup.Run
}

func (m *serverMocks) startServer(c Config) (*Server, string) {
s := New(c, tally.NoopScope, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
s := New(c, m.stats, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
addr, stop := testutil.StartServer(s.Handler())
m.cleanup.Add(stop)
return s, addr
}

// mbServedValue returns the "mb_served" counter value from the scope.
func mbServedValue(scope tally.TestScope) int64 {
for _, c := range scope.Snapshot().Counters() {
if c.Name() == "mb_served" {
return c.Value()
}
}
return 0
}

func TestGetTag(t *testing.T) {
require := require.New(t)

Expand Down Expand Up @@ -146,6 +176,74 @@ func TestDownload(t *testing.T) {
require.Equal(string(blob.Content), string(result))
}

func TestDownloadEmitsMBServed(t *testing.T) {
for _, tc := range []struct {
desc string
blobSize uint64
wantMB int64
}{
{"large blob (2 MiB)", 2 * memsize.MB, 2},
{"exact 1 MiB blob", memsize.MB, 1},
{"sub-MiB blob truncates to 0", 256 * memsize.KB, 0},
} {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

namespace := core.TagFixture()
blob := core.SizedBlobFixture(tc.blobSize, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

r, err := c.Download(namespace, blob.Digest)
require.NoError(err)
_, err = io.ReadAll(r)
require.NoError(err)

require.Equal(tc.wantMB, mbServedValue(mocks.stats))
})
}
}

func TestDownloadEmitsMBServedEvenWhenCopyFails(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

namespace := core.TagFixture()
blob := core.SizedBlobFixture(2*memsize.MB, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

s := New(
Config{}, mocks.stats, mocks.cads, mocks.sched, mocks.tags,
mocks.ac, mocks.containerRuntime)

rctx := chi.NewRouteContext()
rctx.URLParams.Add("namespace", namespace)
rctx.URLParams.Add("digest", blob.Digest.String())
req := httptest.NewRequest(http.MethodGet, "/", nil).
WithContext(context.WithValue(context.Background(), chi.RouteCtxKey, rctx))

err := s.downloadBlobHandler(failingResponseWriter{}, req)
require.Error(err)
require.Contains(err.Error(), "copy file")

require.Equal(int64(2), mbServedValue(mocks.stats))
}

func TestDownloadNotFound(t *testing.T) {
require := require.New(t)

Expand Down
83 changes: 81 additions & 2 deletions lib/dockerregistry/transfer/ro_transferer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/kraken/lib/store"
mocktagclient "github.com/uber/kraken/mocks/build-index/tagclient"
mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler"
"github.com/uber/kraken/utils/memsize"
"github.com/uber/kraken/utils/testutil"

"github.com/golang/mock/gomock"
Expand All @@ -37,6 +38,7 @@ type agentTransfererMocks struct {
cads *store.CADownloadStore
tags *mocktagclient.MockClient
sched *mockscheduler.MockScheduler
stats tally.TestScope
}

func newReadOnlyTransfererMocks(t *testing.T) (*agentTransfererMocks, func()) {
Expand All @@ -52,11 +54,23 @@ func newReadOnlyTransfererMocks(t *testing.T) (*agentTransfererMocks, func()) {

sched := mockscheduler.NewMockScheduler(ctrl)

return &agentTransfererMocks{cads, tags, sched}, cleanup.Run
stats := tally.NewTestScope("", nil)

return &agentTransfererMocks{cads, tags, sched, stats}, cleanup.Run
}

func (m *agentTransfererMocks) new() *ReadOnlyTransferer {
return NewReadOnlyTransferer(tally.NoopScope, m.cads, m.tags, m.sched)
return NewReadOnlyTransferer(m.stats, m.cads, m.tags, m.sched)
}

// mbServedValue returns the "mb_served" counter value from the scope.
func mbServedValue(scope tally.TestScope) int64 {
for _, c := range scope.Snapshot().Counters() {
if c.Name() == "mb_served" {
return c.Value()
}
}
return 0
}

func TestReadOnlyTransfererDownloadCachesBlob(t *testing.T) {
Expand Down Expand Up @@ -86,6 +100,71 @@ func TestReadOnlyTransfererDownloadCachesBlob(t *testing.T) {
}
}

func TestReadOnlyTransfererDownloadEmitsMBServed(t *testing.T) {
for _, tc := range []struct {
desc string
blobSize uint64
wantMB int64
}{
{"large blob (2 MiB)", 2 * memsize.MB, 2},
{"exact 1 MiB blob", memsize.MB, 1},
{"sub-MiB blob truncates to 0", 256 * memsize.KB, 0},
} {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)

mocks, cleanup := newReadOnlyTransfererMocks(t)
defer cleanup()

transferer := mocks.new()

namespace := "docker/repo-bar:latest"
blob := core.SizedBlobFixture(tc.blobSize, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

result, err := transferer.Download(namespace, blob.Digest)
require.NoError(err)
_, err = io.ReadAll(result)
require.NoError(err)

require.Equal(tc.wantMB, mbServedValue(mocks.stats))
})
}
}

// TestReadOnlyTransfererDownloadEmitsMBServedOnCacheHit documents that the
// mb_served counter increments on every Download call, including cached reads
// where no blob was actually fetched over the network.
func TestReadOnlyTransfererDownloadEmitsMBServedOnCacheHit(t *testing.T) {
require := require.New(t)

mocks, cleanup := newReadOnlyTransfererMocks(t)
defer cleanup()

transferer := mocks.new()

namespace := "docker/repo-bar:latest"
blob := core.SizedBlobFixture(2*memsize.MB, 64)

mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
func(namespace string, d core.Digest) error {
return store.RunDownload(mocks.cads, d, blob.Content)
})

for i := 0; i < 3; i++ {
result, err := transferer.Download(namespace, blob.Digest)
require.NoError(err)
_, err = io.ReadAll(result)
require.NoError(err)
}

require.Equal(int64(6), mbServedValue(mocks.stats))
}

func TestReadOnlyTransfererStat(t *testing.T) {
require := require.New(t)

Expand Down
Loading