pkg/filestore/filestoresql/root_query.go
package filestoresql
import (
"context"
"errors"
"fmt"
"path/filepath"
"slices"
"strings"
"github.com/direktiv/direktiv/pkg/datastore"
"github.com/direktiv/direktiv/pkg/filestore"
"github.com/google/uuid"
"gorm.io/gorm"
)
func addTrailingSlash(path string) string {
if !strings.HasSuffix(path, "/") {
return path + "/"
}
return path
}
type RootQuery struct {
rootID uuid.UUID
checksumFunc filestore.CalculateChecksumFunc
db *gorm.DB
root *filestore.Root
namespace string
}
func (q *RootQuery) ListAllFiles(ctx context.Context) ([]*filestore.File, error) {
var list []*filestore.File
// check if root exists.
if err := q.checkRootExists(ctx); err != nil {
return nil, err
}
res := q.db.WithContext(ctx).Raw(`
SELECT id, root_id, path, depth, typ, created_at, updated_at, mime_type, length(data) AS size
FROM filesystem_files
WHERE root_id=?
ORDER BY path ASC
`, q.rootID).Find(&list)
if res.Error != nil {
return nil, res.Error
}
return list, nil
}
func (q *RootQuery) ListDirektivFilesWithData(ctx context.Context) ([]*filestore.File, error) {
var list []*filestore.File
// check if root exists.
if err := q.checkRootExists(ctx); err != nil {
return nil, err
}
res := q.db.WithContext(ctx).Raw(`
SELECT *, length(data) AS size
FROM filesystem_files
WHERE root_id=? AND typ <> 'directory' AND typ <> 'file'
ORDER BY path ASC
`, q.rootID).Find(&list)
if res.Error != nil {
return nil, res.Error
}
return list, nil
}
var _ filestore.RootQuery = &RootQuery{} // Ensures RootQuery struct conforms to filestore.RootQuery interface.
func (q *RootQuery) Delete(ctx context.Context) error {
// check if root exists.
if err := q.checkRootExists(ctx); err != nil {
return err
}
res := q.db.WithContext(ctx).Exec(`DELETE FROM filesystem_roots WHERE id = ?`, q.rootID)
if res.Error != nil {
return res.Error
}
if res.RowsAffected != 1 {
return fmt.Errorf("unexpected gorm delete count, got: %d, want: %d", res.RowsAffected, 1)
}
return nil
}
func (q *RootQuery) CreateFile(ctx context.Context, path string, typ filestore.FileType, mimeType string, data []byte) (*filestore.File, error) {
path, err := filestore.SanitizePath(path)
if err != nil {
return nil, fmt.Errorf("%w: %w", filestore.ErrInvalidPathParameter, err)
}
if path == "/" {
return nil, filestore.ErrInvalidPathParameter
}
// check if root exists.
if err = q.checkRootExists(ctx); err != nil {
return nil, err
}
// check if file type is allowed.
if !slices.Contains(filestore.AllFileTypes, typ) {
return nil, fmt.Errorf("%w: %w",
filestore.ErrInvalidPathParameter,
fmt.Errorf("file type: %s is not allowed", typ))
}
count := 0
tx := q.db.WithContext(ctx).Raw("SELECT count(id) FROM filesystem_files WHERE root_id = ? AND path = ?", q.rootID, path).Scan(&count)
if tx.Error != nil {
return nil, tx.Error
}
if count > 0 {
return nil, filestore.ErrPathAlreadyExists
}
parentDir := filepath.Dir(path)
if parentDir != "/" {
count = 0
tx = q.db.WithContext(ctx).Raw("SELECT count(id) FROM filesystem_files WHERE root_id = ? AND typ = ? AND path = ?", q.rootID, filestore.FileTypeDirectory, parentDir).Scan(&count)
if tx.Error != nil {
return nil, tx.Error
}
if count == 0 {
return nil, filestore.ErrNoParentDirectory
}
}
// first, we need to create a file entry for this new file.
f := &filestore.File{
ID: uuid.New(),
Path: path,
Depth: filestore.GetPathDepth(path),
Size: len(data),
Typ: typ,
RootID: q.rootID,
}
if typ != filestore.FileTypeDirectory {
f.Data = data
f.Checksum = string(q.checksumFunc(data))
f.MIMEType = mimeType
}
res := q.db.WithContext(ctx).Exec(`
INSERT INTO
filesystem_files(id, root_id, path, depth, typ, data, checksum, mime_type)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);
`, f.ID, f.RootID, f.Path, f.Depth, f.Typ, f.Data, f.Checksum, f.MIMEType)
if res.Error != nil && strings.Contains(res.Error.Error(), "duplicate key") {
return nil, datastore.ErrDuplicatedNamespaceName
}
if res.Error != nil {
return nil, res.Error
}
if res.RowsAffected != 1 {
return nil, fmt.Errorf("unexpected gorm create count, got: %d, want: %d", res.RowsAffected, 1)
}
// set updated_at for all parent dirs.
res = q.db.WithContext(ctx).Exec(`
UPDATE filesystem_files
SET updated_at=CURRENT_TIMESTAMP WHERE ? LIKE path || '%' ;
`, path)
if res.Error != nil {
return nil, res.Error
}
return q.GetFile(ctx, f.Path)
}
func (q *RootQuery) GetFile(ctx context.Context, path string) (*filestore.File, error) {
path, err := filestore.SanitizePath(path)
if err != nil {
return nil, fmt.Errorf("%w: %w", filestore.ErrInvalidPathParameter, err)
}
// check if root exists.
if err = q.checkRootExists(ctx); err != nil {
return nil, err
}
if path == "/" {
return &filestore.File{
Path: "/",
Typ: filestore.FileTypeDirectory,
CreatedAt: q.root.CreatedAt,
UpdatedAt: q.root.UpdatedAt,
}, nil
}
f := &filestore.File{}
path = filepath.Clean(path)
res := q.db.WithContext(ctx).Raw(`
SELECT id, root_id, path, depth, typ, created_at, updated_at, mime_type, length(data) AS size
FROM filesystem_files
WHERE root_id=? AND path=?`, q.rootID, path).
First(f)
if res.Error != nil {
if errors.Is(res.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("file '%s': %w", path, filestore.ErrNotFound)
}
return nil, res.Error
}
return f, nil
}
func (q *RootQuery) ReadDirectory(ctx context.Context, path string) ([]*filestore.File, error) {
var list []*filestore.File
path, err := filestore.SanitizePath(path)
if err != nil {
return nil, fmt.Errorf("%w: %w", filestore.ErrInvalidPathParameter, err)
}
// check if root exists.
if err = q.checkRootExists(ctx); err != nil {
return nil, err
}
// check if path is a directory and exists.
if path != "/" {
count := 0
tx := q.db.WithContext(ctx).Raw("SELECT count(id) FROM filesystem_files WHERE root_id = ? AND typ = ? AND path = ?", q.rootID, filestore.FileTypeDirectory, path).Scan(&count)
if tx.Error != nil {
return nil, err
}
if count != 1 {
return nil, filestore.ErrNotFound
}
}
res := q.db.WithContext(ctx).Raw(`
SELECT id, path, depth, typ, root_id, created_at, updated_at, mime_type, length(data) AS size
FROM filesystem_files
WHERE root_id=? AND depth=? AND path LIKE ?
ORDER BY path ASC`,
q.rootID, filestore.GetPathDepth(path)+1, addTrailingSlash(path)+"%").
Find(&list)
if res.Error != nil {
return nil, res.Error
}
return list, nil
}
func (q *RootQuery) checkRootExists(ctx context.Context) error {
zeroUUID := (uuid.UUID{}).String()
if zeroUUID == q.rootID.String() {
n := &filestore.Root{}
res := q.db.WithContext(ctx).Table("filesystem_roots").Where("namespace", q.namespace).First(n)
if errors.Is(res.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("root not found, ns: '%s', err: %w", q.namespace, filestore.ErrNotFound)
}
if res.Error != nil {
return res.Error
}
q.root = n
q.rootID = n.ID
return nil
}
n := &filestore.Root{}
res := q.db.WithContext(ctx).Table("filesystem_roots").Where("id", q.rootID).First(n)
if errors.Is(res.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("root not found, id: '%s', err: %w", q.rootID, filestore.ErrNotFound)
}
if res.Error != nil {
return res.Error
}
q.root = n
return nil
}
func (q *RootQuery) SetNamespace(ctx context.Context, namespace string) error {
res := q.db.WithContext(ctx).Exec(`UPDATE filesystem_roots
SET namespace = ?
WHERE id = ?`,
namespace,
q.rootID,
)
if res.Error != nil {
return res.Error
}
if res.RowsAffected == 0 {
return filestore.ErrNotFound
}
return nil
}