feat: listen function now has an own context and implemented an graceful stop
This commit is contained in:
parent
130f6928fc
commit
311de674ba
@ -31,7 +31,6 @@ type GetMessageExecution func(ctx context.Context, userSource models.UserSource)
|
|||||||
type Plug struct {
|
type Plug struct {
|
||||||
address string
|
address string
|
||||||
port string
|
port string
|
||||||
ctx context.Context
|
|
||||||
taskExecutionFunction TaskExecution
|
taskExecutionFunction TaskExecution
|
||||||
sendMessageExecution SendMessageExecution
|
sendMessageExecution SendMessageExecution
|
||||||
getMessageExecution GetMessageExecution
|
getMessageExecution GetMessageExecution
|
||||||
@ -39,9 +38,8 @@ type Plug struct {
|
|||||||
plugName string
|
plugName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug {
|
func NewPlug(plugName string, address string, port string, source models.Source) Plug {
|
||||||
return Plug{
|
return Plug{
|
||||||
ctx: ctx,
|
|
||||||
address: address,
|
address: address,
|
||||||
port: port,
|
port: port,
|
||||||
source: source,
|
source: source,
|
||||||
@ -49,8 +47,8 @@ func NewPlug(ctx context.Context, plugName string, address string, port string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) Listen() error {
|
func (p *Plug) Listen(ctx context.Context) error {
|
||||||
ctx, span := tracer.Start(p.ctx, "Listen")
|
ctx, span := tracer.Start(ctx, "Listen")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@ -115,12 +113,19 @@ func (p *Plug) Listen() error {
|
|||||||
|
|
||||||
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
|
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
|
||||||
|
|
||||||
err = grpcServer.Serve(lis)
|
go func() {
|
||||||
if err != nil {
|
err = grpcServer.Serve(lis)
|
||||||
span.RecordError(err)
|
if err != nil {
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.RecordError(err)
|
||||||
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
|
span.SetStatus(codes.Error, err.Error())
|
||||||
return err
|
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.WithContext(ctx).Info("Context stopped! Shutdown Plug")
|
||||||
|
grpcServer.GracefulStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user