Process and normalize some headers that will be used in quickly recalling messages.
226 lines
4.3 KiB
Go
226 lines
4.3 KiB
Go
package db
|
|
|
|
import (
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"net/mail"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/golang/glog"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
user = flag.String("dbuser", "gomail", "PostgreSQL user name")
|
|
name = flag.String("dbname", "gomail", "PostgreSQL DB name")
|
|
sslmode = flag.String("dbsslmode", "disable", "PostgreSQL sslmode setting")
|
|
)
|
|
|
|
type Conn struct {
|
|
*sql.DB
|
|
}
|
|
|
|
// NewDB creates connection to PostgreSQL DB. If dsn is empty, the flags
|
|
// -dbuser, -dbname, -dbsslmode are used.
|
|
func NewConn(dsn string) (*Conn, error) {
|
|
if dsn == "" {
|
|
dsn = fmt.Sprintf("user=%s dbname=%s sslmode=%s", *user, *name,
|
|
*sslmode)
|
|
}
|
|
db, err := sql.Open("postgres", dsn)
|
|
return &Conn{DB: db}, err
|
|
}
|
|
|
|
type Original struct {
|
|
UID int64
|
|
Hash string
|
|
HeaderSize int64
|
|
TotalSize int64
|
|
Blob []byte
|
|
}
|
|
|
|
func (c *Conn) Originals(oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
|
query := `
|
|
SELECT
|
|
uid,
|
|
hash,
|
|
header_size,
|
|
total_size,
|
|
blob
|
|
FROM
|
|
original
|
|
`
|
|
c.originals(query, oCh, errc, donec)
|
|
}
|
|
|
|
func (c *Conn) OriginalsNotInTable(table string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
original.uid,
|
|
original.hash,
|
|
original.header_size,
|
|
original.total_size,
|
|
original.blob
|
|
FROM
|
|
original
|
|
LEFT
|
|
JOIN %s ON original.hash = %s.hash
|
|
WHERE
|
|
%s.hash IS NULL;
|
|
`, table, table, table)
|
|
c.originals(query, oCh, errc, donec)
|
|
}
|
|
|
|
func (c *Conn) originals(query string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
|
|
|
defer close(oCh)
|
|
defer close(errc)
|
|
rows, err := c.Query(query)
|
|
if err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
for rows.Next() {
|
|
var o Original
|
|
if err := rows.Scan(&o.UID, &o.Hash, &o.HeaderSize, &o.TotalSize, &o.Blob); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
oCh <- o
|
|
select {
|
|
case <-donec:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
errc <- err
|
|
}
|
|
}
|
|
|
|
func (c *Conn) OriginalBlobByHash(hash string, blob *[]byte) error {
|
|
row := c.QueryRow(`
|
|
SELECT
|
|
blob
|
|
FROM
|
|
original
|
|
WHERE
|
|
hash = $1
|
|
`, hash)
|
|
|
|
return row.Scan(blob)
|
|
}
|
|
|
|
func (c *Conn) InsertHeaders(hash string, hdrs mail.Header) (err error) {
|
|
var tx *sql.Tx
|
|
tx, err = c.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if err := tx.Rollback(); err != nil {
|
|
glog.Error(err)
|
|
}
|
|
return
|
|
}
|
|
err = tx.Commit()
|
|
}()
|
|
|
|
stmt, err := tx.Prepare(`
|
|
INSERT INTO
|
|
header (hash, name, value)
|
|
VALUES
|
|
($1, $2, $3)
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for k, vs := range hdrs {
|
|
for _, v := range vs {
|
|
if !utf8.ValidString(v) {
|
|
glog.Warningf("%s: value for %q invalid UTF-8 %q", hash, k, v)
|
|
continue
|
|
}
|
|
if _, err := stmt.Exec(hash, k, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) InsertSearchHeaders(hash string, hdrs mail.Header) (err error) {
|
|
var tx *sql.Tx
|
|
tx, err = c.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if err := tx.Rollback(); err != nil {
|
|
glog.Error(err)
|
|
}
|
|
return
|
|
}
|
|
err = tx.Commit()
|
|
}()
|
|
|
|
stmt, err := tx.Prepare(`
|
|
INSERT INTO
|
|
search_header (hash, name, value)
|
|
VALUES
|
|
($1, $2, $3)
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Save the following headers to the database untouched.
|
|
otherHdrs := []string{"Subject", "Message-Id", "In-Reply-To", "References"}
|
|
for _, k := range otherHdrs {
|
|
for _, v := range hdrs[k] {
|
|
if !utf8.ValidString(v) {
|
|
glog.Warningf("%s: value for %q invalid UTF-8 %q", hash, k, v)
|
|
continue
|
|
}
|
|
if _, err := stmt.Exec(hash, k, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parse the following headers as addresses, and save each email address
|
|
// found to the database independently.
|
|
addrHdrs := []string{"To", "From", "Cc"}
|
|
for _, k := range addrHdrs {
|
|
for _, value := range hdrs[k] {
|
|
addrs, err := mail.ParseAddressList(value)
|
|
if err != nil {
|
|
glog.Warningf("%s: error parsing address list for %q: %v", hash, k, err)
|
|
continue
|
|
}
|
|
|
|
for _, v := range addrs {
|
|
if _, err := stmt.Exec(hash, k, v.Address); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Normalize date field.
|
|
t, err := hdrs.Date()
|
|
if err != nil {
|
|
glog.Warningf("%s: failed to parse date header: %v", hash, err)
|
|
return nil
|
|
}
|
|
v := t.UTC().Format(time.RFC3339Nano)
|
|
if _, err := stmt.Exec(hash, "Date", v); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|