nsqd

package
v0.0.0-...-94081c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2019 License: MIT Imports: 50 Imported by: 0

README ¶

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.

Read the docs

Documentation ¶

Overview ¶

Package skiplist implements skip list based maps and sets.

Skip lists are a data structure that can be used in place of balanced trees. Skip lists use probabilistic balancing rather than strictly enforced balancing and as a result the algorithms for insertion and deletion in skip lists are much simpler and significantly faster than equivalent algorithms for balanced trees.

Skip lists were first described in Pugh, William (June 1990). "Skip lists: a probabilistic alternative to balanced trees". Communications of the ACM 33 (6): 668–676

Index ¶

Constants ¶

View Source
const (
	LOG_DEBUG = lg.DEBUG
	LOG_INFO  = lg.INFO
	LOG_WARN  = lg.WARN
	LOG_ERROR = lg.ERROR
	LOG_FATAL = lg.FATAL
)
View Source
const (
	UNKONW_STATUS = MessageDtType(0)
	PRE_STATUS    = MessageDtType(1)
	CANCEL_STATUS = MessageDtType(2)
	COMMIT_STATUS = MessageDtType(3)
)
View Source
const (
	TLSNotRequired = iota
	TLSRequiredExceptHTTP
	TLSRequired
)
View Source
const (
	FrameTypeResponse int32 = 0
	FrameTypeError    int32 = 1
	FrameTypeMessage  int32 = 2
	FrameTypeUnknown  int32 = -1
)

frame types

View Source
const DefaultMaxLevel = 32
View Source
const (
	MAX_POSSIBLE_MSG_SIZE = 1 << 28
)
View Source
const (
	MAX_QUEUE_OFFSET_META_DATA_KEEP = 100
)
View Source
const (
	MsgIDLength = 16
)

Variables ¶

View Source
var (
	ErrInvalidOffset     = errors.New("invalid offset")
	ErrNeedFixQueueStart = errors.New("init queue start should be fixed")
)
View Source
var DataInconsistentError = errors.New("DataInconsistentError")
View Source
var EndOfMsgVirOffsetError = errors.New("EndOfMsgVirOffsetError")
View Source
var ErrIDBackwards = errors.New("ID went backward")
View Source
var ErrSequenceExpired = errors.New("sequence expired")
View Source
var ErrTimeBackwards = errors.New("time has gone backwards")
View Source
var FlagInconsistentError = errors.New("FlagInconsistentError")
View Source
var MagicDT = []byte("  DT")

MagicDT is the initial identifier sent when connecting for DT clients

View Source
var MagicV1 = []byte("  V1")

MagicV1 is the initial identifier sent when connecting for V1 clients

View Source
var MagicV2 = []byte("  V2")

MagicV2 is the initial identifier sent when connecting for V2 clients

View Source
var StepInconsistentError = errors.New("StepInconsistentError")

Functions ¶

func GetQueueFileName ¶

func GetQueueFileName(dataRoot string, base string, fileNum int64) string

func GetTopicFullName ¶

func GetTopicFullName(topic string, part int) string

func IsValidChannelName ¶

func IsValidChannelName(name string) bool

IsValidChannelName checks a channel name for correctness

func IsValidTopicName ¶

func IsValidTopicName(name string) bool

IsValidTopicName checks a topic name for correctness

func NewGUIDFactory ¶

func NewGUIDFactory(nodeID int64) *guidFactory

func ReadResponse ¶

func ReadResponse(r io.Reader) ([]byte, error)

ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
    size       data

func ReadUnpackedResponse ¶

func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)

ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error

func UnpackResponse ¶

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
  frame ID     data

Returns a triplicate of: frame type, data ([]byte), error

Types ¶

type BackendQueueEnd ¶

type BackendQueueEnd interface {
	Offset() int64
	TotalMsgCnt() int64
	IsSame(BackendQueueEnd) bool
}

type BackendQueueReader ¶

type BackendQueueReader interface {
	Put([]byte) (BackendQueueEnd, error)
	Close() error
	Delete() error
	Empty() error
	Depth() int64
	ReaderFlush() error
	GetQueueReadEnd() BackendQueueEnd
	GetQueueCurMemRead() BackendQueueEnd
	UpdateBackendQueueEnd(BackendQueueEnd)
	TryReadOne() (*ReadResult, bool)
	Confirm(start int64, end int64, endCnt int64) bool
}

