New and generic scrape algorithm #11

Merged
SoXX merged 20 commits from dev/algorithm into main 2024-11-01 21:39:21 +00:00
8 changed files with 244 additions and 87 deletions

View File

@ -7,7 +7,7 @@ Anthrove Plug SDK is a Golang-based Software Development Kit (SDK) that provides
To install the Anthrove Plug SDK, you will need to have Go installed on your system. You can then use the go get command to fetch the SDK:
```bash
go get git.anthrove.art/Anthrove/plug-sdk/v4
go get git.anthrove.art/Anthrove/plug-sdk/v5
```
## Usage
@ -141,7 +141,7 @@ func main() {
log.Fatal(err)
}
plug.SetTaskExecutionFunction(service.YourTaskFunction)
plug.SetTaskExecutionFunction(service.plugInterface)
plug.SetGetMessageExecutionFunction(service.YourMessageFunction)
err = plug.Listen(ctx, ":8080", source)
if err != nil {

2
go.mod
View File

@ -1,4 +1,4 @@
module git.anthrove.art/Anthrove/plug-sdk/v4
module git.anthrove.art/Anthrove/plug-sdk/v5
go 1.23.0

View File

@ -1,12 +0,0 @@
package otter
import (
"context"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
)
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {
return database.Connect(ctx, config)
}

159
pkg/plug/algorithm.go Normal file
View File

@ -0,0 +1,159 @@
package plug
import (
"context"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"slices"
"time"
)
type User struct {
UserFavoriteCount int64
UserName string
UserID string
}
type Favorites struct {
Posts []models.Post
NextPage string
LastPage string
}
type Plug interface {
// GetFavoritePage
// The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used
GetFavoritePage(ctx context.Context, apiKey string, userSource models.UserSource, pageIdentifier string) (Favorites, error)
// GetUserProfile
// The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used
GetUserProfile(ctx context.Context, apiKey string, userSource models.UserSource) (User, error)
}
func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSource, anthroveUserFavCount int64, deepScrape bool, apiKey string) (TaskSummery, error) {
ctx, span := tracer.Start(ctx, "mainScrapeAlgorithm")
defer span.End()
span.SetAttributes(
attribute.String("user_source_id", string(userSource.ID)),
attribute.String("user_source_user_id", string(userSource.UserID)),
attribute.String("user_source_source_id", string(userSource.SourceID)),
)
basicLoggingInfo := log.Fields{
"user_source_id": userSource.ID,
"user_source_user_id": userSource.UserID,
"user_source_source_id": userSource.SourceID,
}
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Starting mainScrapeAlgorithm")
taskSummery := TaskSummery{
AddedPosts: 0,
DeletedPosts: 0,
}
profile, err := plugInterface.GetUserProfile(ctx, apiKey, userSource)
if err != nil {
return taskSummery, err
}
Alphyron marked this conversation as resolved Outdated

Hier muss eine sonderbedingung gemacht weerden, weil UserFavoriteCount kann ja -1 sein (wenn es nicht ausgelesen werden kann.

Hier muss eine sonderbedingung gemacht weerden, weil UserFavoriteCount kann ja -1 sein (wenn es nicht ausgelesen werden kann.

@SoXX to fix this problem and the Issue one comment bellow I would recommend, to change the algorithm to the following:

@SoXX to fix this problem and the Issue one comment bellow I would recommend, to change the algorithm to the following:
[https://www.plantuml.com/plantuml/uml/RP1FRzH03CNl_XIlj-mDhea4BIgWIXMAgChTk-D9HyoVoEFija9yTtOIYTBIkOtzUy_Flgp6QakAT34hJucnLFaXQk70ySQZPA8LeVwsiCDz5Hsr-105Nal269VfQhmPwFJGQftf8ZiwFw3_AeOlV1nvsk1LFH2mUSbZg1RoXB5KgnlHs7__rv_-vvkdRFqtFPgcYQwSGoxsch62APOzHypdF-AvERo9RsEUSS_7bKPNr8aYL8Gq5pNETe4i9wa67xJQRa1B43owpo_TFk3T3hy80FOg_9E0tslOQ_4X2xx9esr7i8AxW_8i0qbswtM9-liv5cuvywkr1kg_or6qIfk4spLdBYUKw9vpzNyTjZXTi1Thm1v4fPKODN6CSC5xbNmGxCLE8haX2LtYfxtWFLBzkDji7Pj0nHRDk5jIOdtgYQgLcIubkoN5Fm00](https://plantuml.github.io/plantuml-core/raw.html?VLB1RXCn4BtxArulrUu5qMiMWGejg5A5LbJbxcJ76mlRinhF9ktVOsS3j57epPxtvisRPrzCOiTeCMVwIQ9-OyET0oTZibfHixfdE-0na_J2pWU24uxempnUztUTsMzhhN4ij2DfZG6yGsEJReLtL1k_sjqNdfFY0wP5uzsnpEAL5kpoyWtm8zwtq2qbD2epjMK8i1Qolyg9qk1TdRlwHehIsDlnwB8gTHKvB45FFgjF8thEcezxLEwB-ytdC_oYIKVyd4RjgKtNmu34UifHnXXuRGEcSaSeO7UMlepvHtTsTs2ZOHb8u3Dyd5YqX7k1H7igcFEIjUtm9_ZRrsgdoU4_qTsGyudmdPvqoQJOUDY8dQWpXNLuPMDMZrEeoIHE9rjsPlMZVe1CTJ3k3xOJxy5XU67phyBYhl7wQglgLAYMB7Aq8Q3uSq9fGrYJxgdiKIWHJRJFQy8LOlxKBhA3LOAGpfBmCCTevRchdvYdtm00)
fixed an issue @SoXX https://plantuml.github.io/plantuml-core/raw.html?ZL9DJzmm4BtxLpmkABbKzBefL53Q2WbLKH6zUpVZh5N74uqdk_3lQoTBAGkjUeldVSoRoPoCOll1OahqWqJzneOR1ux69BMYPdNBjiDz8cc5dGy49poW3LD_sTuqPhyjjSgnq8waDWRm3fMDkXNUKH5-iRjFF4N51uoBnxj3cSKhBTZfwJ_02vpLq2r5L2eJrRa9i1QoF_CNnSwxqdVrbHHPsDh-aB8uDK-HvdBBIHFEpuyCyHV7UNXwy4bzx0_YPVvdi_bzVTMyvkSiTC3VyYYQ8hhiEaJMOuuo-i1h6p3cDq86cpVfIvjdkhExcOsAfIE1J_33oABvfvmWqfuovhmahHiyRUVpIxUB_rpGDJaV2T_eGPUTv1Xt6x4ZDIQpvkimPdK_XhAPoF5eoiQilfV3ILGQutQE7NaF3jqVCNyxBEagho-cAac5IiaY3JO4v7nEA6u8Qz8zvJaoPK0rsCSruJs6zB5UR3kh126T9E9k6WOhZqqvVXv_0G00
userSource.AccountID = profile.UserID
Alphyron marked this conversation as resolved Outdated

Issue still exists
Anthrove/plug-e621#15

Issue still exists https://git.anthrove.art/Anthrove/plug-e621/issues/15
userSource.AccountUsername = profile.UserName
nextPage := ""
var newPosts []models.Post
var anthroveFaves []models.UserFavorite
outer:
for {
if anthroveUserFavCount >= profile.UserFavoriteCount && profile.UserFavoriteCount > 0 {
break outer
}
select {
case <-ctx.Done():
break outer
default:
span.AddEvent("Executing getFavorites request")
favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage)
span.AddEvent("Finished executing getFavorites request")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page")
return taskSummery, err
}
if len(favorites.Posts) == 0 {
span.AddEvent("No more favorites found")
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
break outer
}
span.AddEvent("Executing BatchPostProcessingWithSummery")
pageNewPosts, pageAnthroveFaves, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts)
anthroveFaves = append(anthroveFaves, pageAnthroveFaves...)
newPosts = append(newPosts, pageNewPosts...)
span.AddEvent("Finished executing BatchPostProcessingWithSummery")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing")
return taskSummery, err
}
if len(pageAnthroveFaves) <= 0 || len(favorites.Posts) != len(pageAnthroveFaves) {
span.AddEvent("No more favorites found to add")
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
break outer
}
nextPage = favorites.NextPage
taskSummery.AddedPosts += len(pageAnthroveFaves)
}
}
if len(newPosts) > 0 {
span.AddEvent("Executing CreatePostInBatch")
err = database.CreatePostInBatch(ctx, newPosts, BatchSize)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch")
return taskSummery, err
}
span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch")
}
if len(anthroveFaves) > 0 {
span.AddEvent("Executing CreateUserFavoriteInBatch")
for i, fav := range anthroveFaves {
fav.CreatedAt = time.Now().Add(time.Millisecond * time.Duration(i) * -1)
}
slices.Reverse(anthroveFaves)
err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch")
return taskSummery, err
}
span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch")
}
span.AddEvent("Completed scraping algorithm")
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Completed scraping algorithm")
return taskSummery, nil
}

