Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
38cecba455 | |||
b497348d2a | |||
2c8d6bd682 | |||
601ce9eae7 | |||
42954c7cf1 | |||
909e84fe1c | |||
76174e3247 | |||
ea41bef942 | |||
caef31f48f |
5
go.mod
5
go.mod
@ -3,9 +3,10 @@ module git.anthrove.art/Anthrove/plug-sdk/v3
|
|||||||
go 1.22.0
|
go 1.22.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0
|
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0
|
||||||
github.com/matoous/go-nanoid/v2 v2.1.0
|
github.com/matoous/go-nanoid/v2 v2.1.0
|
||||||
github.com/sirupsen/logrus v1.9.3
|
github.com/sirupsen/logrus v1.9.3
|
||||||
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0
|
||||||
go.opentelemetry.io/otel v1.29.0
|
go.opentelemetry.io/otel v1.29.0
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0
|
||||||
go.opentelemetry.io/otel/exporters/prometheus v0.51.0
|
go.opentelemetry.io/otel/exporters/prometheus v0.51.0
|
||||||
@ -14,6 +15,7 @@ require (
|
|||||||
go.opentelemetry.io/otel/trace v1.29.0
|
go.opentelemetry.io/otel/trace v1.29.0
|
||||||
google.golang.org/grpc v1.65.0
|
google.golang.org/grpc v1.65.0
|
||||||
google.golang.org/protobuf v1.34.2
|
google.golang.org/protobuf v1.34.2
|
||||||
|
gorm.io/gorm v1.25.11
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@ -50,5 +52,4 @@ require (
|
|||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
|
||||||
gorm.io/driver/postgres v1.5.9 // indirect
|
gorm.io/driver/postgres v1.5.9 // indirect
|
||||||
gorm.io/gorm v1.25.11 // indirect
|
|
||||||
)
|
)
|
||||||
|
6
go.sum
6
go.sum
@ -1,7 +1,7 @@
|
|||||||
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
|
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
|
||||||
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||||
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0 h1:sx/XEd61RWHvitqUqoLmxA8uxqvWoRtaOTBlXl+q6AM=
|
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0 h1:LYkdMoRhidEzZGwIupB5K6u6+9m20oJSszqsFsjvaeA=
|
||||||
git.anthrove.art/Anthrove/otter-space-sdk/v3 v3.0.0/go.mod h1:iQiORzbTupfz/C2M3rZMK1hegF1cxTYQ+6cJtDw9qlk=
|
git.anthrove.art/Anthrove/otter-space-sdk/v4 v4.0.0/go.mod h1:TckER0W5rc8PwV3wF+LChcsP1fKy4SD/6AQOMt0W7H4=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||||
@ -145,6 +145,8 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi
|
|||||||
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||||
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU=
|
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0 h1:QHEj9AK6bEiEA9S5OdDUE9KAx4xp6pRkYMnybHDmjZU=
|
||||||
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM=
|
go.opentelemetry.io/contrib/bridges/otellogrus v0.3.0/go.mod h1:HRlW/1YWrBrbzB6FvHU7jUuz33F74PEvQVBL+b+wUhM=
|
||||||
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc=
|
||||||
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI=
|
||||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
|
||||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
|
||||||
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
|
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
|
||||||
|
@ -3,8 +3,8 @@ package otter
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {
|
func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error {
|
||||||
|
118
pkg/plug/grpc.go
118
pkg/plug/grpc.go
@ -2,9 +2,10 @@ package plug
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
||||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -57,6 +58,18 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
|||||||
}
|
}
|
||||||
span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId)))
|
span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId)))
|
||||||
|
|
||||||
|
if !userSource.AccountValidate {
|
||||||
|
err = errors.New("user is not validated")
|
||||||
|
|
||||||
|
log.WithContext(ctx).WithError(err).WithField("task_id", id).Error("Task execution failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
|
||||||
|
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
|
||||||
|
|
||||||
|
return &plugTaskState, err
|
||||||
|
}
|
||||||
|
|
||||||
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
|
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
|
||||||
// by using this method we assign a new context to each new request we get.
|
// by using this method we assign a new context to each new request we get.
|
||||||
// This can be used for example to close the context with the given id
|
// This can be used for example to close the context with the given id
|
||||||
@ -68,7 +81,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
|||||||
log.WithContext(taskCtx).WithFields(log.Fields{
|
log.WithContext(taskCtx).WithFields(log.Fields{
|
||||||
"task_id": id,
|
"task_id": id,
|
||||||
"user_source_id": creation.UserSourceId,
|
"user_source_id": creation.UserSourceId,
|
||||||
}).Info("Starting task")
|
}).Debug("Starting task")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
|
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
|
||||||
@ -79,7 +92,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
|||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully")
|
log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully")
|
||||||
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
|
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -107,7 +120,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
|
|||||||
log.WithContext(ctx).WithFields(log.Fields{
|
log.WithContext(ctx).WithFields(log.Fields{
|
||||||
"task_id": task.TaskId,
|
"task_id": task.TaskId,
|
||||||
"task_state": plugTaskState.TaskState,
|
"task_state": plugTaskState.TaskState,
|
||||||
}).Info("Task status requested")
|
}).Debug("Task status requested")
|
||||||
|
|
||||||
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
||||||
return &plugTaskState, nil
|
return &plugTaskState, nil
|
||||||
@ -128,7 +141,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
|
|||||||
log.WithContext(ctx).WithFields(log.Fields{
|
log.WithContext(ctx).WithFields(log.Fields{
|
||||||
"task_id": task.TaskId,
|
"task_id": task.TaskId,
|
||||||
"task_state": plugTaskState.TaskState,
|
"task_state": plugTaskState.TaskState,
|
||||||
}).Info("Task cancellation requested")
|
}).Debug("Task cancellation requested")
|
||||||
|
|
||||||
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
||||||
return &plugTaskState, nil
|
return &plugTaskState, nil
|
||||||
@ -142,3 +155,96 @@ func (s *server) removeTask(taskID string) {
|
|||||||
fn()
|
fn()
|
||||||
delete(s.ctx, taskID)
|
delete(s.ctx, taskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "GetUserMessages")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
userSourceID := models.UserSourceID(message.UserSourceId)
|
||||||
|
|
||||||
|
userSource, err := database.GetUserSourceByID(ctx, userSourceID)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithError(err).Error("Getting userSource")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
messages, err := s.getMessageExecution(ctx, userSource)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithError(err).Error("Execution function")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response gRPC.GetMessagesResponse
|
||||||
|
for _, message := range messages {
|
||||||
|
response.Messages = append(response.Messages, &gRPC.Message{
|
||||||
|
FromUserSourceId: string(userSource.ID),
|
||||||
|
CreatedAt: message.CreatedAt,
|
||||||
|
Body: message.Body,
|
||||||
|
Title: message.Title,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetAttributes(
|
||||||
|
attribute.String("user_source_id", string(userSource.ID)),
|
||||||
|
attribute.String("user_id", string(userSource.UserID)),
|
||||||
|
attribute.String("source_id", string(userSource.SourceID)),
|
||||||
|
)
|
||||||
|
|
||||||
|
fields := log.Fields{
|
||||||
|
"user_source_id": userSource.ID,
|
||||||
|
"user_id": userSource.UserID,
|
||||||
|
"source_id": userSource.SourceID,
|
||||||
|
"len_messages": len(messages),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).WithFields(fields).Debug("Got User messages")
|
||||||
|
|
||||||
|
return &response, err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "SendMessage")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
response := &gRPC.SendMessageResponse{
|
||||||
|
Success: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceID := models.UserSourceID(message.UserSourceId)
|
||||||
|
userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}}
|
||||||
|
|
||||||
|
err := s.sendMessageExecution(ctx, userSource, message.Message)
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).WithError(err).Error("Sending message execution")
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
|
||||||
|
return response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
response.Success = true
|
||||||
|
return response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "Ping")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var pong gRPC.PongResponse
|
||||||
|
pong.Message = ping.Message
|
||||||
|
pong.Timestamp = ping.Timestamp
|
||||||
|
|
||||||
|
fields := log.Fields{
|
||||||
|
"messsage": ping.Message,
|
||||||
|
"timestamp": ping.Timestamp,
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).WithFields(fields).Trace("Got pinged")
|
||||||
|
|
||||||
|
return &pong, nil
|
||||||
|
}
|
||||||
|
@ -4,8 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
@ -4,9 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
@ -68,7 +69,9 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer(
|
||||||
|
grpc.StatsHandler(otelgrpc.NewServerHandler()),
|
||||||
|
)
|
||||||
|
|
||||||
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
|
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
|
||||||
|
|
||||||
|
@ -3,8 +3,8 @@ package plug
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user