New utility to import mail into PostgreSQL.
Minor leak cleanup in mailhash.
This commit is contained in:
parent
f7c0774798
commit
bb8f546fd0
@ -7,7 +7,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"xinu.tv/email"
|
"xinu.tv/email"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -47,14 +46,22 @@ func (m *Messages) hashMail(path string, info os.FileInfo, err error) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
base := filepath.Base(path)
|
||||||
|
if _, ok := m.Skip[base]; ok {
|
||||||
|
glog.Infoln("Skipping", path)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
r, err := os.Open(path)
|
r, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
}
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
h, err := email.Hash(r, headers)
|
h, err := email.Hash(r, headers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("%s not an mail file", path)
|
glog.Errorf("%s not an mail file", path)
|
||||||
glog.Info("%q", err.Error())
|
glog.Infof("%q", err.Error())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
md := email.NewInfo(path)
|
md := email.NewInfo(path)
|
||||||
@ -130,6 +137,7 @@ func (m Messages) Reconcile(maildir string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
}
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
h, err := email.Hash(r, headers)
|
h, err := email.Hash(r, headers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
148
cmd/md2pq/md2pq.go
Normal file
148
cmd/md2pq/md2pq.go
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
// md2pg imports a Maildir format file into a PostgreSQL DB
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"database/sql"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"github.com/lib/pq"
|
||||||
|
|
||||||
|
"xinu.tv/email"
|
||||||
|
"xinu.tv/set"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
maildir = flag.String("maildir", "", "Maildir root")
|
||||||
|
skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log",
|
||||||
|
"comma separated files to skip")
|
||||||
|
|
||||||
|
// Hashed over fields from each message.
|
||||||
|
headers = []string{"to", "from", "cc", "date", "subject"}
|
||||||
|
)
|
||||||
|
|
||||||
|
var CRCR = []byte("\n\n")
|
||||||
|
|
||||||
|
func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
||||||
|
dup := set.NewStrings()
|
||||||
|
start := time.Now()
|
||||||
|
cnt := 0
|
||||||
|
dupCnt := 0
|
||||||
|
defer func() {
|
||||||
|
glog.Infof("%d messages processed in %s", cnt, time.Since(start))
|
||||||
|
glog.Infof("%d dups found", dupCnt)
|
||||||
|
}()
|
||||||
|
|
||||||
|
txn, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := txn.Prepare(pq.CopyIn("original", "hash", "header_size",
|
||||||
|
"total_size", "blob"))
|
||||||
|
if err != nil {
|
||||||
|
if err := txn.Rollback(); err != nil {
|
||||||
|
glog.Errorln("txn.Prepare error rolling back", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b := new(bytes.Buffer)
|
||||||
|
err = filepath.Walk(root,
|
||||||
|
func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
base := filepath.Base(path)
|
||||||
|
if skip.Contains(base) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Reset()
|
||||||
|
r, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
n, err := io.Copy(b, r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
blob := b.Bytes()
|
||||||
|
hdr_size := bytes.Index(blob, CRCR)
|
||||||
|
h, err := email.Hash(bytes.NewReader(blob), headers)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("%s not an mail file", path)
|
||||||
|
glog.Infof("%q", err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
chksum := fmt.Sprintf("%x", h.Sum(nil))
|
||||||
|
if dup.Contains(chksum) {
|
||||||
|
glog.Warningln("Dup email found", chksum, path, len(blob))
|
||||||
|
dupCnt++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dup.Add(chksum)
|
||||||
|
|
||||||
|
glog.Infoln(chksum, hdr_size, n)
|
||||||
|
if _, err := stmt.Exec(chksum, hdr_size, n, blob); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if _, err := stmt.Exec(); err != nil {
|
||||||
|
if err := txn.Rollback(); err != nil {
|
||||||
|
glog.Errorln("stmt.Exec error rolling back", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
if err := txn.Rollback(); err != nil {
|
||||||
|
glog.Errorln("stmt.Close error rolling back", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return txn.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
defer glog.Flush()
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// TODO(wathiede): make a set of flags.
|
||||||
|
db, err := sql.Open("postgres", "user=gomail dbname=gomail sslmode=disable")
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if *maildir == "" {
|
||||||
|
fmt.Println("Must specify Maildir with -maildir")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
skip := set.NewStrings(strings.Split(*skipFiles, ",")...)
|
||||||
|
glog.Infoln("Skip files", skip)
|
||||||
|
|
||||||
|
if err := Load(db, *maildir, skip); err != nil {
|
||||||
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
7
pq/init.sql
Normal file
7
pq/init.sql
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
DROP TABLE original;
|
||||||
|
CREATE TABLE original (
|
||||||
|
hash char(40) PRIMARY KEY,
|
||||||
|
header_size integer,
|
||||||
|
total_size integer,
|
||||||
|
blob bytea
|
||||||
|
)
|
||||||
Loading…
x
Reference in New Issue
Block a user