inklabs/rangedb

View on GitHub
provider/postgresstore/postgres_store.go

Summary

Maintainability
D
2 days
Test Coverage
A
91%
File `postgres_store.go` has 671 lines of code (exceeds 500 allowed). Consider refactoring.
package postgresstore
 
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"log"
"strings"
"time"
 
"github.com/lib/pq"
 
"github.com/inklabs/rangedb"
"github.com/inklabs/rangedb/pkg/broadcast"
"github.com/inklabs/rangedb/pkg/clock"
"github.com/inklabs/rangedb/pkg/clock/provider/systemclock"
"github.com/inklabs/rangedb/pkg/rangedberror"
"github.com/inklabs/rangedb/pkg/recordsubscriber"
"github.com/inklabs/rangedb/pkg/shortuuid"
"github.com/inklabs/rangedb/provider/jsonrecordserializer"
)
 
const (
broadcastRecordBuffSize = 100
streamAdvisoryLockTimeout = 2 * time.Second
)
 
type batchSQLRecord struct {
StreamName string
AggregateType string
AggregateID string
StreamSequenceNumber uint64
EventID string
EventType string
Data string
Metadata string
InsertTimestamp uint64
}
 
// JsonSerializer defines the interface to bind events and identify event types.
type JsonSerializer interface {
rangedb.EventBinder
rangedb.EventTypeIdentifier
}
 
`postgresStore` has 24 methods (exceeds 20 allowed). Consider refactoring.
type postgresStore struct {
config *Config
db *sql.DB
clock clock.Clock
uuidGenerator shortuuid.Generator
serializer JsonSerializer
broadcaster broadcast.Broadcaster
pgNotifyIsEnabled bool
}
 
// Option defines functional option parameters for postgresStore.
type Option func(*postgresStore)
 
// WithClock is a functional option to inject a clock.Clock.
func WithClock(clock clock.Clock) Option {
return func(store *postgresStore) {
store.clock = clock
}
}
 
// WithUUIDGenerator is a functional option to inject a shortuuid.Generator.
func WithUUIDGenerator(uuidGenerator shortuuid.Generator) Option {
return func(store *postgresStore) {
store.uuidGenerator = uuidGenerator
}
}
 
// WithPgNotify enables pg_notify() for notifying subscribers.
func WithPgNotify() Option {
return func(store *postgresStore) {
store.pgNotifyIsEnabled = true
}
}
 
// New constructs an postgresStore.
func New(config *Config, options ...Option) (*postgresStore, error) {
s := &postgresStore{
config: config,
clock: systemclock.New(),
serializer: jsonrecordserializer.New(),
broadcaster: broadcast.New(broadcastRecordBuffSize, broadcast.DefaultTimeout),
}
 
for _, option := range options {
option(s)
}
 
err := s.connectToDB()
if err != nil {
return nil, err
}
 
if s.pgNotifyIsEnabled {
err = s.startPQListener()
if err != nil {
return nil, err
}
}
 
return s, nil
}
 
// CloseDB closes the postgres DB connection.
func (s *postgresStore) CloseDB() error {
return s.db.Close()
}
 
func (s *postgresStore) Bind(events ...rangedb.Event) {
s.serializer.Bind(events...)
}
 
func (s *postgresStore) Events(ctx context.Context, globalSequenceNumber uint64) rangedb.RecordIterator {
resultRecords := make(chan rangedb.ResultRecord)
 
go func() {
defer close(resultRecords)
 
query := `SELECT
StreamName,
AggregateType,
AggregateID,
GlobalSequenceNumber,
StreamSequenceNumber,
InsertTimestamp,
EventID,
EventType,
Data,
Metadata
FROM record
WHERE GlobalSequenceNumber >= $1
ORDER BY GlobalSequenceNumber`
rows, err := s.db.QueryContext(ctx, query,
globalSequenceNumber)
if err != nil {
resultRecords <- rangedb.ResultRecord{Err: err}
return
}
defer ignoreClose(rows)
_, _ = s.readResultRecords(ctx, rows, resultRecords)
}()
 
return rangedb.NewRecordIterator(resultRecords)
}
 
