package plug import ( "context" "net" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" log "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" ) var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug") type Message struct { Title string Body string CreatedAt *timestamppb.Timestamp } type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) var ( taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution ) func Listen(ctx context.Context, listenAddr string, source models.Source) error { ctx, span := tracer.Start(ctx, "Listen") defer span.End() var err error span.SetAttributes( attribute.String("source_display_name", source.DisplayName), attribute.String("source_domain", string(source.Domain)), ) serverFields := log.Fields{ "address": listenAddr, } source, err = upsertSource(ctx, source) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to upsert Source") return err } span.SetAttributes( attribute.String("source_id", string(source.ID)), ) lis, err := net.Listen("tcp", listenAddr) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address") return err } grpcServer := grpc.NewServer( grpc.StatsHandler(otelgrpc.NewServerHandler()), ) pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) go func() { err = grpcServer.Serve(lis) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") } }() select { case <-ctx.Done(): log.WithContext(ctx).Info("Context stopped! Shutdown Plug") grpcServer.GracefulStop() } return nil } func SetTaskExecutionFunction(function TaskExecution) { taskExecutionFunction = function } func SetSendMessageExecutionFunction(function SendMessageExecution) { sendMessageExecution = function } func SetGetMessageExecutionFunction(function GetMessageExecution) { getMessageExecution = function }