251 lines
5.1 KiB
Go
251 lines
5.1 KiB
Go
// md2pg imports a Maildir format file into a PostgreSQL DB
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"expvar"
|
|
"flag"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
_ "github.com/lib/pq"
|
|
|
|
"xinu.tv/email"
|
|
"xinu.tv/set"
|
|
"xinu.tv/types"
|
|
)
|
|
|
|
var (
|
|
maildir = flag.String("maildir", "", "Maildir root")
|
|
username = flag.String("user", "wathiede", "username for email we're importing")
|
|
skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log",
|
|
"comma separated files to skip")
|
|
|
|
total = expvar.NewInt("bytes-parsed")
|
|
cnt = expvar.NewInt("messages-parsed")
|
|
dupCnt = expvar.NewInt("duplicates-found")
|
|
)
|
|
|
|
var (
|
|
CRCR = []byte("\n\n")
|
|
// Maximum bytes we parse when decoding
|
|
// email message, 50MB.
|
|
maxMessageSize int64 = 50 << 20
|
|
insertOriginalQuery = `INSERT INTO
|
|
original (uid, hash, header_size, total_size, blob)
|
|
VALUES (
|
|
$1, $2, $3, $4, $5
|
|
);`
|
|
)
|
|
|
|
func LoadReader(db *sql.DB, uid int, r io.Reader) error {
|
|
stmt, err := db.Prepare(insertOriginalQuery)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: maxMessageSize})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hdr_size := bytes.Index(b, CRCR)
|
|
chksum, err := email.HashReader(bytes.NewReader(b))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n := len(b)
|
|
if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func Load(db *sql.DB, uid int, root string, skip *set.StringSet) error {
|
|
dup := set.NewStrings()
|
|
start := time.Now()
|
|
defer func() {
|
|
glog.Infof("%s messages processed in %s", cnt, time.Since(start))
|
|
glog.Infof("%s dups found", dupCnt)
|
|
}()
|
|
|
|
txn, err := db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stmt, err := txn.Prepare(insertOriginalQuery)
|
|
if err != nil {
|
|
if err := txn.Rollback(); err != nil {
|
|
glog.Errorln("txn.Prepare stmt error rolling back", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
hstmt, err := txn.Prepare("INSERT INTO files (hash, path) VALUES ($1, $2);")
|
|
if err != nil {
|
|
if err := txn.Rollback(); err != nil {
|
|
glog.Errorln("txn.Prepare hstmt error rolling back", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
err = filepath.Walk(root,
|
|
func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
base := filepath.Base(path)
|
|
if skip.Contains(base) {
|
|
return nil
|
|
}
|
|
|
|
r, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer r.Close()
|
|
|
|
b, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: maxMessageSize})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hdr_size := bytes.Index(b, CRCR)
|
|
chksum, err := email.HashReader(bytes.NewReader(b))
|
|
if err != nil {
|
|
glog.Errorf("%s not an mail file", path)
|
|
glog.Infof("%q", err.Error())
|
|
return nil
|
|
}
|
|
|
|
if _, err := hstmt.Exec(chksum, path); err != nil {
|
|
return err
|
|
}
|
|
|
|
if dup.Contains(chksum) {
|
|
glog.Warningln("Dup ", chksum, path[len(root)+1:], len(b))
|
|
dupCnt.Add(1)
|
|
return nil
|
|
}
|
|
dup.Add(chksum)
|
|
|
|
n := len(b)
|
|
total.Add(int64(n))
|
|
delta := time.Since(start)
|
|
i, err := strconv.Atoi(cnt.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if i%1000 == 0 {
|
|
t, err := strconv.Atoi(total.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
glog.Infof("%d messages processed in %s: %.2f msg/s %s/s", i,
|
|
delta, float64(i)/delta.Seconds(),
|
|
types.Base2Size(float64(t)/delta.Seconds()))
|
|
}
|
|
if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil {
|
|
return err
|
|
}
|
|
|
|
cnt.Add(1)
|
|
return nil
|
|
})
|
|
|
|
/*
|
|
if _, err := stmt.Exec(); err != nil {
|
|
if err := txn.Rollback(); err != nil {
|
|
glog.Errorln("stmt.Exec error rolling back", err)
|
|
}
|
|
return err
|
|
}
|
|
*/
|
|
|
|
if err := stmt.Close(); err != nil {
|
|
if err := txn.Rollback(); err != nil {
|
|
glog.Errorln("stmt.Close error rolling back", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
if err := hstmt.Close(); err != nil {
|
|
if err := txn.Rollback(); err != nil {
|
|
glog.Errorln("stmt.Close error rolling back", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
return txn.Commit()
|
|
}
|
|
|
|
func GetUid(db *sql.DB, username string) (int, error) {
|
|
var uid int
|
|
err := db.QueryRow("SELECT uid FROM person WHERE username=$1", username).Scan(&uid)
|
|
switch {
|
|
case err == sql.ErrNoRows, err != nil:
|
|
return -1, err
|
|
}
|
|
return uid, nil
|
|
}
|
|
|
|
func main() {
|
|
defer glog.Flush()
|
|
flag.Parse()
|
|
|
|
// TODO(wathiede): make a set of flags.
|
|
db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable")
|
|
if err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
|
|
uid, err := GetUid(db, *username)
|
|
if err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
|
|
glog.Infoln("Using uid", uid, "for", *username)
|
|
if *maildir == "" || flag.NArg() > 0 {
|
|
if flag.NArg() == 0 {
|
|
if err := LoadReader(db, uid, os.Stdin); err != nil {
|
|
glog.Errorln(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
for _, fn := range flag.Args() {
|
|
r, err := os.Open(fn)
|
|
if err != nil {
|
|
glog.Error(err)
|
|
continue
|
|
}
|
|
if err := LoadReader(db, uid, r); err != nil {
|
|
glog.Errorln(fn, err)
|
|
}
|
|
r.Close()
|
|
}
|
|
return
|
|
}
|
|
|
|
// Load all files in maildir
|
|
skip := set.NewStrings(strings.Split(*skipFiles, ",")...)
|
|
glog.Infoln("Skip files", skip)
|
|
|
|
if err := Load(db, uid, *maildir, skip); err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
}
|