plug-sdk/pkg/plug/server.go

149 lines
4.0 KiB
Go
Raw Normal View History

package plug
import (
"context"
"fmt"
"net"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v3/pkg/plug")
type Message struct {
Title string
Body string
CreatedAt *timestamppb.Timestamp
}
type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
type Plug struct {
address string
port string
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
plugName string
}
func NewPlug(plugName string, address string, port string, source models.Source) Plug {
return Plug{
address: address,
port: port,
source: source,
plugName: plugName,
}
}
func (p *Plug) Listen(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "Listen")
defer span.End()
var err error
var source models.Source
span.SetAttributes(
attribute.String("source_display_name", string(p.source.DisplayName)),
attribute.String("source_domain", string(p.source.Domain)),
)
sourceFields := log.Fields{
"source_display_name": p.source.DisplayName,
"source_domain": p.source.Domain,
}
serverFields := log.Fields{
"address": p.address,
"port": p.port,
}
source, err = database.GetSourceByDomain(ctx, p.source.Domain)
if err != nil {
if err.Error() == "Database error: NoDataFound" {
span.AddEvent("No Source found, initializing source")
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
source, err = database.CreateSource(ctx, p.source)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithError(err).WithFields(sourceFields).Error("Failed to create source")
panic(err)
}
span.AddEvent("Source created")
log.WithContext(ctx).WithError(err).WithFields(sourceFields).WithField("source_id", source.ID).Info("Source created!")
} else {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(sourceFields).Error("Failed to get source by domain")
panic(err)
}
}
p.source = source
span.SetAttributes(
attribute.String("source_id", string(p.source.ID)),
)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(serverFields).Error("Failed to listen on address")
return err
}
grpcServer := grpc.NewServer()
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
go func() {
err = grpcServer.Serve(lis)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
}
}()
select {
case <-ctx.Done():
log.WithContext(ctx).Info("Context stopped! Shutdown Plug")
grpcServer.GracefulStop()
}
return nil
}
func (p *Plug) GetSource() models.Source {
return p.source
}
func (p *Plug) TaskExecutionFunction(function TaskExecution) {
p.taskExecutionFunction = function
}
func (p *Plug) SendMessageExecution(function SendMessageExecution) {
p.sendMessageExecution = function
}
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
p.getMessageExecution = function
}