package plug import ( "context" "fmt" "net" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" ) var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug") type Message struct { Title string Body string CreatedAt *timestamppb.Timestamp } type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) type Plug struct { address string port string taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution source models.Source plugName string } func NewPlug(plugName string, address string, port string, source models.Source) Plug { return Plug{ address: address, port: port, source: source, plugName: plugName, } } func (p *Plug) Listen(ctx context.Context) error { ctx, span := tracer.Start(ctx, "Listen") defer span.End() var err error var source models.Source span.SetAttributes( attribute.String("source_display_name", string(p.source.DisplayName)), attribute.String("source_domain", string(p.source.Domain)), ) sourceFields := log.Fields{ "source_display_name": p.source.DisplayName, "source_domain": p.source.Domain, } serverFields := log.Fields{ "address": p.address, "port": p.port, } source, err = database.GetSourceByDomain(ctx, p.source.Domain) if err != nil { if err.Error() == "Database error: NoDataFound" { span.AddEvent("No Source found, initializing source") log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!") source, err = database.CreateSource(ctx, p.source) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithError(err).WithFields(sourceFields).Error("Failed to create source") panic(err) } span.AddEvent("Source created") log.WithContext(ctx).WithError(err).WithFields(sourceFields).WithField("source_id", source.ID).Info("Source created!") } else { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(sourceFields).Error("Failed to get source by domain") panic(err) } } p.source = source span.SetAttributes( attribute.String("source_id", string(p.source.ID)), ) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port)) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address") return err } grpcServer := grpc.NewServer() pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution)) go func() { err = grpcServer.Serve(lis) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server") } }() select { case <-ctx.Done(): log.WithContext(ctx).Info("Context stopped! Shutdown Plug") grpcServer.GracefulStop() } return nil } func (p *Plug) GetSource() models.Source { return p.source } func (p *Plug) TaskExecutionFunction(function TaskExecution) { p.taskExecutionFunction = function } func (p *Plug) SendMessageExecution(function SendMessageExecution) { p.sendMessageExecution = function } func (p *Plug) GetMessageExecution(function GetMessageExecution) { p.getMessageExecution = function }