diff --git a/TODO b/TODO new file mode 100644 index 0000000..d248dd1 --- /dev/null +++ b/TODO @@ -0,0 +1,4 @@ +- Add 'fast_header' table that is just the values we'd want to search on (i.e. + the headers we hash on, plus whatever is needed for jwz threading). +- Investigate stored procedures that update 'fast_header' automatically when + we we add bulk headers to table 'header' diff --git a/cmd/dumporiginal/dumporiginal.go b/cmd/dumporiginal/dumporiginal.go index 9c21fb0..4cec615 100644 --- a/cmd/dumporiginal/dumporiginal.go +++ b/cmd/dumporiginal/dumporiginal.go @@ -1,14 +1,107 @@ package main import ( + "bytes" "flag" "fmt" - "os" + "io" + "mime" + "net/mail" + "sort" "xinu.tv/email/db" "github.com/golang/glog" ) +var ( + dumpMime = flag.Bool("mime", false, "show multipart mime types for message") + headers = flag.Bool("headers", false, "show only headers") +) + +func printMultipartMime(hash string, r io.Reader) error { + msg, err := mail.ReadMessage(r) + if err != nil { + return err + } + + mediatype, params, err := mime.ParseMediaType(msg.Header.Get("Content-Type")) + if err != nil { + return err + } + glog.Infof("%s mt: %s p: %v", hash, mediatype, params) + + return nil +} + +func printHeaders(hash string, r io.Reader) error { + msg, err := mail.ReadMessage(r) + if err != nil { + return err + } + hdrs := make([]string, 0, len(msg.Header)) + for k := range msg.Header { + hdrs = append(hdrs, k) + } + fmt.Println(hash) + sort.Strings(hdrs) + for _, k := range hdrs { + vs := msg.Header[k] + fmt.Printf("%s:", k) + for _, v := range vs { + fmt.Printf(" %s", v) + } + fmt.Println() + } + return nil +} + +func printOneBlob(hash string, blob []byte) error { + r := bytes.NewReader(blob) + switch { + case *dumpMime: + if err := printMultipartMime(hash, r); err != nil { + // err != nil expected when mime type not found, so we log and + // squelch the error. + glog.Warningf("%s: %v", hash, err) + return nil + } + case *headers: + if err := printHeaders(hash, r); err != nil { + return err + } + default: + fmt.Println(string(blob)) + } + return nil +} + +type hashError struct { + hash string + error +} + +func (he hashError) Error() string { + return fmt.Sprintf("%s: %v", he.hash, he.error) +} + +func printAllBlobs(c *db.Conn) error { + oCh := make(chan db.Original) + errc := make(chan error) + donec := make(chan struct{}) + defer close(donec) + go c.Originals(oCh, errc, donec) + for { + select { + case o := <-oCh: + if err := printOneBlob(o.Hash, o.Blob); err != nil { + return hashError{hash: o.Hash, error: err} + } + case err := <-errc: + return err + } + } +} + func main() { defer glog.Flush() flag.Parse() @@ -19,8 +112,10 @@ func main() { } if flag.NArg() == 0 { - fmt.Println("Must specify message hashes to print") - os.Exit(1) + if err := printAllBlobs(c); err != nil { + glog.Fatal(err) + } + return } var blob []byte @@ -28,6 +123,8 @@ func main() { if err := c.OriginalBlobByHash(hash, &blob); err != nil { glog.Fatal(err) } - fmt.Println(string(blob)) + if err := printOneBlob(hash, blob); err != nil { + glog.Fatal(err) + } } } diff --git a/cmd/headers/headers.go b/cmd/headers/headers.go new file mode 100644 index 0000000..d2af4aa --- /dev/null +++ b/cmd/headers/headers.go @@ -0,0 +1,63 @@ +package main + +import ( + "bytes" + "flag" + "io" + "net/mail" + "xinu.tv/email/db" + + "github.com/golang/glog" +) + +var ( + all = flag.Bool("all", false, + "extract headers from all messages. Default is to only extract from unprocessed messages.") +) + +func insertHeaders(c *db.Conn, hash string, r io.Reader) error { + msg, err := mail.ReadMessage(r) + if err != nil { + return err + } + glog.Infoln("Insert", hash) + return c.InsertHeaders(hash, msg.Header) +} + +func insertAllHeaders(c *db.Conn, all bool) error { + oCh := make(chan db.Original) + errc := make(chan error) + donec := make(chan struct{}) + defer close(donec) + if all { + go c.Originals(oCh, errc, donec) + } else { + go c.OriginalsNotInTable("header", oCh, errc, donec) + } + for { + select { + case o := <-oCh: + r := bytes.NewReader(o.Blob) + if err := insertHeaders(c, o.Hash, r); err != nil { + return err + } + case err := <-errc: + return err + } + } +} + +func main() { + defer glog.Flush() + flag.Parse() + + c, err := db.NewConn("") + if err != nil { + glog.Fatal(err) + } + + if err := insertAllHeaders(c, *all); err != nil { + glog.Fatal(err) + } + +} diff --git a/db/util.go b/db/util.go index 25c5a93..a0bc2e6 100644 --- a/db/util.go +++ b/db/util.go @@ -4,7 +4,10 @@ import ( "database/sql" "flag" "fmt" + "net/mail" + "unicode/utf8" + "github.com/golang/glog" _ "github.com/lib/pq" ) @@ -29,6 +32,73 @@ func NewConn(dsn string) (*Conn, error) { return &Conn{DB: db}, err } +type Original struct { + UID int64 + Hash string + HeaderSize int64 + TotalSize int64 + Blob []byte +} + +func (c *Conn) Originals(oCh chan<- Original, errc chan<- error, donec <-chan struct{}) { + query := ` +SELECT + uid, + hash, + header_size, + total_size, + blob +FROM + original +` + c.originals(query, oCh, errc, donec) +} + +func (c *Conn) OriginalsNotInTable(table string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) { + query := fmt.Sprintf(` +SELECT + original.uid, + original.hash, + original.header_size, + original.total_size, + original.blob +FROM + original +LEFT + JOIN %s ON original.hash = %s.hash +WHERE + %s.hash IS NULL; +`, table, table, table) + c.originals(query, oCh, errc, donec) +} + +func (c *Conn) originals(query string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) { + + defer close(oCh) + defer close(errc) + rows, err := c.Query(query) + if err != nil { + errc <- err + return + } + for rows.Next() { + var o Original + if err := rows.Scan(&o.UID, &o.Hash, &o.HeaderSize, &o.TotalSize, &o.Blob); err != nil { + errc <- err + return + } + oCh <- o + select { + case <-donec: + return + default: + } + } + if err := rows.Err(); err != nil { + errc <- err + } +} + func (c *Conn) OriginalBlobByHash(hash string, blob *[]byte) error { row := c.QueryRow(` SELECT @@ -41,3 +111,42 @@ WHERE return row.Scan(blob) } + +func (c *Conn) InsertHeaders(hash string, hdrs mail.Header) (err error) { + var tx *sql.Tx + tx, err = c.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + if err := tx.Rollback(); err != nil { + glog.Error(err) + } + return + } + err = tx.Commit() + }() + + stmt, err := tx.Prepare(` +INSERT INTO + header (hash, name, value) +VALUES + ($1, $2, $3) +`) + if err != nil { + return err + } + for k, vs := range hdrs { + for _, v := range vs { + if !utf8.ValidString(v) { + glog.Infof("%s: value for %q invalid UTF-8 %q", hash, k, v) + continue + } + if _, err := stmt.Exec(hash, k, v); err != nil { + return err + } + } + } + return nil +} diff --git a/pg/init-header.sql b/pg/init-header.sql new file mode 100644 index 0000000..b1a2344 --- /dev/null +++ b/pg/init-header.sql @@ -0,0 +1,7 @@ +DROP TABLE header; + +CREATE TABLE header ( + hash CHAR(40) REFERENCES original (hash), + name TEXT, + value TEXT +);