View File

@ -3,11 +3,12 @@ package plug
import (
"context"
"errors"
"gorm.io/gorm"
"time"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
gRPC "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/grpc"
gRPC "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
@ -17,20 +18,20 @@ import (
type server struct {
gRPC.UnimplementedPlugConnectorServer
ctx map[string]context.CancelFunc
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
ctx map[string]context.CancelFunc
plugInterface Plug
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
}
func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
func NewGrpcServer(source models.Source, plugAPIInterface Plug, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
return &server{
ctx: make(map[string]context.CancelFunc),
taskExecutionFunction: taskExecutionFunction,
sendMessageExecution: sendMessageExecution,
getMessageExecution: getMessageExecution,
source: source,
ctx: make(map[string]context.CancelFunc),
plugInterface: plugAPIInterface,
sendMessageExecution: sendMessageExecution,
getMessageExecution: getMessageExecution,
source: source,
}
}
@ -114,9 +115,38 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
"user_source_id": creation.UserSourceId,
}).Debug("Starting task")
db, err := database.GetGorm(taskCtx)
if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
dberr := database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{
ScrapeTaskID: models.ScrapeTaskID(id),
UserSourceID: userSource.ID,
FinishedAt: time.Now(),
Error: errorString(err),
})
return &plugTaskState, errors.Join(err, dberr)
}
anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(taskCtx, db, userSource.UserID, userSource.ID)
if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
dberr := database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{
ScrapeTaskID: models.ScrapeTaskID(id),
UserSourceID: userSource.ID,
FinishedAt: time.Now(),
Error: errorString(err),
})
return &plugTaskState, errors.Join(err, dberr)
}
go func() {
var err error
taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey)
taskSummery, err := algorithm(taskCtx, s.plugInterface, userSource, anthroveUserFavCount, creation.DeepScrape, creation.ApiKey)
if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
@ -301,3 +331,15 @@ func errorString(err error) string {
}
return ""
}
// getUserFavoriteCountFromDatabase
func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) {
var count int64
err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error
if err != nil {
return count, err
}
return count, nil
}

