From 32e570abfd72ff8fdb44088f136b956a37f569b2 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Wed, 29 May 2024 11:21:04 +0000 Subject: [PATCH] [chore] improved startup / shutdown (#2925) * improved server shutdown with more precise shutdown of modules + deferring of ALL of it * make the same changes to the testrig server * use testrig specific func * update variable name to fix nilptr * fix removal of setting db on state --- cmd/gotosocial/action/server/server.go | 145 +++++++++++++++-------- cmd/gotosocial/action/testrig/testrig.go | 122 +++++++++++++------ internal/gotosocial/gotosocial.go | 66 ----------- internal/router/router.go | 20 ++-- 4 files changed, 194 insertions(+), 159 deletions(-) delete mode 100644 internal/gotosocial/gotosocial.go diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 87184f5b1..9fd3f66fe 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -48,7 +48,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/email" "github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/federation/federatingdb" - "github.com/superseriousbusiness/gotosocial/internal/gotosocial" "github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/media" @@ -69,59 +68,107 @@ import ( // Start creates and starts a gotosocial server var Start action.GTSAction = func(ctx context.Context) error { if _, err := maxprocs.Set(maxprocs.Logger(nil)); err != nil { - log.Infof(ctx, "could not set CPU limits from cgroup: %s", err) + log.Warnf(ctx, "could not set CPU limits from cgroup: %s", err) } - var state state.State + var ( + // Define necessary core variables + // before anything so we can prepare + // defer function for safe shutdown + // depending on what services were + // managed to be started. - // Initialize caches - state.Caches.Init() - state.Caches.Start() - defer state.Caches.Stop() + state state.State + route *router.Router + ) - // Initialize Tracing + defer func() { + // Stop caches with + // background tasks. + state.Caches.Stop() + + if route != nil { + // We reached a point where the API router + // was created + setup. Ensure it gets stopped + // first to stop processing new information. + if err := route.Stop(); err != nil { + log.Errorf(ctx, "error stopping router: %v", err) + } + } + + // Stop any currently running + // worker processes / scheduled + // tasks from being executed. + state.Workers.Stop() + + if state.Timelines.Home != nil { + // Home timeline mgr was setup, ensure it gets stopped. + if err := state.Timelines.Home.Stop(); err != nil { + log.Errorf(ctx, "error stopping home timeline: %v", err) + } + } + + if state.Timelines.List != nil { + // List timeline mgr was setup, ensure it gets stopped. + if err := state.Timelines.List.Stop(); err != nil { + log.Errorf(ctx, "error stopping list timeline: %v", err) + } + } + + if state.DB != nil { + // Lastly, if database service was started, + // ensure it gets closed now all else stopped. + if err := state.DB.Close(); err != nil { + log.Errorf(ctx, "error stopping database: %v", err) + } + } + + // Finally reached end of shutdown. + log.Info(ctx, "done! exiting...") + }() + + // Initialize tracing (noop if not enabled). if err := tracing.Initialize(); err != nil { return fmt.Errorf("error initializing tracing: %w", err) } - // Open connection to the database + // Initialize caches + state.Caches.Init() + state.Caches.Start() + + // Open connection to the database now caches started. dbService, err := bundb.NewBunDBService(ctx, &state) if err != nil { return fmt.Errorf("error creating dbservice: %s", err) } - // Set the state DB connection + // Set DB on state. state.DB = dbService + // Ensure necessary database instance prerequisites exist. if err := dbService.CreateInstanceAccount(ctx); err != nil { return fmt.Errorf("error creating instance account: %s", err) } - if err := dbService.CreateInstanceInstance(ctx); err != nil { return fmt.Errorf("error creating instance instance: %s", err) } - if err := dbService.CreateInstanceApplication(ctx); err != nil { return fmt.Errorf("error creating instance application: %s", err) } - // Get the instance account - // (we'll need this later). + // Get the instance account (we'll need this later). instanceAccount, err := dbService.GetInstanceAccount(ctx, "") if err != nil { return fmt.Errorf("error retrieving instance account: %w", err) } - // Open the storage backend - storage, err := gtsstorage.AutoConfig() + // Open the storage backend according to config. + state.Storage, err = gtsstorage.AutoConfig() if err != nil { - return fmt.Errorf("error creating storage backend: %w", err) + return fmt.Errorf("error opening storage backend: %w", err) } - // Set the state storage driver - state.Storage = storage - - // Build HTTP client + // Prepare wrapped httpclient with config. client := httpclient.New(httpclient.Config{ AllowRanges: config.MustParseIPPrefixes(config.GetHTTPClientAllowIPs()), BlockRanges: config.MustParseIPPrefixes(config.GetHTTPClientBlockIPs()), @@ -156,7 +203,7 @@ var Start action.GTSAction = func(ctx context.Context) error { } } - // Initialize timelines. + // Initialize both home / list timelines. state.Timelines.Home = timeline.NewManager( tlprocessor.HomeTimelineGrab(&state), tlprocessor.HomeTimelineFilter(&state, visFilter), @@ -166,7 +213,6 @@ var Start action.GTSAction = func(ctx context.Context) error { if err := state.Timelines.Home.Start(); err != nil { return fmt.Errorf("error starting home timeline: %s", err) } - state.Timelines.List = timeline.NewManager( tlprocessor.ListTimelineGrab(&state), tlprocessor.ListTimelineFilter(&state, visFilter), @@ -196,6 +242,11 @@ var Start action.GTSAction = func(ctx context.Context) error { // Create background cleaner. cleaner := cleaner.New(&state) + // Now schedule background cleaning tasks. + if err := cleaner.ScheduleJobs(); err != nil { + return fmt.Errorf("error scheduling cleaner jobs: %w", err) + } + // Create the processor using all the // other services we've created so far. processor := processing.NewProcessor( @@ -208,18 +259,17 @@ var Start action.GTSAction = func(ctx context.Context) error { emailSender, ) - // Initialize the specialized workers. + // Initialize the specialized workers pools. state.Workers.Client.Init(messages.ClientMsgIndices()) state.Workers.Federator.Init(messages.FederatorMsgIndices()) state.Workers.Delivery.Init(client) state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI - // Initialize workers. + // Now start workers! state.Workers.Start() - defer state.Workers.Stop() - // Schedule tasks for all existing poll expiries. + // Schedule notif tasks for all existing poll expiries. if err := processor.Polls().ScheduleAll(ctx); err != nil { return fmt.Errorf("error scheduling poll expiries: %w", err) } @@ -233,7 +283,7 @@ var Start action.GTSAction = func(ctx context.Context) error { HTTP router initialization */ - router, err := router.New(ctx) + route, err = router.New(ctx) if err != nil { return fmt.Errorf("error creating router: %s", err) } @@ -288,10 +338,10 @@ var Start action.GTSAction = func(ctx context.Context) error { middlewares = append(middlewares, middleware.ContentSecurityPolicy(cspExtraURIs...)) // attach global middlewares which are used for every request - router.AttachGlobalMiddleware(middlewares...) + route.AttachGlobalMiddleware(middlewares...) // attach global no route / 404 handler to the router - router.AttachNoRouteHandler(func(c *gin.Context) { + route.AttachNoRouteHandler(func(c *gin.Context) { apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), processor.InstanceGetV1) }) @@ -347,22 +397,21 @@ var Start action.GTSAction = func(ctx context.Context) error { // these should be routed in order; // apply throttling *after* rate limiting - authModule.Route(router, clLimit, clThrottle, gzip) - clientModule.Route(router, clLimit, clThrottle, gzip) - metricsModule.Route(router, clLimit, clThrottle, gzip) - healthModule.Route(router, clLimit, clThrottle) - fileserverModule.Route(router, fsMainLimit, fsThrottle) - fileserverModule.RouteEmojis(router, instanceAccount.ID, fsEmojiLimit, fsThrottle) - wellKnownModule.Route(router, gzip, s2sLimit, s2sThrottle) - nodeInfoModule.Route(router, s2sLimit, s2sThrottle, gzip) - activityPubModule.Route(router, s2sLimit, s2sThrottle, gzip) - activityPubModule.RoutePublicKey(router, s2sLimit, pkThrottle, gzip) - webModule.Route(router, fsMainLimit, fsThrottle, gzip) + authModule.Route(route, clLimit, clThrottle, gzip) + clientModule.Route(route, clLimit, clThrottle, gzip) + metricsModule.Route(route, clLimit, clThrottle, gzip) + healthModule.Route(route, clLimit, clThrottle) + fileserverModule.Route(route, fsMainLimit, fsThrottle) + fileserverModule.RouteEmojis(route, instanceAccount.ID, fsEmojiLimit, fsThrottle) + wellKnownModule.Route(route, gzip, s2sLimit, s2sThrottle) + nodeInfoModule.Route(route, s2sLimit, s2sThrottle, gzip) + activityPubModule.Route(route, s2sLimit, s2sThrottle, gzip) + activityPubModule.RoutePublicKey(route, s2sLimit, pkThrottle, gzip) + webModule.Route(route, fsMainLimit, fsThrottle, gzip) - // Start the GoToSocial server. - server := gotosocial.NewServer(dbService, router, cleaner) - if err := server.Start(ctx); err != nil { - return fmt.Errorf("error starting gotosocial service: %s", err) + // Finally start the main http server! + if err := route.Start(); err != nil { + return fmt.Errorf("error starting router: %w", err) } // catch shutdown signals from the operating system @@ -371,11 +420,5 @@ var Start action.GTSAction = func(ctx context.Context) error { sig := <-sigs // block until signal received log.Infof(ctx, "received signal %s, shutting down", sig) - // close down all running services in order - if err := server.Stop(ctx); err != nil { - return fmt.Errorf("error closing gotosocial service: %s", err) - } - - log.Info(ctx, "done! exiting...") return nil } diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go index 0c1341d4c..7a3ddde26 100644 --- a/cmd/gotosocial/action/testrig/testrig.go +++ b/cmd/gotosocial/action/testrig/testrig.go @@ -35,7 +35,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/cleaner" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" - "github.com/superseriousbusiness/gotosocial/internal/gotosocial" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/language" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -43,6 +42,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/middleware" "github.com/superseriousbusiness/gotosocial/internal/oidc" tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" + "github.com/superseriousbusiness/gotosocial/internal/router" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/storage" "github.com/superseriousbusiness/gotosocial/internal/timeline" @@ -54,11 +54,71 @@ import ( // Start creates and starts a gotosocial testrig server var Start action.GTSAction = func(ctx context.Context) error { - var state state.State - testrig.InitTestConfig() testrig.InitTestLog() + var ( + // Define necessary core variables + // before anything so we can prepare + // defer function for safe shutdown + // depending on what services were + // managed to be started. + + state state.State + route *router.Router + ) + + defer func() { + // Stop caches with + // background tasks. + state.Caches.Stop() + + if route != nil { + // We reached a point where the API router + // was created + setup. Ensure it gets stopped + // first to stop processing new information. + if err := route.Stop(); err != nil { + log.Errorf(ctx, "error stopping router: %v", err) + } + } + + // Stop any currently running + // worker processes / scheduled + // tasks from being executed. + testrig.StopWorkers(&state) + + if state.Timelines.Home != nil { + // Home timeline mgr was setup, ensure it gets stopped. + if err := state.Timelines.Home.Stop(); err != nil { + log.Errorf(ctx, "error stopping home timeline: %v", err) + } + } + + if state.Timelines.List != nil { + // List timeline mgr was setup, ensure it gets stopped. + if err := state.Timelines.List.Stop(); err != nil { + log.Errorf(ctx, "error stopping list timeline: %v", err) + } + } + + if state.Storage != nil { + // If storage was created, ensure torn down. + testrig.StandardStorageTeardown(state.Storage) + } + + if state.DB != nil { + // Lastly, if database service was started, + // ensure it gets closed now all else stopped. + testrig.StandardDBTeardown(state.DB) + if err := state.DB.Close(); err != nil { + log.Errorf(ctx, "error stopping database: %v", err) + } + } + + // Finally reached end of shutdown. + log.Info(ctx, "done! exiting...") + }() + parsedLangs, err := language.InitLangs(config.GetInstanceLanguages().TagStrs()) if err != nil { return fmt.Errorf("error initializing languages: %w", err) @@ -75,12 +135,10 @@ var Start action.GTSAction = func(ctx context.Context) error { // New test db inits caches so we don't need to do // that twice, we can just start the initialized caches. state.Caches.Start() - defer state.Caches.Stop() testrig.StandardDBSetup(state.DB, nil) - // Get the instance account - // (we'll need this later). + // Get the instance account (we'll need this later). instanceAccount, err := state.DB.GetInstanceAccount(ctx, "") if err != nil { return fmt.Errorf("error retrieving instance account: %w", err) @@ -119,7 +177,7 @@ var Start action.GTSAction = func(ctx context.Context) error { typeConverter := typeutils.NewConverter(&state) filter := visibility.NewFilter(&state) - // Initialize timelines. + // Initialize both home / list timelines. state.Timelines.Home = timeline.NewManager( tlprocessor.HomeTimelineGrab(&state), tlprocessor.HomeTimelineFilter(&state, filter), @@ -129,7 +187,6 @@ var Start action.GTSAction = func(ctx context.Context) error { if err := state.Timelines.Home.Start(); err != nil { return fmt.Errorf("error starting home timeline: %s", err) } - state.Timelines.List = timeline.NewManager( tlprocessor.ListTimelineGrab(&state), tlprocessor.ListTimelineFilter(&state, filter), @@ -151,7 +208,7 @@ var Start action.GTSAction = func(ctx context.Context) error { HTTP router initialization */ - router := testrig.NewTestRouter(state.DB) + route = testrig.NewTestRouter(state.DB) middlewares := []gin.HandlerFunc{ middleware.AddRequestID(config.GetRequestIDHeader()), // requestID middleware must run before tracing } @@ -195,10 +252,10 @@ var Start action.GTSAction = func(ctx context.Context) error { middlewares = append(middlewares, middleware.ContentSecurityPolicy(cspExtraURIs...)) // attach global middlewares which are used for every request - router.AttachGlobalMiddleware(middlewares...) + route.AttachGlobalMiddleware(middlewares...) // attach global no route / 404 handler to the router - router.AttachNoRouteHandler(func(c *gin.Context) { + route.AttachNoRouteHandler(func(c *gin.Context) { apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), processor.InstanceGetV1) }) @@ -234,23 +291,29 @@ var Start action.GTSAction = func(ctx context.Context) error { ) // these should be routed in order - authModule.Route(router) - clientModule.Route(router) - metricsModule.Route(router) - healthModule.Route(router) - fileserverModule.Route(router) - fileserverModule.RouteEmojis(router, instanceAccount.ID) - wellKnownModule.Route(router) - nodeInfoModule.Route(router) - activityPubModule.Route(router) - activityPubModule.RoutePublicKey(router) - webModule.Route(router) + authModule.Route(route) + clientModule.Route(route) + metricsModule.Route(route) + healthModule.Route(route) + fileserverModule.Route(route) + fileserverModule.RouteEmojis(route, instanceAccount.ID) + wellKnownModule.Route(route) + nodeInfoModule.Route(route) + activityPubModule.Route(route) + activityPubModule.RoutePublicKey(route) + webModule.Route(route) + // Create background cleaner. cleaner := cleaner.New(&state) - gts := gotosocial.NewServer(state.DB, router, cleaner) - if err := gts.Start(ctx); err != nil { - return fmt.Errorf("error starting gotosocial service: %s", err) + // Now schedule background cleaning tasks. + if err := cleaner.ScheduleJobs(); err != nil { + return fmt.Errorf("error scheduling cleaner jobs: %w", err) + } + + // Finally start the main http server! + if err := route.Start(); err != nil { + return fmt.Errorf("error starting router: %w", err) } // catch shutdown signals from the operating system @@ -259,14 +322,5 @@ var Start action.GTSAction = func(ctx context.Context) error { sig := <-sigs log.Infof(ctx, "received signal %s, shutting down", sig) - testrig.StandardDBTeardown(state.DB) - testrig.StandardStorageTeardown(state.Storage) - - // close down all running services in order - if err := gts.Stop(ctx); err != nil { - return fmt.Errorf("error closing gotosocial service: %s", err) - } - - log.Info(ctx, "done! exiting...") return nil } diff --git a/internal/gotosocial/gotosocial.go b/internal/gotosocial/gotosocial.go deleted file mode 100644 index e6a3934be..000000000 --- a/internal/gotosocial/gotosocial.go +++ /dev/null @@ -1,66 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package gotosocial - -import ( - "context" - - "github.com/superseriousbusiness/gotosocial/internal/cleaner" - "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/router" -) - -// Server represents a long-running -// GoToSocial server instance. -type Server struct { - db db.DB - apiRouter *router.Router - cleaner *cleaner.Cleaner -} - -// NewServer returns a new -// GoToSocial server instance. -func NewServer( - db db.DB, - apiRouter *router.Router, - cleaner *cleaner.Cleaner, -) *Server { - return &Server{ - db: db, - apiRouter: apiRouter, - cleaner: cleaner, - } -} - -// Start starts up the GoToSocial server by starting the router, -// then the cleaner. If something goes wrong while starting the -// server, then an error will be returned. -func (s *Server) Start(ctx context.Context) error { - s.apiRouter.Start() - return s.cleaner.ScheduleJobs() -} - -// Stop closes down the GoToSocial server, first closing the cleaner, -// then the router, then the database. If something goes wrong while -// stopping, an error will be returned. -func (s *Server) Stop(ctx context.Context) error { - if err := s.apiRouter.Stop(ctx); err != nil { - return err - } - return s.db.Close() -} diff --git a/internal/router/router.go b/internal/router/router.go index b2fb7418e..3a790dec9 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -122,7 +122,7 @@ func New(ctx context.Context) (*Router, error) { // // It will serve two handlers if letsencrypt is enabled, // and only the web/API handler if letsencrypt is not enabled. -func (r *Router) Start() { +func (r *Router) Start() error { var ( // listen is the server start function. // By default this points to a regular @@ -143,10 +143,16 @@ func (r *Router) Start() { // that either both or neither of Chain and Key // are set, so we can forego checking again here. listen, err = r.customTLS(certFile, keyFile) + if err != nil { + return err + } // TLS with letsencrypt. case leEnabled: listen, err = r.letsEncryptTLS() + if err != nil { + return err + } // Default listen. TLS must // be handled by reverse proxy. @@ -154,10 +160,6 @@ func (r *Router) Start() { listen = r.srv.ListenAndServe } - if err != nil { - log.Fatal(nil, err) - } - // Pass the server handler through a debug pprof middleware handler. // For standard production builds this will be a no-op, but when the // "debug" or "debugenv" build-tag is set pprof stats will be served @@ -177,12 +179,14 @@ func (r *Router) Start() { log.Fatalf(nil, "listen: %s", err) } }() + + return nil } -// Stop shuts down the router nicely -func (r *Router) Stop(ctx context.Context) error { +// Stop shuts down the router nicely. +func (r *Router) Stop() error { log.Infof(nil, "shutting down http router with %s grace period", shutdownTimeout) - timeout, cancel := context.WithTimeout(ctx, shutdownTimeout) + timeout, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() if err := r.srv.Shutdown(timeout); err != nil {