BackendQueueReader represents reader for current topic's consumer

type BackendQueueWriter ¶

type BackendQueueWriter interface {
	Put(data []byte) (BackendQueueEnd, error)
	Close() error
	Delete() error
	Empty() error
	WriterFlush() (bool, bool, error)
	EndInfo()
	GetQueueReadEnd() BackendQueueEnd
	GetQueueCurWriterEnd() BackendQueueEnd
}

BackendQueue represents the behavior for the secondary message storage system

func NewDiskQueueWriter ¶

func NewDiskQueueWriter(name string, dataPath string, maxBytesPerFile int64,
	minMsgSize int32, maxMsgSize int32, ctx *context) (BackendQueueWriter, error)

type Channel ¶

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)

There can be multiple channels per topic, each with there own unique set of subscribers (clients).

Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.

func NewChannel ¶

func NewChannel(topicName string, channelName string, chEnd BackendQueueEnd,
	ctx *context, maxWin int64, deleteCallback func(*Channel)) *Channel

NewChannel creates a new instance of the Channel type and returns a pointer

func (*Channel) AddClient ¶

func (c *Channel) AddClient(clientID int64, client Consumer) error

AddClient adds a client to the Channel's client list

func (*Channel) CheckBack ¶

func (c *Channel) CheckBack(m *Message)

func (*Channel) Close ¶

func (c *Channel) Close() error

Close cleanly closes the Channel

func (*Channel) CommitDtPreMsg ¶

func (c *Channel) CommitDtPreMsg(msgId MessageID) error

func (*Channel) Delete ¶

func (c *Channel) Delete() error

Delete empties the channel and closes

func (*Channel) Depth ¶

func (c *Channel) Depth() int64

func (*Channel) Empty ¶

func (c *Channel) Empty() error

func (*Channel) Exiting ¶

func (c *Channel) Exiting() bool

Exiting returns a boolean indicating if this channel is closed/exiting

func (*Channel) FinishMessage ¶

func (c *Channel) FinishMessage(clientID int64, id MessageID) error

FinishMessage successfully discards an in-flight message

func (*Channel) GetDtPreMsgByCmtMsg ¶

func (c *Channel) GetDtPreMsgByCmtMsg(msgId MessageID) (*Message, error)

func (*Channel) HandleSyncChannelFromSlave ¶

func (c *Channel) HandleSyncChannelFromSlave() ([]byte, error)

func (*Channel) IsPaused ¶

func (c *Channel) IsPaused() bool

func (*Channel) ListMost10Item ¶

func (c *Channel) ListMost10Item() []*Message

func (*Channel) Pause ¶

func (c *Channel) Pause() error

func (*Channel) PutMessageDeferred ¶

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration)

func (*Channel) RemoveClient ¶

func (c *Channel) RemoveClient(clientID int64)

RemoveClient removes a client from the Channel's client list

func (*Channel) RequeueMessage ¶

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error

RequeueMessage requeues a message based on `time.Duration`, ie:

`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout

and requeue a message (aka "deferred requeue")

func (*Channel) RestartPreDtMsgTimeout ¶

func (c *Channel) RestartPreDtMsgTimeout(msg *Message, timeout time.Duration)

func (*Channel) StartDeferredTimeout ¶

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error

func (*Channel) StartInFlightTimeout ¶

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error

func (*Channel) StartPreDtMsgTimeout ¶

func (c *Channel) StartPreDtMsgTimeout(msg *Message, timeout time.Duration) error

func (*Channel) TouchMessage ¶

func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error

TouchMessage resets the timeout for an in-flight message

func (*Channel) UnPause ¶

func (c *Channel) UnPause() error

func (*Channel) UpdateBackendQueueEnd ¶

func (c *Channel) UpdateBackendQueueEnd(backendQueueEnd BackendQueueEnd)

type ChannelStats ¶

type ChannelStats struct {
	ChannelName   string        `json:"channel_name"`
	Depth         int64         `json:"depth"`
	BackendDepth  int64         `json:"backend_depth"`
	InFlightCount int           `json:"in_flight_count"`
	DeferredCount int           `json:"deferred_count"`
	MessageCount  uint64        `json:"message_count"`
	RequeueCount  uint64        `json:"requeue_count"`
	TimeoutCount  uint64        `json:"timeout_count"`
	ClientCount   int           `json:"client_count"`
	Clients       []ClientStats `json:"clients"`
	Paused        bool          `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewChannelStats ¶