func ignoreClose(closer io.Closer) {
_ = closer.Close()
}
 
func (s *postgresStore) EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) rangedb.RecordIterator {
resultRecords := make(chan rangedb.ResultRecord)
 
go func() {
defer close(resultRecords)
query := `SELECT
StreamName,
AggregateType,
AggregateID,
GlobalSequenceNumber,
StreamSequenceNumber,
InsertTimestamp,
EventID,
EventType,
Data,
Metadata
FROM record
WHERE AggregateType = ANY($1) AND GlobalSequenceNumber >= $2
ORDER BY GlobalSequenceNumber`
rows, err := s.db.QueryContext(ctx, query,
pq.Array(aggregateTypes), globalSequenceNumber)
if err != nil {
resultRecords <- rangedb.ResultRecord{Err: err}
return
}
defer ignoreClose(rows)
_, _ = s.readResultRecords(ctx, rows, resultRecords)
}()
 
return rangedb.NewRecordIterator(resultRecords)
}
 
func (s *postgresStore) EventsByStream(ctx context.Context, streamSequenceNumber uint64, streamName string) rangedb.RecordIterator {
resultRecords := make(chan rangedb.ResultRecord)
 
go func() {
defer close(resultRecords)
query := `SELECT
StreamName,
AggregateType,
AggregateID,
GlobalSequenceNumber,
StreamSequenceNumber,
InsertTimestamp,
EventID,
EventType,
Data,
Metadata
FROM record
WHERE StreamName = $1 AND StreamSequenceNumber >= $2
ORDER BY GlobalSequenceNumber`
rows, err := s.db.QueryContext(ctx, query,
streamName, streamSequenceNumber)
if err != nil {
resultRecords <- rangedb.ResultRecord{Err: err}
return
}
defer ignoreClose(rows)
 
recordsRead, err := s.readResultRecords(ctx, rows, resultRecords)
if err == nil && recordsRead == 0 {
resultRecords <- rangedb.ResultRecord{Err: rangedb.ErrStreamNotFound}
}
}()
 
return rangedb.NewRecordIterator(resultRecords)
}
 
Method `postgresStore.OptimisticDeleteStream` has 7 return statements (exceeds 4 allowed).
func (s *postgresStore) OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error {
transaction, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
 
streamSequenceNumber, err := s.getStreamSequenceNumber(ctx, transaction, streamName)
if err != nil {
return err
}
 
if streamSequenceNumber == 0 {
return rangedb.ErrStreamNotFound
}
 
if streamSequenceNumber != expectedStreamSequenceNumber {
return &rangedberror.UnexpectedSequenceNumber{
Expected: expectedStreamSequenceNumber,
ActualSequenceNumber: streamSequenceNumber,
}
}
 
_, err = transaction.ExecContext(ctx, `DELETE FROM record WHERE StreamName = $1`, streamName)
if err != nil {
_ = transaction.Rollback()
return err
}
 
err = transaction.Commit()
if err != nil {
return err
}
 
return nil
}
 
func (s *postgresStore) OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) {
return s.transactionalSaveEvents(ctx, &expectedStreamSequenceNumber, streamName, eventRecords...)
}
 
func (s *postgresStore) Save(ctx context.Context, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) {
return s.transactionalSaveEvents(ctx, nil, streamName, eventRecords...)
}
 
type saveResults struct {
LastStreamSequenceNumber uint64
GlobalSequenceNumbers []uint64
BatchRecords []*batchSQLRecord
}
 
Method `postgresStore.transactionalSaveEvents` has 5 return statements (exceeds 4 allowed).
func (s *postgresStore) transactionalSaveEvents(ctx context.Context, expectedStreamSequenceNumber *uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) {
transaction, err := s.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
 
saveResult, err := s.saveEvents(ctx, transaction, expectedStreamSequenceNumber, streamName, eventRecords)
if err != nil {
_ = transaction.Rollback()
return 0, err
}
 
err = transaction.Commit()
if err != nil {
return 0, err
}
 
if !s.pgNotifyIsEnabled {
err = s.batchNotifySubscribers(saveResult)
if err != nil {
return 0, err
}
}
 
return saveResult.LastStreamSequenceNumber, nil
}
 
