refactor(source): function header

changed function header to use the UserSources as information source
This commit is contained in:
SoXX 2024-08-17 20:23:11 +02:00
parent b6974c4a9f
commit 7d4ce9f359
4 changed files with 36 additions and 24 deletions

8
go.mod
View File

@ -6,7 +6,7 @@ require (
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0 git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0
github.com/golang/protobuf v1.5.4 github.com/golang/protobuf v1.5.4
github.com/matoous/go-nanoid/v2 v2.1.0 github.com/matoous/go-nanoid/v2 v2.1.0
google.golang.org/grpc v1.61.1 google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2 google.golang.org/protobuf v1.34.2
) )
@ -27,12 +27,12 @@ require (
go.opentelemetry.io/otel/log v0.4.0 // indirect go.opentelemetry.io/otel/log v0.4.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/crypto v0.22.0 // indirect golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.21.0 // indirect golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.8.0 // indirect golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.21.0 // indirect golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.17.0 // indirect golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gorm.io/driver/postgres v1.5.9 // indirect gorm.io/driver/postgres v1.5.9 // indirect
gorm.io/gorm v1.25.11 // indirect gorm.io/gorm v1.25.11 // indirect
) )

16
go.sum
View File

@ -139,10 +139,10 @@ go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6b
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -150,10 +150,10 @@ golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -2,10 +2,10 @@ package plug
import ( import (
"context" "context"
"log" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"log"
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2" gonanoid "github.com/matoous/go-nanoid/v2"
@ -31,9 +31,7 @@ func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, se
} }
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) { func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
var user models.User
var plugTaskState gRPC.PlugTaskStatus var plugTaskState gRPC.PlugTaskStatus
var err error
id, err := gonanoid.New() id, err := gonanoid.New()
if err != nil { if err != nil {
@ -43,7 +41,10 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
plugTaskState.TaskId = id plugTaskState.TaskId = id
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
user.ID = models.UserID(creation.UserId) userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId))
if err != nil {
return nil, err
}
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
// by using this method we assign a new context to each new request we get. // by using this method we assign a new context to each new request we get.
@ -53,7 +54,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
go func() { go func() {
// FIXME: better implement this methode, works for now but needs refactoring // FIXME: better implement this methode, works for now but needs refactoring
err := s.taskExecutionFunction(ctx, s.source, creation.UserSourceName, user, creation.DeepScrape, creation.ApiKey, func() { err := s.taskExecutionFunction(ctx, userSource, creation.DeepScrape, creation.ApiKey, func() {
s.removeTask(id) s.removeTask(id)
}) })
if err != nil { if err != nil {
@ -109,7 +110,12 @@ func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongR
func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
messageResponse := gRPC.SendMessageResponse{Success: true} messageResponse := gRPC.SendMessageResponse{Success: true}
err := s.sendMessageExecution(ctx, s.source, request.UserSourceId, request.UserSourceName, request.Message) userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId))
if err != nil {
return nil, err
}
err = s.sendMessageExecution(ctx, userSource, request.Message)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -120,7 +126,12 @@ func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageReque
func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
messageResponse := gRPC.GetMessagesResponse{} messageResponse := gRPC.GetMessagesResponse{}
messages, err := s.getMessageExecution(ctx, s.source, request.UserSourceId, request.UserSourceName) userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId))
if err != nil {
return nil, err
}
messages, err := s.getMessageExecution(ctx, userSource)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -22,9 +22,9 @@ type Message struct {
CreatedAt *timestamp.Timestamp CreatedAt *timestamp.Timestamp
} }
type TaskExecution func(ctx context.Context, source models.Source, userSourceUsername string, anthroveUser models.User, deepScrape bool, apiKey string, cancelFunction func()) error type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error
type SendMessageExecution func(ctx context.Context, source models.Source, userSourceID string, userSourceUsername string, message string) error type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
type GetMessageExecution func(ctx context.Context, source models.Source, userSourceID string, userSourceUsername string) ([]Message, error) type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
type Plug struct { type Plug struct {
address string address string
@ -49,7 +49,6 @@ func (p *Plug) Listen() error {
var err error var err error
var source models.Source var source models.Source
log.Print("Check if source exists")
source, err = database.GetSourceByDomain(p.ctx, p.source.Domain) source, err = database.GetSourceByDomain(p.ctx, p.source.Domain)
if err != nil { if err != nil {
if err.Error() == otterError.NoDataFound { if err.Error() == otterError.NoDataFound {
@ -66,6 +65,8 @@ func (p *Plug) Listen() error {
log.Print("Source exists") log.Print("Source exists")
// Start the Server
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port)) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port))
if err != nil { if err != nil {
return err return err