package plug import ( "context" "net" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" ) var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug") type Message struct { Title string Body string CreatedAt *timestamppb.Timestamp } type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) var ( taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution ) func Listen(ctx context.Context, listenAddr string, source models.Source) error { ctx, span := tracer.Start(ctx, "Listen") defer span.End() var err error span.SetAttributes( attribute.String("source_display_name", string(source.DisplayName)), attribute.String("source_domain", string(source.Domain)), ) sourceFields := log.Fields{ "source_display_name": source.DisplayName, "source_domain": source.Domain, } serverFields := log.Fields{ "address": listenAddr, } source, err = database.GetSourceByDomain(ctx, source.Domain) if err != nil { if err.Error() == "Database error: NoDataFound" { span.AddEvent("No Source found, initializing source") log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!") source, err = database.CreateSource(ctx, source) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithError(err).WithFields(sourceFields).Error("Failed to create source") panic(err) } span.AddEvent("Source created") log.WithContext(ctx).WithError(err).WithFields(sourceFields).WithField("source_id", source.ID).Info("Source created!") } else { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(sourceFields).Error("Failed to get source by domain") panic(err) } } span.SetAttributes( attribute.String("source_id", string(source.ID)), ) lis, err := net.Listen("tcp", listenAddr) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address") return err } grpcServer := grpc.NewServer() pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) go func() { err = grpcServer.Serve(lis) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") } }() select { case <-ctx.Done(): log.WithContext(ctx).Info("Context stopped! Shutdown Plug") grpcServer.GracefulStop() } return nil } func SetTaskExecutionFunction(function TaskExecution) { taskExecutionFunction = function } func SetSendMessageExecutionFunction(function SendMessageExecution) { sendMessageExecution = function } func SetGetMessageExecutionFunction(function GetMessageExecution) { getMessageExecution = function }