Method `postgresStore.saveEvents` has 6 return statements (exceeds 4 allowed).
func (s *postgresStore) saveEvents(
Method `postgresStore.saveEvents` has 5 arguments (exceeds 4 allowed). Consider refactoring.
ctx context.Context,
transaction *sql.Tx,
expectedStreamSequenceNumber *uint64,
streamName string,
eventRecords []*rangedb.EventRecord,
) (*saveResults, error) {
if len(eventRecords) < 1 {
return nil, fmt.Errorf("missing events")
}
 
err := s.lockStream(ctx, transaction, streamName)
if err != nil {
return nil, err
}
 
streamSequenceNumber, err := s.validateStreamSequenceNumber(ctx, transaction, expectedStreamSequenceNumber, streamName)
if err != nil {
return nil, err
}
 
batchRecords, err := s.eventRecordsToBatchRecords(eventRecords, streamName, streamSequenceNumber)
if err != nil {
return nil, err
}
 
globalSequenceNumbers, lastStreamSequenceNumber, err := s.batchInsert(ctx, transaction, batchRecords)
if err != nil {
return nil, err
}
 
return &saveResults{
LastStreamSequenceNumber: lastStreamSequenceNumber,
GlobalSequenceNumbers: globalSequenceNumbers,
BatchRecords: batchRecords,
}, nil
}
 
Similar blocks of code found in 5 locations. Consider refactoring.
func (s *postgresStore) AllEventsSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber) rangedb.RecordSubscription {
return recordsubscriber.New(
recordsubscriber.AllEventsConfig(ctx, s, s.broadcaster, bufferSize,
func(record *rangedb.Record) error {
subscriber.Accept(record)
return nil
},
))
}
 
Similar blocks of code found in 5 locations. Consider refactoring.
func (s *postgresStore) AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber, aggregateTypes ...string) rangedb.RecordSubscription {
return recordsubscriber.New(
recordsubscriber.AggregateTypesConfig(ctx, s, s.broadcaster, bufferSize,
aggregateTypes,
func(record *rangedb.Record) error {
subscriber.Accept(record)
return nil
},
))
}
 
func (s *postgresStore) TotalEventsInStream(ctx context.Context, streamName string) (uint64, error) {
return s.getStreamSequenceNumber(ctx, s.db, streamName)
}
 
func (s *postgresStore) validateStreamSequenceNumber(ctx context.Context, queryable dbRowQueryable, expectedStreamSequenceNumber *uint64, streamName string) (uint64, error) {
streamSequenceNumber, err := s.getStreamSequenceNumber(ctx, queryable, streamName)
if err != nil {
return 0, err
}
 
if expectedStreamSequenceNumber != nil && streamSequenceNumber != *expectedStreamSequenceNumber {
return 0, &rangedberror.UnexpectedSequenceNumber{
Expected: *expectedStreamSequenceNumber,
ActualSequenceNumber: streamSequenceNumber,
}
}
 
return streamSequenceNumber, nil
}
 
func (s *postgresStore) batchNotifySubscribers(saveResult *saveResults) error {
for i, batchRecord := range saveResult.BatchRecords {
record := &rangedb.Record{
StreamName: batchRecord.StreamName,
AggregateType: batchRecord.AggregateType,
AggregateID: batchRecord.AggregateID,
GlobalSequenceNumber: saveResult.GlobalSequenceNumbers[i],
StreamSequenceNumber: batchRecord.StreamSequenceNumber,
EventType: batchRecord.EventType,
EventID: batchRecord.EventID,
InsertTimestamp: batchRecord.InsertTimestamp,
Data: batchRecord.Data,
Metadata: batchRecord.Metadata,
}
 
deSerializedData, err := jsonrecordserializer.DecodeJsonData(
batchRecord.EventType,
strings.NewReader(batchRecord.Data),
s.serializer,
)
if err != nil {
return fmt.Errorf("unable to decode data: %v", err)
}
 
var metadata interface{}
if batchRecord.Metadata != "null" {
err = json.Unmarshal([]byte(batchRecord.Metadata), metadata)
if err != nil {
return fmt.Errorf("unable to unmarshal metadata: %v", err)
}
}
 
record.Data = deSerializedData
record.Metadata = metadata
s.broadcaster.Accept(record)
}
 
return nil
}
 
