207 lines
3.5 KiB
Go
207 lines
3.5 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"net/mail"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"xinu.tv/types"
|
|
|
|
"github.com/golang/glog"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
all = flag.Bool("all", false,
|
|
"extract addresses from all messages. Default is to only extract from unprocessed messages.")
|
|
|
|
dryrun = flag.Bool("dryrun", false, "don't write to the DB.")
|
|
)
|
|
|
|
type contact struct {
|
|
hash string
|
|
name string
|
|
addr string
|
|
}
|
|
|
|
func insertAddresses(db *sql.DB, batch int, contactCh chan *contact, errc chan error) {
|
|
if *dryrun {
|
|
for _ = range contactCh {
|
|
}
|
|
errc <- nil
|
|
return
|
|
}
|
|
var (
|
|
txn *sql.Tx
|
|
stmt *sql.Stmt
|
|
err error
|
|
)
|
|
open := func() error {
|
|
txn, err = db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stmt, err = txn.Prepare(`INSERT INTO
|
|
contact(hash, name, address)
|
|
VALUES (
|
|
$1, $2, $3
|
|
)`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
commit := func() error {
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := txn.Commit(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := open(); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
cnt := 0
|
|
for c := range contactCh {
|
|
switch {
|
|
case !utf8.ValidString(c.name):
|
|
glog.Errorf("%s Invalid UTF-8 for name: %q", c.hash, c.name)
|
|
continue
|
|
case !utf8.ValidString(c.addr):
|
|
glog.Errorf("%s Invalid UTF-8 for name: %q", c.hash, c.addr)
|
|
continue
|
|
}
|
|
glog.V(2).Infof("%d Exec %q %q %q", cnt, c.hash, c.name, c.addr)
|
|
if _, err = stmt.Exec(c.hash, c.name, c.addr); err != nil {
|
|
errc <- err
|
|
}
|
|
cnt++
|
|
if cnt >= batch {
|
|
if err := commit(); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
if err := open(); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
cnt = 0
|
|
}
|
|
}
|
|
if err := commit(); err != nil {
|
|
errc <- err
|
|
}
|
|
errc <- nil
|
|
}
|
|
|
|
func fetchMessages(db *sql.DB, all bool) error {
|
|
var query string
|
|
if all {
|
|
query = `
|
|
SELECT
|
|
hash, blob
|
|
FROM
|
|
original
|
|
`
|
|
} else {
|
|
query = `
|
|
SELECT
|
|
original.hash AS hash,
|
|
original.blob AS blob
|
|
FROM
|
|
original
|
|
LEFT
|
|
JOIN contact ON original.hash = contact.hash
|
|
WHERE
|
|
contact.hash IS NULL;
|
|
`
|
|
}
|
|
rows, err := db.Query(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var (
|
|
h string
|
|
b []byte
|
|
|
|
errc = make(chan error)
|
|
contactCh = make(chan *contact)
|
|
)
|
|
|
|
go insertAddresses(db, 1000, contactCh, errc)
|
|
|
|
go func() {
|
|
cnt, bCnt := 0, 0
|
|
start := time.Now()
|
|
for rows.Next() {
|
|
if err := rows.Scan(&h, &b); err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
cnt++
|
|
bCnt += len(b)
|
|
if cnt%10000 == 0 {
|
|
delta := time.Since(start)
|
|
fmt.Printf("%.2f msg/s %s/s\n",
|
|
float64(cnt)/delta.Seconds(),
|
|
types.Base2Size(float64(bCnt)/delta.Seconds()))
|
|
cnt, bCnt = 0, 0
|
|
start = time.Now()
|
|
}
|
|
|
|
r := bytes.NewReader(b)
|
|
msg, err := mail.ReadMessage(r)
|
|
if err != nil {
|
|
glog.Errorln(h, err)
|
|
continue
|
|
}
|
|
|
|
for _, hdr := range []string{"to", "cc", "from"} {
|
|
addrs, err := msg.Header.AddressList(hdr)
|
|
if err != nil && err != mail.ErrHeaderNotPresent {
|
|
glog.Errorf("%s %q header: %v: %q", h, hdr, err,
|
|
msg.Header.Get(hdr))
|
|
continue
|
|
}
|
|
for _, addr := range addrs {
|
|
contactCh <- &contact{
|
|
hash: h,
|
|
name: addr.Name,
|
|
addr: addr.Address,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
close(contactCh)
|
|
if err := rows.Err(); err != nil {
|
|
errc <- err
|
|
}
|
|
}()
|
|
return <-errc
|
|
}
|
|
|
|
func main() {
|
|
defer glog.Flush()
|
|
flag.Parse()
|
|
|
|
// TODO(wathiede): make a set of flags.
|
|
db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable")
|
|
if err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
if err := fetchMessages(db, *all); err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
}
|