// md2pg imports a Maildir format file into a PostgreSQL DB package main import ( "bytes" "database/sql" "expvar" "flag" "io" "io/ioutil" "os" "path/filepath" "strconv" "strings" "time" "github.com/golang/glog" _ "github.com/lib/pq" "xinu.tv/email" "xinu.tv/set" "xinu.tv/types" ) var ( maildir = flag.String("maildir", "", "Maildir root") username = flag.String("user", "wathiede", "username for email we're importing") skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log", "comma separated files to skip") total = expvar.NewInt("bytes-parsed") cnt = expvar.NewInt("messages-parsed") dupCnt = expvar.NewInt("duplicates-found") ) var ( CRCR = []byte("\n\n") // Maximum bytes we parse when decoding // email message, 50MB. maxMessageSize int64 = 50 << 20 insertOriginalQuery = `INSERT INTO original (uid, hash, header_size, total_size, blob) VALUES ( $1, $2, $3, $4, $5 );` ) func LoadReader(db *sql.DB, uid int, r io.Reader) error { stmt, err := db.Prepare(insertOriginalQuery) if err != nil { return err } b, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: maxMessageSize}) if err != nil { return err } hdr_size := bytes.Index(b, CRCR) chksum, err := email.HashReader(bytes.NewReader(b)) if err != nil { return err } n := len(b) if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil { return err } return nil } func Load(db *sql.DB, uid int, root string, skip *set.StringSet) error { dup := set.NewStrings() start := time.Now() defer func() { glog.Infof("%s messages processed in %s", cnt, time.Since(start)) glog.Infof("%s dups found", dupCnt) }() txn, err := db.Begin() if err != nil { return err } stmt, err := txn.Prepare(insertOriginalQuery) if err != nil { if err := txn.Rollback(); err != nil { glog.Errorln("txn.Prepare stmt error rolling back", err) } return err } hstmt, err := txn.Prepare("INSERT INTO files (hash, path) VALUES ($1, $2);") if err != nil { if err := txn.Rollback(); err != nil { glog.Errorln("txn.Prepare hstmt error rolling back", err) } return err } err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } base := filepath.Base(path) if skip.Contains(base) { return nil } r, err := os.Open(path) if err != nil { return err } defer r.Close() b, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: maxMessageSize}) if err != nil { return err } hdr_size := bytes.Index(b, CRCR) chksum, err := email.HashReader(bytes.NewReader(b)) if err != nil { glog.Errorf("%s not an mail file", path) glog.Infof("%q", err.Error()) return nil } if _, err := hstmt.Exec(chksum, path); err != nil { return err } if dup.Contains(chksum) { glog.Warningln("Dup ", chksum, path[len(root)+1:], len(b)) dupCnt.Add(1) return nil } dup.Add(chksum) n := len(b) total.Add(int64(n)) delta := time.Since(start) i, err := strconv.Atoi(cnt.String()) if err != nil { return err } if i%1000 == 0 { t, err := strconv.Atoi(total.String()) if err != nil { return err } glog.Infof("%d messages processed in %s: %.2f msg/s %s/s", i, delta, float64(i)/delta.Seconds(), types.Base2Size(float64(t)/delta.Seconds())) } if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil { return err } cnt.Add(1) return nil }) /* if _, err := stmt.Exec(); err != nil { if err := txn.Rollback(); err != nil { glog.Errorln("stmt.Exec error rolling back", err) } return err } */ if err := stmt.Close(); err != nil { if err := txn.Rollback(); err != nil { glog.Errorln("stmt.Close error rolling back", err) } return err } if err := hstmt.Close(); err != nil { if err := txn.Rollback(); err != nil { glog.Errorln("stmt.Close error rolling back", err) } return err } return txn.Commit() } func GetUid(db *sql.DB, username string) (int, error) { var uid int err := db.QueryRow("SELECT uid FROM person WHERE username=$1", username).Scan(&uid) switch { case err == sql.ErrNoRows, err != nil: return -1, err } return uid, nil } func main() { defer glog.Flush() flag.Parse() // TODO(wathiede): make a set of flags. db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable") if err != nil { glog.Fatal(err) } uid, err := GetUid(db, *username) if err != nil { glog.Fatal(err) } glog.Infoln("Using uid", uid, "for", *username) if *maildir == "" { if err := LoadReader(db, uid, os.Stdin); err != nil { glog.Fatal(err) } return } // Load all files in maildir skip := set.NewStrings(strings.Split(*skipFiles, ",")...) glog.Infoln("Skip files", skip) if err := Load(db, uid, *maildir, skip); err != nil { glog.Fatal(err) } }