View File

@ -3,7 +3,7 @@ package plug
import (
"context"
"git.anthrove.art/Anthrove/plug-sdk/v4/pkg/telemetry"
"git.anthrove.art/Anthrove/plug-sdk/v5/pkg/telemetry"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/codes"
)

View File

@ -2,8 +2,6 @@ package plug
import (
"context"
"slices"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
log "github.com/sirupsen/logrus"
@ -11,6 +9,7 @@ import (
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"
"slices"
)
var BatchSize = 50
@ -21,12 +20,7 @@ type BatchSummery struct {
AddedFavorites int64
}
func BatchPostProcessing(ctx context.Context, userSource models.UserSource, posts []models.Post) error {
_, err := BatchPostProcessingWithSummery(ctx, userSource, posts)
return err
}
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) (BatchSummery, error) {
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, error) {
ctx, span := tracer.Start(ctx, "BatchPostProcessing")
defer span.End()
@ -49,7 +43,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB")
return BatchSummery{}, err
return nil, nil, err
}
postIDs := make([]string, 0, len(posts))
@ -59,18 +53,18 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.AddEvent("Collected post IDs", trace.WithAttributes(attribute.Int("post_count", len(postIDs))))
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("post_count", len(postIDs)).Info("Collected post IDs")
existingPosts, err := getAnthrovePost(ctx, db, userSource.SourceID, postIDs)
existingPostsReferences, err := getAnthrovePostReferences(ctx, db, userSource.SourceID, postIDs)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts")
return BatchSummery{}, err
return nil, nil, err
}
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts))))
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts")
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPostsReferences))))
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPostsReferences)).Info("Fetched existing posts")
var existingPostIDs []models.PostID
for _, post := range existingPosts {
for _, post := range existingPostsReferences {
existingPostIDs = append(existingPostIDs, models.PostID(post.PostID))
}
@ -80,15 +74,15 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts")
return BatchSummery{}, err
return nil, nil, err
}
span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs))))
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts")
anthroveFaves := make([]models.UserFavorite, 0, len(existingPosts))
newPosts := make([]models.Post, 0, len(existingPosts))
anthroveFaves := make([]models.UserFavorite, 0, len(existingPostsReferences))
newPosts := make([]models.Post, 0, len(existingPostsReferences))
for _, post := range posts {
if !slices.ContainsFunc(existingPosts, func(reference models.PostReference) bool {
if !slices.ContainsFunc(existingPostsReferences, func(reference models.PostReference) bool {
found := reference.SourcePostID == post.References[0].SourcePostID
if found {
if !slices.Contains(existingFavPostIDs, models.PostID(reference.PostID)) {
@ -105,6 +99,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
UserID: userSource.UserID,
PostID: post.ID,
UserSourceID: userSource.ID,
UserSource: models.UserSource{},
})
newPosts = append(newPosts, post)
}
@ -112,41 +107,14 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves))))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts")
if len(newPosts) > 0 {
err = database.CreatePostInBatch(ctx, newPosts, BatchSize)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch")
return BatchSummery{}, err
}
span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch")
}
if len(anthroveFaves) > 0 {
err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch")
return BatchSummery{}, err
}
span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize)))
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch")
}
return BatchSummery{
AddedPosts: int64(len(newPosts)),
AddedFavorites: int64(len(anthroveFaves)),
}, nil
return newPosts, anthroveFaves, nil
}
func getAnthrovePost(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {
ctx, span := tracer.Start(ctx, "getAnthrovePost")
func getAnthrovePostReferences(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {
ctx, span := tracer.Start(ctx, "getAnthrovePostReferences")
defer span.End()
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePost")
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePostReferences")
var existingPosts []models.PostReference
err := gorm.WithContext(ctx).Model(models.PostReference{}).Find(&existingPosts, "source_id = ? AND source_post_id IN ?", id, postIDs).Error

View File

@ -5,7 +5,7 @@ import (
"net"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
pb "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/grpc"
pb "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/grpc"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
@ -15,7 +15,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v4/pkg/plug")
var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v5/pkg/plug")
type Message struct {
Title string
@ -28,14 +28,13 @@ type TaskSummery struct {
DeletedPosts int
}
type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error)
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
var (
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
plugAPIInterface Plug
)
func Listen(ctx context.Context, listenAddr string, source models.Source) error {
@ -78,7 +77,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, plugAPIInterface, sendMessageExecution, getMessageExecution))
go func() {
err = grpcServer.Serve(lis)
@ -98,8 +97,9 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
return nil
}
func SetTaskExecutionFunction(function TaskExecution) {
taskExecutionFunction = function
// RegisterPlugInterface sets the provided Plug interface implementation for task execution.
Alphyron marked this conversation as resolved Outdated

irgendwie passt der Name der funktion nicht zu dem was gemacht wird. Es wird weder eine Function gesetzt, noch wird ein execution stuff gesetzt.
Hier ist es ja nur ein Interface als Schnittstelle zu der PlugAPI

irgendwie passt der Name der funktion nicht zu dem was gemacht wird. Es wird weder eine Function gesetzt, noch wird ein execution stuff gesetzt. Hier ist es ja nur ein Interface als Schnittstelle zu der PlugAPI
func RegisterPlugInterface(plugInterface Plug) {
plugAPIInterface = plugInterface
}
func SetSendMessageExecutionFunction(function SendMessageExecution) {