package db import ( "database/sql" "flag" "fmt" "net/mail" "time" "unicode/utf8" "github.com/golang/glog" _ "github.com/lib/pq" ) var ( user = flag.String("dbuser", "gomail", "PostgreSQL user name") name = flag.String("dbname", "gomail", "PostgreSQL DB name") sslmode = flag.String("dbsslmode", "disable", "PostgreSQL sslmode setting") ) type Conn struct { *sql.DB } // NewDB creates connection to PostgreSQL DB. If dsn is empty, the flags // -dbuser, -dbname, -dbsslmode are used. func NewConn(dsn string) (*Conn, error) { if dsn == "" { dsn = fmt.Sprintf("user=%s dbname=%s sslmode=%s", *user, *name, *sslmode) } db, err := sql.Open("postgres", dsn) return &Conn{DB: db}, err } type Original struct { UID int64 Hash string HeaderSize int64 TotalSize int64 Blob []byte } func (c *Conn) Originals(oCh chan<- Original, errc chan<- error, donec <-chan struct{}) { const query = ` SELECT uid, hash, header_size, total_size, blob FROM original ` c.originals(query, oCh, errc, donec) } func (c *Conn) OriginalsNotInTable(table string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) { query := fmt.Sprintf(` SELECT original.uid, original.hash, original.header_size, original.total_size, original.blob FROM original LEFT JOIN %s ON original.hash = %s.hash WHERE %s.hash IS NULL; `, table, table, table) c.originals(query, oCh, errc, donec) } func (c *Conn) originals(query string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) { defer close(oCh) defer close(errc) rows, err := c.Query(query) if err != nil { errc <- err return } for rows.Next() { var o Original if err := rows.Scan(&o.UID, &o.Hash, &o.HeaderSize, &o.TotalSize, &o.Blob); err != nil { errc <- err return } oCh <- o select { case <-donec: return default: } } if err := rows.Err(); err != nil { errc <- err } } func (c *Conn) OriginalBlobByHash(hash string, blob *[]byte) error { row := c.QueryRow(` SELECT blob FROM original WHERE hash = $1 `, hash) return row.Scan(blob) } func (c *Conn) InsertHeaders(hash string, hdrs mail.Header) (err error) { var tx *sql.Tx tx, err = c.Begin() if err != nil { return err } defer func() { if err != nil { if err := tx.Rollback(); err != nil { glog.Error(err) } return } err = tx.Commit() }() stmt, err := tx.Prepare(` INSERT INTO header (hash, name, value) VALUES ($1, $2, $3) `) if err != nil { return err } for k, vs := range hdrs { for _, v := range vs { if !utf8.ValidString(v) { glog.Warningf("%s: value for %q invalid UTF-8 %q", hash, k, v) continue } if _, err := stmt.Exec(hash, k, v); err != nil { return err } } } return nil } func (c *Conn) InsertSearchHeaders(hash string, hdrs mail.Header) (err error) { var tx *sql.Tx tx, err = c.Begin() if err != nil { return err } defer func() { if err != nil { if err := tx.Rollback(); err != nil { glog.Error(err) } return } err = tx.Commit() }() stmt, err := tx.Prepare(` INSERT INTO search_header (hash, name, value) VALUES ($1, $2, $3) `) if err != nil { return err } // Save the following headers to the database untouched. otherHdrs := []string{"Subject", "Message-Id", "In-Reply-To", "References"} for _, k := range otherHdrs { for _, v := range hdrs[k] { if !utf8.ValidString(v) { glog.Warningf("%s: value for %q invalid UTF-8 %q", hash, k, v) continue } if _, err := stmt.Exec(hash, k, v); err != nil { return err } } } // Parse the following headers as addresses, and save each email address // found to the database independently. addrHdrs := []string{"To", "From", "Cc"} for _, k := range addrHdrs { for _, value := range hdrs[k] { addrs, err := mail.ParseAddressList(value) if err != nil { glog.Warningf("%s: error parsing address list for %q: %v", hash, k, err) continue } for _, v := range addrs { if _, err := stmt.Exec(hash, k, v.Address); err != nil { return err } } } } // Normalize date field. t, err := hdrs.Date() if err != nil { glog.Warningf("%s: failed to parse date header: %v", hash, err) return nil } v := t.UTC().Format(time.RFC3339Nano) if _, err := stmt.Exec(hash, "Date", v); err != nil { return err } return nil } const MagicAllLabel = "[all]" type Paginator struct { Label string Offset int Count int } type Address mail.Address type MessageHeader struct { Hash string From *Address To []*Address Cc []*Address Subject string Date time.Time Seen bool // TODO Summary, ListId string } // Index returns ranges of metadata for messages as defined by paginator. func (c *Conn) Index(p *Paginator) ([]*MessageHeader, error) { const query = ` SELECT hash, name, value FROM search_header WHERE name IN ('Date', 'Subject', 'To', 'From', 'Cc') ORDER BY hash LIMIT 100 -- TODO add offset based on Paginator ; ` var res []*MessageHeader if p.Label == MagicAllLabel { // Paginate all messages. } // Else, filter by label. rows, err := c.Query(query) if err != nil { return res, err } var mh *MessageHeader for rows.Next() { var hash, name, value string if err := rows.Scan(&hash, &name, &value); err != nil { return res, err } if mh == nil { // First Row mh = new(MessageHeader) mh.Hash = hash } if hash != mh.Hash { res = append(res, mh) mh = new(MessageHeader) mh.Hash = hash } switch name { case "Date": t, err := time.Parse(time.RFC3339Nano, value) if err != nil { return res, err } mh.Date = t case "Subject": mh.Subject = value case "To": mh.To = append(mh.To, &Address{Address: value}) case "From": mh.From = &Address{Address: value} case "Cc": mh.Cc = append(mh.Cc, &Address{Address: value}) } } // Intentionally don't handle the last MessageHeader, it is likely // incomplete if LIMIT in the query cuts off a message midstream. if err := rows.Err(); err != nil { return res, err } return res, nil }