Compare commits

...

4 Commits

Author SHA1 Message Date
Lennard Brinkhaus
6b7f570027 refactor: move object functions to own ones 2024-08-27 07:45:30 +02:00
Lennard Brinkhaus
b58206fca0 refactor: remove telemetry from getter and setter functions 2024-08-27 07:38:24 +02:00
Lennard Brinkhaus
74f3c934a7 feat: listen function now has an own context and implemented an graceful stop 2024-08-27 07:37:19 +02:00
Lennard Brinkhaus
fdca511587 refactor: move otlp to custom file out of the scope from plug instance 2024-08-27 07:32:24 +02:00
3 changed files with 64 additions and 91 deletions

View File

@ -61,7 +61,6 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
// by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id
ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()}))
taskCtx, cancel := context.WithCancel(ctx)
s.ctx[id] = cancel
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))

31
pkg/plug/otlp.go Normal file
View File

@ -0,0 +1,31 @@
package plug
import (
"context"
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/codes"
)
func SetupOpenTelemtry(ctx context.Context, serviceName string) error {
ctx, span := tracer.Start(ctx, "SetupOpenTelemtry")
defer span.End()
err := telemetry.SetupMeterProvider(serviceName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
return err
}
err = telemetry.SetupTraceProvider(ctx, serviceName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
return err
}
return nil
}

View File

@ -2,13 +2,11 @@ package plug
import (
"context"
"fmt"
"net"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@ -29,57 +27,40 @@ type TaskExecution func(ctx context.Context, userSource models.UserSource, deepS
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
type Plug struct {
address string
port string
ctx context.Context
var (
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
plugName string
}
)
func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug {
return Plug{
ctx: ctx,
address: address,
port: port,
source: source,
plugName: plugName,
}
}
func (p *Plug) Listen() error {
ctx, span := tracer.Start(p.ctx, "Listen")
func Listen(ctx context.Context, listenAddr string, source models.Source) error {
ctx, span := tracer.Start(ctx, "Listen")
defer span.End()
var err error
var source models.Source
span.SetAttributes(
attribute.String("source_display_name", string(p.source.DisplayName)),
attribute.String("source_domain", string(p.source.Domain)),
attribute.String("source_display_name", string(source.DisplayName)),
attribute.String("source_domain", string(source.Domain)),
)
sourceFields := log.Fields{
"source_display_name": p.source.DisplayName,
"source_domain": p.source.Domain,
"source_display_name": source.DisplayName,
"source_domain": source.Domain,
}
serverFields := log.Fields{
"address": p.address,
"port": p.port,
"address": listenAddr,
}
source, err = database.GetSourceByDomain(ctx, p.source.Domain)
source, err = database.GetSourceByDomain(ctx, source.Domain)
if err != nil {
if err.Error() == "Database error: NoDataFound" {
span.AddEvent("No Source found, initializing source")
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
source, err = database.CreateSource(ctx, p.source)
source, err = database.CreateSource(ctx, source)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
@ -98,13 +79,11 @@ func (p *Plug) Listen() error {
}
}
p.source = source
span.SetAttributes(
attribute.String("source_id", string(p.source.ID)),
attribute.String("source_id", string(source.ID)),
)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port))
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
@ -114,70 +93,34 @@ func (p *Plug) Listen() error {
grpcServer := grpc.NewServer()
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
go func() {
err = grpcServer.Serve(lis)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
return err
}
}()
select {
case <-ctx.Done():
log.WithContext(ctx).Info("Context stopped! Shutdown Plug")
grpcServer.GracefulStop()
}
return nil
}
func (p *Plug) GetSource() models.Source {
_, span := tracer.Start(p.ctx, "GetSource")
defer span.End()
span.SetAttributes(attribute.String("source_domain", string(p.source.Domain)))
return p.source
func SetTaskExecutionFunction(function TaskExecution) {
taskExecutionFunction = function
}
func (p *Plug) TaskExecutionFunction(function TaskExecution) {
_, span := tracer.Start(p.ctx, "TaskExecutionFunction")
defer span.End()
p.taskExecutionFunction = function
span.AddEvent("Task execution function set")
func SetSendMessageExecutionFunction(function SendMessageExecution) {
sendMessageExecution = function
}
func (p *Plug) SendMessageExecution(function SendMessageExecution) {
_, span := tracer.Start(p.ctx, "SendMessageExecution")
defer span.End()
p.sendMessageExecution = function
span.AddEvent("Send message execution function set")
}
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
_, span := tracer.Start(p.ctx, "GetMessageExecution")
defer span.End()
p.getMessageExecution = function
span.AddEvent("Get message execution function set")
}
func (p Plug) SetupOpenTelemtry() error {
ctx, span := tracer.Start(p.ctx, "SetupOpenTelemtry")
defer span.End()
err := telemetry.SetupMeterProvider(p.plugName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
return err
}
err = telemetry.SetupTraceProvider(ctx, p.plugName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
return err
}
return nil
func SetGetMessageExecutionFunction(function GetMessageExecution) {
getMessageExecution = function
}