mirror of
1
Fork 0

Prevent deadlocks in persistable channel pause test (#18410)

* Prevent deadlocks in persistable channel pause test

Because of reuse of the old paused/resumed channels in this test there
was a potential for deadlock. This PR ensures that the channels are always
reobtained.

It further adds some control code to detect hangs in future - and it
ensures that the pausing warning is not shown on shutdown.

Signed-off-by: Andrew Thornton <art27@cantab.net>

* do not warn but do pause

Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
zeripath 2022-01-25 23:09:57 +00:00 committed by GitHub
parent b53fd5ff90
commit 713985b1a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 6 deletions

View File

@ -287,11 +287,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2) assert.Nil(t, result2)
pausable.Resume() pausable.Resume()
paused, resumed = pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should be resumed")
return
case <-resumed: case <-resumed:
default: default:
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
return
} }
select { select {
@ -345,16 +350,22 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
pausable.Resume() pausable.Resume()
paused, resumed = pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed: case <-resumed:
default: default:
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
return
} }
select { select {
case result1 = <-handleChan: case result1 = <-handleChan:
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
assert.Fail(t, "handler chan should contain test1") assert.Fail(t, "handler chan should contain test1")
return
} }
assert.Equal(t, test1.TestString, result1.TestString) assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt) assert.Equal(t, test1.TestInt, result1.TestInt)
@ -369,7 +380,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
} }
// Wait til it is closed // Wait til it is closed
<-queue.(*PersistableChannelQueue).closed select {
case <-queue.(*PersistableChannelQueue).closed:
case <-time.After(5 * time.Second):
assert.Fail(t, "queue should close")
return
}
err = queue.Push(&test1) err = queue.Push(&test1)
assert.NoError(t, err) assert.NoError(t, err)
@ -378,6 +394,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select { select {
case <-handleChan: case <-handleChan:
assert.Fail(t, "Handler processing should have stopped") assert.Fail(t, "Handler processing should have stopped")
return
default: default:
} }
@ -393,6 +410,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select { select {
case <-handleChan: case <-handleChan:
assert.Fail(t, "Handler processing should have stopped") assert.Fail(t, "Handler processing should have stopped")
return
default: default:
} }
@ -431,6 +449,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select { select {
case <-handleChan: case <-handleChan:
assert.Fail(t, "Handler processing should have stopped") assert.Fail(t, "Handler processing should have stopped")
return
case <-paused: case <-paused:
} }
@ -449,13 +468,36 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select { select {
case <-handleChan: case <-handleChan:
assert.Fail(t, "Handler processing should have stopped") assert.Fail(t, "Handler processing should have stopped")
return
default: default:
} }
pausable.Resume() pausable.Resume()
paused, resumed = pausable.IsPausedIsResumed()
select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed:
default:
assert.Fail(t, "Queue should be resumed")
return
}
result3 := <-handleChan var result3, result4 *testData
result4 := <-handleChan
select {
case result3 = <-handleChan:
case <-time.After(1 * time.Second):
assert.Fail(t, "Handler processing should have resumed")
return
}
select {
case result4 = <-handleChan:
case <-time.After(1 * time.Second):
assert.Fail(t, "Handler processing should have resumed")
return
}
if result4.TestString == test1.TestString { if result4.TestString == test1.TestString {
result3, result4 = result4, result3 result3, result4 = result4, result3
} }

View File

@ -301,9 +301,14 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
cancel() cancel()
} }
if p.hasNoWorkerScaling() { if p.hasNoWorkerScaling() {
log.Warn( select {
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ case <-p.baseCtx.Done():
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) // Don't warn if the baseCtx is shutdown
default:
log.Warn(
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
}
p.pause() p.pause()
} }
p.lock.Unlock() p.lock.Unlock()