Add ability to dump headers or mimetypes for original messages.
Add ability to dump all messages, instead of just hashes listed.
Add new command cmd/headers to import all headers from messages in table
original. Utility can work incrementally or do fully table imports.
Moved 'fetch all original messages' code to db/util.go. Added conditional
version given table name that will only return original messages that do not
have matching hashes in the specified table.
153 lines
2.7 KiB
Go
153 lines
2.7 KiB
Go
package db
|
|
|
|
import (
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"net/mail"
|
|
"unicode/utf8"
|
|
|
|
"github.com/golang/glog"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
user = flag.String("dbuser", "gomail", "PostgreSQL user name")
|
|
name = flag.String("dbname", "gomail", "PostgreSQL DB name")
|
|
sslmode = flag.String("dbsslmode", "disable", "PostgreSQL sslmode setting")
|
|
)
|
|
|
|
type Conn struct {
|
|
*sql.DB
|
|
}
|
|
|
|
// NewDB creates connection to PostgreSQL DB. If dsn is empty, the flags
|
|
// -dbuser, -dbname, -dbsslmode are used.
|
|
func NewConn(dsn string) (*Conn, error) {
|
|
if dsn == "" {
|
|
dsn = fmt.Sprintf("user=%s dbname=%s sslmode=%s", *user, *name,
|
|
*sslmode)
|
|
}
|
|
db, err := sql.Open("postgres", dsn)
|
|
return &Conn{DB: db}, err
|
|
}
|
|
|
|
type Original struct {
|
|
UID int64
|
|
Hash string
|
|
HeaderSize int64
|
|
TotalSize int64
|
|
Blob []byte
|
|
}
|
|
|
|
func (c *Conn) Originals(oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
|
query := `
|
|
SELECT
|
|
uid,
|
|
hash,
|
|
header_size,
|
|
total_size,
|
|
blob
|
|
FROM
|
|
original
|
|
`
|
|
c.originals(query, oCh, errc, donec)
|
|
}
|
|
|
|
func (c *Conn) OriginalsNotInTable(table string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
original.uid,
|
|
original.hash,
|
|
original.header_size,
|
|
original.total_size,
|
|
original.blob
|
|
FROM
|
|
original
|
|
LEFT
|
|
JOIN %s ON original.hash = %s.hash
|
|
WHERE
|
|
%s.hash IS NULL;
|
|
`, table, table, table)
|
|
c.originals(query, oCh, errc, donec)
|
|
}
|
|
|
|
func (c *Conn) originals(query string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
|
|
|
defer close(oCh)
|
|
defer close(errc)
|
|
rows, err := c.Query(query)
|
|
if err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
for rows.Next() {
|
|
var o Original
|
|
if err := rows.Scan(&o.UID, &o.Hash, &o.HeaderSize, &o.TotalSize, &o.Blob); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
oCh <- o
|
|
select {
|
|
case <-donec:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
errc <- err
|
|
}
|
|
}
|
|
|
|
func (c *Conn) OriginalBlobByHash(hash string, blob *[]byte) error {
|
|
row := c.QueryRow(`
|
|
SELECT
|
|
blob
|
|
FROM
|
|
original
|
|
WHERE
|
|
hash = $1
|
|
`, hash)
|
|
|
|
return row.Scan(blob)
|
|
}
|
|
|
|
func (c *Conn) InsertHeaders(hash string, hdrs mail.Header) (err error) {
|
|
var tx *sql.Tx
|
|
tx, err = c.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if err := tx.Rollback(); err != nil {
|
|
glog.Error(err)
|
|
}
|
|
return
|
|
}
|
|
err = tx.Commit()
|
|
}()
|
|
|
|
stmt, err := tx.Prepare(`
|
|
INSERT INTO
|
|
header (hash, name, value)
|
|
VALUES
|
|
($1, $2, $3)
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for k, vs := range hdrs {
|
|
for _, v := range vs {
|
|
if !utf8.ValidString(v) {
|
|
glog.Infof("%s: value for %q invalid UTF-8 %q", hash, k, v)
|
|
continue
|
|
}
|
|
if _, err := stmt.Exec(hash, k, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|