func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats

type Channels ¶

type Channels []*Channel

func (Channels) Len ¶

func (c Channels) Len() int

func (Channels) Swap ¶

func (c Channels) Swap(i, j int)

type ChannelsByName ¶

type ChannelsByName struct {
	Channels
}

func (ChannelsByName) Less ¶

func (c ChannelsByName) Less(i, j int) bool

type Client ¶

type Client interface {
	Stats() ClientStats
	IsProducer() bool
}

type ClientStats ¶

type ClientStats struct {
	ClientID        string `json:"client_id"`
	Hostname        string `json:"hostname"`
	Version         string `json:"version"`
	RemoteAddress   string `json:"remote_address"`
	State           int32  `json:"state"`
	ReadyCount      int64  `json:"ready_count"`
	InFlightCount   int64  `json:"in_flight_count"`
	MessageCount    uint64 `json:"message_count"`
	FinishCount     uint64 `json:"finish_count"`
	RequeueCount    uint64 `json:"requeue_count"`
	ConnectTime     int64  `json:"connect_ts"`
	SampleRate      int32  `json:"sample_rate"`
	Deflate         bool   `json:"deflate"`
	Snappy          bool   `json:"snappy"`
	UserAgent       string `json:"user_agent"`
	Authed          bool   `json:"authed,omitempty"`
	AuthIdentity    string `json:"auth_identity,omitempty"`
	AuthIdentityURL string `json:"auth_identity_url,omitempty"`

	PubCounts []PubCount `json:"pub_counts,omitempty"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

type Consumer ¶

type Consumer interface {
	UnPause()
	Pause()
	Close() error
	TimedOutMessage()
	Stats() ClientStats
	Empty()
}

type IntervalTree ¶

type IntervalTree struct {
	// contains filtered or unexported fields
}

func NewIntervalTree ¶

func NewIntervalTree() *IntervalTree

func (*IntervalTree) AddOrMerge ¶

func (IT *IntervalTree) AddOrMerge(inter *QueueInterval) *QueueInterval

func (*IntervalTree) DeleteInterval ¶

func (self *IntervalTree) DeleteInterval(inter *QueueInterval)

func (*IntervalTree) Len ¶

func (IT *IntervalTree) Len() int64

type Iterator ¶

type Iterator interface {
	// Next returns true if the iterator contains subsequent elements
	// and advances its state to the next element if that is possible.
	Next() (ok bool)
	// Previous returns true if the iterator contains previous elements
	// and rewinds its state to the previous element if that is possible.
	Previous() (ok bool)
	// Key returns the current key.
	Key() interface{}
	// Value returns the current value.
	Value() interface{}
	// Seek reduces iterative seek costs for searching forward into the Skip List
	// by remarking the range of keys over which it has scanned before.  If the
	// requested key occurs prior to the point, the Skip List will start searching
	// as a safeguard.  It returns true if the key is within the known range of
	// the list.
	Seek(key interface{}) (ok bool)
	// Close this iterator to reap resources associated with it.  While not
	// strictly required, it will provide extra hints for the garbage collector.
	Close()
}

Iterator is an interface that you can use to iterate through the skip list (in its entirety or fragments). For an use example, see the documentation of SkipList.

Key and Value return the key and the value of the current node.

type Logger ¶

type Logger lg.Logger

type Message ¶

type Message struct {
	ID        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16

	MovedSize int64

	//for backend queue
	BackendQueueEnd
	// contains filtered or unexported fields
}

func NewMessage ¶

func NewMessage(id MessageID, body []byte) *Message

func (*Message) WriteTo ¶

func (m *Message) WriteTo(w io.Writer) (int64, error)

type MessageDtType ¶

type MessageDtType int

type MessageID ¶

type MessageID [MsgIDLength]byte

type NSQD ¶

type NSQD struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New ¶

func New(opts *Options) (*NSQD, error)

func (*NSQD) AddClient ¶

func (n *NSQD) AddClient(clientID int64, client Client)

func (*NSQD) ChannelList ¶

func (n *NSQD) ChannelList(topicName string) []string

func (*NSQD) DeleteExistingTopic ¶

func (n *NSQD) DeleteExistingTopic(topicName string) error

DeleteExistingTopic removes a topic only if it exists

func (*NSQD) Exit ¶

func (n *NSQD) Exit()

func (*NSQD) GetError ¶

func (n *NSQD) GetError() error

func (*NSQD) GetExistingTopic ¶

func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)

GetExistingTopic gets a topic only if it exists

func (*NSQD) GetHealth ¶

func (n *NSQD) GetHealth() string

func (*NSQD) GetProducerStats ¶

func (n *NSQD) GetProducerStats() []ClientStats

func (*NSQD) GetStartTime ¶

func (n *NSQD) GetStartTime() time.Time

func (*NSQD) GetStats ¶

func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []TopicStats

func (*NSQD) GetTopic ¶

func (n *NSQD) GetTopic(topicName string) *Topic

GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)

func (*NSQD) GetTopicMapCopy ¶

func (n *NSQD) GetTopicMapCopy() []*Topic

func (*NSQD) GetTopicsAndChannelsBytes ¶

func (n *NSQD) GetTopicsAndChannelsBytes() ([]byte, error)

func (*NSQD) HandleSyncTopicFromSlave ¶

func (n *NSQD) HandleSyncTopicFromSlave(topicName string, totalMsgCnt, filenum, fileoffset, virtutaloffset, maxnum int64) ([]byte, error)

func (*NSQD) IsAuthEnabled ¶

func (n *NSQD) IsAuthEnabled() bool

func (*NSQD) IsHealthy ¶

func (n *NSQD) IsHealthy() bool

func (*NSQD) LoadMetadata ¶

func (n *NSQD) LoadMetadata() error

func (*NSQD) Main ¶

func (n *NSQD) Main() error

func (*NSQD) Notify ¶

func (n *NSQD) Notify(v interface{})

func (*NSQD) PersistMetadata ¶

func (n *NSQD) PersistMetadata() error

func (*NSQD) RealHTTPAddr ¶

func (n *NSQD) RealHTTPAddr() *net.TCPAddr

func (*NSQD) RealHTTPSAddr ¶

func (n *NSQD) RealHTTPSAddr() *net.TCPAddr

func (*NSQD) RealTCPAddr ¶

func (n *NSQD) RealTCPAddr() *net.TCPAddr

func (*NSQD) RemoveClient ¶

func (n *NSQD) RemoveClient(clientID int64)

func (*NSQD) SetHealth ¶

func (n *NSQD) SetHealth(err error)

func (*NSQD) SlaveSyncChannel ¶

func (n *NSQD) SlaveSyncChannel(topicName, channelName string) ([]byte, error)

func (*NSQD) SlaveSyncLoop ¶

func (n *NSQD) SlaveSyncLoop()

func (*NSQD) TopicList ¶

func (n *NSQD) TopicList() []string

type Options ¶

type Options struct {
	// basic options
	ID        int64       `flag:"node-id" cfg:"id"`
	LogLevel  lg.LogLevel `flag:"log-level"`
	LogPrefix string      `flag:"log-prefix"`
	Logger    Logger

	TCPAddress               string        `flag:"tcp-address"`
	HTTPAddress              string        `flag:"http-address"`
	HTTPSAddress             string        `flag:"https-address"`
	BroadcastAddress         string        `flag:"broadcast-address"`
	NSQLookupdTCPAddresses   []string      `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
	AuthHTTPAddresses        []string      `flag:"auth-http-address" cfg:"auth_http_addresses"`
	HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"`
	HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"`
	NsqdMasterAddr           string        `flag:"nsqd-master-addr"`

	// diskqueue options
	DataPath           string        `flag:"data-path"`
	MemQueueSize       int64         `flag:"mem-queue-size"`
	MaxBytesPerFile    int64         `flag:"max-bytes-per-file"`
	SyncEvery          int64         `flag:"sync-every"`
	DtCheckBackTimeout time.Duration `flag:"dt-checkback-timeout"`
	SyncTimeout        time.Duration `flag:"sync-timeout"`
	LoopReadTimeout    time.Duration `flag:"loop-read-timeout"`

	QueueScanInterval        time.Duration
	QueueScanRefreshInterval time.Duration
	QueueScanSelectionCount  int
	QueueScanWorkerPoolMax   int
	QueueScanDirtyPercent    float64

	// msg and command options
	MsgTimeout    time.Duration `flag:"msg-timeout"`
	MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
	MaxMsgSize    int64         `flag:"max-msg-size"`
	MaxBodySize   int64         `flag:"max-body-size"`
	MaxReqTimeout time.Duration `flag:"max-req-timeout"`
	ClientTimeout time.Duration

	// client overridable configuration options
	MaxHeartbeatInterval   time.Duration `flag:"max-heartbeat-interval"`
	MaxRdyCount            int64         `flag:"max-rdy-count"`
	MaxOutputBufferSize    int64         `flag:"max-output-buffer-size"`
	MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
	MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"`
	MaxConfirmWin          int64         `flag:"max-confirm-win"`
	OutputBufferTimeout    time.Duration `flag:"output-buffer-timeout"`
	MaxChannelConsumers    int           `flag:"max-channel-consumers"`

	// statsd integration
	StatsdAddress       string        `flag:"statsd-address"`
	StatsdPrefix        string        `flag:"statsd-prefix"`
	StatsdInterval      time.Duration `flag:"statsd-interval"`
	StatsdMemStats      bool          `flag:"statsd-mem-stats"`
	StatsdUDPPacketSize int           `flag:"statsd-udp-packet-size"`

	// e2e message latency
	E2EProcessingLatencyWindowTime  time.Duration `flag:"e2e-processing-latency-window-time"`
	E2EProcessingLatencyPercentiles []float64     `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"`

	// TLS config
	TLSCert             string `flag:"tls-cert"`
	TLSKey              string `flag:"tls-key"`
	TLSClientAuthPolicy string `flag:"tls-client-auth-policy"`
	TLSRootCAFile       string `flag:"tls-root-ca-file"`
	TLSRequired         int    `flag:"tls-required"`
	TLSMinVersion       uint16 `flag:"tls-min-version"`

	// compression
	DeflateEnabled  bool `flag:"deflate"`
	MaxDeflateLevel int  `flag:"max-deflate-level"`
	SnappyEnabled   bool `flag:"snappy"`
}