Method `postgresStore.eventRecordsToBatchRecords` has 5 return statements (exceeds 4 allowed).
func (s *postgresStore) eventRecordsToBatchRecords(eventRecords []*rangedb.EventRecord, streamName string, streamSequenceNumber uint64) ([]*batchSQLRecord, error) {
aggregateType := eventRecords[0].Event.AggregateType()
aggregateID := eventRecords[0].Event.AggregateID()
 
var batchEvents []*batchSQLRecord
 
for _, eventRecord := range eventRecords {
// TODO: Allow mixing aggregate types?
if aggregateType != eventRecord.Event.AggregateType() {
return nil, fmt.Errorf("unmatched aggregate type")
}
 
// TODO: Allow mixing aggregate IDs?
if aggregateID != eventRecord.Event.AggregateID() {
return nil, fmt.Errorf("unmatched aggregate ID")
}
 
jsonData, err := json.Marshal(eventRecord.Event)
if err != nil {
return nil, err
}
 
jsonMetadata, err := json.Marshal(eventRecord.Metadata)
if err != nil {
return nil, err
}
 
streamSequenceNumber++
batchEvents = append(batchEvents, &batchSQLRecord{
StreamName: streamName,
AggregateType: eventRecord.Event.AggregateType(),
AggregateID: eventRecord.Event.AggregateID(),
StreamSequenceNumber: streamSequenceNumber,
EventID: s.uuidGenerator.New(),
EventType: eventRecord.Event.EventType(),
Data: string(jsonData),
Metadata: string(jsonMetadata),
InsertTimestamp: uint64(s.clock.Now().Unix()),
})
}
 
return batchEvents, nil
}
 
func (s *postgresStore) batchInsert(ctx context.Context, transaction dbQueryable, batchRecords []*batchSQLRecord) ([]uint64, uint64, error) {
valueStrings := make([]string, 0, len(batchRecords))
valueArgs := make([]interface{}, 0, len(batchRecords)*8)
var lastStreamSequenceNumber uint64
 
i := 0
for _, batchRecord := range batchRecords {
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", i*9+1, i*9+2, i*9+3, i*9+4, i*9+5, i*9+6, i*9+7, i*9+8, i*9+9))
valueArgs = append(valueArgs,
batchRecord.StreamName,
batchRecord.AggregateType,
batchRecord.AggregateID,
batchRecord.StreamSequenceNumber,
batchRecord.InsertTimestamp,
batchRecord.EventID,
batchRecord.EventType,
batchRecord.Data,
batchRecord.Metadata,
)
i++
 
lastStreamSequenceNumber = batchRecord.StreamSequenceNumber
}
 
sqlStatement := fmt.Sprintf(
"INSERT INTO record (StreamName,AggregateType,AggregateID,StreamSequenceNumber,InsertTimestamp,EventID,EventType,Data,Metadata) VALUES %s",
strings.Join(valueStrings, ","))
 
if !s.pgNotifyIsEnabled {
sqlStatement += " RETURNING GlobalSequenceNumber"
}
 
rows, err := transaction.QueryContext(ctx, sqlStatement, valueArgs...)
if err != nil {
return nil, 0, fmt.Errorf("unable to insert: %v", err)
}
defer ignoreClose(rows)
 
var globalSequenceNumbers []uint64
if !s.pgNotifyIsEnabled {
for rows.Next() {
var globalSequenceNumber uint64
err := rows.Scan(&globalSequenceNumber)
if err != nil {
return nil, 0, fmt.Errorf("unable to get global sequence number: %v", err)
}
 
globalSequenceNumbers = append(globalSequenceNumbers, globalSequenceNumber)
}
}
 
return globalSequenceNumbers, lastStreamSequenceNumber, nil
}
 
