Compare commits

..

4 Commits

Author SHA1 Message Date
Lennard Brinkhaus
6b7f570027 refactor: move object functions to own ones 2024-08-27 07:45:30 +02:00
Lennard Brinkhaus
b58206fca0 refactor: remove telemetry from getter and setter functions 2024-08-27 07:38:24 +02:00
Lennard Brinkhaus
74f3c934a7 feat: listen function now has an own context and implemented an graceful stop 2024-08-27 07:37:19 +02:00
Lennard Brinkhaus
fdca511587 refactor: move otlp to custom file out of the scope from plug instance 2024-08-27 07:32:24 +02:00
3 changed files with 64 additions and 91 deletions

View File

@ -61,7 +61,6 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
// by using this method we assign a new context to each new request we get. // by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id // This can be used for example to close the context with the given id
ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()})) ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()}))
taskCtx, cancel := context.WithCancel(ctx) taskCtx, cancel := context.WithCancel(ctx)
s.ctx[id] = cancel s.ctx[id] = cancel
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id))) span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))

31
pkg/plug/otlp.go Normal file
View File

@ -0,0 +1,31 @@
package plug
import (
"context"
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/codes"
)
func SetupOpenTelemtry(ctx context.Context, serviceName string) error {
ctx, span := tracer.Start(ctx, "SetupOpenTelemtry")
defer span.End()
err := telemetry.SetupMeterProvider(serviceName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
return err
}
err = telemetry.SetupTraceProvider(ctx, serviceName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
return err
}
return nil
}

View File

@ -2,13 +2,11 @@ package plug
import ( import (
"context" "context"
"fmt"
"net" "net"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -29,57 +27,40 @@ type TaskExecution func(ctx context.Context, userSource models.UserSource, deepS
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
type Plug struct { var (
address string
port string
ctx context.Context
taskExecutionFunction TaskExecution taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution getMessageExecution GetMessageExecution
source models.Source )
plugName string
}
func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug { func Listen(ctx context.Context, listenAddr string, source models.Source) error {
return Plug{ ctx, span := tracer.Start(ctx, "Listen")
ctx: ctx,
address: address,
port: port,
source: source,
plugName: plugName,
}
}
func (p *Plug) Listen() error {
ctx, span := tracer.Start(p.ctx, "Listen")
defer span.End() defer span.End()
var err error var err error
var source models.Source
span.SetAttributes( span.SetAttributes(
attribute.String("source_display_name", string(p.source.DisplayName)), attribute.String("source_display_name", string(source.DisplayName)),
attribute.String("source_domain", string(p.source.Domain)), attribute.String("source_domain", string(source.Domain)),
) )
sourceFields := log.Fields{ sourceFields := log.Fields{
"source_display_name": p.source.DisplayName, "source_display_name": source.DisplayName,
"source_domain": p.source.Domain, "source_domain": source.Domain,
} }
serverFields := log.Fields{ serverFields := log.Fields{
"address": p.address, "address": listenAddr,
"port": p.port,
} }
source, err = database.GetSourceByDomain(ctx, p.source.Domain) source, err = database.GetSourceByDomain(ctx, source.Domain)
if err != nil { if err != nil {
if err.Error() == "Database error: NoDataFound" { if err.Error() == "Database error: NoDataFound" {
span.AddEvent("No Source found, initializing source") span.AddEvent("No Source found, initializing source")
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!") log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
source, err = database.CreateSource(ctx, p.source) source, err = database.CreateSource(ctx, source)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
@ -98,13 +79,11 @@ func (p *Plug) Listen() error {
} }
} }
p.source = source
span.SetAttributes( span.SetAttributes(
attribute.String("source_id", string(p.source.ID)), attribute.String("source_id", string(source.ID)),
) )
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port)) lis, err := net.Listen("tcp", listenAddr)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
@ -114,70 +93,34 @@ func (p *Plug) Listen() error {
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution)) pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
go func() {
err = grpcServer.Serve(lis) err = grpcServer.Serve(lis)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
return err }
}()
select {
case <-ctx.Done():
log.WithContext(ctx).Info("Context stopped! Shutdown Plug")
grpcServer.GracefulStop()
} }
return nil return nil
} }
func (p *Plug) GetSource() models.Source { func SetTaskExecutionFunction(function TaskExecution) {
_, span := tracer.Start(p.ctx, "GetSource") taskExecutionFunction = function
defer span.End()
span.SetAttributes(attribute.String("source_domain", string(p.source.Domain)))
return p.source
} }
func (p *Plug) TaskExecutionFunction(function TaskExecution) { func SetSendMessageExecutionFunction(function SendMessageExecution) {
_, span := tracer.Start(p.ctx, "TaskExecutionFunction") sendMessageExecution = function
defer span.End()
p.taskExecutionFunction = function
span.AddEvent("Task execution function set")
} }
func (p *Plug) SendMessageExecution(function SendMessageExecution) { func SetGetMessageExecutionFunction(function GetMessageExecution) {
_, span := tracer.Start(p.ctx, "SendMessageExecution") getMessageExecution = function
defer span.End()
p.sendMessageExecution = function
span.AddEvent("Send message execution function set")
}
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
_, span := tracer.Start(p.ctx, "GetMessageExecution")
defer span.End()
p.getMessageExecution = function
span.AddEvent("Get message execution function set")
}
func (p Plug) SetupOpenTelemtry() error {
ctx, span := tracer.Start(p.ctx, "SetupOpenTelemtry")
defer span.End()
err := telemetry.SetupMeterProvider(p.plugName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
return err
}
err = telemetry.SetupTraceProvider(ctx, p.plugName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
return err
}
return nil
} }