[feature] log worker startup counts (#2958)
* log number of each worker kinds started, and log when stopped * remove worker debug logging * whoops, fix the count of media workers
This commit is contained in:
parent
6ed6824d5d
commit
f17dd62ff5
|
@ -26,7 +26,6 @@ import (
|
||||||
"codeberg.org/gruf/go-structr"
|
"codeberg.org/gruf/go-structr"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
|
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/queue"
|
"github.com/superseriousbusiness/gotosocial/internal/queue"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/util"
|
"github.com/superseriousbusiness/gotosocial/internal/util"
|
||||||
)
|
)
|
||||||
|
@ -181,8 +180,6 @@ func (w *Worker) run(ctx context.Context) {
|
||||||
if w.Client == nil || w.Queue == nil {
|
if w.Client == nil || w.Queue == nil {
|
||||||
panic("not yet initialized")
|
panic("not yet initialized")
|
||||||
}
|
}
|
||||||
log.Debugf(ctx, "%p: starting worker", w)
|
|
||||||
defer log.Debugf(ctx, "%p: stopped worker", w)
|
|
||||||
util.Must(func() { w.process(ctx) })
|
util.Must(func() { w.process(ctx) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -127,8 +127,6 @@ func (w *MsgWorker[T]) run(ctx context.Context) {
|
||||||
if w.Process == nil || w.Queue == nil {
|
if w.Process == nil || w.Queue == nil {
|
||||||
panic("not yet initialized")
|
panic("not yet initialized")
|
||||||
}
|
}
|
||||||
log.Debugf(ctx, "%p: starting worker", w)
|
|
||||||
defer log.Debugf(ctx, "%p: stopped worker", w)
|
|
||||||
util.Must(func() { w.process(ctx) })
|
util.Must(func() { w.process(ctx) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/config"
|
"github.com/superseriousbusiness/gotosocial/internal/config"
|
||||||
|
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/messages"
|
"github.com/superseriousbusiness/gotosocial/internal/messages"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
|
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
|
||||||
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
|
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
|
||||||
|
@ -59,26 +60,54 @@ type Workers struct {
|
||||||
// StartScheduler starts the job scheduler.
|
// StartScheduler starts the job scheduler.
|
||||||
func (w *Workers) StartScheduler() {
|
func (w *Workers) StartScheduler() {
|
||||||
_ = w.Scheduler.Start() // false = already running
|
_ = w.Scheduler.Start() // false = already running
|
||||||
|
log.Info(nil, "started scheduler")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start will start contained worker pools.
|
// Start will start contained worker pools.
|
||||||
func (w *Workers) Start() {
|
func (w *Workers) Start() {
|
||||||
|
var n int
|
||||||
|
|
||||||
maxprocs := runtime.GOMAXPROCS(0)
|
maxprocs := runtime.GOMAXPROCS(0)
|
||||||
w.Delivery.Start(deliveryWorkers(maxprocs))
|
|
||||||
w.Client.Start(4 * maxprocs)
|
n = deliveryWorkers(maxprocs)
|
||||||
w.Federator.Start(4 * maxprocs)
|
w.Delivery.Start(n)
|
||||||
w.Dereference.Start(4 * maxprocs)
|
log.Infof(nil, "started %d delivery workers", n)
|
||||||
w.Media.Start(8 * maxprocs)
|
|
||||||
|
n = 4 * maxprocs
|
||||||
|
w.Client.Start(n)
|
||||||
|
log.Infof(nil, "started %d client workers", n)
|
||||||
|
|
||||||
|
n = 4 * maxprocs
|
||||||
|
w.Federator.Start(n)
|
||||||
|
log.Infof(nil, "started %d federator workers", n)
|
||||||
|
|
||||||
|
n = 4 * maxprocs
|
||||||
|
w.Dereference.Start(n)
|
||||||
|
log.Infof(nil, "started %d dereference workers", n)
|
||||||
|
|
||||||
|
n = 8 * maxprocs
|
||||||
|
w.Media.Start(n)
|
||||||
|
log.Infof(nil, "started %d media workers", n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop will stop all of the contained worker pools (and global scheduler).
|
// Stop will stop all of the contained worker pools (and global scheduler).
|
||||||
func (w *Workers) Stop() {
|
func (w *Workers) Stop() {
|
||||||
_ = w.Scheduler.Stop() // false = not running
|
_ = w.Scheduler.Stop() // false = not running
|
||||||
|
|
||||||
w.Delivery.Stop()
|
w.Delivery.Stop()
|
||||||
|
log.Info(nil, "stopped delivery workers")
|
||||||
|
|
||||||
w.Client.Stop()
|
w.Client.Stop()
|
||||||
|
log.Info(nil, "stopped client workers")
|
||||||
|
|
||||||
w.Federator.Stop()
|
w.Federator.Stop()
|
||||||
|
log.Info(nil, "stopped federator workers")
|
||||||
|
|
||||||
w.Dereference.Stop()
|
w.Dereference.Stop()
|
||||||
|
log.Info(nil, "stopped dereference workers")
|
||||||
|
|
||||||
w.Media.Stop()
|
w.Media.Stop()
|
||||||
|
log.Info(nil, "stopped media workers")
|
||||||
}
|
}
|
||||||
|
|
||||||
// nocopy when embedded will signal linter to
|
// nocopy when embedded will signal linter to
|
||||||
|
|
Loading…
Reference in New Issue