email/cmd/md2pq/md2pq.go

251 lines
5.1 KiB
Go

// md2pg imports a Maildir format file into a PostgreSQL DB
package main
import (
"bytes"
"database/sql"
"expvar"
"flag"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/golang/glog"
_ "github.com/lib/pq"
"xinu.tv/email"
"xinu.tv/set"
"xinu.tv/types"
)
var (
maildir = flag.String("maildir", "", "Maildir root")
username = flag.String("user", "wathiede", "username for email we're importing")
skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log",
"comma separated files to skip")
total = expvar.NewInt("bytes-parsed")
cnt = expvar.NewInt("messages-parsed")
dupCnt = expvar.NewInt("duplicates-found")
)
var (
CRCR = []byte("\n\n")
// Maximum bytes we parse when decoding
// email message, 50MB.
maxMessageSize int64 = 50 << 20
insertOriginalQuery = `INSERT INTO
original (uid, hash, header_size, total_size, blob)
VALUES (
$1, $2, $3, $4, $5
);`
)
func LoadReader(db *sql.DB, uid int, r io.Reader) error {
stmt, err := db.Prepare(insertOriginalQuery)
if err != nil {
return err
}
b, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: maxMessageSize})
if err != nil {
return err
}
hdr_size := bytes.Index(b, CRCR)
chksum, err := email.HashReader(bytes.NewReader(b))
if err != nil {
return err
}
n := len(b)
if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil {
return err
}
return nil
}
func Load(db *sql.DB, uid int, root string, skip *set.StringSet) error {
dup := set.NewStrings()
start := time.Now()
defer func() {
glog.Infof("%s messages processed in %s", cnt, time.Since(start))
glog.Infof("%s dups found", dupCnt)
}()
txn, err := db.Begin()
if err != nil {
return err
}
stmt, err := txn.Prepare(insertOriginalQuery)
if err != nil {
if err := txn.Rollback(); err != nil {
glog.Errorln("txn.Prepare stmt error rolling back", err)
}
return err
}
hstmt, err := txn.Prepare("INSERT INTO files (hash, path) VALUES ($1, $2);")
if err != nil {
if err := txn.Rollback(); err != nil {
glog.Errorln("txn.Prepare hstmt error rolling back", err)
}
return err
}
err = filepath.Walk(root,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
base := filepath.Base(path)
if skip.Contains(base) {
return nil
}
r, err := os.Open(path)
if err != nil {
return err
}
defer r.Close()
b, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: maxMessageSize})
if err != nil {
return err
}
hdr_size := bytes.Index(b, CRCR)
chksum, err := email.HashReader(bytes.NewReader(b))
if err != nil {
glog.Errorf("%s not an mail file", path)
glog.Infof("%q", err.Error())
return nil
}
if _, err := hstmt.Exec(chksum, path); err != nil {
return err
}
if dup.Contains(chksum) {
glog.Warningln("Dup ", chksum, path[len(root)+1:], len(b))
dupCnt.Add(1)
return nil
}
dup.Add(chksum)
n := len(b)
total.Add(int64(n))
delta := time.Since(start)
i, err := strconv.Atoi(cnt.String())
if err != nil {
return err
}
if i%1000 == 0 {
t, err := strconv.Atoi(total.String())
if err != nil {
return err
}
glog.Infof("%d messages processed in %s: %.2f msg/s %s/s", i,
delta, float64(i)/delta.Seconds(),
types.Base2Size(float64(t)/delta.Seconds()))
}
if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil {
return err
}
cnt.Add(1)
return nil
})
/*
if _, err := stmt.Exec(); err != nil {
if err := txn.Rollback(); err != nil {
glog.Errorln("stmt.Exec error rolling back", err)
}
return err
}
*/
if err := stmt.Close(); err != nil {
if err := txn.Rollback(); err != nil {
glog.Errorln("stmt.Close error rolling back", err)
}
return err
}
if err := hstmt.Close(); err != nil {
if err := txn.Rollback(); err != nil {
glog.Errorln("stmt.Close error rolling back", err)
}
return err
}
return txn.Commit()
}
func GetUid(db *sql.DB, username string) (int, error) {
var uid int
err := db.QueryRow("SELECT uid FROM person WHERE username=$1", username).Scan(&uid)
switch {
case err == sql.ErrNoRows, err != nil:
return -1, err
}
return uid, nil
}
func main() {
defer glog.Flush()
flag.Parse()
// TODO(wathiede): make a set of flags.
db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable")
if err != nil {
glog.Fatal(err)
}
uid, err := GetUid(db, *username)
if err != nil {
glog.Fatal(err)
}
glog.Infoln("Using uid", uid, "for", *username)
if *maildir == "" || flag.NArg() > 0 {
if flag.NArg() == 0 {
if err := LoadReader(db, uid, os.Stdin); err != nil {
glog.Errorln(err)
}
return
}
for _, fn := range flag.Args() {
r, err := os.Open(fn)
if err != nil {
glog.Error(err)
continue
}
if err := LoadReader(db, uid, r); err != nil {
glog.Errorln(fn, err)
}
r.Close()
}
return
}
// Load all files in maildir
skip := set.NewStrings(strings.Split(*skipFiles, ",")...)
glog.Infoln("Skip files", skip)
if err := Load(db, uid, *maildir, skip); err != nil {
glog.Fatal(err)
}
}