func NewOptions ¶

func NewOptions() *Options

type Ordered ¶

type Ordered interface {
	LessThan(other Ordered) bool
}

Ordered is an interface which can be linearly ordered by the LessThan method, whereby this instance is deemed to be less than other. Additionally, Ordered instances should behave properly when compared using == and !=.

type PubCount ¶

type PubCount struct {
	Topic string `json:"topic"`
	Count uint64 `json:"count"`
}

type QueueInterval ¶

type QueueInterval struct {
	// contains filtered or unexported fields
}

func (*QueueInterval) End ¶

func (QI *QueueInterval) End() int64

func (*QueueInterval) EndCnt ¶

func (QI *QueueInterval) EndCnt() int64

func (*QueueInterval) HighAtDimension ¶

func (QI *QueueInterval) HighAtDimension(dim uint64) int64

func (*QueueInterval) ID ¶

func (QI *QueueInterval) ID() uint64

the augmentedtree use the low and the id to determin if the interval is the duplicate so here we use the end as the id of segment

func (*QueueInterval) LowAtDimension ¶

func (QI *QueueInterval) LowAtDimension(dim uint64) int64

func (*QueueInterval) OverlapsAtDimension ¶

func (QI *QueueInterval) OverlapsAtDimension(inter augmentedtree.Interval, dim uint64) bool