func (s *postgresStore) lockStream(ctx context.Context, transaction dbExecAble, streamName string) error {
lockCtx, cancel := context.WithTimeout(ctx, streamAdvisoryLockTimeout)
defer cancel()
 
h := fnv.New32a()
_, _ = h.Write([]byte(streamName))
lockKey := h.Sum32()
 
_, err := transaction.ExecContext(lockCtx, `SELECT pg_advisory_xact_lock($1)`, lockKey)
if err != nil {
return fmt.Errorf("unable to obtain lock: %v", err)
}
return nil
}
 
func (s *postgresStore) InitDB() error {
sqlStatements := []string{
`CREATE TABLE IF NOT EXISTS record (
StreamName TEXT,
AggregateType TEXT,
AggregateID TEXT,
GlobalSequenceNumber SERIAL PRIMARY KEY,
StreamSequenceNumber BIGINT,
InsertTimestamp BIGINT,
EventID TEXT,
EventType TEXT,
Data TEXT,
Metadata TEXT
);`,
`CREATE INDEX IF NOT EXISTS record_idx_stream_name ON record USING HASH (
StreamName
);`,
`CREATE INDEX IF NOT EXISTS record_idx_aggregate_type ON record USING HASH (
AggregateType
);`,
}
 
if s.pgNotifyIsEnabled {
sqlStatements = append(sqlStatements,
`CREATE OR REPLACE FUNCTION rangedb_notify_record() RETURNS TRIGGER AS
$$
BEGIN
PERFORM pg_notify('records', row_to_json(NEW)::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;`,
`DROP TRIGGER IF EXISTS rangedb_trigger_notify_record ON record;`,
`CREATE TRIGGER rangedb_trigger_notify_record AFTER INSERT ON record FOR EACH ROW EXECUTE PROCEDURE rangedb_notify_record();`,
)
}
 
for _, statement := range sqlStatements {
_, err := s.db.Exec(statement)
if err != nil {
return err
}
}
 
return nil
}
 
func (s *postgresStore) getStreamSequenceNumber(ctx context.Context, queryable dbRowQueryable, streamName string) (uint64, error) {
var lastStreamSequenceNumber uint64
 
query := `SELECT MAX(StreamSequenceNumber)
FROM record
WHERE StreamName = $1
GROUP BY StreamName`
err := queryable.QueryRowContext(ctx, query,
streamName,
).Scan(&lastStreamSequenceNumber)
 
if err != nil {
if err == sql.ErrNoRows {
return 0, nil
}
 
return 0, err
}
 
return lastStreamSequenceNumber, nil
}
 
Method `postgresStore.readResultRecords` has 72 lines of code (exceeds 50 allowed). Consider refactoring.
Method `postgresStore.readResultRecords` has 6 return statements (exceeds 4 allowed).
func (s *postgresStore) readResultRecords(ctx context.Context, rows *sql.Rows, resultRecords chan rangedb.ResultRecord) (int, error) {
recordsRead := 0
for rows.Next() {
var (
streamName string
aggregateType string
aggregateID string
globalSequenceNumber uint64
streamSequenceNumber uint64
insertTimestamp uint64
eventID string
eventType string
serializedData string
serializedMetadata string
)
err := rows.Scan(
&streamName,
&aggregateType,
&aggregateID,
&globalSequenceNumber,
&streamSequenceNumber,
&insertTimestamp,
&eventID,
&eventType,
&serializedData,
&serializedMetadata,
)
if err != nil {
resultRecords <- rangedb.ResultRecord{Err: err}
return 0, err
}
 
data, err := jsonrecordserializer.DecodeJsonData(
eventType,
strings.NewReader(serializedData),
s.serializer,
)
if err != nil {
decodeErr := fmt.Errorf("unable to decode data: %v", err)
resultRecords <- rangedb.ResultRecord{Err: decodeErr}
return 0, decodeErr
}
 
var metadata interface{}
if serializedMetadata != "null" {
err = json.Unmarshal([]byte(serializedMetadata), metadata)
if err != nil {
unmarshalErr := fmt.Errorf("unable to unmarshal metadata: %v", err)
resultRecords <- rangedb.ResultRecord{Err: unmarshalErr}
return 0, unmarshalErr
}
}
 
record := &rangedb.Record{
StreamName: streamName,
AggregateType: aggregateType,
AggregateID: aggregateID,
GlobalSequenceNumber: globalSequenceNumber,
StreamSequenceNumber: streamSequenceNumber,
InsertTimestamp: insertTimestamp,
EventID: eventID,
EventType: eventType,
Data: data,
Metadata: metadata,
}
 
recordsRead++
 
if !rangedb.PublishRecordOrCancel(ctx, resultRecords, record, time.Second) {
return 0, fmt.Errorf("publish record was canceled")
}
}
 
err := rows.Err()
if err != nil {
resultRecords <- rangedb.ResultRecord{Err: err}
return 0, err
}
 
return recordsRead, nil
}
 
