plug-sdk/pkg/plug/server.go

92 lines
2.3 KiB
Go
Raw Permalink Normal View History

package plug
import (
"context"
"errors"
"fmt"
"log"
"net"
"git.anthrove.art/anthrove/otter-space-sdk/v2/pkg/database"
otterError "git.anthrove.art/anthrove/otter-space-sdk/v2/pkg/error"
"git.anthrove.art/anthrove/otter-space-sdk/v2/pkg/models"
"github.com/golang/protobuf/ptypes/timestamp"
pb "git.anthrove.art/anthrove/plug-sdk/v2/pkg/grpc"
"google.golang.org/grpc"
)
type Message struct {
Title string
Body string
CreatedAt *timestamp.Timestamp
}
type TaskExecution func(ctx context.Context, database database.OtterSpace, userSourceUsername string, anthroveUser models.User, deepScrape bool, apiKey string, cancelFunction func()) error
type SendMessageExecution func(ctx context.Context, userSourceID string, userSourceUsername string, message string) error
type GetMessageExecution func(ctx context.Context, userSourceID string, userSourceUsername string) ([]Message, error)
type Plug struct {
address string
port string
ctx context.Context
database database.OtterSpace
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
}
func NewPlug(ctx context.Context, address string, port string, source models.Source) Plug {
return Plug{
ctx: ctx,
address: address,
port: port,
source: source,
}
}
func (p *Plug) Listen() error {
var err error
log.Printf("initilazing source!")
err = p.database.CreateSource(p.ctx, &p.source)
if err != nil {
if !errors.Is(err, &otterError.NoDataWritten{}) {
log.Panic(err)
}
}
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.database, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
err = grpcServer.Serve(lis)
if err != nil {
return err
}
return nil
}
func (p *Plug) WithOtterSpace(graph database.OtterSpace) {
p.database = graph
}
func (p *Plug) TaskExecutionFunction(function TaskExecution) {
p.taskExecutionFunction = function
}
func (p *Plug) SendMessageExecution(function SendMessageExecution) {
p.sendMessageExecution = function
}
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
p.getMessageExecution = function
}