diff --git a/pkg/plug/server.go b/pkg/plug/server.go index e813f71..f338446 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -31,7 +31,6 @@ type GetMessageExecution func(ctx context.Context, userSource models.UserSource) type Plug struct { address string port string - ctx context.Context taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution @@ -39,9 +38,8 @@ type Plug struct { plugName string } -func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug { +func NewPlug(plugName string, address string, port string, source models.Source) Plug { return Plug{ - ctx: ctx, address: address, port: port, source: source, @@ -49,8 +47,8 @@ func NewPlug(ctx context.Context, plugName string, address string, port string, } } -func (p *Plug) Listen() error { - ctx, span := tracer.Start(p.ctx, "Listen") +func (p *Plug) Listen(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Listen") defer span.End() var err error @@ -115,12 +113,19 @@ func (p *Plug) Listen() error { pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution)) - err = grpcServer.Serve(lis) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") - return err + go func() { + err = grpcServer.Serve(lis) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") + } + }() + + select { + case <-ctx.Done(): + log.WithContext(ctx).Info("Context stopped! Shutdown Plug") + grpcServer.GracefulStop() } return nil