Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions splitio/proxy/caching/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (c *CacheAwareSplitSynchronizer) SynchronizeSplits(till *int64) (*split.Upd
previousRB, _ := c.rbStorage.ChangeNumber()

result, err := c.wrapped.SynchronizeSplits(till)
if err != nil {
return nil, err
}
current, _ := c.splitStorage.ChangeNumber()
currentRB, _ := c.rbStorage.ChangeNumber()
if current > previous || (previous != -1 && current == -1) || currentRB > previousRB || (previousRB != -1 && currentRB == -1) {
Expand All @@ -71,6 +74,9 @@ func (c *CacheAwareSplitSynchronizer) SynchronizeFeatureFlags(ffChange *dtos.Spl
previousRB, _ := c.rbStorage.ChangeNumber()

result, err := c.wrapped.SynchronizeFeatureFlags(ffChange)
if err != nil {
return nil, err
}
current, _ := c.splitStorage.ChangeNumber()
currentRB, _ := c.rbStorage.ChangeNumber()
if current > previous || (previous != -1 && current == -1) || currentRB > previousRB || (previousRB != -1 && currentRB == -1) {
Expand Down Expand Up @@ -111,6 +117,9 @@ func NewCacheAwareSegmentSync(
func (c *CacheAwareSegmentSynchronizer) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) {
previous, _ := c.segmentStorage.ChangeNumber(name)
result, err := c.wrapped.SynchronizeSegment(name, till)
if err != nil {
return nil, err
}
if current := result.NewChangeNumber; current > previous || (previous != -1 && current == -1) {
c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(name))
c.cacheFlusher.EvictBySurrogate(MembershipsSurrogate)
Expand Down Expand Up @@ -139,6 +148,9 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegments() (map[string]segmen
}

results, err := c.wrapped.SynchronizeSegments()
if err != nil {
return nil, err
}
for segmentName := range results {
result := results[segmentName]
ccn := result.NewChangeNumber
Expand Down Expand Up @@ -197,6 +209,9 @@ func NewCacheAwareLargeSegmentSync(
func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string, till *int64) (*int64, error) {
previous := c.largeSegmentStorage.ChangeNumber(name)
newCN, err := c.wrapped.SynchronizeLargeSegment(name, till)
if err != nil {
return nil, err
}

c.evictByLargeSegmentSurrogate(previous, *newCN)

Expand All @@ -214,6 +229,9 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[str
}

results, err := c.wrapped.SynchronizeLargeSegments()
if err != nil {
return nil, err
}
for name, currentCN := range results {
c.evictByLargeSegmentSurrogate(previousCNs[name], *currentCN)
}
Expand All @@ -228,6 +246,9 @@ func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool {
func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) {
previous := c.largeSegmentStorage.ChangeNumber(lsRFDResponseDTO.Name)
newCN, err := c.wrapped.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO)
if err != nil {
return nil, err
}

c.evictByLargeSegmentSurrogate(previous, *newCN)

Expand Down
205 changes: 205 additions & 0 deletions splitio/proxy/caching/workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,35 @@ func TestCacheAwareSplitSyncChanges(t *testing.T) {
storageMock.AssertExpectations(t)
}

func TestCacheAwareSplitSyncError(t *testing.T) {
var splitSyncMock mocks.SplitUpdaterMock
expectedErr := assert.AnError
splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), expectedErr).Once()

var rbsStorage commons.MockRuleBasedSegmentStorage
rbsStorage.On("ChangeNumber").Return(int64(-1), error(nil))

var cacheFlusherMock mocks.CacheFlusherMock

var storageMock mocks.SplitStorageMock
storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once()

css := CacheAwareSplitSynchronizer{
splitStorage: &storageMock,
wrapped: &splitSyncMock,
rbStorage: &rbsStorage,
cacheFlusher: &cacheFlusherMock,
}

res, err := css.SynchronizeSplits(nil)
assert.Equal(t, expectedErr, err)
assert.Nil(t, res)

splitSyncMock.AssertExpectations(t)
cacheFlusherMock.AssertExpectations(t)
storageMock.AssertExpectations(t)
}

func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) {

// This test is used to test the new method. Eventually commons should be cleaned in order to have a single method for split-synchronization.
Expand Down Expand Up @@ -119,6 +148,35 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) {
storageMock.AssertExpectations(t)
}

func TestCacheAwareSplitSyncFeatureFlagsError(t *testing.T) {
var splitSyncMock mocks.SplitUpdaterMock
expectedErr := assert.AnError
splitSyncMock.On("SynchronizeFeatureFlags", (*dtos.SplitChangeUpdate)(nil)).Return((*split.UpdateResult)(nil), expectedErr).Once()

var rbsStorage commons.MockRuleBasedSegmentStorage
rbsStorage.On("ChangeNumber").Return(int64(-1), error(nil))

var cacheFlusherMock mocks.CacheFlusherMock

var storageMock mocks.SplitStorageMock
storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once()

css := CacheAwareSplitSynchronizer{
splitStorage: &storageMock,
rbStorage: &rbsStorage,
wrapped: &splitSyncMock,
cacheFlusher: &cacheFlusherMock,
}

res, err := css.SynchronizeFeatureFlags(nil)
assert.Equal(t, expectedErr, err)
assert.Nil(t, res)

splitSyncMock.AssertExpectations(t)
cacheFlusherMock.AssertExpectations(t)
storageMock.AssertExpectations(t)
}

func TestCacheAwareSegmentSyncNoChanges(t *testing.T) {
var segmentUpdater mocks.SegmentUpdaterMock
segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{}, nil).Once()
Expand Down Expand Up @@ -190,6 +248,33 @@ func TestCacheAwareSegmentSyncSingle(t *testing.T) {
cacheFlusher.AssertExpectations(t)
}

func TestCacheAwareSegmentSyncSingleError(t *testing.T) {
var segmentUpdater mocks.SegmentUpdaterMock
expectedErr := assert.AnError
segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return((*segment.UpdateResult)(nil), expectedErr).Once()

var splitStorage mocks.SplitStorageMock
var cacheFlusher mocks.CacheFlusherMock
var segmentStorage mocks.SegmentStorageMock
segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once()

css := CacheAwareSegmentSynchronizer{
splitStorage: &splitStorage,
segmentStorage: &segmentStorage,
wrapped: &segmentUpdater,
cacheFlusher: &cacheFlusher,
}

res, err := css.SynchronizeSegment("segment1", nil)
assert.Equal(t, expectedErr, err)
assert.Nil(t, res)

segmentUpdater.AssertExpectations(t)
segmentStorage.AssertExpectations(t)
splitStorage.AssertExpectations(t)
cacheFlusher.AssertExpectations(t)
}

func TestCacheAwareSegmentSyncAllSegments(t *testing.T) {
var segmentUpdater mocks.SegmentUpdaterMock
segmentUpdater.On("SynchronizeSegments").Return(map[string]segment.UpdateResult{"segment2": {
Expand Down Expand Up @@ -250,6 +335,36 @@ func TestCacheAwareSegmentSyncAllSegments(t *testing.T) {
cacheFlusher.AssertExpectations(t)
}

func TestCacheAwareSegmentSyncAllSegmentsError(t *testing.T) {
var segmentUpdater mocks.SegmentUpdaterMock
expectedErr := assert.AnError
segmentUpdater.On("SynchronizeSegments").Return((map[string]segment.UpdateResult)(nil), expectedErr).Once()

var splitStorage mocks.SplitStorageMock
splitStorage.On("SegmentNames").Return(set.NewSet("segment2")).Once()

var cacheFlusher mocks.CacheFlusherMock

var segmentStorage mocks.SegmentStorageMock
segmentStorage.On("ChangeNumber", "segment2").Return(int64(0), nil).Once()

css := CacheAwareSegmentSynchronizer{
splitStorage: &splitStorage,
segmentStorage: &segmentStorage,
wrapped: &segmentUpdater,
cacheFlusher: &cacheFlusher,
}

res, err := css.SynchronizeSegments()
assert.Equal(t, expectedErr, err)
assert.Nil(t, res)

segmentUpdater.AssertExpectations(t)
segmentStorage.AssertExpectations(t)
splitStorage.AssertExpectations(t)
cacheFlusher.AssertExpectations(t)
}

// CacheAwareLargeSegmentSynchronizer
func TestSynchronizeLargeSegment(t *testing.T) {
lsName := "largeSegment1"
Expand Down Expand Up @@ -366,3 +481,93 @@ func TestSynchronizeLargeSegments(t *testing.T) {
largeSegmentStorage.AssertExpectations(t)
lsUpdater.AssertExpectations(t)
}

func TestSynchronizeLargeSegmentError(t *testing.T) {
lsName := "largeSegment1"

var splitStorage mocks.SplitStorageMock
var cacheFlusher mocks.CacheFlusherMock

var largeSegmentStorage mocks.LargeSegmentStorageMock
largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(-1)).Once()

var lsUpdater mocks.LargeSegmentUpdaterMock
expectedErr := assert.AnError
lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return((*int64)(nil), expectedErr).Once()

clsSync := CacheAwareLargeSegmentSynchronizer{
wrapped: &lsUpdater,
cacheFlusher: &cacheFlusher,
largeSegmentStorage: &largeSegmentStorage,
splitStorage: &splitStorage,
}

cn, err := clsSync.SynchronizeLargeSegment(lsName, nil)
assert.Equal(t, expectedErr, err)
assert.Nil(t, cn)

cacheFlusher.AssertExpectations(t)
largeSegmentStorage.AssertExpectations(t)
lsUpdater.AssertExpectations(t)
}

func TestSynchronizeLargeSegmentsError(t *testing.T) {
var splitStorage mocks.SplitStorageMock
splitStorage.On("LargeSegmentNames").Return(set.NewSet("ls1", "ls2"))

var cacheFlusher mocks.CacheFlusherMock

var largeSegmentStorage mocks.LargeSegmentStorageMock
largeSegmentStorage.On("ChangeNumber", "ls1").Return(int64(50)).Once()
largeSegmentStorage.On("ChangeNumber", "ls2").Return(int64(150)).Once()

var lsUpdater mocks.LargeSegmentUpdaterMock
expectedErr := assert.AnError
lsUpdater.On("SynchronizeLargeSegments").Return((map[string]*int64)(nil), expectedErr).Once()

clsSync := CacheAwareLargeSegmentSynchronizer{
wrapped: &lsUpdater,
cacheFlusher: &cacheFlusher,
largeSegmentStorage: &largeSegmentStorage,
splitStorage: &splitStorage,
}

cn, err := clsSync.SynchronizeLargeSegments()
assert.Equal(t, expectedErr, err)
assert.Nil(t, cn)

splitStorage.AssertExpectations(t)
cacheFlusher.AssertExpectations(t)
largeSegmentStorage.AssertExpectations(t)
lsUpdater.AssertExpectations(t)
}

func TestSynchronizeLargeSegmentUpdateError(t *testing.T) {
lsName := "largeSegment1"
lsRFDResponseDTO := &dtos.LargeSegmentRFDResponseDTO{Name: lsName}

var splitStorage mocks.SplitStorageMock
var cacheFlusher mocks.CacheFlusherMock

var largeSegmentStorage mocks.LargeSegmentStorageMock
largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(-1)).Once()

var lsUpdater mocks.LargeSegmentUpdaterMock
expectedErr := assert.AnError
lsUpdater.On("SynchronizeLargeSegmentUpdate", lsRFDResponseDTO).Return((*int64)(nil), expectedErr).Once()

clsSync := CacheAwareLargeSegmentSynchronizer{
wrapped: &lsUpdater,
cacheFlusher: &cacheFlusher,
largeSegmentStorage: &largeSegmentStorage,
splitStorage: &splitStorage,
}

cn, err := clsSync.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO)
assert.Equal(t, expectedErr, err)
assert.Nil(t, cn)

cacheFlusher.AssertExpectations(t)
largeSegmentStorage.AssertExpectations(t)
lsUpdater.AssertExpectations(t)
}
17 changes: 9 additions & 8 deletions splitio/proxy/controllers/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ func (c *SdkServerController) MySegments(ctx *gin.Context) {
func (c *SdkServerController) fetchRulesSince(since int64, rbsince int64, sets []string) (*dtos.RuleChangesDTO, error) {
splits, err := c.proxySplitStorage.ChangesSince(since, sets)
rbs, rbsErr := c.proxyRBSegmentStorage.ChangesSince(rbsince)
if err != nil && !errors.Is(err, storage.ErrSinceParamTooOld) {
return nil, fmt.Errorf("unexpected error fetching feature flag changes from storage: %w", err)
}

if rbsErr != nil && !errors.Is(rbsErr, storage.ErrSinceParamTooOld) {
return nil, fmt.Errorf("unexpected error fetching rule-based segments changes from storage: %w", rbsErr)
}

if err == nil && rbsErr == nil {
return &dtos.RuleChangesDTO{
FeatureFlags: dtos.FeatureFlagsDTO{
Expand All @@ -216,14 +224,7 @@ func (c *SdkServerController) fetchRulesSince(since int64, rbsince int64, sets [
Since: splits.Since,
},
RuleBasedSegments: *rbs,
}, err
}
if err != nil && !errors.Is(err, storage.ErrSinceParamTooOld) {
return nil, fmt.Errorf("unexpected error fetching feature flag changes from storage: %w", err)
}

if rbsErr != nil && !errors.Is(rbsErr, storage.ErrSinceParamTooOld) {
return nil, fmt.Errorf("unexpected error fetching rule-based segments changes from storage: %w", rbsErr)
}, nil
}

// perform a fetch to the BE using the supplied `since`, have the storage process it's response &, retry
Expand Down
13 changes: 12 additions & 1 deletion splitio/proxy/storage/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,18 @@ func (s *ProxySegmentStorageImpl) Update(name string, toAdd *set.ThreadUnsafeSet
return nil
}

return fmt.Errorf("errors updating cache: %s || errors updating db: %s", errCache.Error(), errDB.Error())
var cacheErrMsg, dbErrMsg string
if errCache != nil {
cacheErrMsg = errCache.Error()
} else {
cacheErrMsg = "nil"
}
if errDB != nil {
dbErrMsg = errDB.Error()
} else {
dbErrMsg = "nil"
}
return fmt.Errorf("errors updating cache: %s || errors updating db: %s", cacheErrMsg, dbErrMsg)
}

// CountRemovedKeys method
Expand Down
Loading