feat(telemetry): initial
added first draft of tracing and logging, may need to be improved
This commit is contained in:
parent
8e44948927
commit
e070fb8dcf
109
pkg/plug/grpc.go
109
pkg/plug/grpc.go
@ -2,14 +2,15 @@ package plug
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
|
||||||
|
|
||||||
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
||||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type server struct {
|
type server struct {
|
||||||
@ -32,41 +33,64 @@ func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, se
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
|
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "TaskStart")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
var plugTaskState gRPC.PlugTaskStatus
|
var plugTaskState gRPC.PlugTaskStatus
|
||||||
|
|
||||||
id, err := gonanoid.New()
|
id, err := gonanoid.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
span.AddEvent("Generated task ID", trace.WithAttributes(attribute.String("task_id", id)))
|
||||||
|
|
||||||
plugTaskState.TaskId = id
|
plugTaskState.TaskId = id
|
||||||
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
|
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
|
||||||
|
|
||||||
userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId))
|
userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId)))
|
||||||
|
|
||||||
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
|
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
|
||||||
// by using this method we assign a new context to each new request we get.
|
// by using this method we assign a new context to each new request we get.
|
||||||
// This can be used for example to close the context with the given id
|
// This can be used for example to close the context with the given id
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
taskCtx, cancel := context.WithCancel(context.Background())
|
||||||
s.ctx[id] = cancel
|
s.ctx[id] = cancel
|
||||||
|
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))
|
||||||
|
|
||||||
|
log.WithContext(ctx).WithFields(log.Fields{
|
||||||
|
"task_id": id,
|
||||||
|
"user_source_id": creation.UserSourceId,
|
||||||
|
}).Info("Starting task")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// FIXME: better implement this methode, works for now but needs refactoring
|
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
|
||||||
err := s.taskExecutionFunction(ctx, userSource, creation.DeepScrape, creation.ApiKey, func() {
|
|
||||||
s.removeTask(id)
|
s.removeTask(id)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
} else {
|
||||||
|
log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully")
|
||||||
|
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("task_id", id))
|
||||||
return &plugTaskState, nil
|
return &plugTaskState, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
|
func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "TaskStatus")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
var plugTaskState gRPC.PlugTaskStatus
|
var plugTaskState gRPC.PlugTaskStatus
|
||||||
|
|
||||||
_, found := s.ctx[task.TaskId]
|
_, found := s.ctx[task.TaskId]
|
||||||
@ -77,17 +101,35 @@ func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugT
|
|||||||
if !found {
|
if !found {
|
||||||
plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN
|
plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN
|
||||||
}
|
}
|
||||||
|
span.AddEvent("Determined task state", trace.WithAttributes(attribute.String("task_id", task.TaskId), attribute.String("task_state", plugTaskState.TaskState.String())))
|
||||||
|
|
||||||
|
log.WithContext(ctx).WithFields(log.Fields{
|
||||||
|
"task_id": task.TaskId,
|
||||||
|
"task_state": plugTaskState.TaskState,
|
||||||
|
}).Info("Task status requested")
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
||||||
return &plugTaskState, nil
|
return &plugTaskState, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
|
func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
|
||||||
|
ctx, span := tracer.Start(ctx, "TaskCancel")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
var plugTaskState gRPC.PlugTaskStatus
|
var plugTaskState gRPC.PlugTaskStatus
|
||||||
|
|
||||||
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
|
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
|
||||||
plugTaskState.TaskId = task.TaskId
|
plugTaskState.TaskId = task.TaskId
|
||||||
|
|
||||||
s.removeTask(task.TaskId)
|
s.removeTask(task.TaskId)
|
||||||
|
span.AddEvent("Removed task", trace.WithAttributes(attribute.String("task_id", task.TaskId)))
|
||||||
|
|
||||||
|
log.WithContext(ctx).WithFields(log.Fields{
|
||||||
|
"task_id": task.TaskId,
|
||||||
|
"task_state": plugTaskState.TaskState,
|
||||||
|
}).Info("Task cancellation requested")
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
||||||
return &plugTaskState, nil
|
return &plugTaskState, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,52 +141,3 @@ func (s *server) removeTask(taskID string) {
|
|||||||
fn()
|
fn()
|
||||||
delete(s.ctx, taskID)
|
delete(s.ctx, taskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) {
|
|
||||||
response := gRPC.PongResponse{
|
|
||||||
Message: request.Message,
|
|
||||||
Timestamp: timestamppb.Now(),
|
|
||||||
}
|
|
||||||
return &response, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
|
|
||||||
messageResponse := gRPC.SendMessageResponse{Success: true}
|
|
||||||
|
|
||||||
userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.sendMessageExecution(ctx, userSource, request.Message)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &messageResponse, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
|
|
||||||
messageResponse := gRPC.GetMessagesResponse{}
|
|
||||||
|
|
||||||
userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
messages, err := s.getMessageExecution(ctx, userSource)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, message := range messages {
|
|
||||||
messageResponse.Messages = append(messageResponse.Messages, &gRPC.Message{
|
|
||||||
FromUserSourceId: request.UserSourceId,
|
|
||||||
CreatedAt: message.CreatedAt,
|
|
||||||
Body: message.Body,
|
|
||||||
Title: message.Title,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return &messageResponse, nil
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user