email/cmd/contacts/contacts.go

207 lines
3.5 KiB
Go

package main
import (
"bytes"
"database/sql"
"flag"
"fmt"
"net/mail"
"time"
"unicode/utf8"
"xinu.tv/types"
"github.com/golang/glog"
_ "github.com/lib/pq"
)
var (
all = flag.Bool("all", false,
"extract addresses from all messages. Default is to only extract from unprocessed messages.")
dryrun = flag.Bool("dryrun", false, "don't write to the DB.")
)
type contact struct {
hash string
name string
addr string
}
func insertAddresses(db *sql.DB, batch int, contactCh chan *contact, errc chan error) {
if *dryrun {
for _ = range contactCh {
}
errc <- nil
return
}
var (
txn *sql.Tx
stmt *sql.Stmt
err error
)
open := func() error {
txn, err = db.Begin()
if err != nil {
return err
}
stmt, err = txn.Prepare(`INSERT INTO
contact(hash, name, address)
VALUES (
$1, $2, $3
)`)
if err != nil {
return err
}
return nil
}
commit := func() error {
if err := stmt.Close(); err != nil {
return err
}
if err := txn.Commit(); err != nil {
return err
}
return nil
}
if err := open(); err != nil {
errc <- err
return
}
cnt := 0
for c := range contactCh {
switch {
case !utf8.ValidString(c.name):
glog.Errorf("%s Invalid UTF-8 for name: %q", c.hash, c.name)
continue
case !utf8.ValidString(c.addr):
glog.Errorf("%s Invalid UTF-8 for name: %q", c.hash, c.addr)
continue
}
glog.V(2).Infof("%d Exec %q %q %q", cnt, c.hash, c.name, c.addr)
if _, err = stmt.Exec(c.hash, c.name, c.addr); err != nil {
errc <- err
}
cnt++
if cnt >= batch {
if err := commit(); err != nil {
errc <- err
return
}
if err := open(); err != nil {
errc <- err
return
}
cnt = 0
}
}
if err := commit(); err != nil {
errc <- err
}
errc <- nil
}
func fetchMessages(db *sql.DB, all bool) error {
var query string
if all {
query = `
SELECT
hash, blob
FROM
original
`
} else {
query = `
SELECT
original.hash AS hash,
original.blob AS blob
FROM
original
LEFT
JOIN contact ON original.hash = contact.hash
WHERE
contact.hash IS NULL;
`
}
rows, err := db.Query(query)
if err != nil {
return err
}
var (
h string
b []byte
errc = make(chan error)
contactCh = make(chan *contact)
)
go insertAddresses(db, 1000, contactCh, errc)
go func() {
cnt, bCnt := 0, 0
start := time.Now()
for rows.Next() {
if err := rows.Scan(&h, &b); err != nil {
errc <- err
return
}
cnt++
bCnt += len(b)
if cnt%10000 == 0 {
delta := time.Since(start)
fmt.Printf("%.2f msg/s %s/s\n",
float64(cnt)/delta.Seconds(),
types.Base2Size(float64(bCnt)/delta.Seconds()))
cnt, bCnt = 0, 0
start = time.Now()
}
r := bytes.NewReader(b)
msg, err := mail.ReadMessage(r)
if err != nil {
glog.Errorln(h, err)
continue
}
for _, hdr := range []string{"to", "cc", "from"} {
addrs, err := msg.Header.AddressList(hdr)
if err != nil && err != mail.ErrHeaderNotPresent {
glog.Errorf("%s %q header: %v: %q", h, hdr, err,
msg.Header.Get(hdr))
continue
}
for _, addr := range addrs {
contactCh <- &contact{
hash: h,
name: addr.Name,
addr: addr.Address,
}
}
}
}
close(contactCh)
if err := rows.Err(); err != nil {
errc <- err
}
}()
return <-errc
}
func main() {
defer glog.Flush()
flag.Parse()
// TODO(wathiede): make a set of flags.
db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable")
if err != nil {
glog.Fatal(err)
}
if err := fetchMessages(db, *all); err != nil {
glog.Fatal(err)
}
}