diff --git a/cmd/contacts/contracts.go b/cmd/contacts/contracts.go new file mode 100644 index 0000000..0d0f601 --- /dev/null +++ b/cmd/contacts/contracts.go @@ -0,0 +1,175 @@ +package main + +import ( + "bytes" + "database/sql" + "flag" + "fmt" + "net/mail" + "time" + "unicode/utf8" + "xinu.tv/types" + + "github.com/golang/glog" + _ "github.com/lib/pq" +) + +type contact struct { + hash string + name string + addr string +} + +func insertAddresses(db *sql.DB, batch int, contactCh chan *contact, errc chan error) { + var ( + txn *sql.Tx + stmt *sql.Stmt + err error + ) + open := func() error { + txn, err = db.Begin() + if err != nil { + return err + } + + stmt, err = txn.Prepare(`INSERT INTO + contact(hash, name, address) +VALUES ( + $1, $2, $3 +)`) + if err != nil { + return err + } + return nil + } + + commit := func() error { + if err := stmt.Close(); err != nil { + return err + } + + if err := txn.Commit(); err != nil { + return err + } + return nil + } + + if err := open(); err != nil { + errc <- err + return + } + cnt := 0 + for c := range contactCh { + switch { + case !utf8.ValidString(c.name): + glog.Errorf("Invalid UTF-8 for name: %q", c.name) + continue + case !utf8.ValidString(c.addr): + glog.Errorf("Invalid UTF-8 for name: %q", c.addr) + continue + } + glog.V(2).Infof("%d Exec %q %q %q", cnt, c.hash, c.name, c.addr) + if _, err = stmt.Exec(c.hash, c.name, c.addr); err != nil { + errc <- err + } + cnt++ + if cnt >= batch { + if err := commit(); err != nil { + errc <- err + return + } + if err := open(); err != nil { + errc <- err + return + } + cnt = 0 + } + } + if err := commit(); err != nil { + errc <- err + } + errc <- nil +} + +func fetchMessages(db *sql.DB) error { + rows, err := db.Query(` +SELECT + hash, blob +FROM + original +`) + if err != nil { + return err + } + var ( + h string + b []byte + + errc = make(chan error) + contactCh = make(chan *contact) + ) + + go insertAddresses(db, 1000, contactCh, errc) + + go func() { + cnt, bCnt := 0, 0 + start := time.Now() + for rows.Next() { + if err := rows.Scan(&h, &b); err != nil { + errc <- err + return + } + cnt++ + bCnt += len(b) + if cnt%10000 == 0 { + delta := time.Since(start) + fmt.Printf("%.2f msg/s %s/s\n", + float64(cnt)/delta.Seconds(), + types.Base2Size(float64(bCnt)/delta.Seconds())) + cnt, bCnt = 0, 0 + start = time.Now() + } + + r := bytes.NewReader(b) + msg, err := mail.ReadMessage(r) + if err != nil { + glog.Errorln(h, err) + continue + } + + for _, hdr := range []string{"to", "cc", "from"} { + addrs, err := msg.Header.AddressList(hdr) + if err != nil && err != mail.ErrHeaderNotPresent { + glog.Errorf("%s %q header: %v", h, hdr, err) + continue + } + for _, addr := range addrs { + contactCh <- &contact{ + hash: h, + name: addr.Name, + addr: addr.Address, + } + } + } + } + close(contactCh) + if err := rows.Err(); err != nil { + errc <- err + } + }() + return <-errc +} + +func main() { + defer glog.Flush() + flag.Parse() + + // TODO(wathiede): make a set of flags. + db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable") + if err != nil { + glog.Fatal(err) + } + if err := fetchMessages(db); err != nil { + glog.Fatal(err) + } +} diff --git a/pg/dup.sql b/pg/dup-original.sql similarity index 100% rename from pg/dup.sql rename to pg/dup-original.sql diff --git a/pg/fill-abook.sql b/pg/fill-abook.sql new file mode 100644 index 0000000..eaba449 --- /dev/null +++ b/pg/fill-abook.sql @@ -0,0 +1,31 @@ +DROP TABLE abook; + +CREATE TABLE + abook (count, name, address) +AS SELECT + DISTINCT ON(lower(address)) + named.count, + named.name || lower(unnamed.address), + lower(unnamed.address) +FROM + contact unnamed +LEFT OUTER JOIN ( + SELECT + DISTINCT ON (lower(address)) + count(name), + name, + lower(address) AS laddr + FROM + contact + WHERE + name != '' + GROUP BY + lower(address), name + ORDER BY + lower(address), + count DESC, + name +) named +ON + named.laddr = lower(unnamed.address) +; diff --git a/pg/init-contact.sql b/pg/init-contact.sql new file mode 100644 index 0000000..4a29bd4 --- /dev/null +++ b/pg/init-contact.sql @@ -0,0 +1,7 @@ +DROP TABLE contact; + +CREATE TABLE contact ( + hash CHAR(40) REFERENCES original (hash), + name TEXT, + address TEXT +); diff --git a/pg/init.sql b/pg/init-original.sql similarity index 100% rename from pg/init.sql rename to pg/init-original.sql