diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index d8f15bb..ae434f1 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -61,7 +61,6 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) // by using this method we assign a new context to each new request we get. // This can be used for example to close the context with the given id ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()})) - taskCtx, cancel := context.WithCancel(ctx) s.ctx[id] = cancel span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id))) diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 11df7a2..0e73e62 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -2,7 +2,6 @@ package plug import ( "context" - "fmt" "net" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" @@ -28,55 +27,40 @@ type TaskExecution func(ctx context.Context, userSource models.UserSource, deepS type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) -type Plug struct { - address string - port string +var ( taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution - source models.Source - plugName string -} +) -func NewPlug(plugName string, address string, port string, source models.Source) Plug { - return Plug{ - address: address, - port: port, - source: source, - plugName: plugName, - } -} - -func (p *Plug) Listen(ctx context.Context) error { +func Listen(ctx context.Context, listenAddr string, source models.Source) error { ctx, span := tracer.Start(ctx, "Listen") defer span.End() var err error - var source models.Source span.SetAttributes( - attribute.String("source_display_name", string(p.source.DisplayName)), - attribute.String("source_domain", string(p.source.Domain)), + attribute.String("source_display_name", string(source.DisplayName)), + attribute.String("source_domain", string(source.Domain)), ) sourceFields := log.Fields{ - "source_display_name": p.source.DisplayName, - "source_domain": p.source.Domain, + "source_display_name": source.DisplayName, + "source_domain": source.Domain, } serverFields := log.Fields{ - "address": p.address, - "port": p.port, + "address": listenAddr, } - source, err = database.GetSourceByDomain(ctx, p.source.Domain) + source, err = database.GetSourceByDomain(ctx, source.Domain) if err != nil { if err.Error() == "Database error: NoDataFound" { span.AddEvent("No Source found, initializing source") log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!") - source, err = database.CreateSource(ctx, p.source) + source, err = database.CreateSource(ctx, source) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -95,13 +79,11 @@ func (p *Plug) Listen(ctx context.Context) error { } } - p.source = source - span.SetAttributes( - attribute.String("source_id", string(p.source.ID)), + attribute.String("source_id", string(source.ID)), ) - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port)) + lis, err := net.Listen("tcp", listenAddr) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -111,7 +93,7 @@ func (p *Plug) Listen(ctx context.Context) error { grpcServer := grpc.NewServer() - pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution)) + pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) go func() { err = grpcServer.Serve(lis) @@ -131,18 +113,14 @@ func (p *Plug) Listen(ctx context.Context) error { return nil } -func (p *Plug) GetSource() models.Source { - return p.source +func SetTaskExecutionFunction(function TaskExecution) { + taskExecutionFunction = function } -func (p *Plug) TaskExecutionFunction(function TaskExecution) { - p.taskExecutionFunction = function +func SetSendMessageExecutionFunction(function SendMessageExecution) { + sendMessageExecution = function } -func (p *Plug) SendMessageExecution(function SendMessageExecution) { - p.sendMessageExecution = function -} - -func (p *Plug) GetMessageExecution(function GetMessageExecution) { - p.getMessageExecution = function +func SetGetMessageExecutionFunction(function GetMessageExecution) { + getMessageExecution = function }