func (*QueueInterval) Start ¶

func (QI *QueueInterval) Start() int64

type ReadResult ¶

type ReadResult struct {
	Data []byte
	Err  error
	// contains filtered or unexported fields
}

ReadResult represents the result for TryReadOne()

type Response ¶

type Response struct {
	// contains filtered or unexported fields
}

type Set ¶

type Set struct {
	// contains filtered or unexported fields
}

Set is an ordered set data structure.

Its elements must implement the Ordered interface. It uses a SkipList for storage, and it gives you similar performance guarantees.

To iterate over a set (where s is a *Set):

for i := s.Iterator(); i.Next(); {
	// do something with i.Key().
	// i.Value() will be nil.
}

func NewCustomSet ¶

func NewCustomSet(lessThan func(l, r interface{}) bool) *Set

NewCustomSet returns a new Set that will use lessThan as the comparison function. lessThan should define a linear order on elements you intend to use with the Set.

func NewIntSet ¶

func NewIntSet() *Set

NewIntSet returns a new Set that accepts int elements.

func NewSet ¶

func NewSet() *Set

NewSet returns a new Set.

func NewStringSet ¶

func NewStringSet() *Set

NewStringSet returns a new Set that accepts string elements.

func (*Set) Add ¶

func (s *Set) Add(key interface{})

