Go tool for reading original messages from DB and filling out contact table.
SQL scripts for building address book table from most commonly referenced
names per unique email address.
176 lines
3.0 KiB
Go
176 lines
3.0 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"net/mail"
|
|
"time"
|
|
"unicode/utf8"
|
|
"xinu.tv/types"
|
|
|
|
"github.com/golang/glog"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
type contact struct {
|
|
hash string
|
|
name string
|
|
addr string
|
|
}
|
|
|
|
func insertAddresses(db *sql.DB, batch int, contactCh chan *contact, errc chan error) {
|
|
var (
|
|
txn *sql.Tx
|
|
stmt *sql.Stmt
|
|
err error
|
|
)
|
|
open := func() error {
|
|
txn, err = db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stmt, err = txn.Prepare(`INSERT INTO
|
|
contact(hash, name, address)
|
|
VALUES (
|
|
$1, $2, $3
|
|
)`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
commit := func() error {
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := txn.Commit(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := open(); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
cnt := 0
|
|
for c := range contactCh {
|
|
switch {
|
|
case !utf8.ValidString(c.name):
|
|
glog.Errorf("Invalid UTF-8 for name: %q", c.name)
|
|
continue
|
|
case !utf8.ValidString(c.addr):
|
|
glog.Errorf("Invalid UTF-8 for name: %q", c.addr)
|
|
continue
|
|
}
|
|
glog.V(2).Infof("%d Exec %q %q %q", cnt, c.hash, c.name, c.addr)
|
|
if _, err = stmt.Exec(c.hash, c.name, c.addr); err != nil {
|
|
errc <- err
|
|
}
|
|
cnt++
|
|
if cnt >= batch {
|
|
if err := commit(); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
if err := open(); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
cnt = 0
|
|
}
|
|
}
|
|
if err := commit(); err != nil {
|
|
errc <- err
|
|
}
|
|
errc <- nil
|
|
}
|
|
|
|
func fetchMessages(db *sql.DB) error {
|
|
rows, err := db.Query(`
|
|
SELECT
|
|
hash, blob
|
|
FROM
|
|
original
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var (
|
|
h string
|
|
b []byte
|
|
|
|
errc = make(chan error)
|
|
contactCh = make(chan *contact)
|
|
)
|
|
|
|
go insertAddresses(db, 1000, contactCh, errc)
|
|
|
|
go func() {
|
|
cnt, bCnt := 0, 0
|
|
start := time.Now()
|
|
for rows.Next() {
|
|
if err := rows.Scan(&h, &b); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
cnt++
|
|
bCnt += len(b)
|
|
if cnt%10000 == 0 {
|
|
delta := time.Since(start)
|
|
fmt.Printf("%.2f msg/s %s/s\n",
|
|
float64(cnt)/delta.Seconds(),
|
|
types.Base2Size(float64(bCnt)/delta.Seconds()))
|
|
cnt, bCnt = 0, 0
|
|
start = time.Now()
|
|
}
|
|
|
|
r := bytes.NewReader(b)
|
|
msg, err := mail.ReadMessage(r)
|
|
if err != nil {
|
|
glog.Errorln(h, err)
|
|
continue
|
|
}
|
|
|
|
for _, hdr := range []string{"to", "cc", "from"} {
|
|
addrs, err := msg.Header.AddressList(hdr)
|
|
if err != nil && err != mail.ErrHeaderNotPresent {
|
|
glog.Errorf("%s %q header: %v", h, hdr, err)
|
|
continue
|
|
}
|
|
for _, addr := range addrs {
|
|
contactCh <- &contact{
|
|
hash: h,
|
|
name: addr.Name,
|
|
addr: addr.Address,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
close(contactCh)
|
|
if err := rows.Err(); err != nil {
|
|
errc <- err
|
|
}
|
|
}()
|
|
return <-errc
|
|
}
|
|
|
|
func main() {
|
|
defer glog.Flush()
|
|
flag.Parse()
|
|
|
|
// TODO(wathiede): make a set of flags.
|
|
db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable")
|
|
if err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
if err := fetchMessages(db); err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
}
|