You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
6.5 KiB
Go

package main
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/cloudwego/eino/schema"
)
// SessionMetadata provides summary information for sessions
type SessionMetadata struct {
ID string `json:"id"`
Title string `json:"title"`
CreatedAt time.Time `json:"created_at"`
MessageCount int `json:"message_count"`
}
// A SessionHeader is the first line in a JSON session file.
type SessionHeader struct {
Type string `json:"type"`
ID string `json:"id"`
CreatedAt time.Time `json:"created_at"`
}
// Session holds the in-memory state for a single chat conversation.
type Session struct {
ID string
CreatedAt time.Time
FilePath string
mu sync.Mutex
Messages []*schema.Message
}
// Append adds a message to the session and flushes it to disk
func (s *Session) Append(msg *schema.Message) error {
s.mu.Lock()
defer s.mu.Unlock()
// append to in memory structure
s.Messages = append(s.Messages, msg)
data, err := json.Marshal(msg)
if err != nil {
return err
}
// open the file for appending
f, err := os.OpenFile(s.FilePath, os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer f.Close()
_, err = fmt.Fprintf(f, "%s\n", data)
return err
}
// GetMessages returns a snapshot of all session messages
func (s *Session) GetMessages() []*schema.Message {
s.mu.Lock()
defer s.mu.Unlock()
snapshot := make([]*schema.Message, len(s.Messages))
copy(snapshot, s.Messages)
return snapshot
}
// Title produces a display title from the first user message of a session.
func (s *Session) Title() string {
s.mu.Lock()
defer s.mu.Unlock()
for _, msg := range s.Messages {
if msg.Role == schema.User && msg.Content != "" {
title := msg.Content
if len([]rune(title)) > 60 {
title = string([]rune(title))[:60] + "..."
}
return title
}
}
return "New Session..."
}
// Store manages persisted sessions backed by JSONL files.
//
// The firstline of a Session file is a SessionHeader line.
// All subsequent lines are schema.Message contents.
//
// File format example:
//
// {"type":"session","id":"...","created_at":"..."}
// {"role":"user","content":"..."}
type SessionStore struct {
dir string
mu sync.Mutex
cache map[string]*Session
}
// GetOrCreate will retrieve a session from disk or create a new session
// and persist its metadata to disk.
func (ss *SessionStore) GetOrCreate(id string) (*Session, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var (
session *Session
err error
ok bool
)
session, ok = ss.cache[id]
if ok {
return session, nil
}
filePath := filepath.Join(ss.dir, id+".jsonl")
if _, statErr := os.Stat(filePath); os.IsNotExist(statErr) {
session, err = createSession(id, filePath)
} else {
session, err = loadSession(filePath)
}
if err != nil {
return nil, err
}
ss.cache[id] = session
return session, nil
}
// List returns the metadata for all sessions.
func (ss *SessionStore) List() ([]SessionMetadata, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
entries, err := os.ReadDir(ss.dir)
if err != nil {
return nil, err
}
var meta []SessionMetadata
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") {
continue
}
id := strings.TrimSuffix(e.Name(), ".jsonl")
session, ok := ss.cache[id]
if ok {
meta = append(meta, SessionMetadata{ID: id, Title: session.Title(), CreatedAt: session.CreatedAt, MessageCount: len(session.Messages)})
continue
} else {
session, err = loadSession(filepath.Join(ss.dir, e.Name()))
if err != nil {
continue
}
meta = append(meta, SessionMetadata{ID: id, Title: session.Title(), CreatedAt: session.CreatedAt, MessageCount: len(session.Messages)})
}
}
return meta, nil
}
// Delete removes a session from the SessionStore cache and any
// backing session file from disk.
func (ss *SessionStore) Delete(id string) error {
ss.mu.Lock()
defer ss.mu.Unlock()
// return if no session found
session, ok := ss.cache[id]
if !ok {
return nil
}
// delete the backing file
err := os.Remove(session.FilePath)
if err != nil {
return err
}
// delete session from cache
delete(ss.cache, id)
return nil
}
// func createSessionMap(dir string) map[string]*Session {
// // Find all the jsonl files, parse them and add them to the map.
// // If a file fails to parse it should simply be logged as a warning
// m := make(map[string]*Session)
// os.DirFS()
// }
func NewSessionStore(dir string) (*SessionStore, error) {
info, err := os.Stat(dir)
// Check if directory exists
if os.IsNotExist(err) {
if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil {
return nil, err
}
return &SessionStore{dir: dir, cache: make(map[string]*Session)}, nil
}
// Possible permission error
if err != nil {
return nil, err
}
if !info.IsDir() {
// Go style guide says error messages shouldn't contain punctuation and be
// lowercase unless a proper noun.
return nil, fmt.Errorf("path %s is a file, not a directory", dir)
}
//return &SessionStore{dir: dir, cache: createSessionMap(dir)}, nil
return &SessionStore{dir: dir, cache: make(map[string]*Session)}, nil
}
func createSession(id, filepath string) (*Session, error) {
header := SessionHeader{
Type: "session",
ID: id,
CreatedAt: time.Now().UTC(),
}
data, err := json.Marshal(header)
if err != nil {
return nil, err
}
// write the header to the file
if err := os.WriteFile(filepath, append(data, '\n'), 0o644); err != nil {
return nil, err
}
return &Session{
ID: id,
CreatedAt: header.CreatedAt,
FilePath: filepath,
Messages: []*schema.Message{},
}, nil
}
func loadSession(filePath string) (*Session, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
// check for header
if !scanner.Scan() {
return nil, fmt.Errorf("empty session file: %s", filePath)
}
var header SessionHeader
if err := json.Unmarshal(scanner.Bytes(), &header); err != nil {
return nil, fmt.Errorf("bad session header in %s: %w", filePath, err)
}
session := &Session{
ID: header.ID,
CreatedAt: header.CreatedAt,
FilePath: filePath,
Messages: make([]*schema.Message, 0),
}
// populate messages
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var msg schema.Message
if err := json.Unmarshal([]byte(line), &msg); err != nil {
//skip bad lines
continue
}
session.Messages = append(session.Messages, &msg)
}
return session, nil
}