NAV Navbar
shell go
  • Introduction
  • Topics
  • Producing a message
  • Consuming
  • Introduction

    Welcome to the Sandglass API documentation.

    Topics

    Creating a topic

    To create a topic:

    sandctl topics create emails --num_partitions 6 --replication_factor 3
    
    client, err := sg.NewClient(
        sg.WithAddresses(":7170"),
    )
    if err != nil {
        return err
    }
    
    err = client.CreateTopic(context.Background(), &sgproto.CreateTopicParams{
      Name: "emails",
      NumPartitions: 6,
      ReplicationFactor: 3,
    })
    if err != nil {
        return err
    }
    

    Make sure to replace :7170 with the address of at least one node in the cluster

    A topic has a name, a number of partitions and a replication factor. Here is a table describing the required/optional params:

    Parameter Required Description
    name true The name of the topic, must not start with an underscore _ and only be composed of alphanumeric characters
    num partitions true The number of partitions
    replication factor true The number of nodes to replicate each partition to
    kind false The topic kind. Default: Timer
    storage driver false The storage engine to use (RocksDB or Badger). Default: RocksDB

    Producing a message

    Now

    sandctl produce emails '{"dest" : "[email protected]"}'
    
    client, err := sg.NewClient(
        sg.WithAddresses(":7170"),
    )
    if err != nil {
        return err
    }
    
    err = client.Produce(context.Background(), "emails", "", &sgproto.Message{
        Value: []byte(`{"dest" : "[email protected]"}`),
    })
    if err != nil {
        return err
    }
    

    When producing a new message to a topic we need to specify to which partition the message should be produced in.

    However, an empty partition means choose a random one. This is for convenience and when there is not need to partition in a particular partition.

    In the future

    client, err := sg.NewClient(
        sg.WithAddresses(":7170"),
    )
    if err != nil {
        return err
    }
    
    // Now we produce a new message that will be consumed in one hour
    inOneHour := time.Now().Add(1 * time.Hour)
    gen := sandflake.NewFixedTimeGenerator(inOneHour)
    
    msg := &sgproto.Message{
        Offset: gen.Next(),
        Value:  []byte("Hello"),
    }
    
    err := client.ProduceMessage(context.Background(), "emails", "", msg)
    if err != nil {
        return err
    }
    

    The way to produce a message in future, is by setting a custom sandflake id with the time that you want.

    Consuming

    sandctl consume emails
    
    client, err := sg.NewClient(
        sg.WithAddresses(":7170"),
    )
    if err != nil {
        panic(err)
    }
    defer client.Close()
    
    mux := sg.NewMux()
    mux.SubscribeFunc("emails", func(msg *sgproto.Message) error {
        // handle message
        log.Printf("received: %s\n", string(msg.Value))
        return nil
    })
    
    m := &sg.MuxManager{
        Client:               c,
        Mux:                  mux,
        ReFetchSleepDuration: dur,
    }
    
    err = m.Start()
    if err != nil {
        log.Fatal(err)
    }
    

    In Go, when using Handlers and Subscribe(Func) methods, each message received will launch a new goroutine.