Add adds key to s.

func (*Set) Contains ¶

func (s *Set) Contains(key interface{}) bool

Contains returns true if key is present in s.

func (*Set) GetMaxLevel ¶

func (s *Set) GetMaxLevel() int

GetMaxLevel returns MaxLevel fo the underlying skip list.

func (*Set) Iterator ¶

func (s *Set) Iterator() Iterator

func (*Set) Len ¶

func (s *Set) Len() int

Len returns the length of the set.

func (*Set) Range ¶

func (s *Set) Range(from, to interface{}) Iterator

Range returns an iterator that will go through all the elements of the set that are greater or equal than from, but less than to.

func (*Set) Remove ¶

func (s *Set) Remove(key interface{}) (ok bool)

Remove tries to remove key from the set. It returns true if key was present.

func (*Set) SetMaxLevel ¶

func (s *Set) SetMaxLevel(newMaxLevel int)

SetMaxLevel sets MaxLevel in the underlying skip list.

type SkipList ¶

type SkipList struct {

	// MaxLevel determines how many items the SkipList can store
	// efficiently (2^MaxLevel).
	//
	// It is safe to increase MaxLevel to accomodate more
	// elements. If you decrease MaxLevel and the skip list
	// already contains nodes on higer levels, the effective
	// MaxLevel will be the greater of the new MaxLevel and the
	// level of the highest node.
	//
	// A SkipList with MaxLevel equal to 0 is equivalent to a
	// standard linked list and will not have any of the nice
	// properties of skip lists (probably not what you want).
	MaxLevel int
	// contains filtered or unexported fields
}

A SkipList is a map-like data structure that maintains an ordered collection of key-value pairs. Insertion, lookup, and deletion are all O(log n) operations. A SkipList can efficiently store up to 2^MaxLevel items.

To iterate over a skip list (where s is a *SkipList):

for i := s.Iterator(); i.Next(); {
	// do something with i.Key() and i.Value()
}

func NewCustomMap ¶

func NewCustomMap(lessThan func(l, r interface{}) bool) *SkipList

NewCustomMap returns a new SkipList that will use lessThan as the comparison function. lessThan should define a linear order on keys you intend to use with the SkipList.

func NewIntMap ¶

func NewIntMap() *SkipList

NewIntKey returns a SkipList that accepts int keys.

func NewSkipList ¶

func NewSkipList() *SkipList

New returns a new SkipList.

Its keys must implement the Ordered interface.

func NewStringMap ¶

func NewStringMap() *SkipList

NewStringMap returns a SkipList that accepts string keys.

func (*SkipList) Delete ¶

func (s *SkipList) Delete(key interface{}) (value interface{}, ok bool)

Delete removes the node with the given key.

It returns the old value and whether the node was present.

func (*SkipList) Get ¶

func (s *SkipList) Get(key interface{}) (value interface{}, ok bool)

Get returns the value associated with key from s (nil if the key is not present in s). The second return value is true when the key is present.

func (*SkipList) GetGreaterOrEqual ¶

func (s *SkipList) GetGreaterOrEqual(min interface{}) (actualKey, value interface{}, ok bool)

GetGreaterOrEqual finds the node whose key is greater than or equal to min. It returns its value, its actual key, and whether such a node is present in the skip list.

func (*SkipList) Iterator ¶

func (s *SkipList) Iterator() Iterator

Iterator returns an Iterator that will go through all elements s.

func (*SkipList) Len ¶

func (s *SkipList) Len() int

Len returns the length of s.

func (*SkipList) Range ¶

func (s *SkipList) Range(from, to interface{}) Iterator

Range returns an iterator that will go through all the elements of the skip list that are greater or equal than from, but less than to.

func (*SkipList) Seek ¶

func (s *SkipList) Seek(key interface{}) Iterator

Seek returns a bidirectional iterator starting with the first element whose key is greater or equal to key; otherwise, a nil iterator is returned.

func (*SkipList) SeekToFirst ¶

func (s *SkipList) SeekToFirst() Iterator

SeekToFirst returns a bidirectional iterator starting from the first element in the list if the list is populated; otherwise, a nil iterator is returned.

func (*SkipList) SeekToLast ¶

func (s *SkipList) SeekToLast() Iterator

