Add person table.
Import mail associated with -user specified on commandline.
This commit is contained in:
parent
bb8f546fd0
commit
1b8afaa73f
@ -6,31 +6,33 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
|
||||||
"xinu.tv/email"
|
"xinu.tv/email"
|
||||||
"xinu.tv/set"
|
"xinu.tv/set"
|
||||||
|
"xinu.tv/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
maildir = flag.String("maildir", "", "Maildir root")
|
maildir = flag.String("maildir", "", "Maildir root")
|
||||||
|
username = flag.String("user", "wathiede", "username for email we're importing")
|
||||||
skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log",
|
skipFiles = flag.String("skip", "maildirfolder,log,msgid.cache,razor-agent.log",
|
||||||
"comma separated files to skip")
|
"comma separated files to skip")
|
||||||
|
|
||||||
// Hashed over fields from each message.
|
// Hashed over fields from each message.
|
||||||
headers = []string{"to", "from", "cc", "date", "subject"}
|
headers = []string{"to", "from", "cc", "date", "subject", "message-id"}
|
||||||
)
|
)
|
||||||
|
|
||||||
var CRCR = []byte("\n\n")
|
var CRCR = []byte("\n\n")
|
||||||
|
|
||||||
func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
func Load(db *sql.DB, uid int, root string, skip *set.StringSet) error {
|
||||||
dup := set.NewStrings()
|
dup := set.NewStrings()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
cnt := 0
|
cnt := 0
|
||||||
@ -45,16 +47,25 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
stmt, err := txn.Prepare(pq.CopyIn("original", "hash", "header_size",
|
stmt, err := txn.Prepare("INSERT INTO original (uid, hash, header_size, total_size, blob) VALUES ($1, $2, $3, $4, $5);")
|
||||||
"total_size", "blob"))
|
//stmt, err := txn.Prepare(pq.CopyIn("original", "uid", "hash", "total_size", "header", "body"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := txn.Rollback(); err != nil {
|
if err := txn.Rollback(); err != nil {
|
||||||
glog.Errorln("txn.Prepare error rolling back", err)
|
glog.Errorln("txn.Prepare stmt error rolling back", err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b := new(bytes.Buffer)
|
//hstmt, err := txn.Prepare(pq.CopyIn("files", "hash", "path"))
|
||||||
|
hstmt, err := txn.Prepare("INSERT INTO files (hash, path) VALUES ($1, $2);")
|
||||||
|
if err != nil {
|
||||||
|
if err := txn.Rollback(); err != nil {
|
||||||
|
glog.Errorln("txn.Prepare hstmt error rolling back", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var total int
|
||||||
err = filepath.Walk(root,
|
err = filepath.Walk(root,
|
||||||
func(path string, info os.FileInfo, err error) error {
|
func(path string, info os.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -69,21 +80,19 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
b.Reset()
|
|
||||||
r, err := os.Open(path)
|
r, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
n, err := io.Copy(b, r)
|
b, err := ioutil.ReadAll(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
blob := b.Bytes()
|
hdr_size := bytes.Index(b, CRCR)
|
||||||
hdr_size := bytes.Index(blob, CRCR)
|
h, err := email.Hash(bytes.NewReader(b), headers)
|
||||||
h, err := email.Hash(bytes.NewReader(blob), headers)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("%s not an mail file", path)
|
glog.Errorf("%s not an mail file", path)
|
||||||
glog.Infof("%q", err.Error())
|
glog.Infof("%q", err.Error())
|
||||||
@ -91,15 +100,26 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
chksum := fmt.Sprintf("%x", h.Sum(nil))
|
chksum := fmt.Sprintf("%x", h.Sum(nil))
|
||||||
|
if _, err := hstmt.Exec(chksum, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if dup.Contains(chksum) {
|
if dup.Contains(chksum) {
|
||||||
glog.Warningln("Dup email found", chksum, path, len(blob))
|
glog.Warningln("Dup ", chksum, path[len(root)+1:], len(b))
|
||||||
dupCnt++
|
dupCnt++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
dup.Add(chksum)
|
dup.Add(chksum)
|
||||||
|
|
||||||
glog.Infoln(chksum, hdr_size, n)
|
n := len(b)
|
||||||
if _, err := stmt.Exec(chksum, hdr_size, n, blob); err != nil {
|
total += n
|
||||||
|
delta := time.Since(start)
|
||||||
|
if cnt%1000 == 0 {
|
||||||
|
glog.Infof("%d messages processed in %s: %.2f msg/s %s/s", cnt,
|
||||||
|
delta, float64(cnt)/delta.Seconds(),
|
||||||
|
types.Base2Size(float64(total)/delta.Seconds()))
|
||||||
|
}
|
||||||
|
if _, err := stmt.Exec(uid, chksum, hdr_size, n, b); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,12 +127,14 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if _, err := stmt.Exec(); err != nil {
|
/*
|
||||||
if err := txn.Rollback(); err != nil {
|
if _, err := stmt.Exec(); err != nil {
|
||||||
glog.Errorln("stmt.Exec error rolling back", err)
|
if err := txn.Rollback(); err != nil {
|
||||||
|
glog.Errorln("stmt.Exec error rolling back", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return err
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
if err := stmt.Close(); err != nil {
|
if err := stmt.Close(); err != nil {
|
||||||
if err := txn.Rollback(); err != nil {
|
if err := txn.Rollback(); err != nil {
|
||||||
@ -121,9 +143,26 @@ func Load(db *sql.DB, root string, skip *set.StringSet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := hstmt.Close(); err != nil {
|
||||||
|
if err := txn.Rollback(); err != nil {
|
||||||
|
glog.Errorln("stmt.Close error rolling back", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return txn.Commit()
|
return txn.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetUid(db *sql.DB, username string) (int, error) {
|
||||||
|
var uid int
|
||||||
|
err := db.QueryRow("SELECT uid FROM person WHERE username=$1", username).Scan(&uid)
|
||||||
|
switch {
|
||||||
|
case err == sql.ErrNoRows, err != nil:
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
return uid, nil
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defer glog.Flush()
|
defer glog.Flush()
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
@ -134,6 +173,12 @@ func main() {
|
|||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uid, err := GetUid(db, *username)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infoln("Using uid", uid, "for", *username)
|
||||||
if *maildir == "" {
|
if *maildir == "" {
|
||||||
fmt.Println("Must specify Maildir with -maildir")
|
fmt.Println("Must specify Maildir with -maildir")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@ -142,7 +187,7 @@ func main() {
|
|||||||
skip := set.NewStrings(strings.Split(*skipFiles, ",")...)
|
skip := set.NewStrings(strings.Split(*skipFiles, ",")...)
|
||||||
glog.Infoln("Skip files", skip)
|
glog.Infoln("Skip files", skip)
|
||||||
|
|
||||||
if err := Load(db, *maildir, skip); err != nil {
|
if err := Load(db, uid, *maildir, skip); err != nil {
|
||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
27
pq/init.sql
27
pq/init.sql
@ -1,7 +1,24 @@
|
|||||||
|
DROP TABLE files;
|
||||||
DROP TABLE original;
|
DROP TABLE original;
|
||||||
|
DROP TABLE person;
|
||||||
|
|
||||||
|
CREATE TABLE files (
|
||||||
|
hash CHAR(40),
|
||||||
|
path TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE person (
|
||||||
|
uid SERIAL UNIQUE,
|
||||||
|
username TEXT,
|
||||||
|
name TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO person (username, name) VALUES ('wathiede', 'Bill Thiede');
|
||||||
|
|
||||||
CREATE TABLE original (
|
CREATE TABLE original (
|
||||||
hash char(40) PRIMARY KEY,
|
uid INTEGER REFERENCES person (uid),
|
||||||
header_size integer,
|
hash CHAR(40) PRIMARY KEY,
|
||||||
total_size integer,
|
header_size INTEGER,
|
||||||
blob bytea
|
total_size INTEGER,
|
||||||
)
|
blob BYTEA
|
||||||
|
);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user