Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/api/internal/rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"net/http"
"net/url"
"path"
Expand Down Expand Up @@ -129,9 +130,7 @@ func (r *Request) WithURI(uri string) *Request {
if r.params == nil {
r.params = make(url.Values)
}
for k, v := range vals {
r.params[k] = v
}
maps.Copy(r.params, vals)
}
return r
}
Expand Down Expand Up @@ -210,7 +209,7 @@ func (r *Request) WithMaxRetries(maxRetries uint64) *Request {
// only supports two types now:
// 1. io.Reader
// 2. type which can be json marshalled
func (r *Request) WithBody(obj interface{}) *Request {
func (r *Request) WithBody(obj any) *Request {
if r.err != nil {
return r
}
Expand Down Expand Up @@ -406,7 +405,7 @@ func (r Result) Error() error {
}

// Into stores the http response body into obj.
func (r Result) Into(obj interface{}) error {
func (r Result) Into(obj any) error {
if r.err != nil {
return r.err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/internal/rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ func TestRequestDoContextTimeout(t *testing.T) {
}))
defer testServer.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

c, err := CDCRESTClientFromConfig(&Config{
Host: testServer.URL,
Expand Down
9 changes: 3 additions & 6 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func (br *MockReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs ui
}

func TestApply(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

checkpointTs := uint64(1000)
resolvedTs := uint64(2000)
Expand Down Expand Up @@ -303,8 +302,7 @@ func TestApply(t *testing.T) {
}

func TestApplyBigTxn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

checkpointTs := uint64(1000)
resolvedTs := uint64(2000)
Expand Down Expand Up @@ -479,8 +477,7 @@ func TestApplyBigTxn(t *testing.T) {
}

func TestApplyMeetSinkError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

port, err := freeport.GetFreePort()
require.Nil(t, err)
Expand Down
6 changes: 2 additions & 4 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@ func NewConflictDetector[Txn txnEvent](
ret.resolvedTxnCaches[i] = newTxnCache[Txn](opt)
}

ret.wg.Add(1)
go func() {
defer ret.wg.Done()
ret.wg.Go(func() {
ret.runBackgroundTasks()
}()
})

return ret
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/causality/internal/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Slots struct {
// NewSlots creates a new Slots.
func NewSlots(numSlots uint64) *Slots {
slots := make([]slot, numSlots)
for i := uint64(0); i < numSlots; i++ {
for i := range numSlots {
slots[i].nodes = make(map[uint64]*Node, 8)
}
return &Slots{
Expand Down
18 changes: 7 additions & 11 deletions pkg/causality/internal/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ func TestSlotsTrivial(t *testing.T) {
slots := NewSlots(8)
nodes := make([]*Node, 0, 1000)

for i := 0; i < count; i++ {
for range count {
node := newNodeForTest(1, 2, 3, 4, 5)
slots.Add(node)
nodes = append(nodes, node)
}

for i := 0; i < count; i++ {
for i := range count {
slots.Remove(nodes[i])
}

Expand All @@ -53,7 +53,7 @@ func TestSlotsConcurrentOps(t *testing.T) {
slots := NewSlots(8)
freeNodeChan := make(chan *Node, N)
inuseNodeChan := make(chan *Node, N)
for i := 0; i < N; i++ {
for range N {
freeNodeChan <- newNodeForTest(1, 9, 17, 25, 33)
}

Expand All @@ -62,9 +62,7 @@ func TestSlotsConcurrentOps(t *testing.T) {

// test concurrent add and remove won't panic
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for {
select {
case <-ctx.Done():
Expand All @@ -75,11 +73,9 @@ func TestSlotsConcurrentOps(t *testing.T) {
inuseNodeChan <- node
}
}
}()
})

wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for {
select {
case <-ctx.Done():
Expand All @@ -90,7 +86,7 @@ func TestSlotsConcurrentOps(t *testing.T) {
freeNodeChan <- newNodeForTest(1, 9, 17, 25, 33)
}
}
}()
})

wg.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/causality/tests/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newConflictTestDriver(
})

workers := make([]*workerForTest, 0, numWorkers)
for i := 0; i < numWorkers; i++ {
for i := range numWorkers {
id := int64(i)
workers = append(workers, newWorkerForTest(detector.GetOutChByCacheID(id)))
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/causality/tests/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ func newWorkerForTest(txnCh <-chan causality.TxnWithNotifier[*txnForTest]) *work
closeCh: make(chan struct{}),
}

ret.wg.Add(1)
go func() {
defer ret.wg.Done()
ret.wg.Go(func() {
ret.run(txnCh)
}()
})

return ret
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/causality/txn_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestBoundedWorker(t *testing.T) {
Size: size,
BlockStrategy: BlockStrategyWaitAvailable,
})
for i := 0; i < size; i++ {
for range size {
// Add 10 events to the worker.
ok := worker.add(TxnWithNotifier[txnEvent]{
TxnEvent: mockTxnEvent{},
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestBoundedWorkerWithBlock(t *testing.T) {
Size: size,
BlockStrategy: BlockStrategyWaitEmpty,
})
for i := 0; i < size; i++ {
for range size {
// Add 10 events to the worker.
ok := worker.add(TxnWithNotifier[txnEvent]{
TxnEvent: mockTxnEvent{},
Expand Down
Loading
Loading