SeekToLast returns a bidirectional iterator starting from the last element in the list if the list is populated; otherwise, a nil iterator is returned.

func (*SkipList) Set ¶

func (s *SkipList) Set(key, value interface{})

Sets set the value associated with key in s.

type Slave ¶

type Slave struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSlave ¶

func NewSlave(masterAddr string, slaveToMasterEvent chan struct{}, ctx *context) *Slave

func (*Slave) CheckConn ¶

func (s *Slave) CheckConn() error

func (*Slave) Close ¶

func (s *Slave) Close()

func (*Slave) ConnToMaster ¶

func (s *Slave) ConnToMaster() error

func (*Slave) FilterResponse ¶

func (s *Slave) FilterResponse()

func (*Slave) HandleResponse ¶

func (s *Slave) HandleResponse()

func (*Slave) Sync ¶

func (s *Slave) Sync() error

type Topic ¶

type Topic struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTopic ¶

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic

Topic constructor

func (*Topic) AggregateChannelE2eProcessingLatency ¶

func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile

func (*Topic) Close ¶

func (t *Topic) Close() error

Close persists all outstanding topic data and closes all its channels

func (*Topic) CommitDtPreMsg ¶

func (t *Topic) CommitDtPreMsg(msgId MessageID)

func (*Topic) Delete ¶

func (t *Topic) Delete() error

Delete empties the topic and all its channels and closes

func (*Topic) DeleteExistingChannel ¶

func (t *Topic) DeleteExistingChannel(channelName string) error

DeleteExistingChannel removes a channel from the topic only if it exists

func (*Topic) Empty ¶

func (t *Topic) Empty() error

func (*Topic) Exiting ¶

func (t *Topic) Exiting() bool

Exiting returns a boolean indicating if this topic is closed/exiting

func (*Topic) FlushTopicAndChannels ¶

func (t *Topic) FlushTopicAndChannels() error

func (*Topic) GenerateID ¶

func (t *Topic) GenerateID() MessageID

func (*Topic) GetChannel ¶

func (t *Topic) GetChannel(channelName string) *Channel

GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic

func (*Topic) GetChannelMapCopy ¶

func (t *Topic) GetChannelMapCopy() map[string]*Channel

func (*Topic) GetDtPreMsgByCmtMsg ¶

func (t *Topic) GetDtPreMsgByCmtMsg(msgId MessageID) (*Message, error)

func (*Topic) GetExistingChannel ¶

func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

func (*Topic) HandleSyncTopicFromSlave ¶

func (t *Topic) HandleSyncTopicFromSlave(totalMsgCnt, filenum, fileoffset, virtutaloffset, maxnum int64) ([]byte, error)

func (*Topic) IsPaused ¶

func (t *Topic) IsPaused() bool

func (*Topic) Pause ¶

func (t *Topic) Pause() error

func (*Topic) PutMessage ¶

func (t *Topic) PutMessage(m *Message) error

func (*Topic) PutMessages ¶

func (t *Topic) PutMessages(msgs []*Message) error

PutMessages writes multiple Messages to the queue

func (*Topic) Start ¶

func (t *Topic) Start()

func (*Topic) UnPause ¶

func (t *Topic) UnPause() error

func (*Topic) UpdatedBackendQueueEndCallback ¶

func (t *Topic) UpdatedBackendQueueEndCallback()

type TopicStats ¶

type TopicStats struct {
	TopicName    string         `json:"topic_name"`
	Channels     []ChannelStats `json:"channels"`
	Depth        int64          `json:"depth"`
	BackendDepth int64          `json:"backend_depth"`
	MessageCount uint64         `json:"message_count"`
	MessageBytes uint64         `json:"message_bytes"`
	Paused       bool           `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewTopicStats ¶

func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats

type Topics ¶

type Topics []*Topic

func (Topics) Len ¶

func (t Topics) Len() int

func (Topics) Swap ¶

func (t Topics) Swap(i, j int)

type TopicsByName ¶

type TopicsByName struct {
	Topics
}

func (TopicsByName) Less ¶

func (t TopicsByName) Less(i, j int) bool

type Uint64Slice ¶

type Uint64Slice []uint64

func (Uint64Slice) Len ¶

func (s Uint64Slice) Len() int

func (Uint64Slice) Less ¶

func (s Uint64Slice) Less(i, j int) bool

func (Uint64Slice) Swap ¶

func (s Uint64Slice) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL