refactor(telemetry): initialization of tracer and logger
logger initialization and tracer usage in Plug struct for consistency
This commit is contained in:
parent
5aeb04951f
commit
8e44948927
@ -9,11 +9,10 @@ import (
|
|||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
||||||
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
||||||
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
|
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
)
|
)
|
||||||
@ -37,36 +36,22 @@ type Plug struct {
|
|||||||
getMessageExecution GetMessageExecution
|
getMessageExecution GetMessageExecution
|
||||||
source models.Source
|
source models.Source
|
||||||
plugName string
|
plugName string
|
||||||
tracer trace.Tracer
|
|
||||||
logger *logrus.Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug")
|
||||||
|
|
||||||
func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug {
|
func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug {
|
||||||
tracer := otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug")
|
|
||||||
ctx, span := tracer.Start(ctx, "NewPlug")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
span.SetAttributes(
|
|
||||||
attribute.String("plug.name", plugName),
|
|
||||||
attribute.String("plug.address", address),
|
|
||||||
attribute.String("plug.port", port),
|
|
||||||
)
|
|
||||||
|
|
||||||
logger := logrus.New()
|
|
||||||
|
|
||||||
return Plug{
|
return Plug{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
address: address,
|
address: address,
|
||||||
port: port,
|
port: port,
|
||||||
source: source,
|
source: source,
|
||||||
plugName: plugName,
|
plugName: plugName,
|
||||||
tracer: tracer,
|
|
||||||
logger: logger,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) Listen() error {
|
func (p *Plug) Listen() error {
|
||||||
ctx, span := p.tracer.Start(p.ctx, "Listen")
|
ctx, span := tracer.Start(p.ctx, "Listen")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@ -77,12 +62,12 @@ func (p *Plug) Listen() error {
|
|||||||
attribute.String("source_domain", string(p.source.Domain)),
|
attribute.String("source_domain", string(p.source.Domain)),
|
||||||
)
|
)
|
||||||
|
|
||||||
sourceFields := logrus.Fields{
|
sourceFields := log.Fields{
|
||||||
"source_display_name": p.source.DisplayName,
|
"source_display_name": p.source.DisplayName,
|
||||||
"source_domain": p.source.Domain,
|
"source_domain": p.source.Domain,
|
||||||
}
|
}
|
||||||
|
|
||||||
serverFields := logrus.Fields{
|
serverFields := log.Fields{
|
||||||
"address": p.address,
|
"address": p.address,
|
||||||
"port": p.port,
|
"port": p.port,
|
||||||
}
|
}
|
||||||
@ -92,23 +77,23 @@ func (p *Plug) Listen() error {
|
|||||||
if err.Error() == "Database error: NoDataFound" {
|
if err.Error() == "Database error: NoDataFound" {
|
||||||
span.AddEvent("No Source found, initializing source")
|
span.AddEvent("No Source found, initializing source")
|
||||||
|
|
||||||
p.logger.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
|
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
|
||||||
|
|
||||||
source, err = database.CreateSource(ctx, p.source)
|
source, err = database.CreateSource(ctx, p.source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
p.logger.WithError(err).WithFields(sourceFields).Error("Failed to create source")
|
log.WithError(err).WithFields(sourceFields).Error("Failed to create source")
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
span.AddEvent("Source created")
|
span.AddEvent("Source created")
|
||||||
p.logger.WithContext(ctx).WithError(err).WithFields(sourceFields).WithField("source_id", source.ID).Info("Source created!")
|
log.WithContext(ctx).WithError(err).WithFields(sourceFields).WithField("source_id", source.ID).Info("Source created!")
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
p.logger.WithContext(ctx).WithError(err).WithFields(sourceFields).Error("Failed to get source by domain")
|
log.WithContext(ctx).WithError(err).WithFields(sourceFields).Error("Failed to get source by domain")
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -123,7 +108,7 @@ func (p *Plug) Listen() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
p.logger.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address")
|
log.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,7 +120,7 @@ func (p *Plug) Listen() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
p.logger.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
|
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,7 +128,7 @@ func (p *Plug) Listen() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) GetSource() models.Source {
|
func (p *Plug) GetSource() models.Source {
|
||||||
_, span := p.tracer.Start(p.ctx, "GetSource")
|
_, span := tracer.Start(p.ctx, "GetSource")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
span.SetAttributes(attribute.String("source_domain", string(p.source.Domain)))
|
span.SetAttributes(attribute.String("source_domain", string(p.source.Domain)))
|
||||||
@ -151,7 +136,7 @@ func (p *Plug) GetSource() models.Source {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) TaskExecutionFunction(function TaskExecution) {
|
func (p *Plug) TaskExecutionFunction(function TaskExecution) {
|
||||||
_, span := p.tracer.Start(p.ctx, "TaskExecutionFunction")
|
_, span := tracer.Start(p.ctx, "TaskExecutionFunction")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
p.taskExecutionFunction = function
|
p.taskExecutionFunction = function
|
||||||
@ -159,7 +144,7 @@ func (p *Plug) TaskExecutionFunction(function TaskExecution) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) SendMessageExecution(function SendMessageExecution) {
|
func (p *Plug) SendMessageExecution(function SendMessageExecution) {
|
||||||
_, span := p.tracer.Start(p.ctx, "SendMessageExecution")
|
_, span := tracer.Start(p.ctx, "SendMessageExecution")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
p.sendMessageExecution = function
|
p.sendMessageExecution = function
|
||||||
@ -167,7 +152,7 @@ func (p *Plug) SendMessageExecution(function SendMessageExecution) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
|
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
|
||||||
_, span := p.tracer.Start(p.ctx, "GetMessageExecution")
|
_, span := tracer.Start(p.ctx, "GetMessageExecution")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
p.getMessageExecution = function
|
p.getMessageExecution = function
|
||||||
@ -175,14 +160,14 @@ func (p *Plug) GetMessageExecution(function GetMessageExecution) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p Plug) SetupOpenTelemtry() error {
|
func (p Plug) SetupOpenTelemtry() error {
|
||||||
ctx, span := p.tracer.Start(p.ctx, "SetupOpenTelemtry")
|
ctx, span := tracer.Start(p.ctx, "SetupOpenTelemtry")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
err := telemetry.SetupMeterProvider(p.plugName)
|
err := telemetry.SetupMeterProvider(p.plugName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
p.logger.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
|
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +175,7 @@ func (p Plug) SetupOpenTelemtry() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
p.logger.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
|
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user