Compare commits

..

No commits in common. "6b7f57002725ddad667b5ac110fc23fa2fa928f9" and "117c15e3719fe07c2b586371d8016a9324568a96" have entirely different histories.

3 changed files with 91 additions and 64 deletions

View File

@ -61,6 +61,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
// by using this method we assign a new context to each new request we get. // by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id // This can be used for example to close the context with the given id
ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()})) ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()}))
taskCtx, cancel := context.WithCancel(ctx) taskCtx, cancel := context.WithCancel(ctx)
s.ctx[id] = cancel s.ctx[id] = cancel
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id))) span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))

View File

@ -1,31 +0,0 @@
package plug
import (
"context"
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/codes"
)
func SetupOpenTelemtry(ctx context.Context, serviceName string) error {
ctx, span := tracer.Start(ctx, "SetupOpenTelemtry")
defer span.End()
err := telemetry.SetupMeterProvider(serviceName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
return err
}
err = telemetry.SetupTraceProvider(ctx, serviceName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
return err
}
return nil
}

View File

@ -2,11 +2,13 @@ package plug
import ( import (
"context" "context"
"fmt"
"net" "net"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -27,40 +29,57 @@ type TaskExecution func(ctx context.Context, userSource models.UserSource, deepS
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
var ( type Plug struct {
address string
port string
ctx context.Context
taskExecutionFunction TaskExecution taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution getMessageExecution GetMessageExecution
) source models.Source
plugName string
}
func Listen(ctx context.Context, listenAddr string, source models.Source) error { func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug {
ctx, span := tracer.Start(ctx, "Listen") return Plug{
ctx: ctx,
address: address,
port: port,
source: source,
plugName: plugName,
}
}
func (p *Plug) Listen() error {
ctx, span := tracer.Start(p.ctx, "Listen")
defer span.End() defer span.End()
var err error var err error
var source models.Source
span.SetAttributes( span.SetAttributes(
attribute.String("source_display_name", string(source.DisplayName)), attribute.String("source_display_name", string(p.source.DisplayName)),
attribute.String("source_domain", string(source.Domain)), attribute.String("source_domain", string(p.source.Domain)),
) )
sourceFields := log.Fields{ sourceFields := log.Fields{
"source_display_name": source.DisplayName, "source_display_name": p.source.DisplayName,
"source_domain": source.Domain, "source_domain": p.source.Domain,
} }
serverFields := log.Fields{ serverFields := log.Fields{
"address": listenAddr, "address": p.address,
"port": p.port,
} }
source, err = database.GetSourceByDomain(ctx, source.Domain) source, err = database.GetSourceByDomain(ctx, p.source.Domain)
if err != nil { if err != nil {
if err.Error() == "Database error: NoDataFound" { if err.Error() == "Database error: NoDataFound" {
span.AddEvent("No Source found, initializing source") span.AddEvent("No Source found, initializing source")
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!") log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
source, err = database.CreateSource(ctx, source) source, err = database.CreateSource(ctx, p.source)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
@ -79,11 +98,13 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
} }
} }
p.source = source
span.SetAttributes( span.SetAttributes(
attribute.String("source_id", string(source.ID)), attribute.String("source_id", string(p.source.ID)),
) )
lis, err := net.Listen("tcp", listenAddr) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port))
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
@ -93,34 +114,70 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
go func() {
err = grpcServer.Serve(lis) err = grpcServer.Serve(lis)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
} return err
}()
select {
case <-ctx.Done():
log.WithContext(ctx).Info("Context stopped! Shutdown Plug")
grpcServer.GracefulStop()
} }
return nil return nil
} }
func SetTaskExecutionFunction(function TaskExecution) { func (p *Plug) GetSource() models.Source {
taskExecutionFunction = function _, span := tracer.Start(p.ctx, "GetSource")
defer span.End()
span.SetAttributes(attribute.String("source_domain", string(p.source.Domain)))
return p.source
} }
func SetSendMessageExecutionFunction(function SendMessageExecution) { func (p *Plug) TaskExecutionFunction(function TaskExecution) {
sendMessageExecution = function _, span := tracer.Start(p.ctx, "TaskExecutionFunction")
defer span.End()
p.taskExecutionFunction = function
span.AddEvent("Task execution function set")
} }
func SetGetMessageExecutionFunction(function GetMessageExecution) { func (p *Plug) SendMessageExecution(function SendMessageExecution) {
getMessageExecution = function _, span := tracer.Start(p.ctx, "SendMessageExecution")
defer span.End()
p.sendMessageExecution = function
span.AddEvent("Send message execution function set")
}
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
_, span := tracer.Start(p.ctx, "GetMessageExecution")
defer span.End()
p.getMessageExecution = function
span.AddEvent("Get message execution function set")
}
func (p Plug) SetupOpenTelemtry() error {
ctx, span := tracer.Start(p.ctx, "SetupOpenTelemtry")
defer span.End()
err := telemetry.SetupMeterProvider(p.plugName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
return err
}
err = telemetry.SetupTraceProvider(ctx, p.plugName)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
return err
}
return nil
} }