Similar blocks of code found in 2 locations. Consider refactoring.
func (s *postgresStore) connectToDB() error {
db, err := sql.Open("postgres", s.config.DataSourceName())
if err != nil {
return fmt.Errorf("unable to open DB connection: %v", err)
}
 
err = db.Ping()
if err != nil {
return fmt.Errorf("unable to connect to DB: %v", err)
}
 
s.db = db
 
return nil
}
 
func (s *postgresStore) startPQListener() error {
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
 
listener := pq.NewListener(s.config.DataSourceName(), 10*time.Second, time.Minute, reportProblem)
err := listener.Listen("records")
if err != nil {
return err
}
 
go s.listen(listener)
 
return nil
}
 
// PostgresJsonRecord holds a JSON record sent via pg_notify.
type PostgresJsonRecord struct {
StreamName string `json:"streamname"`
AggregateType string `json:"aggregatetype"`
AggregateID string `json:"aggregateid"`
GlobalSequenceNumber uint64 `json:"globalsequencenumber"`
StreamSequenceNumber uint64 `json:"streamsequencenumber"`
InsertTimestamp uint64 `json:"inserttimestamp"`
EventID string `json:"eventid"`
EventType string `json:"eventtype"`
Data string `json:"data"`
Metadata string `json:"metadata"`
}
 
func (s *postgresStore) listen(listener *pq.Listener) {
for {
select {
case n := <-listener.Notify:
var jsonRecord PostgresJsonRecord
err := json.Unmarshal([]byte(n.Extra), &jsonRecord)
if err != nil {
log.Printf("invalid json request body: %v", err)
return
}
 
data, err := jsonrecordserializer.DecodeJsonData(
jsonRecord.EventType,
strings.NewReader(jsonRecord.Data),
s.serializer,
)
if err != nil {
log.Printf("unable to decode data: %v", err)
return
}
 
var metadata interface{}
if jsonRecord.Metadata != "null" {
err = json.Unmarshal([]byte(jsonRecord.Metadata), metadata)
if err != nil {
log.Printf("unable to unmarshal metadata: %v", err)
return
}
}
 
record := &rangedb.Record{
StreamName: jsonRecord.StreamName,
AggregateType: jsonRecord.AggregateType,
AggregateID: jsonRecord.AggregateID,
GlobalSequenceNumber: jsonRecord.GlobalSequenceNumber,
StreamSequenceNumber: jsonRecord.StreamSequenceNumber,
InsertTimestamp: jsonRecord.InsertTimestamp,
EventID: jsonRecord.EventID,
EventType: jsonRecord.EventType,
Data: data,
Metadata: metadata,
}
 
s.broadcaster.Accept(record)
 
case <-time.After(90 * time.Second):
go func() {
err := listener.Ping()
if err != nil {
log.Print(err)
}
}()
}
}
}
 
type dbQueryable interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
 
type dbRowQueryable interface {
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
 
type dbExecAble interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}