From 1e70c13cf964f52bd9cfc1d535b2bf8e55dff790 Mon Sep 17 00:00:00 2001 From: Bill Thiede Date: Thu, 27 Mar 2014 21:38:00 -0700 Subject: [PATCH] Harvest email addresses from messages. Go tool for reading original messages from DB and filling out contact table. SQL scripts for building address book table from most commonly referenced names per unique email address. --- cmd/contacts/contracts.go | 175 +++++++++++++++++++++++++++++ pg/{dup.sql => dup-original.sql} | 0 pg/fill-abook.sql | 31 +++++ pg/init-contact.sql | 7 ++ pg/{init.sql => init-original.sql} | 0 5 files changed, 213 insertions(+) create mode 100644 cmd/contacts/contracts.go rename pg/{dup.sql => dup-original.sql} (100%) create mode 100644 pg/fill-abook.sql create mode 100644 pg/init-contact.sql rename pg/{init.sql => init-original.sql} (100%) diff --git a/cmd/contacts/contracts.go b/cmd/contacts/contracts.go new file mode 100644 index 0000000..0d0f601 --- /dev/null +++ b/cmd/contacts/contracts.go @@ -0,0 +1,175 @@ +package main + +import ( + "bytes" + "database/sql" + "flag" + "fmt" + "net/mail" + "time" + "unicode/utf8" + "xinu.tv/types" + + "github.com/golang/glog" + _ "github.com/lib/pq" +) + +type contact struct { + hash string + name string + addr string +} + +func insertAddresses(db *sql.DB, batch int, contactCh chan *contact, errc chan error) { + var ( + txn *sql.Tx + stmt *sql.Stmt + err error + ) + open := func() error { + txn, err = db.Begin() + if err != nil { + return err + } + + stmt, err = txn.Prepare(`INSERT INTO + contact(hash, name, address) +VALUES ( + $1, $2, $3 +)`) + if err != nil { + return err + } + return nil + } + + commit := func() error { + if err := stmt.Close(); err != nil { + return err + } + + if err := txn.Commit(); err != nil { + return err + } + return nil + } + + if err := open(); err != nil { + errc <- err + return + } + cnt := 0 + for c := range contactCh { + switch { + case !utf8.ValidString(c.name): + glog.Errorf("Invalid UTF-8 for name: %q", c.name) + continue + case !utf8.ValidString(c.addr): + glog.Errorf("Invalid UTF-8 for name: %q", c.addr) + continue + } + glog.V(2).Infof("%d Exec %q %q %q", cnt, c.hash, c.name, c.addr) + if _, err = stmt.Exec(c.hash, c.name, c.addr); err != nil { + errc <- err + } + cnt++ + if cnt >= batch { + if err := commit(); err != nil { + errc <- err + return + } + if err := open(); err != nil { + errc <- err + return + } + cnt = 0 + } + } + if err := commit(); err != nil { + errc <- err + } + errc <- nil +} + +func fetchMessages(db *sql.DB) error { + rows, err := db.Query(` +SELECT + hash, blob +FROM + original +`) + if err != nil { + return err + } + var ( + h string + b []byte + + errc = make(chan error) + contactCh = make(chan *contact) + ) + + go insertAddresses(db, 1000, contactCh, errc) + + go func() { + cnt, bCnt := 0, 0 + start := time.Now() + for rows.Next() { + if err := rows.Scan(&h, &b); err != nil { + errc <- err + return + } + cnt++ + bCnt += len(b) + if cnt%10000 == 0 { + delta := time.Since(start) + fmt.Printf("%.2f msg/s %s/s\n", + float64(cnt)/delta.Seconds(), + types.Base2Size(float64(bCnt)/delta.Seconds())) + cnt, bCnt = 0, 0 + start = time.Now() + } + + r := bytes.NewReader(b) + msg, err := mail.ReadMessage(r) + if err != nil { + glog.Errorln(h, err) + continue + } + + for _, hdr := range []string{"to", "cc", "from"} { + addrs, err := msg.Header.AddressList(hdr) + if err != nil && err != mail.ErrHeaderNotPresent { + glog.Errorf("%s %q header: %v", h, hdr, err) + continue + } + for _, addr := range addrs { + contactCh <- &contact{ + hash: h, + name: addr.Name, + addr: addr.Address, + } + } + } + } + close(contactCh) + if err := rows.Err(); err != nil { + errc <- err + } + }() + return <-errc +} + +func main() { + defer glog.Flush() + flag.Parse() + + // TODO(wathiede): make a set of flags. + db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable") + if err != nil { + glog.Fatal(err) + } + if err := fetchMessages(db); err != nil { + glog.Fatal(err) + } +} diff --git a/pg/dup.sql b/pg/dup-original.sql similarity index 100% rename from pg/dup.sql rename to pg/dup-original.sql diff --git a/pg/fill-abook.sql b/pg/fill-abook.sql new file mode 100644 index 0000000..eaba449 --- /dev/null +++ b/pg/fill-abook.sql @@ -0,0 +1,31 @@ +DROP TABLE abook; + +CREATE TABLE + abook (count, name, address) +AS SELECT + DISTINCT ON(lower(address)) + named.count, + named.name || lower(unnamed.address), + lower(unnamed.address) +FROM + contact unnamed +LEFT OUTER JOIN ( + SELECT + DISTINCT ON (lower(address)) + count(name), + name, + lower(address) AS laddr + FROM + contact + WHERE + name != '' + GROUP BY + lower(address), name + ORDER BY + lower(address), + count DESC, + name +) named +ON + named.laddr = lower(unnamed.address) +; diff --git a/pg/init-contact.sql b/pg/init-contact.sql new file mode 100644 index 0000000..4a29bd4 --- /dev/null +++ b/pg/init-contact.sql @@ -0,0 +1,7 @@ +DROP TABLE contact; + +CREATE TABLE contact ( + hash CHAR(40) REFERENCES original (hash), + name TEXT, + address TEXT +); diff --git a/pg/init.sql b/pg/init-original.sql similarity index 100% rename from pg/init.sql rename to pg/init-original.sql