From 1b8afaa73fd6c7eb4b90b0cf00b6b27d35455c25 Mon Sep 17 00:00:00 2001 From: Bill Thiede Date: Thu, 20 Mar 2014 22:27:50 -0700 Subject: [PATCH] Add person table. Import mail associated with -user specified on commandline. --- cmd/md2pq/md2pq.go | 89 ++++++++++++++++++++++++++++++++++------------ pq/init.sql | 27 +++++++++++--- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/cmd/md2pq/md2pq.go b/cmd/md2pq/md2pq.go index 1035114..2af8299 100644 --- a/cmd/md2pq/md2pq.go +++ b/cmd/md2pq/md2pq.go @@ -6,31 +6,33 @@ import ( "database/sql" "flag" "fmt" - "io" + "io/ioutil" "os" "path/filepath" "strings" "time" "github.com/golang/glog" - "github.com/lib/pq" + _ "github.com/lib/pq" "xinu.tv/email" "xinu.tv/set" + "xinu.tv/types" ) var ( maildir = flag.String("maildir", "", "Maildir root") + username = flag.String("user", "wathiede", "username for email we're importing") skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log", "comma separated files to skip") // Hashed over fields from each message. - headers = []string{"to", "from", "cc", "date", "subject"} + headers = []string{"to", "from", "cc", "date", "subject", "message-id"} ) var CRCR = []byte("\n\n") -func Load(db *sql.DB, root string, skip *set.StringSet) error { +func Load(db *sql.DB, uid int, root string, skip *set.StringSet) error { dup := set.NewStrings() start := time.Now() cnt := 0 @@ -45,16 +47,25 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error { return err } - stmt, err := txn.Prepare(pq.CopyIn("original", "hash", "header_size", - "total_size", "blob")) + stmt, err := txn.Prepare("INSERT INTO original (uid, hash, header_size, total_size, blob) VALUES ($1, $2, $3, $4, $5);") + //stmt, err := txn.Prepare(pq.CopyIn("original", "uid", "hash", "total_size", "header", "body")) if err != nil { if err := txn.Rollback(); err != nil { - glog.Errorln("txn.Prepare error rolling back", err) + glog.Errorln("txn.Prepare stmt error rolling back", err) } return err } - b := new(bytes.Buffer) + //hstmt, err := txn.Prepare(pq.CopyIn("files", "hash", "path")) + hstmt, err := txn.Prepare("INSERT INTO files (hash, path) VALUES ($1, $2);") + if err != nil { + if err := txn.Rollback(); err != nil { + glog.Errorln("txn.Prepare hstmt error rolling back", err) + } + return err + } + + var total int err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -69,21 +80,19 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error { return nil } - b.Reset() r, err := os.Open(path) if err != nil { return err } defer r.Close() - n, err := io.Copy(b, r) + b, err := ioutil.ReadAll(r) if err != nil { return err } - blob := b.Bytes() - hdr_size := bytes.Index(blob, CRCR) - h, err := email.Hash(bytes.NewReader(blob), headers) + hdr_size := bytes.Index(b, CRCR) + h, err := email.Hash(bytes.NewReader(b), headers) if err != nil { glog.Errorf("%s not an mail file", path) glog.Infof("%q", err.Error()) @@ -91,15 +100,26 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error { } chksum := fmt.Sprintf("%x", h.Sum(nil)) + if _, err := hstmt.Exec(chksum, path); err != nil { + return err + } + if dup.Contains(chksum) { - glog.Warningln("Dup email found", chksum, path, len(blob)) + glog.Warningln("Dup ", chksum, path[len(root)+1:], len(b)) dupCnt++ return nil } dup.Add(chksum) - glog.Infoln(chksum, hdr_size, n) - if _, err := stmt.Exec(chksum, hdr_size, n, blob); err != nil { + n := len(b) + total += n + delta := time.Since(start) + if cnt%1000 == 0 { + glog.Infof("%d messages processed in %s: %.2f msg/s %s/s", cnt, + delta, float64(cnt)/delta.Seconds(), + types.Base2Size(float64(total)/delta.Seconds())) + } + if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil { return err } @@ -107,12 +127,14 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error { return nil }) - if _, err := stmt.Exec(); err != nil { - if err := txn.Rollback(); err != nil { - glog.Errorln("stmt.Exec error rolling back", err) + /* + if _, err := stmt.Exec(); err != nil { + if err := txn.Rollback(); err != nil { + glog.Errorln("stmt.Exec error rolling back", err) + } + return err } - return err - } + */ if err := stmt.Close(); err != nil { if err := txn.Rollback(); err != nil { @@ -121,9 +143,26 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error { return err } + if err := hstmt.Close(); err != nil { + if err := txn.Rollback(); err != nil { + glog.Errorln("stmt.Close error rolling back", err) + } + return err + } + return txn.Commit() } +func GetUid(db *sql.DB, username string) (int, error) { + var uid int + err := db.QueryRow("SELECT uid FROM person WHERE username=$1", username).Scan(&uid) + switch { + case err == sql.ErrNoRows, err != nil: + return -1, err + } + return uid, nil +} + func main() { defer glog.Flush() flag.Parse() @@ -134,6 +173,12 @@ func main() { glog.Fatal(err) } + uid, err := GetUid(db, *username) + if err != nil { + glog.Fatal(err) + } + + glog.Infoln("Using uid", uid, "for", *username) if *maildir == "" { fmt.Println("Must specify Maildir with -maildir") os.Exit(1) @@ -142,7 +187,7 @@ func main() { skip := set.NewStrings(strings.Split(*skipFiles, ",")...) glog.Infoln("Skip files", skip) - if err := Load(db, *maildir, skip); err != nil { + if err := Load(db, uid, *maildir, skip); err != nil { glog.Fatal(err) } } diff --git a/pq/init.sql b/pq/init.sql index f1cd3db..aa27cc6 100644 --- a/pq/init.sql +++ b/pq/init.sql @@ -1,7 +1,24 @@ +DROP TABLE files; DROP TABLE original; +DROP TABLE person; + +CREATE TABLE files ( + hash CHAR(40), + path TEXT +); + +CREATE TABLE person ( + uid SERIAL UNIQUE, + username TEXT, + name TEXT +); + +INSERT INTO person (username, name) VALUES ('wathiede', 'Bill Thiede'); + CREATE TABLE original ( - hash char(40) PRIMARY KEY, - header_size integer, - total_size integer, - blob bytea -) + uid INTEGER REFERENCES person (uid), + hash CHAR(40) PRIMARY KEY, + header_size INTEGER, + total_size INTEGER, + blob BYTEA +);