package main import ( "bytes" "database/sql" "flag" "fmt" "time" "unicode/utf8" "xinu.tv/types" "github.com/golang/glog" _ "github.com/lib/pq" "xinu.tv/std/net/mail" ) var ( all = flag.Bool("all", false, "extract addresses from all messages. Default is to only extract from unprocessed messages.") dryrun = flag.Bool("dryrun", false, "don't write to the DB.") ) type contact struct { hash string name string addr string } func insertAddresses(db *sql.DB, batch int, contactCh chan *contact, errc chan error) { if *dryrun { for _ = range contactCh { } errc <- nil return } var ( txn *sql.Tx stmt *sql.Stmt err error ) open := func() error { txn, err = db.Begin() if err != nil { return err } stmt, err = txn.Prepare(`INSERT INTO contact(hash, name, address) VALUES ( $1, $2, $3 )`) if err != nil { return err } return nil } commit := func() error { if err := stmt.Close(); err != nil { return err } if err := txn.Commit(); err != nil { return err } return nil } if err := open(); err != nil { errc <- err return } cnt := 0 for c := range contactCh { switch { case !utf8.ValidString(c.name): glog.Errorf("Invalid UTF-8 for name: %q", c.name) continue case !utf8.ValidString(c.addr): glog.Errorf("Invalid UTF-8 for name: %q", c.addr) continue } glog.V(2).Infof("%d Exec %q %q %q", cnt, c.hash, c.name, c.addr) if _, err = stmt.Exec(c.hash, c.name, c.addr); err != nil { errc <- err } cnt++ if cnt >= batch { if err := commit(); err != nil { errc <- err return } if err := open(); err != nil { errc <- err return } cnt = 0 } } if err := commit(); err != nil { errc <- err } errc <- nil } func fetchMessages(db *sql.DB, all bool) error { var query string if all { query = ` SELECT hash, blob FROM original ` } else { query = ` SELECT original.hash AS hash, original.blob AS blob FROM original LEFT JOIN contact ON original.hash = contact.hash WHERE contact.hash IS NULL; ` } rows, err := db.Query(query) if err != nil { return err } var ( h string b []byte errc = make(chan error) contactCh = make(chan *contact) ) go insertAddresses(db, 1000, contactCh, errc) go func() { cnt, bCnt := 0, 0 start := time.Now() for rows.Next() { if err := rows.Scan(&h, &b); err != nil { errc <- err return } cnt++ bCnt += len(b) if cnt%10000 == 0 { delta := time.Since(start) fmt.Printf("%.2f msg/s %s/s\n", float64(cnt)/delta.Seconds(), types.Base2Size(float64(bCnt)/delta.Seconds())) cnt, bCnt = 0, 0 start = time.Now() } r := bytes.NewReader(b) msg, err := mail.ReadMessage(r) if err != nil { glog.Errorln(h, err) continue } for _, hdr := range []string{"to", "cc", "from"} { addrs, err := msg.Header.AddressList(hdr) if err != nil && err != mail.ErrHeaderNotPresent { glog.Errorf("%s %q header: %v: %q", h, hdr, err, msg.Header.Get(hdr)) continue } for _, addr := range addrs { contactCh <- &contact{ hash: h, name: addr.Name, addr: addr.Address, } } } } close(contactCh) if err := rows.Err(); err != nil { errc <- err } }() return <-errc } func main() { defer glog.Flush() flag.Parse() // TODO(wathiede): make a set of flags. db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable") if err != nil { glog.Fatal(err) } if err := fetchMessages(db, *all); err != nil { glog.Fatal(err) } }