diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 5b4f379..f071b41 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -3,15 +3,19 @@ package plug import ( "context" "fmt" - "log" "net" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" - "google.golang.org/protobuf/types/known/timestamppb" - pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" + "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry" + "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" ) type Message struct { @@ -32,42 +36,94 @@ type Plug struct { sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution source models.Source + plugName string + tracer trace.Tracer + logger *logrus.Logger } -func NewPlug(ctx context.Context, address string, port string, source models.Source) Plug { +func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug { + tracer := otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug") + ctx, span := tracer.Start(ctx, "NewPlug") + defer span.End() + + span.SetAttributes( + attribute.String("plug.name", plugName), + attribute.String("plug.address", address), + attribute.String("plug.port", port), + ) + + logger := logrus.New() + return Plug{ - ctx: ctx, - address: address, - port: port, - source: source, + ctx: ctx, + address: address, + port: port, + source: source, + plugName: plugName, + tracer: tracer, + logger: logger, } } func (p *Plug) Listen() error { + ctx, span := p.tracer.Start(p.ctx, "Listen") + defer span.End() + var err error var source models.Source - source, err = database.GetSourceByDomain(p.ctx, p.source.Domain) + span.SetAttributes( + attribute.String("source_display_name", string(p.source.DisplayName)), + attribute.String("source_domain", string(p.source.Domain)), + ) + + sourceFields := logrus.Fields{ + "source_display_name": p.source.DisplayName, + "source_domain": p.source.Domain, + } + + serverFields := logrus.Fields{ + "address": p.address, + "port": p.port, + } + + source, err = database.GetSourceByDomain(ctx, p.source.Domain) if err != nil { if err.Error() == "Database error: NoDataFound" { - log.Printf("Initalizing source!") - source, err = database.CreateSource(p.ctx, p.source) + span.AddEvent("No Source found, initializing source") + + p.logger.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!") + + source, err = database.CreateSource(ctx, p.source) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + p.logger.WithError(err).WithFields(sourceFields).Error("Failed to create source") panic(err) } + + span.AddEvent("Source created") + p.logger.WithContext(ctx).WithError(err).WithFields(sourceFields).WithField("source_id", source.ID).Info("Source created!") + } else { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + p.logger.WithContext(ctx).WithError(err).WithFields(sourceFields).Error("Failed to get source by domain") panic(err) } } p.source = source - log.Print("Source exists") - - // Start the Server + span.SetAttributes( + attribute.String("source_id", string(p.source.ID)), + ) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port)) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + p.logger.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address") return err } @@ -77,6 +133,9 @@ func (p *Plug) Listen() error { err = grpcServer.Serve(lis) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + p.logger.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") return err } @@ -84,17 +143,56 @@ func (p *Plug) Listen() error { } func (p *Plug) GetSource() models.Source { + _, span := p.tracer.Start(p.ctx, "GetSource") + defer span.End() + + span.SetAttributes(attribute.String("source_domain", string(p.source.Domain))) return p.source } func (p *Plug) TaskExecutionFunction(function TaskExecution) { + _, span := p.tracer.Start(p.ctx, "TaskExecutionFunction") + defer span.End() + p.taskExecutionFunction = function + span.AddEvent("Task execution function set") } func (p *Plug) SendMessageExecution(function SendMessageExecution) { + _, span := p.tracer.Start(p.ctx, "SendMessageExecution") + defer span.End() + p.sendMessageExecution = function + span.AddEvent("Send message execution function set") } func (p *Plug) GetMessageExecution(function GetMessageExecution) { + _, span := p.tracer.Start(p.ctx, "GetMessageExecution") + defer span.End() + p.getMessageExecution = function + span.AddEvent("Get message execution function set") +} + +func (p Plug) SetupOpenTelemtry() error { + ctx, span := p.tracer.Start(p.ctx, "SetupOpenTelemtry") + defer span.End() + + err := telemetry.SetupMeterProvider(p.plugName) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + p.logger.WithContext(ctx).WithError(err).Error("Failed to setup meter provider") + return err + } + + err = telemetry.SetupTraceProvider(ctx, p.plugName) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + p.logger.WithContext(ctx).WithError(err).Error("Failed to setup trace provider") + return err + } + + return nil }